log_indexer_tasks.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package nginx_log
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/uozi-tech/cosy/logger"
  7. )
  8. // debounceIndexTask implements file-level debouncing for index operations
  9. func (li *LogIndexer) debounceIndexTask(task *IndexTask) {
  10. // First, check if the context is already cancelled
  11. select {
  12. case <-li.ctx.Done():
  13. logger.Debugf("Debounce check cancelled for %s as context is done.", task.FilePath)
  14. if task.Wg != nil {
  15. task.Wg.Done()
  16. }
  17. return
  18. default:
  19. }
  20. filePath := task.FilePath
  21. // Check if we need to respect the minimum interval
  22. if lastTime, exists := li.lastIndexTime.Load(filePath); exists {
  23. if lastIndexTime, ok := lastTime.(time.Time); ok {
  24. timeSinceLastIndex := time.Since(lastIndexTime)
  25. if timeSinceLastIndex < MinIndexInterval {
  26. // Calculate remaining wait time
  27. remainingWait := MinIndexInterval - timeSinceLastIndex
  28. // Cancel any existing timer for this file
  29. if timerInterface, exists := li.debounceTimers.Load(filePath); exists {
  30. if timer, ok := timerInterface.(*time.Timer); ok {
  31. timer.Stop()
  32. }
  33. }
  34. // Set new timer
  35. timer := time.AfterFunc(remainingWait, func() {
  36. // Clean up timer
  37. li.debounceTimers.Delete(filePath)
  38. // Execute the actual indexing
  39. li.executeIndexTask(task)
  40. })
  41. li.debounceTimers.Store(filePath, timer)
  42. return
  43. }
  44. }
  45. }
  46. // No debouncing needed, execute immediately
  47. li.executeIndexTask(task)
  48. }
  49. // executeIndexTask executes the actual indexing task and updates last index time
  50. func (li *LogIndexer) executeIndexTask(task *IndexTask) {
  51. // Update last index time before processing
  52. li.lastIndexTime.Store(task.FilePath, time.Now())
  53. // Check if context is still valid
  54. select {
  55. case <-li.ctx.Done():
  56. logger.Warnf("Index task cancelled for file: %s", task.FilePath)
  57. return
  58. default:
  59. }
  60. // Queue the task for processing
  61. select {
  62. case li.indexQueue <- task:
  63. // Task queued successfully (no debug log to avoid spam)
  64. default:
  65. logger.Warnf("Index queue is full, dropping task for file: %s", task.FilePath)
  66. // If there's a WaitGroup, we must decrement it to avoid deadlock
  67. if task.Wg != nil {
  68. task.Wg.Done()
  69. }
  70. }
  71. }
  72. // processIndexTask processes a single indexing task with file locking
  73. func (li *LogIndexer) processIndexTask(task *IndexTask) {
  74. // Ensure WaitGroup is handled correctly
  75. if task.Wg != nil {
  76. defer task.Wg.Done()
  77. }
  78. // Get or create a mutex for this file
  79. mutexInterface, _ := li.indexingLock.LoadOrStore(task.FilePath, &sync.Mutex{})
  80. fileMutex := mutexInterface.(*sync.Mutex)
  81. // Lock the file for indexing
  82. fileMutex.Lock()
  83. defer fileMutex.Unlock()
  84. logger.Infof("Processing index task for file: %s (priority: %d, full_reindex: %v)", task.FilePath, task.Priority, task.FullReindex)
  85. // Create a context with timeout for this task
  86. ctx, cancel := context.WithTimeout(li.ctx, 10*time.Minute)
  87. defer cancel()
  88. // Check if context is still valid
  89. select {
  90. case <-ctx.Done():
  91. logger.Warnf("Index task cancelled for file: %s", task.FilePath)
  92. return
  93. default:
  94. }
  95. // Perform the actual indexing
  96. if err := li.IndexLogFileWithMode(task.FilePath, task.FullReindex); err != nil {
  97. logger.Errorf("Failed to index file %s: %v", task.FilePath, err)
  98. } else {
  99. logger.Infof("Successfully indexed file: %s", task.FilePath)
  100. // Note: Log group notifications are handled centrally after all files complete
  101. }
  102. }