task_recovery.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/event"
  9. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // TaskRecovery handles the recovery of incomplete indexing tasks after restart
  13. type TaskRecovery struct {
  14. logFileManager *indexer.LogFileManager
  15. modernIndexer *indexer.ParallelIndexer
  16. activeTasks int32 // Counter for active recovery tasks
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. wg sync.WaitGroup
  20. taskLocks map[string]*sync.Mutex // Per-log-group locks
  21. locksMutex sync.RWMutex // Protects taskLocks map
  22. }
  23. // NewTaskRecovery creates a new task recovery manager
  24. func NewTaskRecovery(parentCtx context.Context) *TaskRecovery {
  25. ctx, cancel := context.WithCancel(parentCtx)
  26. return &TaskRecovery{
  27. logFileManager: GetLogFileManager(),
  28. modernIndexer: GetIndexer(),
  29. ctx: ctx,
  30. cancel: cancel,
  31. taskLocks: make(map[string]*sync.Mutex),
  32. }
  33. }
  34. // RecoverUnfinishedTasks recovers indexing tasks that were incomplete at last shutdown
  35. func (tr *TaskRecovery) RecoverUnfinishedTasks(ctx context.Context) error {
  36. if tr.logFileManager == nil || tr.modernIndexer == nil {
  37. return nil
  38. }
  39. logger.Debug("Starting recovery of unfinished indexing tasks")
  40. // Get all logs with their index status
  41. allLogs := GetAllLogsWithIndexGrouped(func(log *NginxLogWithIndex) bool {
  42. // Only process access logs
  43. return log.Type == "access"
  44. })
  45. var incompleteTasksCount int
  46. var queuePosition int = 1
  47. for _, log := range allLogs {
  48. if tr.needsRecovery(log) {
  49. incompleteTasksCount++
  50. // Reset to queued status and assign queue position
  51. if err := tr.recoverTask(ctx, log.Path, queuePosition); err != nil {
  52. logger.Errorf("Failed to recover task for %s: %v", log.Path, err)
  53. } else {
  54. queuePosition++
  55. }
  56. }
  57. }
  58. if incompleteTasksCount > 0 {
  59. logger.Debugf("Recovered %d incomplete indexing tasks", incompleteTasksCount)
  60. } else {
  61. logger.Debug("No incomplete indexing tasks found")
  62. }
  63. return nil
  64. }
  65. // acquireTaskLock gets or creates a mutex for a specific log group
  66. func (tr *TaskRecovery) acquireTaskLock(logPath string) *sync.Mutex {
  67. tr.locksMutex.Lock()
  68. defer tr.locksMutex.Unlock()
  69. if lock, exists := tr.taskLocks[logPath]; exists {
  70. return lock
  71. }
  72. lock := &sync.Mutex{}
  73. tr.taskLocks[logPath] = lock
  74. return lock
  75. }
  76. // releaseTaskLock removes the mutex for a specific log group after completion
  77. func (tr *TaskRecovery) releaseTaskLock(logPath string) {
  78. tr.locksMutex.Lock()
  79. defer tr.locksMutex.Unlock()
  80. delete(tr.taskLocks, logPath)
  81. }
  82. // isTaskInProgress checks if a task is currently running for a specific log group
  83. func (tr *TaskRecovery) isTaskInProgress(logPath string) bool {
  84. tr.locksMutex.RLock()
  85. defer tr.locksMutex.RUnlock()
  86. if lock, exists := tr.taskLocks[logPath]; exists {
  87. // Try to acquire the lock with TryLock
  88. // If we can't acquire it, it means task is in progress
  89. if lock.TryLock() {
  90. lock.Unlock()
  91. return false
  92. }
  93. return true
  94. }
  95. return false
  96. }
  97. // needsRecovery determines if a log file has an incomplete indexing task that needs recovery
  98. func (tr *TaskRecovery) needsRecovery(log *NginxLogWithIndex) bool {
  99. // Check for incomplete states that indicate interrupted operations
  100. switch log.IndexStatus {
  101. case string(indexer.IndexStatusIndexing):
  102. // Task was in progress during last shutdown
  103. logger.Debugf("Found incomplete indexing task: %s", log.Path)
  104. return true
  105. case string(indexer.IndexStatusQueued):
  106. // Task was queued but may not have started
  107. logger.Debugf("Found queued indexing task: %s", log.Path)
  108. return true
  109. case string(indexer.IndexStatusError):
  110. // Check if error is recent (within last hour before restart)
  111. if log.LastIndexed > 0 {
  112. lastIndexTime := time.Unix(log.LastIndexed, 0)
  113. if time.Since(lastIndexTime) < time.Hour {
  114. logger.Debugf("Found recent error task for retry: %s", log.Path)
  115. return true
  116. }
  117. }
  118. }
  119. return false
  120. }
  121. // recoverTask recovers a single indexing task
  122. func (tr *TaskRecovery) recoverTask(ctx context.Context, logPath string, queuePosition int) error {
  123. // Check if task is already in progress
  124. if tr.isTaskInProgress(logPath) {
  125. logger.Debugf("Skipping recovery for %s - task already in progress", logPath)
  126. return nil
  127. }
  128. logger.Debugf("Recovering indexing task for: %s (queue position: %d)", logPath, queuePosition)
  129. // Set status to queued with queue position
  130. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusQueued), queuePosition); err != nil {
  131. return err
  132. }
  133. // Queue the recovery task asynchronously with proper context and WaitGroup
  134. tr.wg.Add(1)
  135. go tr.executeRecoveredTask(tr.ctx, logPath)
  136. return nil
  137. }
  138. // executeRecoveredTask executes a recovered indexing task with proper global state and progress tracking
  139. func (tr *TaskRecovery) executeRecoveredTask(ctx context.Context, logPath string) {
  140. defer tr.wg.Done() // Always decrement WaitGroup
  141. // Acquire lock for this specific log group to prevent concurrent execution
  142. lock := tr.acquireTaskLock(logPath)
  143. lock.Lock()
  144. defer func() {
  145. lock.Unlock()
  146. tr.releaseTaskLock(logPath)
  147. }()
  148. // Check context before starting
  149. select {
  150. case <-ctx.Done():
  151. logger.Debugf("Context cancelled, skipping recovery task for %s", logPath)
  152. return
  153. default:
  154. }
  155. // Add a small delay to stagger recovery tasks, but check context
  156. select {
  157. case <-time.After(time.Second * 2):
  158. case <-ctx.Done():
  159. logger.Debugf("Context cancelled during delay, skipping recovery task for %s", logPath)
  160. return
  161. }
  162. logger.Debugf("Executing recovered indexing task: %s", logPath)
  163. // Get processing manager for global state updates
  164. processingManager := event.GetProcessingStatusManager()
  165. // Increment active tasks counter and set global status if this is the first task
  166. isFirstTask := atomic.AddInt32(&tr.activeTasks, 1) == 1
  167. if isFirstTask {
  168. processingManager.UpdateNginxLogIndexing(true)
  169. logger.Debug("Set global indexing status to true for recovery tasks")
  170. }
  171. // Ensure we always decrement counter and reset global status when no tasks remain
  172. defer func() {
  173. remainingTasks := atomic.AddInt32(&tr.activeTasks, -1)
  174. if remainingTasks == 0 {
  175. processingManager.UpdateNginxLogIndexing(false)
  176. logger.Debug("Set global indexing status to false - all recovery tasks completed")
  177. }
  178. if r := recover(); r != nil {
  179. logger.Errorf("Panic during recovered task execution: %v", r)
  180. }
  181. }()
  182. // Set status to indexing
  183. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusIndexing), 0); err != nil {
  184. logger.Errorf("Failed to set indexing status for recovered task %s: %v", logPath, err)
  185. return
  186. }
  187. // Create progress tracking configuration for recovery task
  188. progressConfig := &indexer.ProgressConfig{
  189. NotifyInterval: 1 * time.Second,
  190. OnProgress: func(progress indexer.ProgressNotification) {
  191. // Send progress event to frontend
  192. event.Publish(event.Event{
  193. Type: event.TypeNginxLogIndexProgress,
  194. Data: event.NginxLogIndexProgressData{
  195. LogPath: progress.LogGroupPath,
  196. Progress: progress.Percentage,
  197. Stage: "indexing",
  198. Status: "running",
  199. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  200. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  201. },
  202. })
  203. logger.Debugf("Recovery progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  204. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  205. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  206. },
  207. OnCompletion: func(completion indexer.CompletionNotification) {
  208. // Send completion event to frontend
  209. event.Publish(event.Event{
  210. Type: event.TypeNginxLogIndexComplete,
  211. Data: event.NginxLogIndexCompleteData{
  212. LogPath: completion.LogGroupPath,
  213. Success: completion.Success,
  214. Duration: int64(completion.Duration.Milliseconds()),
  215. TotalLines: completion.TotalLines,
  216. IndexedSize: completion.IndexedSize,
  217. Error: completion.Error,
  218. },
  219. })
  220. logger.Debugf("Recovery completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  221. completion.LogGroupPath, completion.Success, completion.Duration,
  222. completion.TotalLines, completion.IndexedSize)
  223. // Send index ready event if recovery was successful
  224. if completion.Success {
  225. event.Publish(event.Event{
  226. Type: event.TypeNginxLogIndexReady,
  227. Data: event.NginxLogIndexReadyData{
  228. LogPath: completion.LogGroupPath,
  229. StartTime: time.Now().Unix(),
  230. EndTime: time.Now().Unix(),
  231. Available: true,
  232. IndexStatus: "ready",
  233. },
  234. })
  235. }
  236. },
  237. }
  238. // Check context before starting indexing
  239. select {
  240. case <-ctx.Done():
  241. logger.Debugf("Context cancelled before indexing, stopping recovery task for %s", logPath)
  242. return
  243. default:
  244. }
  245. // Execute the indexing with progress tracking
  246. startTime := time.Now()
  247. docsCountMap, minTime, maxTime, err := tr.modernIndexer.IndexLogGroupWithProgress(logPath, progressConfig)
  248. if err != nil {
  249. logger.Errorf("Failed to execute recovered indexing task %s: %v", logPath, err)
  250. // Set error status
  251. if statusErr := tr.setTaskStatus(logPath, string(indexer.IndexStatusError), 0); statusErr != nil {
  252. logger.Errorf("Failed to set error status for recovered task %s: %v", logPath, statusErr)
  253. }
  254. return
  255. }
  256. // Calculate total documents indexed
  257. var totalDocsIndexed uint64
  258. for _, docCount := range docsCountMap {
  259. totalDocsIndexed += docCount
  260. }
  261. // Save indexing metadata using the log file manager
  262. duration := time.Since(startTime)
  263. if err := tr.logFileManager.SaveIndexMetadata(logPath, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
  264. logger.Errorf("Failed to save recovered index metadata for %s: %v", logPath, err)
  265. }
  266. // Set status to indexed (completed)
  267. if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusIndexed), 0); err != nil {
  268. logger.Errorf("Failed to set completed status for recovered task %s: %v", logPath, err)
  269. }
  270. // Update searcher shards
  271. UpdateSearcherShards()
  272. logger.Debugf("Successfully completed recovered indexing task: %s, Documents: %d", logPath, totalDocsIndexed)
  273. }
  274. // setTaskStatus updates the task status in the database using the enhanced persistence layer
  275. func (tr *TaskRecovery) setTaskStatus(logPath, status string, queuePosition int) error {
  276. // Get persistence manager
  277. persistence := tr.logFileManager.GetPersistence()
  278. if persistence == nil {
  279. return fmt.Errorf("persistence manager not available")
  280. }
  281. // Use enhanced SetIndexStatus method
  282. return persistence.SetIndexStatus(logPath, status, queuePosition, "")
  283. }
  284. // Shutdown gracefully stops all recovery tasks
  285. func (tr *TaskRecovery) Shutdown() {
  286. logger.Debug("Shutting down task recovery system...")
  287. // Cancel all active tasks
  288. tr.cancel()
  289. // Wait for all tasks to complete with timeout
  290. done := make(chan struct{})
  291. go func() {
  292. tr.wg.Wait()
  293. close(done)
  294. }()
  295. select {
  296. case <-done:
  297. logger.Debug("All recovery tasks completed successfully")
  298. case <-time.After(30 * time.Second):
  299. logger.Warn("Timeout waiting for recovery tasks to complete")
  300. }
  301. logger.Debug("Task recovery system shutdown completed")
  302. }
  303. // Global task recovery manager
  304. var globalTaskRecovery *TaskRecovery
  305. // InitTaskRecovery initializes the task recovery system - called during application startup
  306. func InitTaskRecovery(ctx context.Context) {
  307. logger.Debug("Initializing task recovery system")
  308. // Wait a bit for services to fully initialize
  309. time.Sleep(3 * time.Second)
  310. // Check if services are available
  311. if GetLogFileManager() == nil || GetIndexer() == nil {
  312. logger.Debug("Modern services not available, skipping task recovery")
  313. return
  314. }
  315. globalTaskRecovery = NewTaskRecovery(ctx)
  316. if err := globalTaskRecovery.RecoverUnfinishedTasks(ctx); err != nil {
  317. logger.Errorf("Failed to recover unfinished tasks: %v", err)
  318. }
  319. // Monitor context for shutdown
  320. go func() {
  321. <-ctx.Done()
  322. if globalTaskRecovery != nil {
  323. globalTaskRecovery.Shutdown()
  324. }
  325. }()
  326. }