task_scheduler.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/event"
  9. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // TaskScheduler manages all indexing tasks (recovery, manual rebuild, etc.)
  13. // with unified locking to prevent concurrent execution on the same log group
  14. type TaskScheduler struct {
  15. logFileManager *indexer.LogFileManager
  16. modernIndexer *indexer.ParallelIndexer
  17. activeTasks int32 // Counter for active tasks
  18. ctx context.Context
  19. cancel context.CancelFunc
  20. wg sync.WaitGroup
  21. taskLocks map[string]*sync.Mutex // Per-log-group locks
  22. locksMutex sync.RWMutex // Protects taskLocks map
  23. }
  24. // Global task scheduler instance
  25. var (
  26. globalTaskScheduler *TaskScheduler
  27. taskSchedulerOnce sync.Once
  28. taskSchedulerInitialized bool
  29. taskSchedulerMutex sync.RWMutex
  30. )
  31. // GetTaskScheduler returns the global task scheduler instance
  32. func GetTaskScheduler() *TaskScheduler {
  33. taskSchedulerMutex.RLock()
  34. defer taskSchedulerMutex.RUnlock()
  35. return globalTaskScheduler
  36. }
  37. // InitTaskScheduler initializes the global task scheduler
  38. func InitTaskScheduler(ctx context.Context) {
  39. taskSchedulerMutex.Lock()
  40. defer taskSchedulerMutex.Unlock()
  41. if taskSchedulerInitialized {
  42. logger.Debug("Task scheduler already initialized")
  43. return
  44. }
  45. logger.Debug("Initializing task scheduler")
  46. // Wait a bit for services to fully initialize
  47. time.Sleep(3 * time.Second)
  48. // Check if services are available
  49. if GetLogFileManager() == nil || GetIndexer() == nil {
  50. logger.Debug("Modern services not available, skipping task scheduler initialization")
  51. return
  52. }
  53. globalTaskScheduler = NewTaskScheduler(ctx)
  54. taskSchedulerInitialized = true
  55. // Start task recovery
  56. if err := globalTaskScheduler.RecoverUnfinishedTasks(ctx); err != nil {
  57. logger.Errorf("Failed to recover unfinished tasks: %v", err)
  58. }
  59. // Monitor context for shutdown
  60. go func() {
  61. <-ctx.Done()
  62. if globalTaskScheduler != nil {
  63. globalTaskScheduler.Shutdown()
  64. }
  65. }()
  66. }
  67. // NewTaskScheduler creates a new task scheduler
  68. func NewTaskScheduler(parentCtx context.Context) *TaskScheduler {
  69. ctx, cancel := context.WithCancel(parentCtx)
  70. return &TaskScheduler{
  71. logFileManager: GetLogFileManager(),
  72. modernIndexer: GetIndexer(),
  73. ctx: ctx,
  74. cancel: cancel,
  75. taskLocks: make(map[string]*sync.Mutex),
  76. }
  77. }
  78. // acquireTaskLock gets or creates a mutex for a specific log group
  79. func (ts *TaskScheduler) acquireTaskLock(logPath string) *sync.Mutex {
  80. ts.locksMutex.Lock()
  81. defer ts.locksMutex.Unlock()
  82. if lock, exists := ts.taskLocks[logPath]; exists {
  83. return lock
  84. }
  85. lock := &sync.Mutex{}
  86. ts.taskLocks[logPath] = lock
  87. return lock
  88. }
  89. // releaseTaskLock removes the mutex for a specific log group after completion
  90. func (ts *TaskScheduler) releaseTaskLock(logPath string) {
  91. ts.locksMutex.Lock()
  92. defer ts.locksMutex.Unlock()
  93. delete(ts.taskLocks, logPath)
  94. }
  95. // IsTaskInProgress checks if a task is currently running for a specific log group
  96. func (ts *TaskScheduler) IsTaskInProgress(logPath string) bool {
  97. ts.locksMutex.RLock()
  98. defer ts.locksMutex.RUnlock()
  99. if lock, exists := ts.taskLocks[logPath]; exists {
  100. // Try to acquire the lock with TryLock
  101. // If we can't acquire it, it means task is in progress
  102. if lock.TryLock() {
  103. lock.Unlock()
  104. return false
  105. }
  106. return true
  107. }
  108. return false
  109. }
  110. // AcquireTaskLock acquires a lock for external use (e.g., manual rebuild)
  111. // Returns the lock and a release function
  112. func (ts *TaskScheduler) AcquireTaskLock(logPath string) (*sync.Mutex, func()) {
  113. lock := ts.acquireTaskLock(logPath)
  114. lock.Lock()
  115. releaseFunc := func() {
  116. lock.Unlock()
  117. ts.releaseTaskLock(logPath)
  118. }
  119. return lock, releaseFunc
  120. }
  121. // ScheduleIndexTask schedules an indexing task for a log group
  122. // Returns error if task is already in progress
  123. func (ts *TaskScheduler) ScheduleIndexTask(ctx context.Context, logPath string, progressConfig *indexer.ProgressConfig) error {
  124. // Check if task is already in progress
  125. if ts.IsTaskInProgress(logPath) {
  126. return fmt.Errorf("indexing task already in progress for %s", logPath)
  127. }
  128. // Queue the task asynchronously with proper context and WaitGroup
  129. ts.wg.Add(1)
  130. go ts.executeIndexTask(ctx, logPath, progressConfig)
  131. return nil
  132. }
  133. // executeIndexTask executes an indexing task with proper locking and progress tracking
  134. func (ts *TaskScheduler) executeIndexTask(ctx context.Context, logPath string, progressConfig *indexer.ProgressConfig) {
  135. defer ts.wg.Done() // Always decrement WaitGroup
  136. // Acquire lock for this specific log group to prevent concurrent execution
  137. lock := ts.acquireTaskLock(logPath)
  138. lock.Lock()
  139. defer func() {
  140. lock.Unlock()
  141. ts.releaseTaskLock(logPath)
  142. }()
  143. // Check context before starting
  144. select {
  145. case <-ctx.Done():
  146. logger.Debugf("Context cancelled, skipping task for %s", logPath)
  147. return
  148. default:
  149. }
  150. logger.Debugf("Executing indexing task: %s", logPath)
  151. // Get processing manager for global state updates
  152. processingManager := event.GetProcessingStatusManager()
  153. // Increment active tasks counter and set global status if this is the first task
  154. isFirstTask := atomic.AddInt32(&ts.activeTasks, 1) == 1
  155. if isFirstTask {
  156. processingManager.UpdateNginxLogIndexing(true)
  157. logger.Debug("Set global indexing status to true")
  158. }
  159. // Ensure we always decrement counter and reset global status when no tasks remain
  160. defer func() {
  161. remainingTasks := atomic.AddInt32(&ts.activeTasks, -1)
  162. if remainingTasks == 0 {
  163. processingManager.UpdateNginxLogIndexing(false)
  164. logger.Debug("Set global indexing status to false - all tasks completed")
  165. }
  166. if r := recover(); r != nil {
  167. logger.Errorf("Panic during task execution: %v", r)
  168. }
  169. }()
  170. // Set status to indexing
  171. if err := ts.setTaskStatus(logPath, string(indexer.IndexStatusIndexing), 0); err != nil {
  172. logger.Errorf("Failed to set indexing status for %s: %v", logPath, err)
  173. return
  174. }
  175. // Execute the indexing with progress tracking
  176. startTime := time.Now()
  177. docsCountMap, minTime, maxTime, err := ts.modernIndexer.IndexLogGroupWithProgress(logPath, progressConfig)
  178. if err != nil {
  179. logger.Errorf("Failed to execute indexing task %s: %v", logPath, err)
  180. // Set error status
  181. if statusErr := ts.setTaskStatus(logPath, string(indexer.IndexStatusError), 0); statusErr != nil {
  182. logger.Errorf("Failed to set error status for %s: %v", logPath, statusErr)
  183. }
  184. return
  185. }
  186. // Calculate total documents indexed
  187. var totalDocsIndexed uint64
  188. for _, docCount := range docsCountMap {
  189. totalDocsIndexed += docCount
  190. }
  191. // Save indexing metadata using the log file manager
  192. duration := time.Since(startTime)
  193. if err := ts.logFileManager.SaveIndexMetadata(logPath, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
  194. logger.Errorf("Failed to save index metadata for %s: %v", logPath, err)
  195. }
  196. // Set status to indexed (completed)
  197. if err := ts.setTaskStatus(logPath, string(indexer.IndexStatusIndexed), 0); err != nil {
  198. logger.Errorf("Failed to set completed status for %s: %v", logPath, err)
  199. }
  200. // Update searcher shards
  201. UpdateSearcherShards()
  202. logger.Debugf("Successfully completed indexing task: %s, Documents: %d", logPath, totalDocsIndexed)
  203. }
  204. // RecoverUnfinishedTasks recovers indexing tasks that were incomplete at last shutdown
  205. func (ts *TaskScheduler) RecoverUnfinishedTasks(ctx context.Context) error {
  206. if ts.logFileManager == nil || ts.modernIndexer == nil {
  207. logger.Warn("Cannot recover tasks: services not available")
  208. return nil
  209. }
  210. logger.Debug("Starting recovery of unfinished indexing tasks")
  211. // Get all logs with their index status
  212. allLogs := GetAllLogsWithIndexGrouped(func(log *NginxLogWithIndex) bool {
  213. // Only process access logs
  214. return log.Type == "access"
  215. })
  216. var incompleteTasksCount int
  217. var queuePosition int = 1
  218. for _, log := range allLogs {
  219. if ts.needsRecovery(log) {
  220. incompleteTasksCount++
  221. // Reset to queued status and assign queue position
  222. if err := ts.recoverTask(ctx, log.Path, queuePosition); err != nil {
  223. logger.Errorf("Failed to recover task for %s: %v", log.Path, err)
  224. } else {
  225. queuePosition++
  226. }
  227. }
  228. }
  229. if incompleteTasksCount > 0 {
  230. logger.Debugf("Recovered %d incomplete indexing tasks", incompleteTasksCount)
  231. } else {
  232. logger.Debug("No incomplete indexing tasks found")
  233. }
  234. return nil
  235. }
  236. // needsRecovery determines if a log file has an incomplete indexing task that needs recovery
  237. func (ts *TaskScheduler) needsRecovery(log *NginxLogWithIndex) bool {
  238. // Check for incomplete states that indicate interrupted operations
  239. switch log.IndexStatus {
  240. case string(indexer.IndexStatusIndexing):
  241. // Task was in progress during last shutdown
  242. logger.Debugf("Found incomplete indexing task: %s", log.Path)
  243. return true
  244. case string(indexer.IndexStatusQueued):
  245. // Task was queued but may not have started
  246. logger.Debugf("Found queued indexing task: %s", log.Path)
  247. return true
  248. case string(indexer.IndexStatusError):
  249. // Check if error is recent (within last hour before restart)
  250. if log.LastIndexed > 0 {
  251. lastIndexTime := time.Unix(log.LastIndexed, 0)
  252. if time.Since(lastIndexTime) < time.Hour {
  253. logger.Debugf("Found recent error task for retry: %s", log.Path)
  254. return true
  255. }
  256. }
  257. }
  258. return false
  259. }
  260. // recoverTask recovers a single indexing task
  261. func (ts *TaskScheduler) recoverTask(ctx context.Context, logPath string, queuePosition int) error {
  262. // Check if task is already in progress
  263. if ts.IsTaskInProgress(logPath) {
  264. logger.Debugf("Skipping recovery for %s - task already in progress", logPath)
  265. return nil
  266. }
  267. logger.Debugf("Recovering indexing task for: %s (queue position: %d)", logPath, queuePosition)
  268. // Set status to queued with queue position
  269. if err := ts.setTaskStatus(logPath, string(indexer.IndexStatusQueued), queuePosition); err != nil {
  270. return err
  271. }
  272. // Add a small delay to stagger recovery tasks
  273. time.Sleep(time.Second * 2)
  274. // Create recovery progress config
  275. progressConfig := ts.createProgressConfig()
  276. // Schedule the task
  277. return ts.ScheduleIndexTask(ctx, logPath, progressConfig)
  278. }
  279. // createProgressConfig creates a standard progress configuration
  280. func (ts *TaskScheduler) createProgressConfig() *indexer.ProgressConfig {
  281. return &indexer.ProgressConfig{
  282. NotifyInterval: 1 * time.Second,
  283. OnProgress: func(progress indexer.ProgressNotification) {
  284. // Send progress event to frontend
  285. event.Publish(event.Event{
  286. Type: event.TypeNginxLogIndexProgress,
  287. Data: event.NginxLogIndexProgressData{
  288. LogPath: progress.LogGroupPath,
  289. Progress: progress.Percentage,
  290. Stage: "indexing",
  291. Status: "running",
  292. ElapsedTime: progress.ElapsedTime.Milliseconds(),
  293. EstimatedRemain: progress.EstimatedRemain.Milliseconds(),
  294. },
  295. })
  296. logger.Debugf("Indexing progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
  297. progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
  298. progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
  299. },
  300. OnCompletion: func(completion indexer.CompletionNotification) {
  301. // Send completion event to frontend
  302. event.Publish(event.Event{
  303. Type: event.TypeNginxLogIndexComplete,
  304. Data: event.NginxLogIndexCompleteData{
  305. LogPath: completion.LogGroupPath,
  306. Success: completion.Success,
  307. Duration: int64(completion.Duration.Milliseconds()),
  308. TotalLines: completion.TotalLines,
  309. IndexedSize: completion.IndexedSize,
  310. Error: completion.Error,
  311. },
  312. })
  313. logger.Debugf("Indexing completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
  314. completion.LogGroupPath, completion.Success, completion.Duration,
  315. completion.TotalLines, completion.IndexedSize)
  316. // Send index ready event if indexing was successful
  317. if completion.Success {
  318. event.Publish(event.Event{
  319. Type: event.TypeNginxLogIndexReady,
  320. Data: event.NginxLogIndexReadyData{
  321. LogPath: completion.LogGroupPath,
  322. StartTime: time.Now().Unix(),
  323. EndTime: time.Now().Unix(),
  324. Available: true,
  325. IndexStatus: "ready",
  326. },
  327. })
  328. }
  329. },
  330. }
  331. }
  332. // setTaskStatus updates the task status in the database
  333. func (ts *TaskScheduler) setTaskStatus(logPath, status string, queuePosition int) error {
  334. // Get persistence manager
  335. persistence := ts.logFileManager.GetPersistence()
  336. if persistence == nil {
  337. return fmt.Errorf("persistence manager not available")
  338. }
  339. // Use enhanced SetIndexStatus method
  340. return persistence.SetIndexStatus(logPath, status, queuePosition, "")
  341. }
  342. // Shutdown gracefully stops all tasks
  343. func (ts *TaskScheduler) Shutdown() {
  344. logger.Debug("Shutting down task scheduler...")
  345. // Cancel all active tasks
  346. ts.cancel()
  347. // Wait for all tasks to complete with timeout
  348. done := make(chan struct{})
  349. go func() {
  350. ts.wg.Wait()
  351. close(done)
  352. }()
  353. select {
  354. case <-done:
  355. logger.Debug("All tasks completed successfully")
  356. case <-time.After(30 * time.Second):
  357. logger.Warn("Timeout waiting for tasks to complete")
  358. }
  359. logger.Debug("Task scheduler shutdown completed")
  360. }