indexer_file_utils.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "regexp"
  7. "sort"
  8. "strings"
  9. "time"
  10. "github.com/blevesearch/bleve/v2"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // findRelatedLogFiles finds all log files related to a base log name in a directory
  14. func (li *LogIndexer) findRelatedLogFiles(logDir string, baseLogName string) ([]string, error) {
  15. entries, err := li.safeReadDir(logDir)
  16. if err != nil {
  17. return nil, fmt.Errorf("failed to read log directory %s: %w", logDir, err)
  18. }
  19. var logFiles []string
  20. for _, entry := range entries {
  21. if entry.IsDir() {
  22. continue
  23. }
  24. name := entry.Name()
  25. if isLogrotateFile(name, baseLogName) {
  26. fullPath := filepath.Join(logDir, name)
  27. logFiles = append(logFiles, fullPath)
  28. }
  29. }
  30. return logFiles, nil
  31. }
  32. // Note: isLogrotateFile is now defined in date_patterns.go as a common utility
  33. // RepairFileMetadata repairs file metadata by scanning existing index data
  34. func (li *LogIndexer) RepairFileMetadata() error {
  35. logger.Infof("Starting file metadata repair...")
  36. li.mu.Lock()
  37. defer li.mu.Unlock()
  38. for filePath, fileInfo := range li.logPaths {
  39. logger.Infof("Repairing metadata for: %s", filePath)
  40. // Check if file exists and get current info
  41. currentInfo, err := os.Stat(filePath)
  42. if err != nil {
  43. logger.Warnf("Failed to stat file %s: %v", filePath, err)
  44. continue
  45. }
  46. // Query index for entries from this file to determine time range
  47. query := bleve.NewTermQuery(filePath)
  48. query.SetField("file_path")
  49. searchReq := bleve.NewSearchRequest(query)
  50. searchReq.Size = 1000 // Get a sample to determine time range
  51. searchReq.Fields = []string{"timestamp"}
  52. searchReq.SortBy([]string{"timestamp"}) // Sort by timestamp
  53. searchResult, err := li.index.Search(searchReq)
  54. if err != nil {
  55. logger.Warnf("Failed to search index for file %s: %v", filePath, err)
  56. continue
  57. }
  58. if searchResult.Total == 0 {
  59. logger.Warnf("No indexed entries found for file %s", filePath)
  60. continue
  61. }
  62. // Get time range from search results
  63. var timeRange *TimeRange
  64. for _, hit := range searchResult.Hits {
  65. if timestampField, ok := hit.Fields["timestamp"]; ok {
  66. if timestampStr, ok := timestampField.(string); ok {
  67. timestamp, err := time.Parse(time.RFC3339, timestampStr)
  68. if err != nil {
  69. continue
  70. }
  71. if timeRange == nil {
  72. timeRange = &TimeRange{Start: timestamp, End: timestamp}
  73. } else {
  74. if timestamp.Before(timeRange.Start) {
  75. timeRange.Start = timestamp
  76. }
  77. if timestamp.After(timeRange.End) {
  78. timeRange.End = timestamp
  79. }
  80. }
  81. }
  82. }
  83. }
  84. // Update file info
  85. fileInfo.LastModified = currentInfo.ModTime()
  86. fileInfo.LastSize = currentInfo.Size()
  87. fileInfo.LastIndexed = time.Now()
  88. fileInfo.TimeRange = timeRange
  89. if timeRange != nil {
  90. logger.Infof("Repaired metadata for %s: TimeRange %v to %v, Total entries: %d",
  91. filePath, timeRange.Start, timeRange.End, searchResult.Total)
  92. } else {
  93. logger.Warnf("Could not determine time range for %s", filePath)
  94. }
  95. }
  96. logger.Infof("File metadata repair completed")
  97. return nil
  98. }
  99. // DiscoverLogFiles discovers log files in a directory, including compressed ones
  100. // This function now primarily adds paths to the indexer for tracking.
  101. // The actual indexing is queued by AddLogPath.
  102. func (li *LogIndexer) DiscoverLogFiles(logDir string, baseLogName string) error {
  103. logger.Infof("Auto-discovering log files in %s with base name %s", logDir, baseLogName)
  104. logFiles, err := li.findRelatedLogFiles(logDir, baseLogName)
  105. if err != nil {
  106. return err
  107. }
  108. if len(logFiles) == 0 {
  109. logger.Warnf("No log files found matching pattern %s in directory %s", baseLogName, logDir)
  110. return fmt.Errorf("no log files found matching pattern %s", baseLogName)
  111. }
  112. // Sort files to process them in order (newest first for current log)
  113. sort.Slice(logFiles, func(i, j int) bool {
  114. // Current log file should be processed first
  115. if !strings.Contains(logFiles[i], ".") && strings.Contains(logFiles[j], ".") {
  116. return true
  117. }
  118. return logFiles[i] < logFiles[j]
  119. })
  120. logger.Infof("Found %d log files to process: %v", len(logFiles), logFiles)
  121. // Add all discovered log files for tracking
  122. var addedCount int
  123. for _, logFile := range logFiles {
  124. if err := li.AddLogPath(logFile); err != nil {
  125. logger.Warnf("Failed to add log path %s: %v", logFile, err)
  126. continue
  127. }
  128. addedCount++
  129. }
  130. logger.Infof("Discovered and added %d log files in %s for tracking", addedCount, logDir)
  131. return nil
  132. }
  133. // calculateRelatedLogFilesSize calculates the total processing units for all related log files
  134. // For uncompressed files, returns bytes; for compressed files, estimates equivalent processing units
  135. func (li *LogIndexer) calculateRelatedLogFilesSize(filePath string) int64 {
  136. // Get the main log path for this file to find all related files in the group
  137. mainLogPath := li.getMainLogPath(filePath)
  138. logDir := filepath.Dir(mainLogPath)
  139. baseLogName := filepath.Base(mainLogPath)
  140. entries, err := li.safeReadDir(logDir)
  141. if err != nil {
  142. logger.Warnf("Failed to read log directory %s: %v", logDir, err)
  143. return 0
  144. }
  145. var totalSize int64
  146. var foundFiles []string
  147. for _, entry := range entries {
  148. if entry.IsDir() {
  149. continue
  150. }
  151. name := entry.Name()
  152. if isLogrotateFile(name, baseLogName) {
  153. fullPath := filepath.Join(logDir, name)
  154. foundFiles = append(foundFiles, name)
  155. // Use safe method to get file info for related log files
  156. if info, err := li.safeGetFileInfo(fullPath); err == nil {
  157. fileSize := info.Size()
  158. // For compressed files, use estimated processing units based on compression ratio
  159. if strings.HasSuffix(fullPath, ".gz") || strings.HasSuffix(fullPath, ".bz2") {
  160. // Estimate uncompressed size using 3:1 compression ratio for progress calculation
  161. // This provides a more consistent progress measurement across file types
  162. estimatedUncompressedSize := fileSize * 3
  163. totalSize += estimatedUncompressedSize
  164. } else {
  165. // For uncompressed files, use actual size
  166. totalSize += fileSize
  167. }
  168. }
  169. }
  170. }
  171. return totalSize
  172. }
  173. // getMainLogPath extracts the main log path from a file (including rotated files)
  174. func (li *LogIndexer) getMainLogPath(filePath string) string {
  175. dir := filepath.Dir(filePath)
  176. filename := filepath.Base(filePath)
  177. // Remove .gz compression suffix if present
  178. filename = strings.TrimSuffix(filename, ".gz")
  179. // Handle numbered rotation (access.log.1, access.log.2, etc.)
  180. // Use a more specific pattern to avoid matching date patterns like "20231201"
  181. if match := regexp.MustCompile(`^(.+)\.(\d{1,3})$`).FindStringSubmatch(filename); len(match) > 1 {
  182. // Only match if the number is reasonable for rotation (1-999)
  183. baseFilename := match[1]
  184. return filepath.Join(dir, baseFilename)
  185. }
  186. // Handle date-based rotation (access.20231201, access.2023-12-01, etc.)
  187. datePatterns := []string{
  188. `^\d{8}$`, // YYYYMMDD
  189. `^\d{4}-\d{2}-\d{2}$`, // YYYY-MM-DD
  190. `^\d{4}\.\d{2}\.\d{2}$`, // YYYY.MM.DD
  191. `^\d{4}_\d{2}_\d{2}$`, // YYYY_MM_DD
  192. }
  193. // Check if filename itself contains date patterns that we should strip
  194. // Example: access.2023-12-01 -> access.log, access.20231201 -> access.log
  195. parts := strings.Split(filename, ".")
  196. if len(parts) >= 2 {
  197. lastPart := parts[len(parts)-1]
  198. for _, pattern := range datePatterns {
  199. if matched, _ := regexp.MatchString(pattern, lastPart); matched {
  200. baseFilename := strings.Join(parts[:len(parts)-1], ".")
  201. // If the base doesn't end with .log, add it
  202. if !strings.HasSuffix(baseFilename, ".log") {
  203. baseFilename += ".log"
  204. }
  205. return filepath.Join(dir, baseFilename)
  206. }
  207. }
  208. }
  209. // No rotation pattern found, return as-is
  210. return filePath
  211. }
  212. // clearLogGroupCompletionFlag clears the completion flag for a log group (used during reindex)
  213. func (li *LogIndexer) clearLogGroupCompletionFlag(logGroupPath string) {
  214. li.logGroupCompletionSent.Delete(logGroupPath)
  215. }