index_management.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/event"
  8. "github.com/0xJacky/Nginx-UI/internal/nginx_log"
  9. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  10. "github.com/gin-gonic/gin"
  11. "github.com/uozi-tech/cosy"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // RebuildIndex rebuilds the log index asynchronously (all files or specific file)
  15. // The API call returns immediately and the rebuild happens in background
  16. func RebuildIndex(c *gin.Context) {
  17. var request controlStruct
  18. if err := c.ShouldBindJSON(&request); err != nil {
  19. // No JSON body means rebuild all indexes
  20. request.Path = ""
  21. }
  22. // Get modern indexer
  23. modernIndexer := nginx_log.GetModernIndexer()
  24. if modernIndexer == nil {
  25. cosy.ErrHandler(c, nginx_log.ErrModernIndexerNotAvailable)
  26. return
  27. }
  28. // Check if modern indexer is healthy
  29. if !modernIndexer.IsHealthy() {
  30. cosy.ErrHandler(c, fmt.Errorf("modern indexer is not healthy"))
  31. return
  32. }
  33. // Check if indexing is already in progress
  34. processingManager := event.GetProcessingStatusManager()
  35. currentStatus := processingManager.GetCurrentStatus()
  36. if currentStatus.NginxLogIndexing {
  37. cosy.ErrHandler(c, fmt.Errorf("index rebuild is already in progress"))
  38. return
  39. }
  40. // Return immediate response to client
  41. c.JSON(http.StatusOK, gin.H{
  42. "message": "Index rebuild started in background",
  43. "status": "started",
  44. })
  45. // Start async rebuild in goroutine
  46. go func() {
  47. performAsyncRebuild(modernIndexer, request.Path)
  48. }()
  49. }
  50. // performAsyncRebuild performs the actual rebuild logic asynchronously
  51. func performAsyncRebuild(modernIndexer interface{}, path string) {
  52. processingManager := event.GetProcessingStatusManager()
  53. // Notify that indexing has started
  54. processingManager.UpdateNginxLogIndexing(true)
  55. // Create a context for this rebuild operation that can be cancelled
  56. ctx, cancel := context.WithCancel(context.Background())
  57. defer cancel()
  58. // Ensure we always reset status when done
  59. defer func() {
  60. processingManager.UpdateNginxLogIndexing(false)
  61. if r := recover(); r != nil {
  62. logger.Errorf("Panic during async rebuild: %v", r)
  63. }
  64. }()
  65. logFileManager := nginx_log.GetLogFileManager()
  66. // Handle index cleanup based on rebuild scope
  67. if path != "" {
  68. // For single file rebuild, only delete indexes for that log group
  69. if err := modernIndexer.(*indexer.ParallelIndexer).DeleteIndexByLogGroup(path, logFileManager); err != nil {
  70. logger.Errorf("Failed to delete existing indexes for log group %s: %v", path, err)
  71. return
  72. }
  73. logger.Infof("Deleted existing indexes for log group: %s", path)
  74. } else {
  75. // For full rebuild, destroy all existing indexes with context
  76. if err := nginx_log.DestroyAllIndexes(ctx); err != nil {
  77. logger.Errorf("Failed to destroy existing indexes before rebuild: %v", err)
  78. return
  79. }
  80. // Re-initialize the indexer to create new, empty shards
  81. if err := modernIndexer.(interface {
  82. Start(context.Context) error
  83. }).Start(ctx); err != nil {
  84. logger.Errorf("Failed to re-initialize indexer after destruction: %v", err)
  85. return
  86. }
  87. logger.Info("Re-initialized indexer after destruction")
  88. }
  89. // Create progress tracking callbacks
  90. progressConfig := &indexer.ProgressConfig{
  91. NotifyInterval: 1 * time.Second,
  92. OnProgress: func(progress indexer.ProgressNotification) {
  93. // Send progress event to frontend
  94. event.Publish(event.Event{
  95. Type: event.TypeNginxLogIndexProgress,
  96. Data: event.NginxLogIndexProgressData{
  97. LogPath: progress.LogGroupPath,
  98. Progress: progress.Percentage,
  99. Stage: "indexing",
  100. Status: "running",
  101. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  102. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  103. },
  104. })
  105. logger.Infof("Index progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  106. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  107. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  108. },
  109. OnCompletion: func(completion indexer.CompletionNotification) {
  110. // Send completion event to frontend
  111. event.Publish(event.Event{
  112. Type: event.TypeNginxLogIndexComplete,
  113. Data: event.NginxLogIndexCompleteData{
  114. LogPath: completion.LogGroupPath,
  115. Success: completion.Success,
  116. Duration: int64(completion.Duration.Milliseconds()),
  117. TotalLines: completion.TotalLines,
  118. IndexedSize: completion.IndexedSize,
  119. Error: completion.Error,
  120. },
  121. })
  122. logger.Infof("Index completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  123. completion.LogGroupPath, completion.Success, completion.Duration,
  124. completion.TotalLines, completion.IndexedSize)
  125. },
  126. }
  127. // Store the progress config to access from rebuild functions
  128. var globalMinTime, globalMaxTime *time.Time
  129. // Create a wrapper progress config that captures timing information
  130. wrapperProgressConfig := &indexer.ProgressConfig{
  131. NotifyInterval: progressConfig.NotifyInterval,
  132. OnProgress: progressConfig.OnProgress,
  133. OnCompletion: func(completion indexer.CompletionNotification) {
  134. // Call the original completion callback first
  135. if progressConfig.OnCompletion != nil {
  136. progressConfig.OnCompletion(completion)
  137. }
  138. // Send index ready event if indexing was successful with actual time range
  139. if completion.Success {
  140. var startTimeUnix, endTimeUnix int64
  141. // Use global timing if available, otherwise use current time
  142. if globalMinTime != nil {
  143. startTimeUnix = globalMinTime.Unix()
  144. } else {
  145. startTimeUnix = time.Now().Unix()
  146. }
  147. if globalMaxTime != nil {
  148. endTimeUnix = globalMaxTime.Unix()
  149. } else {
  150. endTimeUnix = time.Now().Unix()
  151. }
  152. event.Publish(event.Event{
  153. Type: event.TypeNginxLogIndexReady,
  154. Data: event.NginxLogIndexReadyData{
  155. LogPath: completion.LogGroupPath,
  156. StartTime: startTimeUnix,
  157. EndTime: endTimeUnix,
  158. Available: true,
  159. IndexStatus: "ready",
  160. },
  161. })
  162. }
  163. },
  164. }
  165. if path != "" {
  166. // Rebuild specific file
  167. minTime, maxTime := rebuildSingleFile(modernIndexer, path, logFileManager, wrapperProgressConfig)
  168. globalMinTime, globalMaxTime = minTime, maxTime
  169. } else {
  170. // Rebuild all indexes
  171. minTime, maxTime := rebuildAllFiles(modernIndexer, logFileManager, wrapperProgressConfig)
  172. globalMinTime, globalMaxTime = minTime, maxTime
  173. }
  174. }
  175. // rebuildSingleFile rebuilds index for a single file
  176. func rebuildSingleFile(modernIndexer interface{}, path string, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  177. // For a single file, we need to check its type first
  178. allLogsForTypeCheck := nginx_log.GetAllLogsWithIndexGrouped()
  179. var targetLog *nginx_log.NginxLogWithIndex
  180. for _, log := range allLogsForTypeCheck {
  181. if log.Path == path {
  182. targetLog = log
  183. break
  184. }
  185. }
  186. var minTime, maxTime *time.Time
  187. if targetLog != nil && targetLog.Type == "error" {
  188. logger.Infof("Skipping index rebuild for error log as requested: %s", path)
  189. if logFileManager != nil {
  190. if err := logFileManager.(interface {
  191. SaveIndexMetadata(string, uint64, time.Time, time.Duration, *time.Time, *time.Time) error
  192. }).SaveIndexMetadata(path, 0, time.Now(), 0, nil, nil); err != nil {
  193. logger.Warnf("Could not reset metadata for skipped error log %s: %v", path, err)
  194. }
  195. }
  196. } else {
  197. logger.Infof("Starting modern index rebuild for file: %s", path)
  198. // Clear existing database records for this log group before rebuilding
  199. if logFileManager != nil {
  200. if err := logFileManager.(interface {
  201. DeleteIndexMetadataByGroup(string) error
  202. }).DeleteIndexMetadataByGroup(path); err != nil {
  203. logger.Warnf("Could not clean up existing DB records for log group %s: %v", path, err)
  204. }
  205. }
  206. startTime := time.Now()
  207. docsCountMap, docMinTime, docMaxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(path, progressConfig)
  208. if err != nil {
  209. logger.Errorf("Failed to index modern index for file group %s: %v", path, err)
  210. return nil, nil
  211. }
  212. minTime, maxTime = docMinTime, docMaxTime
  213. duration := time.Since(startTime)
  214. var totalDocsIndexed uint64
  215. if logFileManager != nil {
  216. for filePath, docCount := range docsCountMap {
  217. totalDocsIndexed += docCount
  218. if err := logFileManager.(interface {
  219. SaveIndexMetadata(string, uint64, time.Time, time.Duration, *time.Time, *time.Time) error
  220. }).SaveIndexMetadata(filePath, docCount, startTime, duration, minTime, maxTime); err != nil {
  221. logger.Errorf("Failed to save index metadata for %s: %v", filePath, err)
  222. }
  223. }
  224. }
  225. logger.Infof("Successfully completed modern rebuild for file group: %s, Documents: %d", path, totalDocsIndexed)
  226. }
  227. if err := modernIndexer.(interface {
  228. FlushAll() error
  229. }).FlushAll(); err != nil {
  230. logger.Errorf("Failed to flush all indexer data for single file: %v", err)
  231. }
  232. nginx_log.UpdateSearcherShards()
  233. return minTime, maxTime
  234. }
  235. // rebuildAllFiles rebuilds indexes for all files
  236. func rebuildAllFiles(modernIndexer interface{}, logFileManager interface{}, progressConfig *indexer.ProgressConfig) (*time.Time, *time.Time) {
  237. if logFileManager != nil {
  238. if err := logFileManager.(interface {
  239. DeleteAllIndexMetadata() error
  240. }).DeleteAllIndexMetadata(); err != nil {
  241. logger.Errorf("Could not clean up all old DB records before full rebuild: %v", err)
  242. }
  243. }
  244. logger.Info("Starting full modern index rebuild")
  245. allLogs := nginx_log.GetAllLogsWithIndexGrouped()
  246. startTime := time.Now()
  247. var overallMinTime, overallMaxTime *time.Time
  248. for _, log := range allLogs {
  249. if log.Type == "error" {
  250. logger.Infof("Skipping indexing for error log: %s", log.Path)
  251. if logFileManager != nil {
  252. if err := logFileManager.(interface {
  253. SaveIndexMetadata(string, uint64, time.Time, time.Duration, *time.Time, *time.Time) error
  254. }).SaveIndexMetadata(log.Path, 0, time.Now(), 0, nil, nil); err != nil {
  255. logger.Warnf("Could not reset metadata for skipped error log %s: %v", log.Path, err)
  256. }
  257. }
  258. continue
  259. }
  260. loopStartTime := time.Now()
  261. docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(log.Path, progressConfig)
  262. if err != nil {
  263. logger.Warnf("Failed to index file group, skipping: %s, error: %v", log.Path, err)
  264. } else {
  265. // Track overall time range across all log files
  266. if minTime != nil {
  267. if overallMinTime == nil || minTime.Before(*overallMinTime) {
  268. overallMinTime = minTime
  269. }
  270. }
  271. if maxTime != nil {
  272. if overallMaxTime == nil || maxTime.After(*overallMaxTime) {
  273. overallMaxTime = maxTime
  274. }
  275. }
  276. if logFileManager != nil {
  277. duration := time.Since(loopStartTime)
  278. for path, docCount := range docsCountMap {
  279. if err := logFileManager.(interface {
  280. SaveIndexMetadata(string, uint64, time.Time, time.Duration, *time.Time, *time.Time) error
  281. }).SaveIndexMetadata(path, docCount, loopStartTime, duration, minTime, maxTime); err != nil {
  282. logger.Errorf("Failed to save index metadata for %s: %v", path, err)
  283. }
  284. }
  285. }
  286. }
  287. }
  288. totalDuration := time.Since(startTime)
  289. logger.Infof("Successfully completed full modern index rebuild in %s", totalDuration)
  290. if err := modernIndexer.(interface {
  291. FlushAll() error
  292. }).FlushAll(); err != nil {
  293. logger.Errorf("Failed to flush all indexer data: %v", err)
  294. }
  295. nginx_log.UpdateSearcherShards()
  296. return overallMinTime, overallMaxTime
  297. }