indexer_file_indexing.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "sync"
  6. "time"
  7. "github.com/uozi-tech/cosy/logger"
  8. )
  9. // IndexLogFileWithMode indexes a log file with specified mode (full or incremental)
  10. func (li *LogIndexer) IndexLogFileWithMode(filePath string, fullReindex bool) error {
  11. if fullReindex {
  12. return li.IndexLogFileFull(filePath)
  13. }
  14. return li.IndexLogFileIncremental(filePath)
  15. }
  16. // IndexLogFile indexes a specific log file (backward compatibility)
  17. func (li *LogIndexer) IndexLogFile(filePath string) error {
  18. return li.IndexLogFileWithMode(filePath, false)
  19. }
  20. // IndexLogFileIncremental performs incremental indexing of a log file
  21. func (li *LogIndexer) IndexLogFileIncremental(filePath string) error {
  22. logger.Debugf("Starting incremental index of log file: %s", filePath)
  23. // Note: Global indexing status is managed at the rebuild level
  24. // Individual file notifications are not needed as they cause excessive status changes
  25. defer SetIndexingStatus(filePath, false) // Clear individual file status when done
  26. // Get log index record
  27. logIndex, err := li.persistence.GetLogIndex(filePath)
  28. if err != nil {
  29. return fmt.Errorf("failed to get log index record: %w", err)
  30. }
  31. // Get current file info using safe method
  32. currentInfo, err := li.safeGetFileInfo(filePath)
  33. if err != nil {
  34. return fmt.Errorf("failed to safely stat file %s: %w", filePath, err)
  35. }
  36. // Calculate total index size of related log files for comparison
  37. totalSize := li.calculateRelatedLogFilesSize(filePath)
  38. // Check if file needs indexing
  39. if !logIndex.NeedsIndexing(currentInfo.ModTime(), totalSize) {
  40. logger.Debugf("Skipping %s - file group hasn't changed since last index", filePath)
  41. return nil
  42. }
  43. // Check if we need full reindex instead
  44. if logIndex.ShouldFullReindex(currentInfo.ModTime(), totalSize) {
  45. logger.Debugf("File %s needs full reindex instead of incremental", filePath)
  46. return li.IndexLogFileFull(filePath)
  47. }
  48. logger.Debugf("Incremental indexing log file: %s from position %d", filePath, logIndex.LastPosition)
  49. // Index from last position
  50. return li.indexFileFromPosition(filePath, logIndex.LastPosition, logIndex)
  51. }
  52. // ForceReindexFileGroup cleans, discovers, and queues all files for a log group.
  53. func (li *LogIndexer) ForceReindexFileGroup(mainLogPath string, wg *sync.WaitGroup) error {
  54. logger.Infof("Force reindexing log group: %s", mainLogPath)
  55. // 1. Delete all existing index data for this entire log group
  56. if err := li.DeleteLogGroupFromIndex(mainLogPath); err != nil {
  57. return fmt.Errorf("failed to delete log group %s before reindexing: %w", mainLogPath, err)
  58. }
  59. // 2. Discover all files belonging to this log group
  60. logDir := filepath.Dir(mainLogPath)
  61. baseLogName := filepath.Base(mainLogPath)
  62. relatedFiles, err := li.findRelatedLogFiles(logDir, baseLogName)
  63. if err != nil {
  64. return fmt.Errorf("failed to find related files for log group %s: %w", mainLogPath, err)
  65. }
  66. logger.Infof("Found %d files to reindex for log group %s", len(relatedFiles), mainLogPath)
  67. // 3. Clear completion flag to allow new notifications
  68. li.clearLogGroupCompletionFlag(mainLogPath)
  69. // 4. Record log group start time (use the main log path to store group-level timing)
  70. groupStartTime := time.Now()
  71. if mainLogIndex, err := li.persistence.GetLogIndex(mainLogPath); err == nil {
  72. mainLogIndex.SetIndexStartTime(groupStartTime)
  73. if err := li.persistence.SaveLogIndex(mainLogIndex); err != nil {
  74. logger.Warnf("Failed to save group start time for %s: %v", mainLogPath, err)
  75. }
  76. }
  77. // 5. Reset persistence for all files and queue a single task for the entire log group
  78. for _, file := range relatedFiles {
  79. if logIndex, err := li.persistence.GetLogIndex(file); err == nil {
  80. logIndex.Reset()
  81. if err := li.persistence.SaveLogIndex(logIndex); err != nil {
  82. logger.Warnf("Failed to reset persistence for %s: %v", file, err)
  83. }
  84. }
  85. SetIndexingStatus(file, true)
  86. }
  87. // Queue a single task for the entire log group (not per file)
  88. wg.Add(1)
  89. li.queueIndexTask(&IndexTask{
  90. FilePath: mainLogPath, // Use main log path to represent the entire group
  91. Priority: 10,
  92. FullReindex: true,
  93. Wg: wg,
  94. }, wg)
  95. statusManager := GetIndexingStatusManager()
  96. statusManager.UpdateIndexingStatus()
  97. return nil
  98. }