task_recovery.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/event"
  9. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // TaskRecovery handles the recovery of incomplete indexing tasks after restart
  13. type TaskRecovery struct {
  14. logFileManager *indexer.LogFileManager
  15. modernIndexer *indexer.ParallelIndexer
  16. activeTasks int32 // Counter for active recovery tasks
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. wg sync.WaitGroup
  20. }
  21. // NewTaskRecovery creates a new task recovery manager
  22. func NewTaskRecovery(parentCtx context.Context) *TaskRecovery {
  23. ctx, cancel := context.WithCancel(parentCtx)
  24. return &TaskRecovery{
  25. logFileManager: GetLogFileManager(),
  26. modernIndexer: GetModernIndexer(),
  27. ctx: ctx,
  28. cancel: cancel,
  29. }
  30. }
  31. // RecoverUnfinishedTasks recovers indexing tasks that were incomplete at last shutdown
  32. func (tr *TaskRecovery) RecoverUnfinishedTasks(ctx context.Context) error {
  33. if tr.logFileManager == nil || tr.modernIndexer == nil {
  34. logger.Warn("Cannot recover tasks: services not available")
  35. return nil
  36. }
  37. logger.Debug("Starting recovery of unfinished indexing tasks")
  38. // Get all logs with their index status
  39. allLogs := GetAllLogsWithIndexGrouped(func(log *NginxLogWithIndex) bool {
  40. // Only process access logs
  41. return log.Type == "access"
  42. })
  43. var incompleteTasksCount int
  44. var queuePosition int = 1
  45. for _, log := range allLogs {
  46. if tr.needsRecovery(log) {
  47. incompleteTasksCount++
  48. // Reset to queued status and assign queue position
  49. if err := tr.recoverTask(ctx, log.Path, queuePosition); err != nil {
  50. logger.Errorf("Failed to recover task for %s: %v", log.Path, err)
  51. } else {
  52. queuePosition++
  53. }
  54. }
  55. }
  56. if incompleteTasksCount > 0 {
  57. logger.Debugf("Recovered %d incomplete indexing tasks", incompleteTasksCount)
  58. } else {
  59. logger.Debug("No incomplete indexing tasks found")
  60. }
  61. return nil
  62. }
  63. // needsRecovery determines if a log file has an incomplete indexing task that needs recovery
  64. func (tr *TaskRecovery) needsRecovery(log *NginxLogWithIndex) bool {
  65. // Check for incomplete states that indicate interrupted operations
  66. switch log.IndexStatus {
  67. case string(indexer.IndexStatusIndexing):
  68. // Task was in progress during last shutdown
  69. logger.Debugf("Found incomplete indexing task: %s", log.Path)
  70. return true
  71. case string(indexer.IndexStatusQueued):
  72. // Task was queued but may not have started
  73. logger.Debugf("Found queued indexing task: %s", log.Path)
  74. return true
  75. case string(indexer.IndexStatusError):
  76. // Check if error is recent (within last hour before restart)
  77. if log.LastIndexed > 0 {
  78. lastIndexTime := time.Unix(log.LastIndexed, 0)
  79. if time.Since(lastIndexTime) < time.Hour {
  80. logger.Debugf("Found recent error task for retry: %s", log.Path)
  81. return true
  82. }
  83. }
  84. }
  85. return false
  86. }
  87. // recoverTask recovers a single indexing task
  88. func (tr *TaskRecovery) recoverTask(ctx context.Context, logPath string, queuePosition int) error {
  89. logger.Debugf("Recovering indexing task for: %s (queue position: %d)", logPath, queuePosition)
  90. // Set status to queued with queue position
  91. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusQueued), queuePosition); err != nil {
  92. return err
  93. }
  94. // Queue the recovery task asynchronously with proper context and WaitGroup
  95. tr.wg.Add(1)
  96. go tr.executeRecoveredTask(tr.ctx, logPath)
  97. return nil
  98. }
  99. // executeRecoveredTask executes a recovered indexing task with proper global state and progress tracking
  100. func (tr *TaskRecovery) executeRecoveredTask(ctx context.Context, logPath string) {
  101. defer tr.wg.Done() // Always decrement WaitGroup
  102. // Check context before starting
  103. select {
  104. case <-ctx.Done():
  105. logger.Debugf("Context cancelled, skipping recovery task for %s", logPath)
  106. return
  107. default:
  108. }
  109. // Add a small delay to stagger recovery tasks, but check context
  110. select {
  111. case <-time.After(time.Second * 2):
  112. case <-ctx.Done():
  113. logger.Debugf("Context cancelled during delay, skipping recovery task for %s", logPath)
  114. return
  115. }
  116. logger.Debugf("Executing recovered indexing task: %s", logPath)
  117. // Get processing manager for global state updates
  118. processingManager := event.GetProcessingStatusManager()
  119. // Increment active tasks counter and set global status if this is the first task
  120. isFirstTask := atomic.AddInt32(&tr.activeTasks, 1) == 1
  121. if isFirstTask {
  122. processingManager.UpdateNginxLogIndexing(true)
  123. logger.Debug("Set global indexing status to true for recovery tasks")
  124. }
  125. // Ensure we always decrement counter and reset global status when no tasks remain
  126. defer func() {
  127. remainingTasks := atomic.AddInt32(&tr.activeTasks, -1)
  128. if remainingTasks == 0 {
  129. processingManager.UpdateNginxLogIndexing(false)
  130. logger.Debug("Set global indexing status to false - all recovery tasks completed")
  131. }
  132. if r := recover(); r != nil {
  133. logger.Errorf("Panic during recovered task execution: %v", r)
  134. }
  135. }()
  136. // Set status to indexing
  137. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusIndexing), 0); err != nil {
  138. logger.Errorf("Failed to set indexing status for recovered task %s: %v", logPath, err)
  139. return
  140. }
  141. // Create progress tracking configuration for recovery task
  142. progressConfig := &indexer.ProgressConfig{
  143. NotifyInterval: 1 * time.Second,
  144. OnProgress: func(progress indexer.ProgressNotification) {
  145. // Send progress event to frontend
  146. event.Publish(event.Event{
  147. Type: event.TypeNginxLogIndexProgress,
  148. Data: event.NginxLogIndexProgressData{
  149. LogPath: progress.LogGroupPath,
  150. Progress: progress.Percentage,
  151. Stage: "indexing",
  152. Status: "running",
  153. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  154. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  155. },
  156. })
  157. logger.Debugf("Recovery progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  158. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  159. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  160. },
  161. OnCompletion: func(completion indexer.CompletionNotification) {
  162. // Send completion event to frontend
  163. event.Publish(event.Event{
  164. Type: event.TypeNginxLogIndexComplete,
  165. Data: event.NginxLogIndexCompleteData{
  166. LogPath: completion.LogGroupPath,
  167. Success: completion.Success,
  168. Duration: int64(completion.Duration.Milliseconds()),
  169. TotalLines: completion.TotalLines,
  170. IndexedSize: completion.IndexedSize,
  171. Error: completion.Error,
  172. },
  173. })
  174. logger.Debugf("Recovery completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  175. completion.LogGroupPath, completion.Success, completion.Duration,
  176. completion.TotalLines, completion.IndexedSize)
  177. // Send index ready event if recovery was successful
  178. if completion.Success {
  179. event.Publish(event.Event{
  180. Type: event.TypeNginxLogIndexReady,
  181. Data: event.NginxLogIndexReadyData{
  182. LogPath: completion.LogGroupPath,
  183. StartTime: time.Now().Unix(),
  184. EndTime: time.Now().Unix(),
  185. Available: true,
  186. IndexStatus: "ready",
  187. },
  188. })
  189. }
  190. },
  191. }
  192. // Check context before starting indexing
  193. select {
  194. case <-ctx.Done():
  195. logger.Debugf("Context cancelled before indexing, stopping recovery task for %s", logPath)
  196. return
  197. default:
  198. }
  199. // Execute the indexing with progress tracking
  200. startTime := time.Now()
  201. docsCountMap, minTime, maxTime, err := tr.modernIndexer.IndexLogGroupWithProgress(logPath, progressConfig)
  202. if err != nil {
  203. logger.Errorf("Failed to execute recovered indexing task %s: %v", logPath, err)
  204. // Set error status
  205. if statusErr := tr.setTaskStatus(logPath, string(indexer.IndexStatusError), 0); statusErr != nil {
  206. logger.Errorf("Failed to set error status for recovered task %s: %v", logPath, statusErr)
  207. }
  208. return
  209. }
  210. // Calculate total documents indexed
  211. var totalDocsIndexed uint64
  212. for _, docCount := range docsCountMap {
  213. totalDocsIndexed += docCount
  214. }
  215. // Save indexing metadata using the log file manager
  216. duration := time.Since(startTime)
  217. if err := tr.logFileManager.SaveIndexMetadata(logPath, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
  218. logger.Errorf("Failed to save recovered index metadata for %s: %v", logPath, err)
  219. }
  220. // Set status to indexed (completed)
  221. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusIndexed), 0); err != nil {
  222. logger.Errorf("Failed to set completed status for recovered task %s: %v", logPath, err)
  223. }
  224. // Update searcher shards
  225. UpdateSearcherShards()
  226. logger.Debugf("Successfully completed recovered indexing task: %s, Documents: %d", logPath, totalDocsIndexed)
  227. }
  228. // setTaskStatus updates the task status in the database using the enhanced persistence layer
  229. func (tr *TaskRecovery) setTaskStatus(logPath, status string, queuePosition int) error {
  230. // Get persistence manager
  231. persistence := tr.logFileManager.GetPersistence()
  232. if persistence == nil {
  233. return fmt.Errorf("persistence manager not available")
  234. }
  235. // Use enhanced SetIndexStatus method
  236. return persistence.SetIndexStatus(logPath, status, queuePosition, "")
  237. }
  238. // Shutdown gracefully stops all recovery tasks
  239. func (tr *TaskRecovery) Shutdown() {
  240. logger.Debug("Shutting down task recovery system...")
  241. // Cancel all active tasks
  242. tr.cancel()
  243. // Wait for all tasks to complete with timeout
  244. done := make(chan struct{})
  245. go func() {
  246. tr.wg.Wait()
  247. close(done)
  248. }()
  249. select {
  250. case <-done:
  251. logger.Debug("All recovery tasks completed successfully")
  252. case <-time.After(30 * time.Second):
  253. logger.Warn("Timeout waiting for recovery tasks to complete")
  254. }
  255. logger.Debug("Task recovery system shutdown completed")
  256. }
  257. // Global task recovery manager
  258. var globalTaskRecovery *TaskRecovery
  259. // InitTaskRecovery initializes the task recovery system - called during application startup
  260. func InitTaskRecovery(ctx context.Context) {
  261. logger.Debug("Initializing task recovery system")
  262. // Wait a bit for services to fully initialize
  263. time.Sleep(3 * time.Second)
  264. globalTaskRecovery = NewTaskRecovery(ctx)
  265. if err := globalTaskRecovery.RecoverUnfinishedTasks(ctx); err != nil {
  266. logger.Errorf("Failed to recover unfinished tasks: %v", err)
  267. }
  268. // Monitor context for shutdown
  269. go func() {
  270. <-ctx.Done()
  271. if globalTaskRecovery != nil {
  272. globalTaskRecovery.Shutdown()
  273. }
  274. }()
  275. }