task_recovery.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "sync/atomic"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/event"
  8. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // TaskRecovery handles the recovery of incomplete indexing tasks after restart
  12. type TaskRecovery struct {
  13. logFileManager *indexer.LogFileManager
  14. modernIndexer *indexer.ParallelIndexer
  15. activeTasks int32 // Counter for active recovery tasks
  16. }
  17. // NewTaskRecovery creates a new task recovery manager
  18. func NewTaskRecovery() *TaskRecovery {
  19. return &TaskRecovery{
  20. logFileManager: GetLogFileManager(),
  21. modernIndexer: GetModernIndexer(),
  22. }
  23. }
  24. // RecoverUnfinishedTasks recovers indexing tasks that were incomplete at last shutdown
  25. func (tr *TaskRecovery) RecoverUnfinishedTasks(ctx context.Context) error {
  26. if tr.logFileManager == nil || tr.modernIndexer == nil {
  27. logger.Warn("Cannot recover tasks: services not available")
  28. return nil
  29. }
  30. logger.Info("Starting recovery of unfinished indexing tasks")
  31. // Get all logs with their index status
  32. allLogs := GetAllLogsWithIndexGrouped(func(log *NginxLogWithIndex) bool {
  33. // Only process access logs
  34. return log.Type == "access"
  35. })
  36. var incompleteTasksCount int
  37. var queuePosition int = 1
  38. for _, log := range allLogs {
  39. if tr.needsRecovery(log) {
  40. incompleteTasksCount++
  41. // Reset to queued status and assign queue position
  42. if err := tr.recoverTask(ctx, log.Path, queuePosition); err != nil {
  43. logger.Errorf("Failed to recover task for %s: %v", log.Path, err)
  44. } else {
  45. queuePosition++
  46. }
  47. }
  48. }
  49. if incompleteTasksCount > 0 {
  50. logger.Infof("Recovered %d incomplete indexing tasks", incompleteTasksCount)
  51. } else {
  52. logger.Info("No incomplete indexing tasks found")
  53. }
  54. return nil
  55. }
  56. // needsRecovery determines if a log file has an incomplete indexing task that needs recovery
  57. func (tr *TaskRecovery) needsRecovery(log *NginxLogWithIndex) bool {
  58. // Check for incomplete states that indicate interrupted operations
  59. switch log.IndexStatus {
  60. case string(indexer.IndexStatusIndexing):
  61. // Task was in progress during last shutdown
  62. logger.Debugf("Found incomplete indexing task: %s", log.Path)
  63. return true
  64. case string(indexer.IndexStatusQueued):
  65. // Task was queued but may not have started
  66. logger.Debugf("Found queued indexing task: %s", log.Path)
  67. return true
  68. case string(indexer.IndexStatusError):
  69. // Check if error is recent (within last hour before restart)
  70. if log.LastIndexed > 0 {
  71. lastIndexTime := time.Unix(log.LastIndexed, 0)
  72. if time.Since(lastIndexTime) < time.Hour {
  73. logger.Debugf("Found recent error task for retry: %s", log.Path)
  74. return true
  75. }
  76. }
  77. }
  78. return false
  79. }
  80. // recoverTask recovers a single indexing task
  81. func (tr *TaskRecovery) recoverTask(ctx context.Context, logPath string, queuePosition int) error {
  82. logger.Infof("Recovering indexing task for: %s (queue position: %d)", logPath, queuePosition)
  83. // Set status to queued with queue position
  84. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusQueued), queuePosition); err != nil {
  85. return err
  86. }
  87. // Queue the recovery task asynchronously
  88. go tr.executeRecoveredTask(ctx, logPath)
  89. return nil
  90. }
  91. // executeRecoveredTask executes a recovered indexing task with proper global state and progress tracking
  92. func (tr *TaskRecovery) executeRecoveredTask(ctx context.Context, logPath string) {
  93. // Add a small delay to stagger recovery tasks
  94. time.Sleep(time.Second * 2)
  95. logger.Infof("Executing recovered indexing task: %s", logPath)
  96. // Get processing manager for global state updates
  97. processingManager := event.GetProcessingStatusManager()
  98. // Increment active tasks counter and set global status if this is the first task
  99. isFirstTask := atomic.AddInt32(&tr.activeTasks, 1) == 1
  100. if isFirstTask {
  101. processingManager.UpdateNginxLogIndexing(true)
  102. logger.Info("Set global indexing status to true for recovery tasks")
  103. }
  104. // Ensure we always decrement counter and reset global status when no tasks remain
  105. defer func() {
  106. remainingTasks := atomic.AddInt32(&tr.activeTasks, -1)
  107. if remainingTasks == 0 {
  108. processingManager.UpdateNginxLogIndexing(false)
  109. logger.Info("Set global indexing status to false - all recovery tasks completed")
  110. }
  111. if r := recover(); r != nil {
  112. logger.Errorf("Panic during recovered task execution: %v", r)
  113. }
  114. }()
  115. // Set status to indexing
  116. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusIndexing), 0); err != nil {
  117. logger.Errorf("Failed to set indexing status for recovered task %s: %v", logPath, err)
  118. return
  119. }
  120. // Create progress tracking configuration for recovery task
  121. progressConfig := &indexer.ProgressConfig{
  122. NotifyInterval: 1 * time.Second,
  123. OnProgress: func(progress indexer.ProgressNotification) {
  124. // Send progress event to frontend
  125. event.Publish(event.Event{
  126. Type: event.TypeNginxLogIndexProgress,
  127. Data: event.NginxLogIndexProgressData{
  128. LogPath: progress.LogGroupPath,
  129. Progress: progress.Percentage,
  130. Stage: "indexing",
  131. Status: "running",
  132. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  133. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  134. },
  135. })
  136. logger.Infof("Recovery progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  137. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  138. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  139. },
  140. OnCompletion: func(completion indexer.CompletionNotification) {
  141. // Send completion event to frontend
  142. event.Publish(event.Event{
  143. Type: event.TypeNginxLogIndexComplete,
  144. Data: event.NginxLogIndexCompleteData{
  145. LogPath: completion.LogGroupPath,
  146. Success: completion.Success,
  147. Duration: int64(completion.Duration.Milliseconds()),
  148. TotalLines: completion.TotalLines,
  149. IndexedSize: completion.IndexedSize,
  150. Error: completion.Error,
  151. },
  152. })
  153. logger.Infof("Recovery completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  154. completion.LogGroupPath, completion.Success, completion.Duration,
  155. completion.TotalLines, completion.IndexedSize)
  156. // Send index ready event if recovery was successful
  157. if completion.Success {
  158. event.Publish(event.Event{
  159. Type: event.TypeNginxLogIndexReady,
  160. Data: event.NginxLogIndexReadyData{
  161. LogPath: completion.LogGroupPath,
  162. StartTime: time.Now().Unix(),
  163. EndTime: time.Now().Unix(),
  164. Available: true,
  165. IndexStatus: "ready",
  166. },
  167. })
  168. }
  169. },
  170. }
  171. // Execute the indexing with progress tracking
  172. startTime := time.Now()
  173. docsCountMap, minTime, maxTime, err := tr.modernIndexer.IndexLogGroupWithProgress(logPath, progressConfig)
  174. if err != nil {
  175. logger.Errorf("Failed to execute recovered indexing task %s: %v", logPath, err)
  176. // Set error status
  177. if statusErr := tr.setTaskStatus(logPath, string(indexer.IndexStatusError), 0); statusErr != nil {
  178. logger.Errorf("Failed to set error status for recovered task %s: %v", logPath, statusErr)
  179. }
  180. return
  181. }
  182. // Calculate total documents indexed
  183. var totalDocsIndexed uint64
  184. for _, docCount := range docsCountMap {
  185. totalDocsIndexed += docCount
  186. }
  187. // Save indexing metadata using the log file manager
  188. duration := time.Since(startTime)
  189. if err := tr.logFileManager.SaveIndexMetadata(logPath, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
  190. logger.Errorf("Failed to save recovered index metadata for %s: %v", logPath, err)
  191. }
  192. // Set status to indexed (completed)
  193. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusIndexed), 0); err != nil {
  194. logger.Errorf("Failed to set completed status for recovered task %s: %v", logPath, err)
  195. }
  196. // Update searcher shards
  197. UpdateSearcherShards()
  198. logger.Infof("Successfully completed recovered indexing task: %s, Documents: %d", logPath, totalDocsIndexed)
  199. }
  200. // setTaskStatus updates the task status in the database using the enhanced persistence layer
  201. func (tr *TaskRecovery) setTaskStatus(logPath, status string, queuePosition int) error {
  202. // Get persistence manager
  203. persistence := tr.logFileManager.GetPersistence()
  204. if persistence == nil {
  205. return fmt.Errorf("persistence manager not available")
  206. }
  207. // Use enhanced SetIndexStatus method
  208. return persistence.SetIndexStatus(logPath, status, queuePosition, "")
  209. }
  210. // InitTaskRecovery initializes the task recovery system - called during application startup
  211. func InitTaskRecovery(ctx context.Context) {
  212. logger.Info("Initializing task recovery system")
  213. // Wait a bit for services to fully initialize
  214. time.Sleep(3 * time.Second)
  215. recoveryManager := NewTaskRecovery()
  216. if err := recoveryManager.RecoverUnfinishedTasks(ctx); err != nil {
  217. logger.Errorf("Failed to recover unfinished tasks: %v", err)
  218. }
  219. }