indexer_file_utils.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "regexp"
  7. "sort"
  8. "strings"
  9. "time"
  10. "github.com/blevesearch/bleve/v2"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // findRelatedLogFiles finds all log files related to a base log name in a directory
  14. func (li *LogIndexer) findRelatedLogFiles(logDir string, baseLogName string) ([]string, error) {
  15. entries, err := li.safeReadDir(logDir)
  16. if err != nil {
  17. return nil, fmt.Errorf("failed to read log directory %s: %w", logDir, err)
  18. }
  19. var logFiles []string
  20. for _, entry := range entries {
  21. if entry.IsDir() {
  22. continue
  23. }
  24. name := entry.Name()
  25. if isLogrotateFile(name, baseLogName) {
  26. fullPath := filepath.Join(logDir, name)
  27. logFiles = append(logFiles, fullPath)
  28. }
  29. }
  30. return logFiles, nil
  31. }
  32. // Note: isLogrotateFile is now defined in date_patterns.go as a common utility
  33. // RepairFileMetadata repairs file metadata by scanning existing index data
  34. func (li *LogIndexer) RepairFileMetadata() error {
  35. logger.Infof("Starting file metadata repair...")
  36. li.mu.Lock()
  37. defer li.mu.Unlock()
  38. for filePath, fileInfo := range li.logPaths {
  39. logger.Infof("Repairing metadata for: %s", filePath)
  40. // Check if file exists and get current info
  41. currentInfo, err := os.Stat(filePath)
  42. if err != nil {
  43. logger.Warnf("Failed to stat file %s: %v", filePath, err)
  44. continue
  45. }
  46. // Query index for entries from this file to determine time range
  47. query := bleve.NewTermQuery(filePath)
  48. query.SetField("file_path")
  49. searchReq := bleve.NewSearchRequest(query)
  50. searchReq.Size = 1000 // Get a sample to determine time range
  51. searchReq.Fields = []string{"timestamp"}
  52. searchReq.SortBy([]string{"timestamp"}) // Sort by timestamp
  53. searchResult, err := li.index.Search(searchReq)
  54. if err != nil {
  55. logger.Warnf("Failed to search index for file %s: %v", filePath, err)
  56. continue
  57. }
  58. if searchResult.Total == 0 {
  59. logger.Warnf("No indexed entries found for file %s", filePath)
  60. continue
  61. }
  62. // Get time range from search results
  63. var timeRange *TimeRange
  64. for _, hit := range searchResult.Hits {
  65. if timestampField, ok := hit.Fields["timestamp"]; ok {
  66. var timestamp int64
  67. switch v := timestampField.(type) {
  68. case float64:
  69. timestamp = int64(v)
  70. case int64:
  71. timestamp = v
  72. default:
  73. continue
  74. }
  75. if timeRange == nil {
  76. timeRange = &TimeRange{Start: timestamp, End: timestamp}
  77. } else {
  78. if timestamp < timeRange.Start {
  79. timeRange.Start = timestamp
  80. }
  81. if timestamp > timeRange.End {
  82. timeRange.End = timestamp
  83. }
  84. }
  85. }
  86. }
  87. // Update file info
  88. fileInfo.LastModified = currentInfo.ModTime().Unix()
  89. fileInfo.LastSize = currentInfo.Size()
  90. fileInfo.LastIndexed = time.Now().Unix()
  91. fileInfo.TimeRange = timeRange
  92. if timeRange != nil {
  93. logger.Infof("Repaired metadata for %s: TimeRange %v to %v, Total entries: %d",
  94. filePath, timeRange.Start, timeRange.End, searchResult.Total)
  95. } else {
  96. logger.Warnf("Could not determine time range for %s", filePath)
  97. }
  98. }
  99. logger.Infof("File metadata repair completed")
  100. return nil
  101. }
  102. // DiscoverLogFiles discovers log files in a directory, including compressed ones
  103. // This function now primarily adds paths to the indexer for tracking.
  104. // The actual indexing is queued by AddLogPath.
  105. func (li *LogIndexer) DiscoverLogFiles(logDir string, baseLogName string) error {
  106. logger.Infof("Auto-discovering log files in %s with base name %s", logDir, baseLogName)
  107. logFiles, err := li.findRelatedLogFiles(logDir, baseLogName)
  108. if err != nil {
  109. return err
  110. }
  111. if len(logFiles) == 0 {
  112. logger.Warnf("No log files found matching pattern %s in directory %s", baseLogName, logDir)
  113. return fmt.Errorf("no log files found matching pattern %s", baseLogName)
  114. }
  115. // Sort files to process them in order (newest first for current log)
  116. sort.Slice(logFiles, func(i, j int) bool {
  117. // Current log file should be processed first
  118. if !strings.Contains(logFiles[i], ".") && strings.Contains(logFiles[j], ".") {
  119. return true
  120. }
  121. return logFiles[i] < logFiles[j]
  122. })
  123. logger.Infof("Found %d log files to process: %v", len(logFiles), logFiles)
  124. // Add all discovered log files for tracking
  125. var addedCount int
  126. for _, logFile := range logFiles {
  127. if err := li.AddLogPath(logFile); err != nil {
  128. logger.Warnf("Failed to add log path %s: %v", logFile, err)
  129. continue
  130. }
  131. addedCount++
  132. }
  133. logger.Infof("Discovered and added %d log files in %s for tracking", addedCount, logDir)
  134. return nil
  135. }
  136. // calculateRelatedLogFilesSize calculates the total processing units for all related log files
  137. // For uncompressed files, returns bytes; for compressed files, estimates equivalent processing units
  138. func (li *LogIndexer) calculateRelatedLogFilesSize(filePath string) int64 {
  139. // Get the main log path for this file to find all related files in the group
  140. mainLogPath := li.getMainLogPath(filePath)
  141. logDir := filepath.Dir(mainLogPath)
  142. baseLogName := filepath.Base(mainLogPath)
  143. entries, err := li.safeReadDir(logDir)
  144. if err != nil {
  145. logger.Warnf("Failed to read log directory %s: %v", logDir, err)
  146. return 0
  147. }
  148. var totalSize int64
  149. var foundFiles []string
  150. for _, entry := range entries {
  151. if entry.IsDir() {
  152. continue
  153. }
  154. name := entry.Name()
  155. if isLogrotateFile(name, baseLogName) {
  156. fullPath := filepath.Join(logDir, name)
  157. foundFiles = append(foundFiles, name)
  158. // Use safe method to get file info for related log files
  159. if info, err := li.safeGetFileInfo(fullPath); err == nil {
  160. fileSize := info.Size()
  161. // For compressed files, use estimated processing units based on compression ratio
  162. if strings.HasSuffix(fullPath, ".gz") || strings.HasSuffix(fullPath, ".bz2") {
  163. // Estimate uncompressed size using 3:1 compression ratio for progress calculation
  164. // This provides a more consistent progress measurement across file types
  165. estimatedUncompressedSize := fileSize * 3
  166. totalSize += estimatedUncompressedSize
  167. } else {
  168. // For uncompressed files, use actual size
  169. totalSize += fileSize
  170. }
  171. }
  172. }
  173. }
  174. return totalSize
  175. }
  176. // getMainLogPath extracts the main log path from a file (including rotated files)
  177. func (li *LogIndexer) getMainLogPath(filePath string) string {
  178. dir := filepath.Dir(filePath)
  179. filename := filepath.Base(filePath)
  180. // Remove .gz compression suffix if present
  181. filename = strings.TrimSuffix(filename, ".gz")
  182. // Handle numbered rotation (access.log.1, access.log.2, etc.)
  183. // Use a more specific pattern to avoid matching date patterns like "20231201"
  184. if match := regexp.MustCompile(`^(.+)\.(\d{1,3})$`).FindStringSubmatch(filename); len(match) > 1 {
  185. // Only match if the number is reasonable for rotation (1-999)
  186. baseFilename := match[1]
  187. return filepath.Join(dir, baseFilename)
  188. }
  189. // Handle date-based rotation (access.20231201, access.2023-12-01, etc.)
  190. datePatterns := []string{
  191. `^\d{8}$`, // YYYYMMDD
  192. `^\d{4}-\d{2}-\d{2}$`, // YYYY-MM-DD
  193. `^\d{4}\.\d{2}\.\d{2}$`, // YYYY.MM.DD
  194. `^\d{4}_\d{2}_\d{2}$`, // YYYY_MM_DD
  195. }
  196. // Check if filename itself contains date patterns that we should strip
  197. // Example: access.2023-12-01 -> access.log, access.20231201 -> access.log
  198. parts := strings.Split(filename, ".")
  199. if len(parts) >= 2 {
  200. lastPart := parts[len(parts)-1]
  201. for _, pattern := range datePatterns {
  202. if matched, _ := regexp.MatchString(pattern, lastPart); matched {
  203. baseFilename := strings.Join(parts[:len(parts)-1], ".")
  204. // If the base doesn't end with .log, add it
  205. if !strings.HasSuffix(baseFilename, ".log") {
  206. baseFilename += ".log"
  207. }
  208. return filepath.Join(dir, baseFilename)
  209. }
  210. }
  211. }
  212. // No rotation pattern found, return as-is
  213. return filePath
  214. }
  215. // clearLogGroupCompletionFlag clears the completion flag for a log group (used during reindex)
  216. func (li *LogIndexer) clearLogGroupCompletionFlag(logGroupPath string) {
  217. li.logGroupCompletionSent.Delete(logGroupPath)
  218. }