indexer_file_management.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "strings"
  6. "sync"
  7. "github.com/fsnotify/fsnotify"
  8. "github.com/uozi-tech/cosy/logger"
  9. )
  10. // AddLogPath adds a log path to be indexed and monitored
  11. func (li *LogIndexer) AddLogPath(logPath string) error {
  12. li.mu.Lock()
  13. defer li.mu.Unlock()
  14. // Check if file exists using safe method
  15. info, err := li.safeGetFileInfo(logPath)
  16. if err != nil {
  17. return fmt.Errorf("failed to safely stat log file %s: %w", logPath, err)
  18. }
  19. // Determine if file is compressed
  20. isCompressed := strings.HasSuffix(logPath, ".gz") || strings.HasSuffix(logPath, ".bz2")
  21. // Check if this path is already being tracked
  22. if existingInfo, exists := li.logPaths[logPath]; exists {
  23. // Update compressed status but keep existing tracking info
  24. existingInfo.IsCompressed = isCompressed
  25. logger.Debugf("Log path %s already being tracked, updated compressed status to %v", logPath, isCompressed)
  26. } else {
  27. // Add new log path with zero time to trigger initial indexing check
  28. li.logPaths[logPath] = &LogFileInfo{
  29. Path: logPath,
  30. LastModified: 0, // Will trigger indexing check on first scan
  31. LastSize: 0, // Will trigger indexing check on first scan
  32. IsCompressed: isCompressed,
  33. }
  34. logger.Infof("Added new log path %s (compressed=%v)", logPath, isCompressed)
  35. }
  36. // Add to file watcher if not compressed and watcher is available
  37. if li.watcher != nil && !isCompressed {
  38. if err := li.watcher.Add(logPath); err != nil {
  39. logger.Warnf("Failed to add file watcher for %s: %v", logPath, err)
  40. }
  41. }
  42. // Also watch the directory for compressed files if watcher is available
  43. if li.watcher != nil {
  44. dir := filepath.Dir(logPath)
  45. if err := li.watcher.Add(dir); err != nil {
  46. logger.Warnf("Failed to add directory watcher for %s: %v", dir, err)
  47. }
  48. }
  49. // Check if file needs incremental or full indexing
  50. logIndex, err := li.persistence.GetLogIndex(logPath)
  51. if err != nil {
  52. logger.Warnf("Failed to get log index record for %s: %v", logPath, err)
  53. }
  54. // Calculate total index size of related log files for comparison
  55. totalSize := li.calculateRelatedLogFilesSize(logPath)
  56. needsFullReindex := logIndex == nil || logIndex.ShouldFullReindex(info.ModTime(), totalSize)
  57. // Queue for background indexing
  58. li.queueIndexTask(&IndexTask{
  59. FilePath: logPath,
  60. Priority: 1, // Normal priority
  61. FullReindex: needsFullReindex,
  62. }, nil) // No waitgroup for regular add
  63. return nil
  64. }
  65. // processIndexQueue processes indexing tasks in the background
  66. func (li *LogIndexer) processIndexQueue() {
  67. for {
  68. select {
  69. case <-li.ctx.Done():
  70. logger.Info("Log indexer background processor stopping")
  71. return
  72. case task := <-li.indexQueue:
  73. li.processIndexTask(task)
  74. }
  75. }
  76. }
  77. // queueIndexTask adds a task to the indexing queue with debouncing
  78. func (li *LogIndexer) queueIndexTask(task *IndexTask, wg *sync.WaitGroup) {
  79. task.Wg = wg // Assign WaitGroup to task
  80. // Apply debouncing for file updates (not for manual rebuilds)
  81. if task.Priority < 10 { // Priority 10 is for manual rebuilds, should not be debounced
  82. li.debounceIndexTask(task)
  83. } else {
  84. // Manual rebuilds bypass debouncing
  85. li.executeIndexTask(task)
  86. }
  87. }
  88. // handleCompressedLogFile handles the creation of new compressed log files
  89. func (li *LogIndexer) handleCompressedLogFile(fullPath string) {
  90. li.mu.RLock()
  91. defer li.mu.RUnlock()
  92. fileName := filepath.Base(fullPath)
  93. for logPath := range li.logPaths {
  94. baseLogName := filepath.Base(logPath)
  95. if isLogrotateFile(fileName, baseLogName) {
  96. go func(path string) {
  97. if err := li.AddLogPath(path); err != nil {
  98. logger.Errorf("Failed to add new compressed log file %s: %v", path, err)
  99. return
  100. }
  101. // Queue for full indexing (compressed files need full reindex)
  102. li.queueIndexTask(&IndexTask{
  103. FilePath: path,
  104. Priority: 1, // Normal priority for compressed files
  105. FullReindex: true, // Compressed files need full indexing
  106. }, nil) // No waitgroup for autodetected compressed file
  107. }(fullPath)
  108. return // Found matching log path, no need to continue
  109. }
  110. }
  111. }
  112. // watchFiles watches for file system events
  113. func (li *LogIndexer) watchFiles() {
  114. for {
  115. select {
  116. case <-li.ctx.Done():
  117. logger.Info("Log indexer file watcher stopping")
  118. return
  119. case event, ok := <-li.watcher.Events:
  120. if !ok {
  121. return
  122. }
  123. // Handle file modifications
  124. if event.Op&fsnotify.Write == fsnotify.Write {
  125. li.mu.RLock()
  126. _, exists := li.logPaths[event.Name]
  127. li.mu.RUnlock()
  128. if exists {
  129. // Queue for incremental indexing (debouncing handled by queueIndexTask)
  130. li.queueIndexTask(&IndexTask{
  131. FilePath: event.Name,
  132. Priority: 2, // Higher priority for file updates
  133. FullReindex: false, // Use incremental indexing for file updates
  134. }, nil) // No waitgroup for file updates
  135. }
  136. }
  137. // Handle new compressed files
  138. if event.Op&fsnotify.Create == fsnotify.Create {
  139. if strings.HasSuffix(event.Name, ".gz") {
  140. // Check if this is a rotated log file we should index
  141. li.handleCompressedLogFile(event.Name)
  142. }
  143. }
  144. case err, ok := <-li.watcher.Errors:
  145. if !ok {
  146. return
  147. }
  148. logger.Errorf("File watcher error: %v", err)
  149. }
  150. }
  151. }