index_management.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/event"
  8. "github.com/0xJacky/Nginx-UI/internal/nginx_log"
  9. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  10. "github.com/gin-gonic/gin"
  11. "github.com/uozi-tech/cosy"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // RebuildIndex rebuilds the log index asynchronously (all files or specific file)
  15. // The API call returns immediately and the rebuild happens in background
  16. func RebuildIndex(c *gin.Context) {
  17. var request controlStruct
  18. if err := c.ShouldBindJSON(&request); err != nil {
  19. // No JSON body means rebuild all indexes
  20. request.Path = ""
  21. }
  22. // Get modern indexer
  23. modernIndexer := nginx_log.GetModernIndexer()
  24. if modernIndexer == nil {
  25. cosy.ErrHandler(c, nginx_log.ErrModernIndexerNotAvailable)
  26. return
  27. }
  28. // Check if modern indexer is healthy
  29. if !modernIndexer.IsHealthy() {
  30. cosy.ErrHandler(c, fmt.Errorf("modern indexer is not healthy"))
  31. return
  32. }
  33. // Check if indexing is already in progress
  34. processingManager := event.GetProcessingStatusManager()
  35. currentStatus := processingManager.GetCurrentStatus()
  36. if currentStatus.NginxLogIndexing {
  37. cosy.ErrHandler(c, fmt.Errorf("index rebuild is already in progress"))
  38. return
  39. }
  40. // Return immediate response to client
  41. c.JSON(http.StatusOK, gin.H{
  42. "message": "Index rebuild started in background",
  43. "status": "started",
  44. })
  45. // Start async rebuild in goroutine
  46. go func() {
  47. performAsyncRebuild(modernIndexer, request.Path)
  48. }()
  49. }
  50. // performAsyncRebuild performs the actual rebuild logic asynchronously
  51. func performAsyncRebuild(modernIndexer interface{}, path string) {
  52. processingManager := event.GetProcessingStatusManager()
  53. // Notify that indexing has started
  54. processingManager.UpdateNginxLogIndexing(true)
  55. // Ensure we always reset status when done
  56. defer func() {
  57. processingManager.UpdateNginxLogIndexing(false)
  58. if r := recover(); r != nil {
  59. logger.Errorf("Panic during async rebuild: %v", r)
  60. }
  61. }()
  62. // First, destroy all existing indexes to ensure a clean slate
  63. if err := nginx_log.DestroyAllIndexes(); err != nil {
  64. logger.Errorf("Failed to destroy existing indexes before rebuild: %v", err)
  65. return
  66. }
  67. // Re-initialize the indexer to create new, empty shards
  68. if err := modernIndexer.(interface {
  69. Start(context.Context) error
  70. }).Start(context.Background()); err != nil {
  71. logger.Errorf("Failed to re-initialize indexer after destruction: %v", err)
  72. return
  73. }
  74. logFileManager := nginx_log.GetLogFileManager()
  75. // Create progress tracking callbacks
  76. progressConfig := &indexer.ProgressConfig{
  77. NotifyInterval: 2 * time.Second,
  78. OnProgress: func(progress indexer.ProgressNotification) {
  79. // Send progress event to frontend
  80. event.Publish(event.Event{
  81. Type: event.TypeNginxLogIndexProgress,
  82. Data: event.NginxLogIndexProgressData{
  83. LogPath: progress.LogGroupPath,
  84. Progress: progress.Percentage,
  85. Stage: "indexing",
  86. Status: "running",
  87. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  88. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  89. },
  90. })
  91. logger.Infof("Index progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  92. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  93. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  94. },
  95. OnCompletion: func(completion indexer.CompletionNotification) {
  96. // Send completion event to frontend
  97. event.Publish(event.Event{
  98. Type: event.TypeNginxLogIndexComplete,
  99. Data: event.NginxLogIndexCompleteData{
  100. LogPath: completion.LogGroupPath,
  101. Success: completion.Success,
  102. Duration: int64(completion.Duration.Milliseconds()),
  103. TotalLines: completion.TotalLines,
  104. IndexedSize: completion.IndexedSize,
  105. Error: completion.Error,
  106. },
  107. })
  108. logger.Infof("Index completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  109. completion.LogGroupPath, completion.Success, completion.Duration,
  110. completion.TotalLines, completion.IndexedSize)
  111. },
  112. }
  113. // Store the progress config to access from rebuild functions
  114. var globalMinTime, globalMaxTime *time.Time
  115. // Create a wrapper progress config that captures timing information
  116. wrapperProgressConfig := &indexer.ProgressConfig{
  117. NotifyInterval: progressConfig.NotifyInterval,
  118. OnProgress: progressConfig.OnProgress,
  119. OnCompletion: func(completion indexer.CompletionNotification) {
  120. // Call the original completion callback first
  121. if progressConfig.OnCompletion != nil {
  122. progressConfig.OnCompletion(completion)
  123. }
  124. // Send index ready event if indexing was successful with actual time range
  125. if completion.Success {
  126. var startTimeUnix, endTimeUnix int64
  127. // Use global timing if available, otherwise use current time
  128. if globalMinTime != nil {
  129. startTimeUnix = globalMinTime.Unix()
  130. } else {
  131. startTimeUnix = time.Now().Unix()
  132. }
  133. if globalMaxTime != nil {
  134. endTimeUnix = globalMaxTime.Unix()
  135. } else {
  136. endTimeUnix = time.Now().Unix()
  137. }
  138. event.Publish(event.Event{
  139. Type: event.TypeNginxLogIndexReady,
  140. Data: event.NginxLogIndexReadyData{
  141. LogPath: completion.LogGroupPath,
  142. StartTime: startTimeUnix,
  143. EndTime: endTimeUnix,
  144. Available: true,
  145. IndexStatus: "ready",
  146. },
  147. })
  148. }
  149. },
  150. }
  151. if path != "" {
  152. // Rebuild specific file
  153. minTime, maxTime := rebuildSingleFile(modernIndexer, path, logFileManager, wrapperProgressConfig)
  154. globalMinTime, globalMaxTime = minTime, maxTime
  155. } else {
  156. // Rebuild all indexes
  157. minTime, maxTime := rebuildAllFiles(modernIndexer, logFileManager, wrapperProgressConfig)
  158. globalMinTime, globalMaxTime = minTime, maxTime
  159. }
  160. }
  161. // rebuildSingleFile rebuilds index for a single file
  162. func rebuildSingleFile(modernIndexer interface{}, path string, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  163. // For a single file, we need to check its type first
  164. allLogsForTypeCheck := nginx_log.GetAllLogsWithIndexGrouped()
  165. var targetLog *nginx_log.NginxLogWithIndex
  166. for _, log := range allLogsForTypeCheck {
  167. if log.Path == path {
  168. targetLog = log
  169. break
  170. }
  171. }
  172. var minTime, maxTime *time.Time
  173. if targetLog != nil && targetLog.Type == "error" {
  174. logger.Infof("Skipping index rebuild for error log as requested: %s", path)
  175. if logFileManager != nil {
  176. if err := logFileManager.(interface {
  177. SaveIndexMetadata(string, uint64, time.Time, time.Duration, *time.Time, *time.Time) error
  178. }).SaveIndexMetadata(path, 0, time.Now(), 0, nil, nil); err != nil {
  179. logger.Warnf("Could not reset metadata for skipped error log %s: %v", path, err)
  180. }
  181. }
  182. } else {
  183. logger.Infof("Starting modern index rebuild for file: %s", path)
  184. // Clear existing database records for this log group before rebuilding
  185. if logFileManager != nil {
  186. if err := logFileManager.(interface {
  187. DeleteIndexMetadataByGroup(string) error
  188. }).DeleteIndexMetadataByGroup(path); err != nil {
  189. logger.Warnf("Could not clean up existing DB records for log group %s: %v", path, err)
  190. }
  191. }
  192. startTime := time.Now()
  193. docsCountMap, docMinTime, docMaxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(path, progressConfig)
  194. if err != nil {
  195. logger.Errorf("Failed to index modern index for file group %s: %v", path, err)
  196. return nil, nil
  197. }
  198. minTime, maxTime = docMinTime, docMaxTime
  199. duration := time.Since(startTime)
  200. var totalDocsIndexed uint64
  201. if logFileManager != nil {
  202. for filePath, docCount := range docsCountMap {
  203. totalDocsIndexed += docCount
  204. if err := logFileManager.(interface {
  205. SaveIndexMetadata(string, uint64, time.Time, time.Duration, *time.Time, *time.Time) error
  206. }).SaveIndexMetadata(filePath, docCount, startTime, duration, minTime, maxTime); err != nil {
  207. logger.Errorf("Failed to save index metadata for %s: %v", filePath, err)
  208. }
  209. }
  210. }
  211. logger.Infof("Successfully completed modern rebuild for file group: %s, Documents: %d", path, totalDocsIndexed)
  212. }
  213. if err := modernIndexer.(interface {
  214. FlushAll() error
  215. }).FlushAll(); err != nil {
  216. logger.Errorf("Failed to flush all indexer data for single file: %v", err)
  217. }
  218. nginx_log.UpdateSearcherShards()
  219. return minTime, maxTime
  220. }
  221. // rebuildAllFiles rebuilds indexes for all files
  222. func rebuildAllFiles(modernIndexer interface{}, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  223. if logFileManager != nil {
  224. if err := logFileManager.(interface {
  225. DeleteAllIndexMetadata() error
  226. }).DeleteAllIndexMetadata(); err != nil {
  227. logger.Errorf("Could not clean up all old DB records before full rebuild: %v", err)
  228. }
  229. }
  230. logger.Info("Starting full modern index rebuild")
  231. allLogs := nginx_log.GetAllLogsWithIndexGrouped()
  232. startTime := time.Now()
  233. var overallMinTime, overallMaxTime *time.Time
  234. for _, log := range allLogs {
  235. if log.Type == "error" {
  236. logger.Infof("Skipping indexing for error log: %s", log.Path)
  237. if logFileManager != nil {
  238. if err := logFileManager.(interface {
  239. SaveIndexMetadata(string, uint64, time.Time, time.Duration, *time.Time, *time.Time) error
  240. }).SaveIndexMetadata(log.Path, 0, time.Now(), 0, nil, nil); err != nil {
  241. logger.Warnf("Could not reset metadata for skipped error log %s: %v", log.Path, err)
  242. }
  243. }
  244. continue
  245. }
  246. loopStartTime := time.Now()
  247. docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(log.Path, progressConfig)
  248. if err != nil {
  249. logger.Warnf("Failed to index file group, skipping: %s, error: %v", log.Path, err)
  250. } else {
  251. // Track overall time range across all log files
  252. if minTime != nil {
  253. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  254. overallMinTime = minTime
  255. }
  256. }
  257. if maxTime != nil {
  258. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  259. overallMaxTime = maxTime
  260. }
  261. }
  262. if logFileManager != nil {
  263. duration := time.Since(loopStartTime)
  264. for path, docCount := range docsCountMap {
  265. if err := logFileManager.(interface {
  266. SaveIndexMetadata(string, uint64, time.Time, time.Duration, *time.Time, *time.Time) error
  267. }).SaveIndexMetadata(path, docCount, loopStartTime, duration, minTime, maxTime); err != nil {
  268. logger.Errorf("Failed to save index metadata for %s: %v", path, err)
  269. }
  270. }
  271. }
  272. }
  273. }
  274. totalDuration := time.Since(startTime)
  275. logger.Infof("Successfully completed full modern index rebuild in %s", totalDuration)
  276. if err := modernIndexer.(interface {
  277. FlushAll() error
  278. }).FlushAll(); err != nil {
  279. logger.Errorf("Failed to flush all indexer data: %v", err)
  280. }
  281. nginx_log.UpdateSearcherShards()
  282. return overallMinTime, overallMaxTime
  283. }