|
|
@@ -40,7 +40,7 @@ func (tr *TaskRecovery) RecoverUnfinishedTasks(ctx context.Context) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- logger.Info("Starting recovery of unfinished indexing tasks")
|
|
|
+ logger.Debug("Starting recovery of unfinished indexing tasks")
|
|
|
|
|
|
// Get all logs with their index status
|
|
|
allLogs := GetAllLogsWithIndexGrouped(func(log *NginxLogWithIndex) bool {
|
|
|
@@ -54,7 +54,7 @@ func (tr *TaskRecovery) RecoverUnfinishedTasks(ctx context.Context) error {
|
|
|
for _, log := range allLogs {
|
|
|
if tr.needsRecovery(log) {
|
|
|
incompleteTasksCount++
|
|
|
-
|
|
|
+
|
|
|
// Reset to queued status and assign queue position
|
|
|
if err := tr.recoverTask(ctx, log.Path, queuePosition); err != nil {
|
|
|
logger.Errorf("Failed to recover task for %s: %v", log.Path, err)
|
|
|
@@ -65,9 +65,9 @@ func (tr *TaskRecovery) RecoverUnfinishedTasks(ctx context.Context) error {
|
|
|
}
|
|
|
|
|
|
if incompleteTasksCount > 0 {
|
|
|
- logger.Infof("Recovered %d incomplete indexing tasks", incompleteTasksCount)
|
|
|
+ logger.Debugf("Recovered %d incomplete indexing tasks", incompleteTasksCount)
|
|
|
} else {
|
|
|
- logger.Info("No incomplete indexing tasks found")
|
|
|
+ logger.Debug("No incomplete indexing tasks found")
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
@@ -81,12 +81,12 @@ func (tr *TaskRecovery) needsRecovery(log *NginxLogWithIndex) bool {
|
|
|
// Task was in progress during last shutdown
|
|
|
logger.Debugf("Found incomplete indexing task: %s", log.Path)
|
|
|
return true
|
|
|
-
|
|
|
+
|
|
|
case string(indexer.IndexStatusQueued):
|
|
|
// Task was queued but may not have started
|
|
|
logger.Debugf("Found queued indexing task: %s", log.Path)
|
|
|
return true
|
|
|
-
|
|
|
+
|
|
|
case string(indexer.IndexStatusError):
|
|
|
// Check if error is recent (within last hour before restart)
|
|
|
if log.LastIndexed > 0 {
|
|
|
@@ -97,76 +97,76 @@ func (tr *TaskRecovery) needsRecovery(log *NginxLogWithIndex) bool {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
// recoverTask recovers a single indexing task
|
|
|
func (tr *TaskRecovery) recoverTask(ctx context.Context, logPath string, queuePosition int) error {
|
|
|
- logger.Infof("Recovering indexing task for: %s (queue position: %d)", logPath, queuePosition)
|
|
|
-
|
|
|
+ logger.Debugf("Recovering indexing task for: %s (queue position: %d)", logPath, queuePosition)
|
|
|
+
|
|
|
// Set status to queued with queue position
|
|
|
if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusQueued), queuePosition); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Queue the recovery task asynchronously with proper context and WaitGroup
|
|
|
tr.wg.Add(1)
|
|
|
go tr.executeRecoveredTask(tr.ctx, logPath)
|
|
|
-
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// executeRecoveredTask executes a recovered indexing task with proper global state and progress tracking
|
|
|
func (tr *TaskRecovery) executeRecoveredTask(ctx context.Context, logPath string) {
|
|
|
defer tr.wg.Done() // Always decrement WaitGroup
|
|
|
-
|
|
|
+
|
|
|
// Check context before starting
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
- logger.Infof("Context cancelled, skipping recovery task for %s", logPath)
|
|
|
+ logger.Debugf("Context cancelled, skipping recovery task for %s", logPath)
|
|
|
return
|
|
|
default:
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Add a small delay to stagger recovery tasks, but check context
|
|
|
select {
|
|
|
case <-time.After(time.Second * 2):
|
|
|
case <-ctx.Done():
|
|
|
- logger.Infof("Context cancelled during delay, skipping recovery task for %s", logPath)
|
|
|
+ logger.Debugf("Context cancelled during delay, skipping recovery task for %s", logPath)
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
- logger.Infof("Executing recovered indexing task: %s", logPath)
|
|
|
-
|
|
|
+
|
|
|
+ logger.Debugf("Executing recovered indexing task: %s", logPath)
|
|
|
+
|
|
|
// Get processing manager for global state updates
|
|
|
processingManager := event.GetProcessingStatusManager()
|
|
|
-
|
|
|
+
|
|
|
// Increment active tasks counter and set global status if this is the first task
|
|
|
isFirstTask := atomic.AddInt32(&tr.activeTasks, 1) == 1
|
|
|
if isFirstTask {
|
|
|
processingManager.UpdateNginxLogIndexing(true)
|
|
|
- logger.Info("Set global indexing status to true for recovery tasks")
|
|
|
+ logger.Debug("Set global indexing status to true for recovery tasks")
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Ensure we always decrement counter and reset global status when no tasks remain
|
|
|
defer func() {
|
|
|
remainingTasks := atomic.AddInt32(&tr.activeTasks, -1)
|
|
|
if remainingTasks == 0 {
|
|
|
processingManager.UpdateNginxLogIndexing(false)
|
|
|
- logger.Info("Set global indexing status to false - all recovery tasks completed")
|
|
|
+ logger.Debug("Set global indexing status to false - all recovery tasks completed")
|
|
|
}
|
|
|
if r := recover(); r != nil {
|
|
|
logger.Errorf("Panic during recovered task execution: %v", r)
|
|
|
}
|
|
|
}()
|
|
|
-
|
|
|
+
|
|
|
// Set status to indexing
|
|
|
if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusIndexing), 0); err != nil {
|
|
|
logger.Errorf("Failed to set indexing status for recovered task %s: %v", logPath, err)
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Create progress tracking configuration for recovery task
|
|
|
progressConfig := &indexer.ProgressConfig{
|
|
|
NotifyInterval: 1 * time.Second,
|
|
|
@@ -184,7 +184,7 @@ func (tr *TaskRecovery) executeRecoveredTask(ctx context.Context, logPath string
|
|
|
},
|
|
|
})
|
|
|
|
|
|
- logger.Infof("Recovery progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
|
|
|
+ logger.Debugf("Recovery progress: %s - %.1f%% (Files: %d/%d, Lines: %d/%d)",
|
|
|
progress.LogGroupPath, progress.Percentage, progress.CompletedFiles,
|
|
|
progress.TotalFiles, progress.ProcessedLines, progress.EstimatedLines)
|
|
|
},
|
|
|
@@ -202,10 +202,10 @@ func (tr *TaskRecovery) executeRecoveredTask(ctx context.Context, logPath string
|
|
|
},
|
|
|
})
|
|
|
|
|
|
- logger.Infof("Recovery completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
|
|
|
+ logger.Debugf("Recovery completion: %s - Success: %t, Duration: %s, Lines: %d, Size: %d bytes",
|
|
|
completion.LogGroupPath, completion.Success, completion.Duration,
|
|
|
completion.TotalLines, completion.IndexedSize)
|
|
|
-
|
|
|
+
|
|
|
// Send index ready event if recovery was successful
|
|
|
if completion.Success {
|
|
|
event.Publish(event.Event{
|
|
|
@@ -221,19 +221,19 @@ func (tr *TaskRecovery) executeRecoveredTask(ctx context.Context, logPath string
|
|
|
}
|
|
|
},
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Check context before starting indexing
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
- logger.Infof("Context cancelled before indexing, stopping recovery task for %s", logPath)
|
|
|
+ logger.Debugf("Context cancelled before indexing, stopping recovery task for %s", logPath)
|
|
|
return
|
|
|
default:
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Execute the indexing with progress tracking
|
|
|
startTime := time.Now()
|
|
|
docsCountMap, minTime, maxTime, err := tr.modernIndexer.IndexLogGroupWithProgress(logPath, progressConfig)
|
|
|
-
|
|
|
+
|
|
|
if err != nil {
|
|
|
logger.Errorf("Failed to execute recovered indexing task %s: %v", logPath, err)
|
|
|
// Set error status
|
|
|
@@ -242,28 +242,28 @@ func (tr *TaskRecovery) executeRecoveredTask(ctx context.Context, logPath string
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Calculate total documents indexed
|
|
|
var totalDocsIndexed uint64
|
|
|
for _, docCount := range docsCountMap {
|
|
|
totalDocsIndexed += docCount
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Save indexing metadata using the log file manager
|
|
|
duration := time.Since(startTime)
|
|
|
if err := tr.logFileManager.SaveIndexMetadata(logPath, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
|
|
|
logger.Errorf("Failed to save recovered index metadata for %s: %v", logPath, err)
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Set status to indexed (completed)
|
|
|
if err := tr.setTaskStatus(logPath, string(indexer.IndexStatusIndexed), 0); err != nil {
|
|
|
logger.Errorf("Failed to set completed status for recovered task %s: %v", logPath, err)
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Update searcher shards
|
|
|
UpdateSearcherShards()
|
|
|
-
|
|
|
- logger.Infof("Successfully completed recovered indexing task: %s, Documents: %d", logPath, totalDocsIndexed)
|
|
|
+
|
|
|
+ logger.Debugf("Successfully completed recovered indexing task: %s, Documents: %d", logPath, totalDocsIndexed)
|
|
|
}
|
|
|
|
|
|
// setTaskStatus updates the task status in the database using the enhanced persistence layer
|
|
|
@@ -273,33 +273,33 @@ func (tr *TaskRecovery) setTaskStatus(logPath, status string, queuePosition int)
|
|
|
if persistence == nil {
|
|
|
return fmt.Errorf("persistence manager not available")
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Use enhanced SetIndexStatus method
|
|
|
return persistence.SetIndexStatus(logPath, status, queuePosition, "")
|
|
|
}
|
|
|
|
|
|
// Shutdown gracefully stops all recovery tasks
|
|
|
func (tr *TaskRecovery) Shutdown() {
|
|
|
- logger.Info("Shutting down task recovery system...")
|
|
|
-
|
|
|
+ logger.Debug("Shutting down task recovery system...")
|
|
|
+
|
|
|
// Cancel all active tasks
|
|
|
tr.cancel()
|
|
|
-
|
|
|
+
|
|
|
// Wait for all tasks to complete with timeout
|
|
|
done := make(chan struct{})
|
|
|
go func() {
|
|
|
tr.wg.Wait()
|
|
|
close(done)
|
|
|
}()
|
|
|
-
|
|
|
+
|
|
|
select {
|
|
|
case <-done:
|
|
|
- logger.Info("All recovery tasks completed successfully")
|
|
|
+ logger.Debug("All recovery tasks completed successfully")
|
|
|
case <-time.After(30 * time.Second):
|
|
|
logger.Warn("Timeout waiting for recovery tasks to complete")
|
|
|
}
|
|
|
-
|
|
|
- logger.Info("Task recovery system shutdown completed")
|
|
|
+
|
|
|
+ logger.Debug("Task recovery system shutdown completed")
|
|
|
}
|
|
|
|
|
|
// Global task recovery manager
|
|
|
@@ -307,16 +307,16 @@ var globalTaskRecovery *TaskRecovery
|
|
|
|
|
|
// InitTaskRecovery initializes the task recovery system - called during application startup
|
|
|
func InitTaskRecovery(ctx context.Context) {
|
|
|
- logger.Info("Initializing task recovery system")
|
|
|
-
|
|
|
+ logger.Debug("Initializing task recovery system")
|
|
|
+
|
|
|
// Wait a bit for services to fully initialize
|
|
|
time.Sleep(3 * time.Second)
|
|
|
-
|
|
|
+
|
|
|
globalTaskRecovery = NewTaskRecovery(ctx)
|
|
|
if err := globalTaskRecovery.RecoverUnfinishedTasks(ctx); err != nil {
|
|
|
logger.Errorf("Failed to recover unfinished tasks: %v", err)
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Monitor context for shutdown
|
|
|
go func() {
|
|
|
<-ctx.Done()
|
|
|
@@ -324,4 +324,4 @@ func InitTaskRecovery(ctx context.Context) {
|
|
|
globalTaskRecovery.Shutdown()
|
|
|
}
|
|
|
}()
|
|
|
-}
|
|
|
+}
|