1
0

indexer_file_full.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "strings"
  6. "github.com/blevesearch/bleve/v2"
  7. "github.com/uozi-tech/cosy/logger"
  8. )
  9. // IndexLogFileFull performs full reindexing of a log file and its related log group
  10. func (li *LogIndexer) IndexLogFileFull(filePath string) error {
  11. logger.Infof("Starting full reindex of log file and related group: %s", filePath)
  12. // Determine main log path for group operations
  13. mainLogPath := li.getMainLogPath(filePath)
  14. logDir := filepath.Dir(mainLogPath)
  15. baseLogName := filepath.Base(mainLogPath)
  16. // Get or create progress tracker for this log group
  17. progressTracker := GetProgressTracker(mainLogPath)
  18. // Find all related log files in the group
  19. relatedFiles, err := li.findRelatedLogFiles(logDir, baseLogName)
  20. if err != nil {
  21. // Fallback to single file if related file discovery fails
  22. logger.Warnf("Failed to find related files for %s, processing single file: %v", filePath, err)
  23. relatedFiles = []string{filePath}
  24. }
  25. logger.Infof("Full reindexing log group %s with %d files: %v", mainLogPath, len(relatedFiles), relatedFiles)
  26. // Initialize progress tracker with all files
  27. for _, file := range relatedFiles {
  28. info, err := li.safeGetFileInfo(file)
  29. if err != nil {
  30. logger.Warnf("Failed to get file info for %s: %v", file, err)
  31. continue
  32. }
  33. isCompressed := strings.HasSuffix(file, ".gz") || strings.HasSuffix(file, ".bz2")
  34. progressTracker.AddFile(file, isCompressed)
  35. // Estimate lines in this file
  36. estimatedLines := EstimateFileLines(file, info.Size(), isCompressed)
  37. progressTracker.SetFileEstimate(file, estimatedLines)
  38. }
  39. // Delete existing index data for the entire log group
  40. if err := li.DeleteLogGroupFromIndex(mainLogPath); err != nil {
  41. logger.Warnf("Failed to delete existing index data for log group %s: %v", mainLogPath, err)
  42. }
  43. // Index all files in the group
  44. for _, file := range relatedFiles {
  45. if err := li.indexSingleFileForGroup(file, mainLogPath, progressTracker); err != nil {
  46. logger.Errorf("Failed to index file %s in group %s: %v", file, mainLogPath, err)
  47. // Continue with other files rather than failing completely
  48. } else {
  49. logger.Infof("Successfully indexed file %s in group %s", file, mainLogPath)
  50. }
  51. }
  52. // Clean up progress tracker
  53. RemoveProgressTracker(mainLogPath)
  54. // Clear indexing status for all files in the group
  55. for _, file := range relatedFiles {
  56. SetIndexingStatus(file, false)
  57. }
  58. logger.Infof("Completed full reindex of log group %s with %d files", mainLogPath, len(relatedFiles))
  59. return nil
  60. }
  61. // indexSingleFileForGroup indexes a single file as part of a log group
  62. func (li *LogIndexer) indexSingleFileForGroup(filePath, mainLogPath string, progressTracker *ProgressTracker) error {
  63. // Get or create log index record
  64. logIndex, err := li.persistence.GetLogIndex(filePath)
  65. if err != nil {
  66. return fmt.Errorf("failed to get log index record: %w", err)
  67. }
  68. // Get current file info using safe method
  69. currentInfo, err := li.safeGetFileInfo(filePath)
  70. if err != nil {
  71. return fmt.Errorf("failed to safely stat file %s: %w", filePath, err)
  72. }
  73. logger.Infof("Indexing file in group: %s -> %s (size: %d, mod: %v)", filePath, mainLogPath, currentInfo.Size(), currentInfo.ModTime())
  74. // Start file processing in progress tracker
  75. progressTracker.StartFile(filePath)
  76. // Reset log index position for full reindex
  77. logIndex.Reset()
  78. // Index the entire file with specified mainLogPath for grouping
  79. return li.indexFileFromPositionWithMainLogPath(filePath, mainLogPath, 0, logIndex, progressTracker)
  80. }
  81. // DeleteLogGroupFromIndex removes all index entries for a given log group
  82. func (li *LogIndexer) DeleteLogGroupFromIndex(mainLogPath string) error {
  83. logger.Infof("Deleting all index entries for log group: %s", mainLogPath)
  84. query := bleve.NewTermQuery(mainLogPath)
  85. query.SetField("file_path")
  86. searchReq := bleve.NewSearchRequest(query)
  87. searchReq.Size = 10000 // Process in batches
  88. for {
  89. searchResult, err := li.index.Search(searchReq)
  90. if err != nil {
  91. return fmt.Errorf("failed to search existing entries for log group %s: %w", mainLogPath, err)
  92. }
  93. if len(searchResult.Hits) == 0 {
  94. break
  95. }
  96. batch := li.index.NewBatch()
  97. for _, hit := range searchResult.Hits {
  98. batch.Delete(hit.ID)
  99. }
  100. if err := li.index.Batch(batch); err != nil {
  101. return fmt.Errorf("failed to delete entries for log group %s: %w", mainLogPath, err)
  102. }
  103. logger.Infof("Deleted %d entries for log group %s", len(searchResult.Hits), mainLogPath)
  104. }
  105. return nil
  106. }