index_management.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/event"
  9. "github.com/0xJacky/Nginx-UI/internal/nginx_log"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  11. "github.com/gin-gonic/gin"
  12. "github.com/uozi-tech/cosy"
  13. "github.com/uozi-tech/cosy/logger"
  14. )
  15. // rebuildLocks tracks ongoing rebuild operations for specific log groups
  16. var (
  17. rebuildLocks = make(map[string]*sync.Mutex)
  18. rebuildLocksLock sync.RWMutex
  19. )
  20. // acquireRebuildLock gets or creates a mutex for a specific log group
  21. func acquireRebuildLock(logGroupPath string) *sync.Mutex {
  22. rebuildLocksLock.Lock()
  23. defer rebuildLocksLock.Unlock()
  24. if lock, exists := rebuildLocks[logGroupPath]; exists {
  25. return lock
  26. }
  27. lock := &sync.Mutex{}
  28. rebuildLocks[logGroupPath] = lock
  29. return lock
  30. }
  31. // releaseRebuildLock removes the mutex for a specific log group after completion
  32. func releaseRebuildLock(logGroupPath string) {
  33. rebuildLocksLock.Lock()
  34. defer rebuildLocksLock.Unlock()
  35. delete(rebuildLocks, logGroupPath)
  36. }
  37. // isRebuildInProgress checks if a rebuild is currently running for a specific log group
  38. func isRebuildInProgress(logGroupPath string) bool {
  39. rebuildLocksLock.RLock()
  40. defer rebuildLocksLock.RUnlock()
  41. if lock, exists := rebuildLocks[logGroupPath]; exists {
  42. // Try to acquire the lock with a short timeout
  43. // If we can't acquire it, it means rebuild is in progress
  44. if lock.TryLock() {
  45. lock.Unlock()
  46. return false
  47. }
  48. return true
  49. }
  50. return false
  51. }
  52. // RebuildIndex rebuilds the log index asynchronously (all files or specific file)
  53. // The API call returns immediately and the rebuild happens in background
  54. func RebuildIndex(c *gin.Context) {
  55. var request controlStruct
  56. if err := c.ShouldBindJSON(&request); err != nil {
  57. // No JSON body means rebuild all indexes
  58. request.Path = ""
  59. }
  60. // Get modern indexer
  61. modernIndexer := nginx_log.GetModernIndexer()
  62. if modernIndexer == nil {
  63. cosy.ErrHandler(c, nginx_log.ErrModernIndexerNotAvailable)
  64. return
  65. }
  66. // Check if modern indexer is healthy
  67. if !modernIndexer.IsHealthy() {
  68. cosy.ErrHandler(c, fmt.Errorf("modern indexer is not healthy"))
  69. return
  70. }
  71. // Check if indexing is already in progress
  72. processingManager := event.GetProcessingStatusManager()
  73. currentStatus := processingManager.GetCurrentStatus()
  74. if currentStatus.NginxLogIndexing {
  75. cosy.ErrHandler(c, nginx_log.ErrFailedToRebuildIndex)
  76. return
  77. }
  78. // Check if specific log group rebuild is already in progress
  79. if request.Path != "" && isRebuildInProgress(request.Path) {
  80. cosy.ErrHandler(c, nginx_log.ErrFailedToRebuildFileIndex)
  81. return
  82. }
  83. // Return immediate response to client
  84. c.JSON(http.StatusOK, IndexRebuildResponse{
  85. Message: "Index rebuild started in background",
  86. Status: "started",
  87. })
  88. // Start async rebuild in goroutine
  89. go func() {
  90. performAsyncRebuild(modernIndexer, request.Path)
  91. }()
  92. }
  93. // performAsyncRebuild performs the actual rebuild logic asynchronously
  94. func performAsyncRebuild(modernIndexer interface{}, path string) {
  95. processingManager := event.GetProcessingStatusManager()
  96. // Notify that indexing has started
  97. processingManager.UpdateNginxLogIndexing(true)
  98. // Create a context for this rebuild operation that can be cancelled
  99. ctx, cancel := context.WithCancel(context.Background())
  100. defer cancel()
  101. // Ensure we always reset status when done
  102. defer func() {
  103. processingManager.UpdateNginxLogIndexing(false)
  104. if r := recover(); r != nil {
  105. logger.Errorf("Panic during async rebuild: %v", r)
  106. }
  107. }()
  108. logFileManager := nginx_log.GetLogFileManager()
  109. // Handle index cleanup based on rebuild scope
  110. if path != "" {
  111. // For single file rebuild, only delete indexes for that log group
  112. if err := modernIndexer.(*indexer.ParallelIndexer).DeleteIndexByLogGroup(path, logFileManager); err != nil {
  113. logger.Errorf("Failed to delete existing indexes for log group %s: %v", path, err)
  114. return
  115. }
  116. logger.Infof("Deleted existing indexes for log group: %s", path)
  117. } else {
  118. // For full rebuild, destroy all existing indexes with context
  119. if err := nginx_log.DestroyAllIndexes(ctx); err != nil {
  120. logger.Errorf("Failed to destroy existing indexes before rebuild: %v", err)
  121. return
  122. }
  123. // Re-initialize the indexer to create new, empty shards
  124. if err := modernIndexer.(indexer.RestartableIndexer).Start(ctx); err != nil {
  125. logger.Errorf("Failed to re-initialize indexer after destruction: %v", err)
  126. return
  127. }
  128. logger.Info("Re-initialized indexer after destruction")
  129. }
  130. // Create progress tracking callbacks
  131. progressConfig := &indexer.ProgressConfig{
  132. NotifyInterval: 1 * time.Second,
  133. OnProgress: func(progress indexer.ProgressNotification) {
  134. // Send progress event to frontend
  135. event.Publish(event.Event{
  136. Type: event.TypeNginxLogIndexProgress,
  137. Data: event.NginxLogIndexProgressData{
  138. LogPath: progress.LogGroupPath,
  139. Progress: progress.Percentage,
  140. Stage: "indexing",
  141. Status: "running",
  142. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  143. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  144. },
  145. })
  146. logger.Infof("Index progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  147. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  148. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  149. },
  150. OnCompletion: func(completion indexer.CompletionNotification) {
  151. // Send completion event to frontend
  152. event.Publish(event.Event{
  153. Type: event.TypeNginxLogIndexComplete,
  154. Data: event.NginxLogIndexCompleteData{
  155. LogPath: completion.LogGroupPath,
  156. Success: completion.Success,
  157. Duration: int64(completion.Duration.Milliseconds()),
  158. TotalLines: completion.TotalLines,
  159. IndexedSize: completion.IndexedSize,
  160. Error: completion.Error,
  161. },
  162. })
  163. logger.Infof("Index completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  164. completion.LogGroupPath, completion.Success, completion.Duration,
  165. completion.TotalLines, completion.IndexedSize)
  166. },
  167. }
  168. // Store the progress config to access from rebuild functions
  169. var globalMinTime, globalMaxTime *time.Time
  170. // Create a wrapper progress config that captures timing information
  171. wrapperProgressConfig := &indexer.ProgressConfig{
  172. NotifyInterval: progressConfig.NotifyInterval,
  173. OnProgress: progressConfig.OnProgress,
  174. OnCompletion: func(completion indexer.CompletionNotification) {
  175. // Call the original completion callback first
  176. if progressConfig.OnCompletion != nil {
  177. progressConfig.OnCompletion(completion)
  178. }
  179. // Send index ready event if indexing was successful with actual time range
  180. if completion.Success {
  181. var startTimeUnix, endTimeUnix int64
  182. // Use global timing if available, otherwise use current time
  183. if globalMinTime != nil {
  184. startTimeUnix = globalMinTime.Unix()
  185. } else {
  186. startTimeUnix = time.Now().Unix()
  187. }
  188. if globalMaxTime != nil {
  189. endTimeUnix = globalMaxTime.Unix()
  190. } else {
  191. endTimeUnix = time.Now().Unix()
  192. }
  193. event.Publish(event.Event{
  194. Type: event.TypeNginxLogIndexReady,
  195. Data: event.NginxLogIndexReadyData{
  196. LogPath: completion.LogGroupPath,
  197. StartTime: startTimeUnix,
  198. EndTime: endTimeUnix,
  199. Available: true,
  200. IndexStatus: "ready",
  201. },
  202. })
  203. }
  204. },
  205. }
  206. if path != "" {
  207. // Rebuild specific file
  208. minTime, maxTime := rebuildSingleFile(modernIndexer, path, logFileManager, wrapperProgressConfig)
  209. globalMinTime, globalMaxTime = minTime, maxTime
  210. } else {
  211. // Rebuild all indexes
  212. minTime, maxTime := rebuildAllFiles(modernIndexer, logFileManager, wrapperProgressConfig)
  213. globalMinTime, globalMaxTime = minTime, maxTime
  214. }
  215. }
  216. // rebuildSingleFile rebuilds index for a single file
  217. func rebuildSingleFile(modernIndexer interface{}, path string, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  218. // Acquire lock for this specific log group
  219. lock := acquireRebuildLock(path)
  220. lock.Lock()
  221. defer func() {
  222. lock.Unlock()
  223. releaseRebuildLock(path)
  224. }()
  225. // For a single file, we need to check its type first
  226. allLogsForTypeCheck := nginx_log.GetAllLogsWithIndexGrouped()
  227. var targetLog *nginx_log.NginxLogWithIndex
  228. for _, log := range allLogsForTypeCheck {
  229. if log.Path == path {
  230. targetLog = log
  231. break
  232. }
  233. }
  234. var minTime, maxTime *time.Time
  235. if targetLog != nil && targetLog.Type == "error" {
  236. logger.Infof("Skipping index rebuild for error log as requested: %s", path)
  237. if logFileManager != nil {
  238. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, 0, time.Now(), 0, nil, nil); err != nil {
  239. logger.Warnf("Could not reset metadata for skipped error log %s: %v", path, err)
  240. }
  241. }
  242. } else {
  243. logger.Infof("Starting modern index rebuild for file: %s", path)
  244. // Clear existing database records for this log group before rebuilding
  245. if logFileManager != nil {
  246. if err := logFileManager.(indexer.MetadataManager).DeleteIndexMetadataByGroup(path); err != nil {
  247. logger.Warnf("Could not clean up existing DB records for log group %s: %v", path, err)
  248. }
  249. }
  250. startTime := time.Now()
  251. docsCountMap, docMinTime, docMaxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(path, progressConfig)
  252. if err != nil {
  253. logger.Errorf("Failed to index modern index for file group %s: %v", path, err)
  254. return nil, nil
  255. }
  256. minTime, maxTime = docMinTime, docMaxTime
  257. duration := time.Since(startTime)
  258. var totalDocsIndexed uint64
  259. if logFileManager != nil {
  260. // Calculate total document count
  261. for _, docCount := range docsCountMap {
  262. totalDocsIndexed += docCount
  263. }
  264. // Save metadata for the base log path with total count
  265. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
  266. logger.Errorf("Failed to save index metadata for %s: %v", path, err)
  267. }
  268. // Also save individual file metadata if needed
  269. for filePath, docCount := range docsCountMap {
  270. if filePath != path { // Don't duplicate the base path
  271. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(filePath, docCount, startTime, duration, minTime, maxTime); err != nil {
  272. logger.Errorf("Failed to save index metadata for %s: %v", filePath, err)
  273. }
  274. }
  275. }
  276. }
  277. logger.Infof("Successfully completed modern rebuild for file group: %s, Documents: %d", path, totalDocsIndexed)
  278. }
  279. if err := modernIndexer.(indexer.FlushableIndexer).FlushAll(); err != nil {
  280. logger.Errorf("Failed to flush all indexer data for single file: %v", err)
  281. }
  282. nginx_log.UpdateSearcherShards()
  283. return minTime, maxTime
  284. }
  285. // rebuildAllFiles rebuilds indexes for all files with proper queue management
  286. func rebuildAllFiles(modernIndexer interface{}, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  287. // For full rebuild, we use a special global lock key
  288. globalLockKey := "__GLOBAL_REBUILD__"
  289. lock := acquireRebuildLock(globalLockKey)
  290. lock.Lock()
  291. defer func() {
  292. lock.Unlock()
  293. releaseRebuildLock(globalLockKey)
  294. }()
  295. if logFileManager != nil {
  296. if err := logFileManager.(indexer.MetadataManager).DeleteAllIndexMetadata(); err != nil {
  297. logger.Errorf("Could not clean up all old DB records before full rebuild: %v", err)
  298. }
  299. }
  300. logger.Info("Starting full modern index rebuild with queue management")
  301. allLogs := nginx_log.GetAllLogsWithIndexGrouped()
  302. // Get persistence manager for queue management
  303. var persistence *indexer.PersistenceManager
  304. if lfm, ok := logFileManager.(*indexer.LogFileManager); ok {
  305. persistence = lfm.GetPersistence()
  306. }
  307. // First pass: Set all access logs to queued status
  308. queuePosition := 1
  309. accessLogs := make([]*nginx_log.NginxLogWithIndex, 0)
  310. for _, log := range allLogs {
  311. if log.Type == "error" {
  312. logger.Infof("Skipping indexing for error log: %s", log.Path)
  313. if logFileManager != nil {
  314. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(log.Path, 0, time.Now(), 0, nil, nil); err != nil {
  315. logger.Warnf("Could not reset metadata for skipped error log %s: %v", log.Path, err)
  316. }
  317. }
  318. continue
  319. }
  320. // Set to queued status with position
  321. if persistence != nil {
  322. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusQueued), queuePosition, ""); err != nil {
  323. logger.Errorf("Failed to set queued status for %s: %v", log.Path, err)
  324. }
  325. }
  326. accessLogs = append(accessLogs, log)
  327. queuePosition++
  328. }
  329. // Give the frontend a moment to refresh and show queued status
  330. time.Sleep(2 * time.Second)
  331. startTime := time.Now()
  332. var overallMinTime, overallMaxTime *time.Time
  333. // Second pass: Process each queued log and set to indexing, then indexed
  334. for _, log := range accessLogs {
  335. // Set to indexing status
  336. if persistence != nil {
  337. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusIndexing), 0, ""); err != nil {
  338. logger.Errorf("Failed to set indexing status for %s: %v", log.Path, err)
  339. }
  340. }
  341. loopStartTime := time.Now()
  342. docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(log.Path, progressConfig)
  343. if err != nil {
  344. logger.Warnf("Failed to index file group: %s, error: %v", log.Path, err)
  345. // Set error status
  346. if persistence != nil {
  347. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusError), 0, err.Error()); err != nil {
  348. logger.Errorf("Failed to set error status for %s: %v", log.Path, err)
  349. }
  350. }
  351. } else {
  352. // Track overall time range across all log files
  353. if minTime != nil {
  354. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  355. overallMinTime = minTime
  356. }
  357. }
  358. if maxTime != nil {
  359. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  360. overallMaxTime = maxTime
  361. }
  362. }
  363. if logFileManager != nil {
  364. duration := time.Since(loopStartTime)
  365. // Calculate total document count for the log group
  366. var totalDocCount uint64
  367. for _, docCount := range docsCountMap {
  368. totalDocCount += docCount
  369. }
  370. // Save metadata for the base log path with total count
  371. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(log.Path, totalDocCount, loopStartTime, duration, minTime, maxTime); err != nil {
  372. logger.Errorf("Failed to save index metadata for %s: %v", log.Path, err)
  373. }
  374. // Also save individual file metadata if needed
  375. for path, docCount := range docsCountMap {
  376. if path != log.Path { // Don't duplicate the base path
  377. if err := logFileManager.(indexer.MetadataManager).SaveIndexMetadata(path, docCount, loopStartTime, duration, minTime, maxTime); err != nil {
  378. logger.Errorf("Failed to save index metadata for %s: %v", path, err)
  379. }
  380. }
  381. }
  382. }
  383. // Set to indexed status
  384. if persistence != nil {
  385. if err := persistence.SetIndexStatus(log.Path, string(indexer.IndexStatusIndexed), 0, ""); err != nil {
  386. logger.Errorf("Failed to set indexed status for %s: %v", log.Path, err)
  387. }
  388. }
  389. }
  390. }
  391. totalDuration := time.Since(startTime)
  392. logger.Infof("Successfully completed full modern index rebuild in %s", totalDuration)
  393. if err := modernIndexer.(indexer.FlushableIndexer).FlushAll(); err != nil {
  394. logger.Errorf("Failed to flush all indexer data: %v", err)
  395. }
  396. nginx_log.UpdateSearcherShards()
  397. return overallMinTime, overallMaxTime
  398. }