1
0

index_management.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/event"
  9. "github.com/0xJacky/Nginx-UI/internal/nginx_log"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  11. "github.com/gin-gonic/gin"
  12. "github.com/uozi-tech/cosy"
  13. "github.com/uozi-tech/cosy/logger"
  14. )
  15. // rebuildLocks tracks ongoing rebuild operations for specific log groups
  16. var (
  17. rebuildLocks = make(map[string]*sync.Mutex)
  18. rebuildLocksLock sync.RWMutex
  19. )
  20. // acquireRebuildLock gets or creates a mutex for a specific log group
  21. func acquireRebuildLock(logGroupPath string) *sync.Mutex {
  22. rebuildLocksLock.Lock()
  23. defer rebuildLocksLock.Unlock()
  24. if lock, exists := rebuildLocks[logGroupPath]; exists {
  25. return lock
  26. }
  27. lock := &sync.Mutex{}
  28. rebuildLocks[logGroupPath] = lock
  29. return lock
  30. }
  31. // releaseRebuildLock removes the mutex for a specific log group after completion
  32. func releaseRebuildLock(logGroupPath string) {
  33. rebuildLocksLock.Lock()
  34. defer rebuildLocksLock.Unlock()
  35. delete(rebuildLocks, logGroupPath)
  36. }
  37. // isRebuildInProgress checks if a rebuild is currently running for a specific log group
  38. func isRebuildInProgress(logGroupPath string) bool {
  39. rebuildLocksLock.RLock()
  40. defer rebuildLocksLock.RUnlock()
  41. if lock, exists := rebuildLocks[logGroupPath]; exists {
  42. // Try to acquire the lock with a short timeout
  43. // If we can't acquire it, it means rebuild is in progress
  44. if lock.TryLock() {
  45. lock.Unlock()
  46. return false
  47. }
  48. return true
  49. }
  50. return false
  51. }
  52. // RebuildIndex rebuilds the log index asynchronously (all files or specific file)
  53. // The API call returns immediately and the rebuild happens in background
  54. func RebuildIndex(c *gin.Context) {
  55. var request controlStruct
  56. if err := c.ShouldBindJSON(&request); err != nil {
  57. // No JSON body means rebuild all indexes
  58. request.Path = ""
  59. }
  60. // Get modern indexer
  61. modernIndexer := nginx_log.GetModernIndexer()
  62. if modernIndexer == nil {
  63. cosy.ErrHandler(c, nginx_log.ErrModernIndexerNotAvailable)
  64. return
  65. }
  66. // Check if modern indexer is healthy
  67. if !modernIndexer.IsHealthy() {
  68. cosy.ErrHandler(c, fmt.Errorf("modern indexer is not healthy"))
  69. return
  70. }
  71. // Check if indexing is already in progress
  72. processingManager := event.GetProcessingStatusManager()
  73. currentStatus := processingManager.GetCurrentStatus()
  74. if currentStatus.NginxLogIndexing {
  75. cosy.ErrHandler(c, nginx_log.ErrFailedToRebuildIndex)
  76. return
  77. }
  78. // Check if specific log group rebuild is already in progress
  79. if request.Path != "" && isRebuildInProgress(request.Path) {
  80. cosy.ErrHandler(c, nginx_log.ErrFailedToRebuildFileIndex)
  81. return
  82. }
  83. // Return immediate response to client
  84. c.JSON(http.StatusOK, IndexRebuildResponse{
  85. Message: "Index rebuild started in background",
  86. Status: "started",
  87. })
  88. // Start async rebuild in goroutine
  89. go func() {
  90. performAsyncRebuild(modernIndexer, request.Path)
  91. }()
  92. }
  93. // performAsyncRebuild performs the actual rebuild logic asynchronously
  94. // For incremental indexing of a specific log group, it preserves existing metadata
  95. // For full rebuilds (path == ""), it clears all metadata first
  96. func performAsyncRebuild(modernIndexer interface{}, path string) {
  97. processingManager := event.GetProcessingStatusManager()
  98. // Notify that indexing has started
  99. processingManager.UpdateNginxLogIndexing(true)
  100. // Create a context for this rebuild operation that can be cancelled
  101. ctx, cancel := context.WithCancel(context.Background())
  102. defer cancel()
  103. // Ensure we always reset status when done
  104. defer func() {
  105. processingManager.UpdateNginxLogIndexing(false)
  106. if r := recover(); r != nil {
  107. logger.Errorf("Panic during async rebuild: %v", r)
  108. }
  109. }()
  110. logFileManager := nginx_log.GetLogFileManager()
  111. // Handle index cleanup based on rebuild scope
  112. if path != "" {
  113. // For single file rebuild, only delete indexes for that log group
  114. if err := modernIndexer.(*indexer.ParallelIndexer).DeleteIndexByLogGroup(path, logFileManager); err != nil {
  115. logger.Errorf("Failed to delete existing indexes for log group %s: %v", path, err)
  116. return
  117. }
  118. logger.Infof("Deleted existing indexes for log group: %s", path)
  119. } else {
  120. // For full rebuild, destroy all existing indexes with context
  121. if err := nginx_log.DestroyAllIndexes(ctx); err != nil {
  122. logger.Errorf("Failed to destroy existing indexes before rebuild: %v", err)
  123. return
  124. }
  125. // Re-initialize the indexer to create new, empty shards
  126. if err := modernIndexer.(indexer.RestartableIndexer).Start(ctx); err != nil {
  127. logger.Errorf("Failed to re-initialize indexer after destruction: %v", err)
  128. return
  129. }
  130. logger.Info("Re-initialized indexer after destruction")
  131. }
  132. // Create progress tracking callbacks
  133. progressConfig := &indexer.ProgressConfig{
  134. NotifyInterval: 1 * time.Second,
  135. OnProgress: func(progress indexer.ProgressNotification) {
  136. // Send progress event to frontend
  137. event.Publish(event.Event{
  138. Type: event.TypeNginxLogIndexProgress,
  139. Data: event.NginxLogIndexProgressData{
  140. LogPath: progress.LogGroupPath,
  141. Progress: progress.Percentage,
  142. Stage: "indexing",
  143. Status: "running",
  144. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  145. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  146. },
  147. })
  148. logger.Infof("Index progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  149. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  150. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  151. },
  152. OnCompletion: func(completion indexer.CompletionNotification) {
  153. // Send completion event to frontend
  154. event.Publish(event.Event{
  155. Type: event.TypeNginxLogIndexComplete,
  156. Data: event.NginxLogIndexCompleteData{
  157. LogPath: completion.LogGroupPath,
  158. Success: completion.Success,
  159. Duration: int64(completion.Duration.Milliseconds()),
  160. TotalLines: completion.TotalLines,
  161. IndexedSize: completion.IndexedSize,
  162. Error: completion.Error,
  163. },
  164. })
  165. logger.Infof("Index completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  166. completion.LogGroupPath, completion.Success, completion.Duration,
  167. completion.TotalLines, completion.IndexedSize)
  168. },
  169. }
  170. // Store the progress config to access from rebuild functions
  171. var globalMinTime, globalMaxTime *time.Time
  172. // Create a wrapper progress config that captures timing information
  173. wrapperProgressConfig := &indexer.ProgressConfig{
  174. NotifyInterval: progressConfig.NotifyInterval,
  175. OnProgress: progressConfig.OnProgress,
  176. OnCompletion: func(completion indexer.CompletionNotification) {
  177. // Call the original completion callback first
  178. if progressConfig.OnCompletion != nil {
  179. progressConfig.OnCompletion(completion)
  180. }
  181. // Send index ready event if indexing was successful with actual time range
  182. if completion.Success {
  183. var startTimeUnix, endTimeUnix int64
  184. // Use global timing if available, otherwise use current time
  185. if globalMinTime != nil {
  186. startTimeUnix = globalMinTime.Unix()
  187. } else {
  188. startTimeUnix = time.Now().Unix()
  189. }
  190. if globalMaxTime != nil {
  191. endTimeUnix = globalMaxTime.Unix()
  192. } else {
  193. endTimeUnix = time.Now().Unix()
  194. }
  195. event.Publish(event.Event{
  196. Type: event.TypeNginxLogIndexReady,
  197. Data: event.NginxLogIndexReadyData{
  198. LogPath: completion.LogGroupPath,
  199. StartTime: startTimeUnix,
  200. EndTime: endTimeUnix,
  201. Available: true,
  202. IndexStatus: "ready",
  203. },
  204. })
  205. }
  206. },
  207. }
  208. if path != "" {
  209. // Rebuild specific file
  210. minTime, maxTime := rebuildSingleFile(modernIndexer, path, logFileManager, wrapperProgressConfig)
  211. globalMinTime, globalMaxTime = minTime, maxTime
  212. } else {
  213. // Rebuild all indexes
  214. minTime, maxTime := rebuildAllFiles(modernIndexer, logFileManager, wrapperProgressConfig)
  215. globalMinTime, globalMaxTime = minTime, maxTime
  216. }
  217. }
  218. // rebuildSingleFile rebuilds index for a single file
  219. func rebuildSingleFile(modernIndexer interface{}, path string, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  220. // Acquire lock for this specific log group
  221. lock := acquireRebuildLock(path)
  222. lock.Lock()
  223. defer func() {
  224. lock.Unlock()
  225. releaseRebuildLock(path)
  226. }()
  227. // For a single file, we need to check its type first
  228. allLogsForTypeCheck := nginx_log.GetAllLogsWithIndexGrouped()
  229. var targetLog *nginx_log.NginxLogWithIndex
  230. for _, log := range allLogsForTypeCheck {
  231. if log.Path == path {
  232. targetLog = log
  233. break
  234. }
  235. }
  236. var minTime, maxTime *time.Time
  237. if targetLog != nil && targetLog.Type == "error" {
  238. logger.Infof("Skipping index rebuild for error log as requested: %s", path)
  239. if logFileManager != nil {
  240. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, 0, time.Now(), 0, nil, nil); err != nil {
  241. logger.Warnf("Could not reset metadata for skipped error log %s: %v", path, err)
  242. }
  243. }
  244. } else {
  245. logger.Infof("Starting modern index rebuild for file: %s", path)
  246. // NOTE: We intentionally do NOT delete existing index metadata here
  247. // This allows incremental indexing to work properly with rotated logs
  248. // The IndexLogGroupWithProgress method will handle updating existing records
  249. startTime := time.Now()
  250. docsCountMap, docMinTime, docMaxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(path, progressConfig)
  251. if err != nil {
  252. logger.Errorf("Failed to index modern index for file group %s: %v", path, err)
  253. return nil, nil
  254. }
  255. minTime, maxTime = docMinTime, docMaxTime
  256. duration := time.Since(startTime)
  257. var totalDocsIndexed uint64
  258. if logFileManager != nil {
  259. // Calculate total document count
  260. for _, docCount := range docsCountMap {
  261. totalDocsIndexed += docCount
  262. }
  263. // Save metadata for the base log path with total count
  264. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
  265. logger.Errorf("Failed to save index metadata for %s: %v", path, err)
  266. }
  267. // Also save individual file metadata if needed
  268. for filePath, docCount := range docsCountMap {
  269. if filePath != path { // Don't duplicate the base path
  270. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(filePath, docCount, startTime, duration, minTime, maxTime); err != nil {
  271. logger.Errorf("Failed to save index metadata for %s: %v", filePath, err)
  272. }
  273. }
  274. }
  275. }
  276. logger.Infof("Successfully completed modern rebuild for file group: %s, Documents: %d", path, totalDocsIndexed)
  277. }
  278. if err := modernIndexer.(indexer.FlushableIndexer).FlushAll(); err != nil {
  279. logger.Errorf("Failed to flush all indexer data for single file: %v", err)
  280. }
  281. nginx_log.UpdateSearcherShards()
  282. return minTime, maxTime
  283. }
  284. // rebuildAllFiles rebuilds indexes for all files with proper queue management
  285. func rebuildAllFiles(modernIndexer interface{}, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  286. // For full rebuild, we use a special global lock key
  287. globalLockKey := "__GLOBAL_REBUILD__"
  288. lock := acquireRebuildLock(globalLockKey)
  289. lock.Lock()
  290. defer func() {
  291. lock.Unlock()
  292. releaseRebuildLock(globalLockKey)
  293. }()
  294. // For full rebuild, we clear ALL existing metadata to start fresh
  295. // This is different from single file/group rebuild which preserves metadata for incremental indexing
  296. if logFileManager != nil {
  297. if err := logFileManager.(indexer.MetadataManager).DeleteAllIndexMetadata(); err != nil {
  298. logger.Errorf("Could not clean up all old DB records before full rebuild: %v", err)
  299. }
  300. }
  301. logger.Info("Starting full modern index rebuild with queue management")
  302. allLogs := nginx_log.GetAllLogsWithIndexGrouped()
  303. // Get persistence manager for queue management
  304. var persistence *indexer.PersistenceManager
  305. if lfm, ok := logFileManager.(*indexer.LogFileManager); ok {
  306. persistence = lfm.GetPersistence()
  307. }
  308. // First pass: Set all access logs to queued status
  309. queuePosition := 1
  310. accessLogs := make([]*nginx_log.NginxLogWithIndex, 0)
  311. for _, log := range allLogs {
  312. if log.Type == "error" {
  313. logger.Infof("Skipping indexing for error log: %s", log.Path)
  314. if logFileManager != nil {
  315. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(log.Path, 0, time.Now(), 0, nil, nil); err != nil {
  316. logger.Warnf("Could not reset metadata for skipped error log %s: %v", log.Path, err)
  317. }
  318. }
  319. continue
  320. }
  321. // Set to queued status with position
  322. if persistence != nil {
  323. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusQueued), queuePosition, ""); err != nil {
  324. logger.Errorf("Failed to set queued status for %s: %v", log.Path, err)
  325. }
  326. }
  327. accessLogs = append(accessLogs, log)
  328. queuePosition++
  329. }
  330. // Give the frontend a moment to refresh and show queued status
  331. time.Sleep(2 * time.Second)
  332. startTime := time.Now()
  333. var overallMinTime, overallMaxTime *time.Time
  334. // Second pass: Process each queued log and set to indexing, then indexed
  335. for _, log := range accessLogs {
  336. // Set to indexing status
  337. if persistence != nil {
  338. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusIndexing), 0, ""); err != nil {
  339. logger.Errorf("Failed to set indexing status for %s: %v", log.Path, err)
  340. }
  341. }
  342. loopStartTime := time.Now()
  343. docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(log.Path, progressConfig)
  344. if err != nil {
  345. logger.Warnf("Failed to index file group: %s, error: %v", log.Path, err)
  346. // Set error status
  347. if persistence != nil {
  348. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusError), 0, err.Error()); err != nil {
  349. logger.Errorf("Failed to set error status for %s: %v", log.Path, err)
  350. }
  351. }
  352. } else {
  353. // Track overall time range across all log files
  354. if minTime != nil {
  355. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  356. overallMinTime = minTime
  357. }
  358. }
  359. if maxTime != nil {
  360. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  361. overallMaxTime = maxTime
  362. }
  363. }
  364. if logFileManager != nil {
  365. duration := time.Since(loopStartTime)
  366. // Calculate total document count for the log group
  367. var totalDocCount uint64
  368. for _, docCount := range docsCountMap {
  369. totalDocCount += docCount
  370. }
  371. // Save metadata for the base log path with total count
  372. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(log.Path, totalDocCount, loopStartTime, duration, minTime, maxTime); err != nil {
  373. logger.Errorf("Failed to save index metadata for %s: %v", log.Path, err)
  374. }
  375. // Also save individual file metadata if needed
  376. for path, docCount := range docsCountMap {
  377. if path != log.Path { // Don't duplicate the base path
  378. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, docCount, loopStartTime, duration, minTime, maxTime); err != nil {
  379. logger.Errorf("Failed to save index metadata for %s: %v", path, err)
  380. }
  381. }
  382. }
  383. }
  384. // Set to indexed status
  385. if persistence != nil {
  386. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusIndexed), 0, ""); err != nil {
  387. logger.Errorf("Failed to set indexed status for %s: %v", log.Path, err)
  388. }
  389. }
  390. }
  391. }
  392. totalDuration := time.Since(startTime)
  393. logger.Infof("Successfully completed full modern index rebuild in %s", totalDuration)
  394. if err := modernIndexer.(indexer.FlushableIndexer).FlushAll(); err != nil {
  395. logger.Errorf("Failed to flush all indexer data: %v", err)
  396. }
  397. nginx_log.UpdateSearcherShards()
  398. return overallMinTime, overallMaxTime
  399. }