index_management.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/event"
  9. "github.com/0xJacky/Nginx-UI/internal/nginx_log"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  11. "github.com/gin-gonic/gin"
  12. "github.com/uozi-tech/cosy"
  13. "github.com/uozi-tech/cosy/logger"
  14. )
  15. // rebuildLocks tracks ongoing rebuild operations for specific log groups
  16. var (
  17. rebuildLocks = make(map[string]*sync.Mutex)
  18. rebuildLocksLock sync.RWMutex
  19. )
  20. // acquireRebuildLock gets or creates a mutex for a specific log group
  21. func acquireRebuildLock(logGroupPath string) *sync.Mutex {
  22. rebuildLocksLock.Lock()
  23. defer rebuildLocksLock.Unlock()
  24. if lock, exists := rebuildLocks[logGroupPath]; exists {
  25. return lock
  26. }
  27. lock := &sync.Mutex{}
  28. rebuildLocks[logGroupPath] = lock
  29. return lock
  30. }
  31. // releaseRebuildLock removes the mutex for a specific log group after completion
  32. func releaseRebuildLock(logGroupPath string) {
  33. rebuildLocksLock.Lock()
  34. defer rebuildLocksLock.Unlock()
  35. delete(rebuildLocks, logGroupPath)
  36. }
  37. // isRebuildInProgress checks if a rebuild is currently running for a specific log group
  38. func isRebuildInProgress(logGroupPath string) bool {
  39. rebuildLocksLock.RLock()
  40. defer rebuildLocksLock.RUnlock()
  41. if lock, exists := rebuildLocks[logGroupPath]; exists {
  42. // Try to acquire the lock with a short timeout
  43. // If we can't acquire it, it means rebuild is in progress
  44. if lock.TryLock() {
  45. lock.Unlock()
  46. return false
  47. }
  48. return true
  49. }
  50. return false
  51. }
  52. // RebuildIndex rebuilds the log index asynchronously (all files or specific file)
  53. // The API call returns immediately and the rebuild happens in background
  54. func RebuildIndex(c *gin.Context) {
  55. var request controlStruct
  56. if err := c.ShouldBindJSON(&request); err != nil {
  57. // No JSON body means rebuild all indexes
  58. request.Path = ""
  59. }
  60. // Get modern indexer
  61. modernIndexer := nginx_log.GetIndexer()
  62. if modernIndexer == nil {
  63. cosy.ErrHandler(c, nginx_log.ErrModernIndexerNotAvailable)
  64. return
  65. }
  66. // Check if modern indexer is healthy
  67. if !modernIndexer.IsHealthy() {
  68. cosy.ErrHandler(c, fmt.Errorf("modern indexer is not healthy"))
  69. return
  70. }
  71. // Check if indexing is already in progress
  72. processingManager := event.GetProcessingStatusManager()
  73. currentStatus := processingManager.GetCurrentStatus()
  74. if currentStatus.NginxLogIndexing {
  75. cosy.ErrHandler(c, nginx_log.ErrFailedToRebuildIndex)
  76. return
  77. }
  78. // Check if specific log group rebuild is already in progress using task scheduler
  79. scheduler := nginx_log.GetTaskScheduler()
  80. if request.Path != "" {
  81. if scheduler != nil && scheduler.IsTaskInProgress(request.Path) {
  82. cosy.ErrHandler(c, nginx_log.ErrFailedToRebuildFileIndex)
  83. return
  84. }
  85. // Fallback to local lock check if scheduler not available
  86. if scheduler == nil && isRebuildInProgress(request.Path) {
  87. cosy.ErrHandler(c, nginx_log.ErrFailedToRebuildFileIndex)
  88. return
  89. }
  90. }
  91. // Return immediate response to client
  92. c.JSON(http.StatusOK, IndexRebuildResponse{
  93. Message: "Index rebuild started in background",
  94. Status: "started",
  95. })
  96. // Start async rebuild in goroutine
  97. go func() {
  98. performAsyncRebuild(modernIndexer, request.Path)
  99. }()
  100. }
  101. // performAsyncRebuild performs the actual rebuild logic asynchronously
  102. // For incremental indexing of a specific log group, it preserves existing metadata
  103. // For full rebuilds (path == ""), it clears all metadata first
  104. func performAsyncRebuild(modernIndexer interface{}, path string) {
  105. processingManager := event.GetProcessingStatusManager()
  106. // Notify that indexing has started
  107. processingManager.UpdateNginxLogIndexing(true)
  108. // Create a context for this rebuild operation that can be cancelled
  109. ctx, cancel := context.WithCancel(context.Background())
  110. defer cancel()
  111. // Ensure we always reset status when done
  112. defer func() {
  113. processingManager.UpdateNginxLogIndexing(false)
  114. if r := recover(); r != nil {
  115. logger.Errorf("Panic during async rebuild: %v", r)
  116. }
  117. }()
  118. logFileManager := nginx_log.GetLogFileManager()
  119. // Handle index cleanup based on rebuild scope
  120. if path != "" {
  121. // For single file rebuild, only delete indexes for that log group
  122. if err := modernIndexer.(*indexer.ParallelIndexer).DeleteIndexByLogGroup(path, logFileManager); err != nil {
  123. logger.Errorf("Failed to delete existing indexes for log group %s: %v", path, err)
  124. return
  125. }
  126. logger.Infof("Deleted existing indexes for log group: %s", path)
  127. } else {
  128. // For full rebuild, destroy all existing indexes with context
  129. if err := nginx_log.DestroyAllIndexes(ctx); err != nil {
  130. logger.Errorf("Failed to destroy existing indexes before rebuild: %v", err)
  131. return
  132. }
  133. // Re-initialize the indexer to create new, empty shards
  134. if err := modernIndexer.(indexer.RestartableIndexer).Start(ctx); err != nil {
  135. logger.Errorf("Failed to re-initialize indexer after destruction: %v", err)
  136. return
  137. }
  138. logger.Info("Re-initialized indexer after destruction")
  139. }
  140. // Create progress tracking callbacks
  141. progressConfig := &indexer.ProgressConfig{
  142. NotifyInterval: 1 * time.Second,
  143. OnProgress: func(progress indexer.ProgressNotification) {
  144. // Send progress event to frontend
  145. event.Publish(event.Event{
  146. Type: event.TypeNginxLogIndexProgress,
  147. Data: event.NginxLogIndexProgressData{
  148. LogPath: progress.LogGroupPath,
  149. Progress: progress.Percentage,
  150. Stage: "indexing",
  151. Status: "running",
  152. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  153. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  154. },
  155. })
  156. logger.Infof("Index progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  157. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  158. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  159. },
  160. OnCompletion: func(completion indexer.CompletionNotification) {
  161. // Send completion event to frontend
  162. event.Publish(event.Event{
  163. Type: event.TypeNginxLogIndexComplete,
  164. Data: event.NginxLogIndexCompleteData{
  165. LogPath: completion.LogGroupPath,
  166. Success: completion.Success,
  167. Duration: int64(completion.Duration.Milliseconds()),
  168. TotalLines: completion.TotalLines,
  169. IndexedSize: completion.IndexedSize,
  170. Error: completion.Error,
  171. },
  172. })
  173. logger.Infof("Index completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  174. completion.LogGroupPath, completion.Success, completion.Duration,
  175. completion.TotalLines, completion.IndexedSize)
  176. },
  177. }
  178. // Store the progress config to access from rebuild functions
  179. var globalMinTime, globalMaxTime *time.Time
  180. // Create a wrapper progress config that captures timing information
  181. wrapperProgressConfig := &indexer.ProgressConfig{
  182. NotifyInterval: progressConfig.NotifyInterval,
  183. OnProgress: progressConfig.OnProgress,
  184. OnCompletion: func(completion indexer.CompletionNotification) {
  185. // Call the original completion callback first
  186. if progressConfig.OnCompletion != nil {
  187. progressConfig.OnCompletion(completion)
  188. }
  189. // Send index ready event if indexing was successful with actual time range
  190. if completion.Success {
  191. var startTimeUnix, endTimeUnix int64
  192. // Use global timing if available, otherwise use current time
  193. if globalMinTime != nil {
  194. startTimeUnix = globalMinTime.Unix()
  195. } else {
  196. startTimeUnix = time.Now().Unix()
  197. }
  198. if globalMaxTime != nil {
  199. endTimeUnix = globalMaxTime.Unix()
  200. } else {
  201. endTimeUnix = time.Now().Unix()
  202. }
  203. event.Publish(event.Event{
  204. Type: event.TypeNginxLogIndexReady,
  205. Data: event.NginxLogIndexReadyData{
  206. LogPath: completion.LogGroupPath,
  207. StartTime: startTimeUnix,
  208. EndTime: endTimeUnix,
  209. Available: true,
  210. IndexStatus: "ready",
  211. },
  212. })
  213. }
  214. },
  215. }
  216. if path != "" {
  217. // Rebuild specific file
  218. minTime, maxTime := rebuildSingleFile(modernIndexer, path, logFileManager, wrapperProgressConfig)
  219. globalMinTime, globalMaxTime = minTime, maxTime
  220. } else {
  221. // Rebuild all indexes
  222. minTime, maxTime := rebuildAllFiles(modernIndexer, logFileManager, wrapperProgressConfig)
  223. globalMinTime, globalMaxTime = minTime, maxTime
  224. }
  225. }
  226. // rebuildSingleFile rebuilds index for a single file
  227. func rebuildSingleFile(modernIndexer interface{}, path string, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  228. // Use task scheduler lock if available for unified locking across recovery and manual rebuild
  229. scheduler := nginx_log.GetTaskScheduler()
  230. var unlock func()
  231. if scheduler != nil {
  232. // Use scheduler's unified lock
  233. _, unlock = scheduler.AcquireTaskLock(path)
  234. defer unlock()
  235. } else {
  236. // Fallback: Acquire local lock for this specific log group
  237. lock := acquireRebuildLock(path)
  238. lock.Lock()
  239. defer func() {
  240. lock.Unlock()
  241. releaseRebuildLock(path)
  242. }()
  243. }
  244. // For a single file, we need to check its type first
  245. allLogsForTypeCheck := nginx_log.GetAllLogsWithIndexGrouped()
  246. var targetLog *nginx_log.NginxLogWithIndex
  247. for _, log := range allLogsForTypeCheck {
  248. if log.Path == path {
  249. targetLog = log
  250. break
  251. }
  252. }
  253. var minTime, maxTime *time.Time
  254. if targetLog != nil && targetLog.Type == "error" {
  255. logger.Infof("Skipping index rebuild for error log as requested: %s", path)
  256. if logFileManager != nil {
  257. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, 0, time.Now(), 0, nil, nil); err != nil {
  258. logger.Warnf("Could not reset metadata for skipped error log %s: %v", path, err)
  259. }
  260. }
  261. } else {
  262. logger.Infof("Starting modern index rebuild for file: %s", path)
  263. // NOTE: We intentionally do NOT delete existing index metadata here
  264. // This allows incremental indexing to work properly with rotated logs
  265. // The IndexLogGroupWithProgress method will handle updating existing records
  266. startTime := time.Now()
  267. docsCountMap, docMinTime, docMaxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(path, progressConfig)
  268. if err != nil {
  269. logger.Errorf("Failed to index modern index for file group %s: %v", path, err)
  270. return nil, nil
  271. }
  272. minTime, maxTime = docMinTime, docMaxTime
  273. duration := time.Since(startTime)
  274. var totalDocsIndexed uint64
  275. if logFileManager != nil {
  276. // Calculate total document count
  277. for _, docCount := range docsCountMap {
  278. totalDocsIndexed += docCount
  279. }
  280. // Save metadata for the base log path with total count
  281. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
  282. logger.Errorf("Failed to save index metadata for %s: %v", path, err)
  283. }
  284. // Also save individual file metadata if needed
  285. for filePath, docCount := range docsCountMap {
  286. if filePath != path { // Don't duplicate the base path
  287. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(filePath, docCount, startTime, duration, minTime, maxTime); err != nil {
  288. logger.Errorf("Failed to save index metadata for %s: %v", filePath, err)
  289. }
  290. }
  291. }
  292. }
  293. logger.Infof("Successfully completed modern rebuild for file group: %s, Documents: %d", path, totalDocsIndexed)
  294. }
  295. if err := modernIndexer.(indexer.FlushableIndexer).FlushAll(); err != nil {
  296. logger.Errorf("Failed to flush all indexer data for single file: %v", err)
  297. }
  298. nginx_log.UpdateSearcherShards()
  299. return minTime, maxTime
  300. }
  301. // rebuildAllFiles rebuilds indexes for all files with proper queue management
  302. func rebuildAllFiles(modernIndexer interface{}, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  303. // For full rebuild, we use a special global lock key
  304. globalLockKey := "__GLOBAL_REBUILD__"
  305. // Use task scheduler lock if available for unified locking
  306. scheduler := nginx_log.GetTaskScheduler()
  307. var unlock func()
  308. if scheduler != nil {
  309. // Use scheduler's unified lock
  310. _, unlock = scheduler.AcquireTaskLock(globalLockKey)
  311. defer unlock()
  312. } else {
  313. // Fallback: Acquire local lock
  314. lock := acquireRebuildLock(globalLockKey)
  315. lock.Lock()
  316. defer func() {
  317. lock.Unlock()
  318. releaseRebuildLock(globalLockKey)
  319. }()
  320. }
  321. // For full rebuild, we clear ALL existing metadata to start fresh
  322. // This is different from single file/group rebuild which preserves metadata for incremental indexing
  323. if logFileManager != nil {
  324. if err := logFileManager.(indexer.MetadataManager).DeleteAllIndexMetadata(); err != nil {
  325. logger.Errorf("Could not clean up all old DB records before full rebuild: %v", err)
  326. }
  327. }
  328. logger.Info("Starting full modern index rebuild with queue management")
  329. allLogs := nginx_log.GetAllLogsWithIndexGrouped()
  330. // Get persistence manager for queue management
  331. var persistence *indexer.PersistenceManager
  332. if lfm, ok := logFileManager.(*indexer.LogFileManager); ok {
  333. persistence = lfm.GetPersistence()
  334. }
  335. // First pass: Set all access logs to queued status
  336. queuePosition := 1
  337. accessLogs := make([]*nginx_log.NginxLogWithIndex, 0)
  338. for _, log := range allLogs {
  339. if log.Type == "error" {
  340. logger.Infof("Skipping indexing for error log: %s", log.Path)
  341. if logFileManager != nil {
  342. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(log.Path, 0, time.Now(), 0, nil, nil); err != nil {
  343. logger.Warnf("Could not reset metadata for skipped error log %s: %v", log.Path, err)
  344. }
  345. }
  346. continue
  347. }
  348. // Set to queued status with position
  349. if persistence != nil {
  350. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusQueued), queuePosition, ""); err != nil {
  351. logger.Errorf("Failed to set queued status for %s: %v", log.Path, err)
  352. }
  353. }
  354. accessLogs = append(accessLogs, log)
  355. queuePosition++
  356. }
  357. // Give the frontend a moment to refresh and show queued status
  358. time.Sleep(2 * time.Second)
  359. startTime := time.Now()
  360. var overallMinTime, overallMaxTime *time.Time
  361. var timeMu sync.Mutex
  362. // Second pass: Process queued logs in parallel with controlled concurrency
  363. var wg sync.WaitGroup
  364. // Get concurrency from indexer config (FileGroupConcurrency controls both file and group level parallelism)
  365. maxConcurrency := 4 // Default fallback
  366. if pi, ok := modernIndexer.(*indexer.ParallelIndexer); ok {
  367. config := pi.GetConfig()
  368. if config.FileGroupConcurrency > 0 {
  369. maxConcurrency = config.FileGroupConcurrency
  370. }
  371. }
  372. semaphore := make(chan struct{}, maxConcurrency)
  373. logger.Infof("Processing %d log groups in parallel with concurrency=%d", len(accessLogs), maxConcurrency)
  374. for _, log := range accessLogs {
  375. wg.Add(1)
  376. go func(logItem *nginx_log.NginxLogWithIndex) {
  377. defer wg.Done()
  378. // Acquire semaphore for controlled concurrency
  379. semaphore <- struct{}{}
  380. defer func() { <-semaphore }()
  381. // Set to indexing status
  382. if persistence != nil {
  383. if err := persistence.SetIndexStatus(logItem.Path, string(indexer.IndexStatusIndexing), 0, ""); err != nil {
  384. logger.Errorf("Failed to set indexing status for %s: %v", logItem.Path, err)
  385. }
  386. }
  387. loopStartTime := time.Now()
  388. docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(logItem.Path, progressConfig)
  389. if err != nil {
  390. logger.Warnf("Failed to index file group: %s, error: %v", logItem.Path, err)
  391. // Set error status
  392. if persistence != nil {
  393. if err := persistence.SetIndexStatus(logItem.Path, string(indexer.IndexStatusError), 0, err.Error()); err != nil {
  394. logger.Errorf("Failed to set error status for %s: %v", logItem.Path, err)
  395. }
  396. }
  397. } else {
  398. // Track overall time range across all log files (thread-safe)
  399. timeMu.Lock()
  400. if minTime != nil {
  401. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  402. overallMinTime = minTime
  403. }
  404. }
  405. if maxTime != nil {
  406. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  407. overallMaxTime = maxTime
  408. }
  409. }
  410. timeMu.Unlock()
  411. if logFileManager != nil {
  412. duration := time.Since(loopStartTime)
  413. // Calculate total document count for the log group
  414. var totalDocCount uint64
  415. for _, docCount := range docsCountMap {
  416. totalDocCount += docCount
  417. }
  418. // Save metadata for the base log path with total count
  419. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(logItem.Path, totalDocCount, loopStartTime, duration, minTime, maxTime); err != nil {
  420. logger.Errorf("Failed to save index metadata for %s: %v", logItem.Path, err)
  421. }
  422. // Also save individual file metadata if needed
  423. for path, docCount := range docsCountMap {
  424. if path != logItem.Path { // Don't duplicate the base path
  425. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, docCount, loopStartTime, duration, minTime, maxTime); err != nil {
  426. logger.Errorf("Failed to save index metadata for %s: %v", path, err)
  427. }
  428. }
  429. }
  430. }
  431. // Set to indexed status
  432. if persistence != nil {
  433. if err := persistence.SetIndexStatus(logItem.Path, string(indexer.IndexStatusIndexed), 0, ""); err != nil {
  434. logger.Errorf("Failed to set indexed status for %s: %v", logItem.Path, err)
  435. }
  436. }
  437. }
  438. }(log)
  439. }
  440. // Wait for all log groups to complete
  441. wg.Wait()
  442. totalDuration := time.Since(startTime)
  443. logger.Infof("Successfully completed full modern index rebuild in %s", totalDuration)
  444. if err := modernIndexer.(indexer.FlushableIndexer).FlushAll(); err != nil {
  445. logger.Errorf("Failed to flush all indexer data: %v", err)
  446. }
  447. nginx_log.UpdateSearcherShards()
  448. return overallMinTime, overallMaxTime
  449. }