log_indexer_tasks.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package nginx_log
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/uozi-tech/cosy/logger"
  7. )
  8. // debounceIndexTask implements file-level debouncing for index operations
  9. func (li *LogIndexer) debounceIndexTask(task *IndexTask) {
  10. filePath := task.FilePath
  11. // Check if we need to respect the minimum interval
  12. if lastTime, exists := li.lastIndexTime.Load(filePath); exists {
  13. if lastIndexTime, ok := lastTime.(time.Time); ok {
  14. timeSinceLastIndex := time.Since(lastIndexTime)
  15. if timeSinceLastIndex < MinIndexInterval {
  16. // Calculate remaining wait time
  17. remainingWait := MinIndexInterval - timeSinceLastIndex
  18. // Cancel any existing timer for this file
  19. if timerInterface, exists := li.debounceTimers.Load(filePath); exists {
  20. if timer, ok := timerInterface.(*time.Timer); ok {
  21. timer.Stop()
  22. }
  23. }
  24. // Set new timer
  25. timer := time.AfterFunc(remainingWait, func() {
  26. // Clean up timer
  27. li.debounceTimers.Delete(filePath)
  28. // Execute the actual indexing
  29. li.executeIndexTask(task)
  30. })
  31. li.debounceTimers.Store(filePath, timer)
  32. return
  33. }
  34. }
  35. }
  36. // No debouncing needed, execute immediately
  37. li.executeIndexTask(task)
  38. }
  39. // executeIndexTask executes the actual indexing task and updates last index time
  40. func (li *LogIndexer) executeIndexTask(task *IndexTask) {
  41. // Update last index time before processing
  42. li.lastIndexTime.Store(task.FilePath, time.Now())
  43. // Queue the task for processing
  44. select {
  45. case li.indexQueue <- task:
  46. // Task queued successfully (no debug log to avoid spam)
  47. default:
  48. logger.Warnf("Index queue is full, dropping task for file: %s", task.FilePath)
  49. // If there's a WaitGroup, we must decrement it to avoid deadlock
  50. if task.Wg != nil {
  51. task.Wg.Done()
  52. }
  53. }
  54. }
  55. // processIndexTask processes a single indexing task with file locking
  56. func (li *LogIndexer) processIndexTask(task *IndexTask) {
  57. // Ensure WaitGroup is handled correctly
  58. if task.Wg != nil {
  59. defer task.Wg.Done()
  60. }
  61. // Get or create a mutex for this file
  62. mutexInterface, _ := li.indexingLock.LoadOrStore(task.FilePath, &sync.Mutex{})
  63. fileMutex := mutexInterface.(*sync.Mutex)
  64. // Lock the file for indexing
  65. fileMutex.Lock()
  66. defer fileMutex.Unlock()
  67. logger.Infof("Processing index task for file: %s (priority: %d, full_reindex: %v)", task.FilePath, task.Priority, task.FullReindex)
  68. // Create a context with timeout for this task
  69. ctx, cancel := context.WithTimeout(li.ctx, 10*time.Minute)
  70. defer cancel()
  71. // Check if context is still valid
  72. select {
  73. case <-ctx.Done():
  74. logger.Warnf("Index task cancelled for file: %s", task.FilePath)
  75. return
  76. default:
  77. }
  78. // Perform the actual indexing
  79. if err := li.IndexLogFileWithMode(task.FilePath, task.FullReindex); err != nil {
  80. logger.Errorf("Failed to index file %s: %v", task.FilePath, err)
  81. } else {
  82. logger.Infof("Successfully indexed file: %s", task.FilePath)
  83. // Note: Log group notifications are handled centrally after all files complete
  84. }
  85. }