1
0

log_indexer_rebuild.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. package nginx_log
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/nginx"
  9. "github.com/blevesearch/bleve/v2"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // RebuildIndex forces a complete rebuild of the index
  13. func (li *LogIndexer) RebuildIndex() error {
  14. logger.Infof("Starting index rebuild...")
  15. // Get all files that need to be marked as indexing
  16. var allLogPaths []string
  17. if li.persistence != nil {
  18. indexes, err := li.persistence.GetAllLogIndexes()
  19. if err != nil {
  20. logger.Warnf("Failed to get log indexes: %v", err)
  21. } else {
  22. for _, logIndex := range indexes {
  23. allLogPaths = append(allLogPaths, logIndex.Path)
  24. }
  25. }
  26. }
  27. // Mark all files as being indexed
  28. for _, path := range allLogPaths {
  29. SetIndexingStatus(path, true)
  30. }
  31. // Update global indexing status
  32. statusManager := GetIndexingStatusManager()
  33. statusManager.UpdateIndexingStatus()
  34. // Close current index
  35. if err := li.index.Close(); err != nil {
  36. logger.Warnf("Failed to close index: %v", err)
  37. }
  38. // Remove index directory
  39. if err := os.RemoveAll(li.indexPath); err != nil {
  40. // Clear indexing status on error
  41. for _, path := range allLogPaths {
  42. SetIndexingStatus(path, false)
  43. }
  44. statusManager := GetIndexingStatusManager()
  45. statusManager.UpdateIndexingStatus()
  46. return fmt.Errorf("failed to remove index directory: %w", err)
  47. }
  48. // Create new index
  49. mapping := createIndexMapping()
  50. index, err := bleve.New(li.indexPath, mapping)
  51. if err != nil {
  52. // Clear indexing status on error
  53. for _, path := range allLogPaths {
  54. SetIndexingStatus(path, false)
  55. }
  56. statusManager := GetIndexingStatusManager()
  57. statusManager.UpdateIndexingStatus()
  58. return fmt.Errorf("failed to create new index: %w", err)
  59. }
  60. li.index = index
  61. // Reset file tracking
  62. li.mu.Lock()
  63. for path := range li.logPaths {
  64. li.logPaths[path].LastModified = 0
  65. li.logPaths[path].LastSize = 0
  66. li.logPaths[path].LastIndexed = 0
  67. li.logPaths[path].TimeRange = nil // Clear in-memory time range
  68. }
  69. li.mu.Unlock()
  70. // Reset persistence data - clear all index position records
  71. if li.persistence != nil {
  72. // Get all log indexes
  73. indexes, err := li.persistence.GetAllLogIndexes()
  74. if err != nil {
  75. logger.Warnf("Failed to get log indexes for reset: %v", err)
  76. } else {
  77. // Reset each index record
  78. for _, logIndex := range indexes {
  79. logIndex.Reset() // Clear position data
  80. if err := li.persistence.SaveLogIndex(logIndex); err != nil {
  81. logger.Warnf("Failed to reset log index for %s: %v", logIndex.Path, err)
  82. }
  83. }
  84. logger.Infof("Reset %d persistence records", len(indexes))
  85. }
  86. }
  87. // Clear caches since all data will be reindexed
  88. li.cache.Clear()
  89. li.statsCache.Clear()
  90. // Clear all log group completion flags to allow new notifications
  91. li.logGroupCompletionSent.Range(func(key, value interface{}) bool {
  92. li.logGroupCompletionSent.Delete(key)
  93. return true
  94. })
  95. // --- Start of Synchronous Re-indexing Logic ---
  96. logger.Infof("Starting synchronous re-indexing of all discovered files")
  97. // 1. Discover all potential log files from the main access log path
  98. var discoveredFiles []string
  99. accessLogPath := nginx.GetAccessLogPath()
  100. if accessLogPath != "" && IsLogPathUnderWhiteList(accessLogPath) {
  101. logDir := filepath.Dir(accessLogPath)
  102. baseLogName := filepath.Base(accessLogPath)
  103. files, err := li.findRelatedLogFiles(logDir, baseLogName)
  104. if err != nil {
  105. logger.Errorf("Failed to discover log files, proceeding with tracked files only: %v", err)
  106. } else {
  107. discoveredFiles = files
  108. logger.Infof("Discovered %d log files to consider for re-indexing", len(discoveredFiles))
  109. }
  110. }
  111. // 2. Get all currently tracked files and discover all log groups
  112. li.mu.RLock()
  113. allLogGroups := make(map[string]struct{})
  114. for path := range li.logPaths {
  115. mainLogPath := li.getMainLogPath(path)
  116. allLogGroups[mainLogPath] = struct{}{}
  117. }
  118. li.mu.RUnlock()
  119. // Also add discovered files to the set of log groups
  120. for _, path := range discoveredFiles {
  121. mainLogPath := li.getMainLogPath(path)
  122. allLogGroups[mainLogPath] = struct{}{}
  123. }
  124. // 3. Create WaitGroup and dispatch tasks for each unique log group
  125. var wg sync.WaitGroup
  126. logger.Infof("Queueing re-index tasks for %d unique log groups", len(allLogGroups))
  127. for mainLogPath := range allLogGroups {
  128. // ForceReindexFileGroup will discover files, add to WaitGroup, and queue tasks internally
  129. if err := li.ForceReindexFileGroup(mainLogPath, &wg); err != nil {
  130. logger.Warnf("Failed to queue force reindex for log group %s: %v", mainLogPath, err)
  131. }
  132. }
  133. // 4. Wait for all indexing tasks to complete
  134. logger.Infof("Waiting for all re-indexing tasks for log groups to complete...")
  135. wg.Wait()
  136. // 5. Completion notifications are now handled automatically by ProgressTracker
  137. // when each log group finishes indexing - no need for manual notification
  138. logger.Infof("All log groups have completed indexing - notifications sent automatically")
  139. // 6. Finalize status
  140. statusManager.UpdateIndexingStatus()
  141. logger.Infof("Synchronous re-indexing completed for all queued files")
  142. logger.Infof("Index rebuild completed successfully")
  143. return nil
  144. }
  145. // DiscoverAndIndexFile discovers and indexes a single log file
  146. func (li *LogIndexer) DiscoverAndIndexFile(filePath string, lastModified time.Time, lastSize int64) error {
  147. logDir := filepath.Dir(filePath)
  148. baseLogName := filepath.Base(filePath)
  149. return li.DiscoverLogFiles(logDir, baseLogName)
  150. }
  151. // DeleteFileIndex removes all index entries for a specific file
  152. func (li *LogIndexer) DeleteFileIndex(filePath string) error {
  153. logger.Infof("Deleting index entries for file: %s", filePath)
  154. // Create query to find all entries for this file
  155. query := bleve.NewTermQuery(filePath)
  156. query.SetField("file_path")
  157. searchReq := bleve.NewSearchRequest(query)
  158. searchReq.Size = 10000 // Process in batches
  159. totalDeleted := 0
  160. for {
  161. searchResult, err := li.index.Search(searchReq)
  162. if err != nil {
  163. return fmt.Errorf("failed to search for entries to delete: %w", err)
  164. }
  165. if len(searchResult.Hits) == 0 {
  166. break
  167. }
  168. // Create batch deletion
  169. batch := li.index.NewBatch()
  170. for _, hit := range searchResult.Hits {
  171. batch.Delete(hit.ID)
  172. }
  173. if err := li.index.Batch(batch); err != nil {
  174. return fmt.Errorf("failed to delete entries: %w", err)
  175. }
  176. totalDeleted += len(searchResult.Hits)
  177. logger.Infof("Deleted %d entries from %s (total: %d)", len(searchResult.Hits), filePath, totalDeleted)
  178. }
  179. // Remove from tracking
  180. li.mu.Lock()
  181. delete(li.logPaths, filePath)
  182. li.mu.Unlock()
  183. // Remove from persistence
  184. if li.persistence != nil {
  185. if err := li.persistence.DeleteLogIndex(filePath); err != nil {
  186. logger.Warnf("Failed to delete persistence record for %s: %v", filePath, err)
  187. }
  188. }
  189. // Clear related caches
  190. li.invalidateStatsCache()
  191. logger.Infof("Successfully deleted %d index entries for %s", totalDeleted, filePath)
  192. return nil
  193. }
  194. // DeleteAllIndexes removes all index entries
  195. func (li *LogIndexer) DeleteAllIndexes() error {
  196. logger.Info("Deleting all index entries...")
  197. // Close current index
  198. if err := li.index.Close(); err != nil {
  199. logger.Warnf("Failed to close index during deletion: %v", err)
  200. }
  201. // Remove index directory
  202. if err := os.RemoveAll(li.indexPath); err != nil {
  203. return fmt.Errorf("failed to remove index directory: %w", err)
  204. }
  205. // Create new empty index
  206. mapping := createIndexMapping()
  207. index, err := bleve.New(li.indexPath, mapping)
  208. if err != nil {
  209. return fmt.Errorf("failed to create new empty index: %w", err)
  210. }
  211. li.index = index
  212. // Clear all tracking
  213. li.mu.Lock()
  214. li.logPaths = make(map[string]*LogFileInfo)
  215. li.mu.Unlock()
  216. // Clear persistence
  217. if li.persistence != nil {
  218. if err := li.persistence.DeleteAllLogIndexes(); err != nil {
  219. logger.Warnf("Failed to clear persistence: %v", err)
  220. }
  221. }
  222. // Clear caches
  223. li.cache.Clear()
  224. li.statsCache.Clear()
  225. logger.Info("Successfully deleted all index entries")
  226. return nil
  227. }
  228. // CleanupOrphanedIndexes removes index entries for files that no longer exist
  229. func (li *LogIndexer) CleanupOrphanedIndexes() error {
  230. logger.Info("Cleaning up orphaned index entries...")
  231. li.mu.RLock()
  232. paths := make([]string, 0, len(li.logPaths))
  233. for path := range li.logPaths {
  234. paths = append(paths, path)
  235. }
  236. li.mu.RUnlock()
  237. orphanedPaths := make([]string, 0)
  238. for _, path := range paths {
  239. if _, err := os.Stat(path); os.IsNotExist(err) {
  240. orphanedPaths = append(orphanedPaths, path)
  241. }
  242. }
  243. if len(orphanedPaths) == 0 {
  244. logger.Info("No orphaned index entries found")
  245. return nil
  246. }
  247. logger.Infof("Found %d orphaned files to clean up: %v", len(orphanedPaths), orphanedPaths)
  248. for _, path := range orphanedPaths {
  249. if err := li.DeleteFileIndex(path); err != nil {
  250. logger.Errorf("Failed to delete orphaned index for %s: %v", path, err)
  251. } else {
  252. logger.Infof("Cleaned up orphaned index for %s", path)
  253. }
  254. }
  255. logger.Infof("Cleanup completed for %d orphaned files", len(orphanedPaths))
  256. return nil
  257. }