indexer_file_management.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/fsnotify/fsnotify"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // AddLogPath adds a log path to be indexed and monitored
  12. func (li *LogIndexer) AddLogPath(logPath string) error {
  13. li.mu.Lock()
  14. defer li.mu.Unlock()
  15. // Check if file exists using safe method
  16. info, err := li.safeGetFileInfo(logPath)
  17. if err != nil {
  18. return fmt.Errorf("failed to safely stat log file %s: %w", logPath, err)
  19. }
  20. // Determine if file is compressed
  21. isCompressed := strings.HasSuffix(logPath, ".gz") || strings.HasSuffix(logPath, ".bz2")
  22. // Check if this path is already being tracked
  23. if existingInfo, exists := li.logPaths[logPath]; exists {
  24. // Update compressed status but keep existing tracking info
  25. existingInfo.IsCompressed = isCompressed
  26. logger.Debugf("Log path %s already being tracked, updated compressed status to %v", logPath, isCompressed)
  27. } else {
  28. // Add new log path with zero time to trigger initial indexing check
  29. li.logPaths[logPath] = &LogFileInfo{
  30. Path: logPath,
  31. LastModified: time.Time{}, // Will trigger indexing check on first scan
  32. LastSize: 0, // Will trigger indexing check on first scan
  33. IsCompressed: isCompressed,
  34. }
  35. logger.Infof("Added new log path %s (compressed=%v)", logPath, isCompressed)
  36. }
  37. // Add to file watcher if not compressed and watcher is available
  38. if li.watcher != nil && !isCompressed {
  39. if err := li.watcher.Add(logPath); err != nil {
  40. logger.Warnf("Failed to add file watcher for %s: %v", logPath, err)
  41. }
  42. }
  43. // Also watch the directory for compressed files if watcher is available
  44. if li.watcher != nil {
  45. dir := filepath.Dir(logPath)
  46. if err := li.watcher.Add(dir); err != nil {
  47. logger.Warnf("Failed to add directory watcher for %s: %v", dir, err)
  48. }
  49. }
  50. // Check if file needs incremental or full indexing
  51. logIndex, err := li.persistence.GetLogIndex(logPath)
  52. if err != nil {
  53. logger.Warnf("Failed to get log index record for %s: %v", logPath, err)
  54. }
  55. // Calculate total index size of related log files for comparison
  56. totalSize := li.calculateRelatedLogFilesSize(logPath)
  57. needsFullReindex := logIndex == nil || logIndex.ShouldFullReindex(info.ModTime(), totalSize)
  58. // Queue for background indexing
  59. li.queueIndexTask(&IndexTask{
  60. FilePath: logPath,
  61. Priority: 1, // Normal priority
  62. FullReindex: needsFullReindex,
  63. }, nil) // No waitgroup for regular add
  64. return nil
  65. }
  66. // processIndexQueue processes indexing tasks in the background
  67. func (li *LogIndexer) processIndexQueue() {
  68. for {
  69. select {
  70. case <-li.ctx.Done():
  71. logger.Info("Log indexer background processor stopping")
  72. return
  73. case task := <-li.indexQueue:
  74. li.processIndexTask(task)
  75. }
  76. }
  77. }
  78. // queueIndexTask adds a task to the indexing queue with debouncing
  79. func (li *LogIndexer) queueIndexTask(task *IndexTask, wg *sync.WaitGroup) {
  80. task.Wg = wg // Assign WaitGroup to task
  81. // Apply debouncing for file updates (not for manual rebuilds)
  82. if task.Priority < 10 { // Priority 10 is for manual rebuilds, should not be debounced
  83. li.debounceIndexTask(task)
  84. } else {
  85. // Manual rebuilds bypass debouncing
  86. li.executeIndexTask(task)
  87. }
  88. }
  89. // handleCompressedLogFile handles the creation of new compressed log files
  90. func (li *LogIndexer) handleCompressedLogFile(fullPath string) {
  91. li.mu.RLock()
  92. defer li.mu.RUnlock()
  93. fileName := filepath.Base(fullPath)
  94. for logPath := range li.logPaths {
  95. baseLogName := filepath.Base(logPath)
  96. if isLogrotateFile(fileName, baseLogName) {
  97. go func(path string) {
  98. if err := li.AddLogPath(path); err != nil {
  99. logger.Errorf("Failed to add new compressed log file %s: %v", path, err)
  100. return
  101. }
  102. // Queue for full indexing (compressed files need full reindex)
  103. li.queueIndexTask(&IndexTask{
  104. FilePath: path,
  105. Priority: 1, // Normal priority for compressed files
  106. FullReindex: true, // Compressed files need full indexing
  107. }, nil) // No waitgroup for autodetected compressed file
  108. }(fullPath)
  109. return // Found matching log path, no need to continue
  110. }
  111. }
  112. }
  113. // watchFiles watches for file system events
  114. func (li *LogIndexer) watchFiles() {
  115. for {
  116. select {
  117. case <-li.ctx.Done():
  118. logger.Info("Log indexer file watcher stopping")
  119. return
  120. case event, ok := <-li.watcher.Events:
  121. if !ok {
  122. return
  123. }
  124. // Handle file modifications
  125. if event.Op&fsnotify.Write == fsnotify.Write {
  126. li.mu.RLock()
  127. _, exists := li.logPaths[event.Name]
  128. li.mu.RUnlock()
  129. if exists {
  130. // Queue for incremental indexing (debouncing handled by queueIndexTask)
  131. li.queueIndexTask(&IndexTask{
  132. FilePath: event.Name,
  133. Priority: 2, // Higher priority for file updates
  134. FullReindex: false, // Use incremental indexing for file updates
  135. }, nil) // No waitgroup for file updates
  136. }
  137. }
  138. // Handle new compressed files
  139. if event.Op&fsnotify.Create == fsnotify.Create {
  140. if strings.HasSuffix(event.Name, ".gz") {
  141. // Check if this is a rotated log file we should index
  142. li.handleCompressedLogFile(event.Name)
  143. }
  144. }
  145. case err, ok := <-li.watcher.Errors:
  146. if !ok {
  147. return
  148. }
  149. logger.Errorf("File watcher error: %v", err)
  150. }
  151. }
  152. }