package indexer import ( "bytes" "compress/gzip" "context" "errors" "io" "os" "path/filepath" "strings" "sync" "time" ) // ProgressTracker manages progress tracking for indexing operations type ProgressTracker struct { mu sync.RWMutex logGroupPath string startTime time.Time files map[string]*FileProgress totalEstimate int64 // Total estimated lines across all files totalActual int64 // Total actual lines processed isCompleted bool completionNotified bool // Flag to prevent duplicate completion notifications lastNotify time.Time notifyInterval time.Duration // Configurable notification interval // Callback functions for notifications onProgress func(ProgressNotification) onCompletion func(CompletionNotification) } // FileProgress tracks progress for individual files type FileProgress struct { FilePath string `json:"file_path"` State FileState `json:"state"` EstimatedLines int64 `json:"estimated_lines"` // Estimated total lines in this file ProcessedLines int64 `json:"processed_lines"` // Actually processed lines FileSize int64 `json:"file_size"` // Total file size in bytes CurrentPos int64 `json:"current_pos"` // Current reading position in bytes AvgLineSize int64 `json:"avg_line_size"` // Dynamic average line size in bytes SampleCount int64 `json:"sample_count"` // Number of lines sampled for average calculation IsCompressed bool `json:"is_compressed"` StartTime time.Time `json:"start_time"` CompletedTime time.Time `json:"completed_time"` ErrorMsg string `json:"error_msg,omitempty"` // Error message if processing failed } // FileState represents the current state of file processing type FileState int const ( FileStatePending FileState = iota FileStateProcessing FileStateCompleted FileStateFailed ) func (fs FileState) String() string { switch fs { case FileStatePending: return "pending" case FileStateProcessing: return "processing" case FileStateCompleted: return "completed" case FileStateFailed: return "failed" default: return "unknown" } } // ProgressNotification contains progress update information type ProgressNotification struct { LogGroupPath string `json:"log_group_path"` Percentage float64 `json:"percentage"` TotalFiles int `json:"total_files"` CompletedFiles int `json:"completed_files"` ProcessingFiles int `json:"processing_files"` FailedFiles int `json:"failed_files"` ProcessedLines int64 `json:"processed_lines"` EstimatedLines int64 `json:"estimated_lines"` ElapsedTime time.Duration `json:"elapsed_time"` EstimatedRemain time.Duration `json:"estimated_remain"` IsCompleted bool `json:"is_completed"` } // CompletionNotification contains completion information type CompletionNotification struct { LogGroupPath string `json:"log_group_path"` Success bool `json:"success"` Duration time.Duration `json:"duration"` TotalLines int64 `json:"total_lines"` TotalFiles int `json:"total_files"` FailedFiles int `json:"failed_files"` IndexedSize int64 `json:"indexed_size"` Error string `json:"error,omitempty"` } // ProgressConfig contains configuration for progress tracking type ProgressConfig struct { NotifyInterval time.Duration // Minimum time between progress notifications OnProgress func(ProgressNotification) OnCompletion func(CompletionNotification) } // NewProgressTracker creates a new progress tracker for indexing operations func NewProgressTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker { pt := &ProgressTracker{ logGroupPath: logGroupPath, startTime: time.Now(), files: make(map[string]*FileProgress), completionNotified: false, } if config != nil { if config.NotifyInterval == 0 { config.NotifyInterval = 1 * time.Second // Default notify interval } pt.notifyInterval = config.NotifyInterval pt.onProgress = config.OnProgress pt.onCompletion = config.OnCompletion } else { pt.notifyInterval = 1 * time.Second // Default when no config provided } return pt } // AddFile adds a file to the progress tracker func (pt *ProgressTracker) AddFile(filePath string, isCompressed bool) { pt.mu.Lock() defer pt.mu.Unlock() pt.files[filePath] = &FileProgress{ FilePath: filePath, State: FileStatePending, IsCompressed: isCompressed, AvgLineSize: 120, // Initial estimate: 120 bytes per line SampleCount: 0, } } // SetFileEstimate sets the estimated line count for a file func (pt *ProgressTracker) SetFileEstimate(filePath string, estimatedLines int64) { pt.mu.Lock() defer pt.mu.Unlock() if progress, exists := pt.files[filePath]; exists { oldEstimate := progress.EstimatedLines progress.EstimatedLines = estimatedLines // Update total estimate pt.totalEstimate = pt.totalEstimate - oldEstimate + estimatedLines } } // SetFileSize sets the file size for a file func (pt *ProgressTracker) SetFileSize(filePath string, fileSize int64) { pt.mu.Lock() defer pt.mu.Unlock() if progress, exists := pt.files[filePath]; exists { progress.FileSize = fileSize } } // UpdateFileProgress updates the processed line count and position for a file func (pt *ProgressTracker) UpdateFileProgress(filePath string, processedLines int64, currentPos ...int64) { pt.mu.Lock() defer pt.mu.Unlock() if progress, exists := pt.files[filePath]; exists { // Update total actual processed, ensuring not to double-count on completion if progress.State != FileStateCompleted { pt.totalActual = pt.totalActual - progress.ProcessedLines + processedLines } progress.ProcessedLines = processedLines // Update position if provided if len(currentPos) > 0 && !progress.IsCompressed { progress.CurrentPos = currentPos[0] } // Update average line size for compressed files if progress.IsCompressed && processedLines > 0 && currentPos != nil && len(currentPos) > 0 { progress.SampleCount++ if progress.SampleCount > 0 { progress.AvgLineSize = currentPos[0] / processedLines } } // Notify progress if enough time has passed pt.notifyProgressLocked() } } // StartFile marks a file as started processing func (pt *ProgressTracker) StartFile(filePath string) { pt.mu.Lock() defer pt.mu.Unlock() if progress, exists := pt.files[filePath]; exists { progress.State = FileStateProcessing progress.StartTime = time.Now() pt.notifyProgressLocked() } } // CompleteFile marks a file as completed successfully func (pt *ProgressTracker) CompleteFile(filePath string, finalProcessedLines int64) { pt.mu.Lock() defer pt.mu.Unlock() if progress, exists := pt.files[filePath]; exists { // Prevent marking as completed multiple times if progress.State == FileStateCompleted || progress.State == FileStateFailed { return } // Ensure final processed lines are correctly accounted for in total pt.totalActual = pt.totalActual - progress.ProcessedLines + finalProcessedLines progress.ProcessedLines = finalProcessedLines progress.State = FileStateCompleted progress.CompletedTime = time.Now() pt.checkCompletionLocked() } } // FailFile marks a file as failed processing func (pt *ProgressTracker) FailFile(filePath string, errorMsg string) { pt.mu.Lock() defer pt.mu.Unlock() if progress, exists := pt.files[filePath]; exists { // Prevent marking as failed multiple times if progress.State == FileStateCompleted || progress.State == FileStateFailed { return } progress.State = FileStateFailed progress.ErrorMsg = errorMsg progress.CompletedTime = time.Now() pt.checkCompletionLocked() } } // GetProgress returns the current progress percentage and stats func (pt *ProgressTracker) GetProgress() ProgressNotification { pt.mu.RLock() defer pt.mu.RUnlock() return pt.getProgressLocked() } // GetFileProgress returns progress for a specific file func (pt *ProgressTracker) GetFileProgress(filePath string) (*FileProgress, bool) { pt.mu.RLock() defer pt.mu.RUnlock() progress, exists := pt.files[filePath] if !exists { return nil, false } // Return a copy to avoid race conditions copy := *progress return ©, true } // GetAllFiles returns all file progress information func (pt *ProgressTracker) GetAllFiles() map[string]*FileProgress { pt.mu.RLock() defer pt.mu.RUnlock() result := make(map[string]*FileProgress) for path, progress := range pt.files { copy := *progress result[path] = © } return result } // IsCompleted returns whether all files have been processed func (pt *ProgressTracker) IsCompleted() bool { pt.mu.RLock() defer pt.mu.RUnlock() return pt.isCompleted } // Cancel marks the tracker as cancelled and stops processing func (pt *ProgressTracker) Cancel(reason string) { pt.mu.Lock() defer pt.mu.Unlock() for _, progress := range pt.files { if progress.State == FileStateProcessing || progress.State == FileStatePending { progress.State = FileStateFailed progress.ErrorMsg = "cancelled: " + reason progress.CompletedTime = time.Now() } } pt.isCompleted = true pt.notifyCompletionLocked() } // checkCompletionLocked checks if all files are completed and notifies if so func (pt *ProgressTracker) checkCompletionLocked() { if pt.completionNotified { return } allCompleted := true for _, fp := range pt.files { if fp.State != FileStateCompleted && fp.State != FileStateFailed { allCompleted = false break } } if allCompleted { pt.isCompleted = true pt.completionNotified = true pt.notifyCompletionLocked() } else { pt.notifyProgressLocked() } } // notifyProgressLocked sends progress notification (must be called with lock held) func (pt *ProgressTracker) notifyProgressLocked() { // Throttle notifications to avoid spam using configurable interval now := time.Now() if now.Sub(pt.lastNotify) < pt.notifyInterval { return } pt.lastNotify = now if pt.onProgress != nil { notification := pt.getProgressLocked() go pt.onProgress(notification) // Non-blocking notification } } // notifyCompletionLocked sends completion notification (must be called with lock held) func (pt *ProgressTracker) notifyCompletionLocked() { if pt.onCompletion == nil { return } elapsed := time.Since(pt.startTime) // Calculate total size processed var totalSize int64 var failedFiles int for _, fp := range pt.files { if fp.State == FileStateFailed { failedFiles++ continue } if fp.IsCompressed { // For compressed files, use dynamic average line size totalSize += fp.ProcessedLines * fp.AvgLineSize } else { // For uncompressed files, use actual bytes processed if available if fp.CurrentPos > 0 { totalSize += fp.CurrentPos } else { // Fallback to line-based estimation totalSize += fp.ProcessedLines * 150 } } } notification := CompletionNotification{ LogGroupPath: pt.logGroupPath, Success: failedFiles == 0, Duration: elapsed, TotalLines: pt.totalActual, TotalFiles: len(pt.files), FailedFiles: failedFiles, IndexedSize: totalSize, } if failedFiles > 0 { notification.Error = "some files failed to process" } go pt.onCompletion(notification) // Non-blocking notification } // getProgressLocked returns progress without notification (must be called with lock held) func (pt *ProgressTracker) getProgressLocked() ProgressNotification { var completedFiles, processingFiles, failedFiles int // Count files by state for _, fp := range pt.files { switch fp.State { case FileStateCompleted: completedFiles++ case FileStateProcessing: processingFiles++ case FileStateFailed: failedFiles++ } } // Calculate progress percentage using hybrid approach var percentage float64 // Calculate weighted progress combining file count, file sizes, and line estimates var totalFileWeight, processedFileWeight float64 var totalSizeWeight, processedSizeWeight float64 var totalLineWeight, processedLineWeight float64 // Collect metrics for hybrid calculation for _, fp := range pt.files { fileWeight := 1.0 // Each file contributes equally to file-based progress sizeWeight := float64(fp.FileSize) lineWeight := float64(fp.EstimatedLines) totalFileWeight += fileWeight totalSizeWeight += sizeWeight if lineWeight > 0 { totalLineWeight += lineWeight } if fp.State == FileStateCompleted { processedFileWeight += fileWeight processedSizeWeight += sizeWeight if lineWeight > 0 { processedLineWeight += float64(fp.ProcessedLines) } } else if fp.State == FileStateProcessing && fp.ProcessedLines > 0 { // For processing files, add partial progress if lineWeight > 0 { processedLineWeight += float64(fp.ProcessedLines) // For size-based progress, estimate based on lines processed if fp.EstimatedLines > 0 { sizeProgress := float64(fp.ProcessedLines) / float64(fp.EstimatedLines) processedSizeWeight += sizeWeight * sizeProgress } } else if fp.FileSize > 0 && fp.CurrentPos > 0 { // Use position-based progress for files without line estimates sizeProgress := float64(fp.CurrentPos) / float64(fp.FileSize) processedSizeWeight += sizeWeight * sizeProgress processedFileWeight += fileWeight * sizeProgress } } } // Calculate progress using the most reliable metric available if totalLineWeight > 0 && processedLineWeight > 0 { // Dynamic line estimation: adjust line estimates based on completed files dynamicLineWeight := totalLineWeight if completedFiles >= 2 { // Calculate average lines per completed file var totalLinesFromCompleted int64 for _, fp := range pt.files { if fp.State == FileStateCompleted { totalLinesFromCompleted += fp.ProcessedLines } } if completedFiles > 0 { avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles) // Adjust remaining file estimates based on observed average remainingFiles := len(pt.files) - completedFiles if remainingFiles > 0 { // Calculate current estimate for remaining files var remainingEstimate int64 for _, fp := range pt.files { if fp.State != FileStateCompleted { remainingEstimate += fp.EstimatedLines } } // Replace remaining estimates with dynamic estimate dynamicRemainingEstimate := int64(avgLinesPerFile * float64(remainingFiles)) dynamicLineWeight = float64(int64(totalLinesFromCompleted) + dynamicRemainingEstimate) } } } // Use line-based progress with dynamic estimation linePercentage := (processedLineWeight / float64(dynamicLineWeight)) * 100 // Use file-based progress as primary metric for reliability filePercentage := (processedFileWeight / totalFileWeight) * 100 // Weight them: 40% lines, 60% files for better reliability // Files are more predictable than line estimates for compressed files percentage = (linePercentage * 0.4) + (filePercentage * 0.6) // Additional safety: never exceed file-based progress by more than 10% // This prevents premature 100% when file-based progress is more reliable maxAllowedPercentage := filePercentage + 10.0 if percentage > maxAllowedPercentage { percentage = maxAllowedPercentage } } else if totalSizeWeight > 0 { // Fallback to size-based progress percentage = (processedSizeWeight / totalSizeWeight) * 100 } else if totalFileWeight > 0 { // Fallback to file-based progress only percentage = (processedFileWeight / totalFileWeight) * 100 } // Cap at 100% if percentage > 100 { percentage = 100 } elapsed := time.Since(pt.startTime) var estimatedRemain time.Duration if percentage > 0 && percentage < 100 { avgTimePerPercent := float64(elapsed.Nanoseconds()) / percentage remainingPercent := 100.0 - percentage estimatedRemain = time.Duration(int64(avgTimePerPercent * remainingPercent)) } // Use dynamic line estimation for reporting adjustedEstimate := pt.totalEstimate if completedFiles >= 2 { // Calculate average lines per completed file var totalLinesFromCompleted int64 for _, fp := range pt.files { if fp.State == FileStateCompleted { totalLinesFromCompleted += fp.ProcessedLines } } if completedFiles > 0 { avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles) // Project total lines based on completed files projectedTotal := int64(avgLinesPerFile * float64(len(pt.files))) // Use dynamic estimate with some constraints to prevent extreme changes if completedFiles >= len(pt.files)/5 { // At least 20% of files processed // More confidence, allow larger adjustments adjustedEstimate = projectedTotal } else if projectedTotal > adjustedEstimate { // Less confidence, be more conservative but still adjust upward maxIncrease := adjustedEstimate + (projectedTotal-adjustedEstimate)/2 adjustedEstimate = maxIncrease } } } return ProgressNotification{ LogGroupPath: pt.logGroupPath, Percentage: percentage, TotalFiles: len(pt.files), CompletedFiles: completedFiles, ProcessingFiles: processingFiles, FailedFiles: failedFiles, ProcessedLines: pt.totalActual, EstimatedLines: adjustedEstimate, ElapsedTime: elapsed, EstimatedRemain: estimatedRemain, IsCompleted: pt.isCompleted, } } // EstimateFileLines estimates the number of lines in a file based on sampling func EstimateFileLines(ctx context.Context, filePath string, fileSize int64, isCompressed bool) (int64, error) { if fileSize == 0 { return 0, nil } file, err := os.Open(filePath) if err != nil { // Return fallback estimate instead of error return fileSize / 150, nil // Fallback: ~150 bytes per line } defer file.Close() var reader io.Reader = file // Handle compressed files if isCompressed { gzReader, err := gzip.NewReader(file) if err != nil { return (fileSize * 3) / 150, nil // Fallback for compressed: 3:1 ratio } defer gzReader.Close() reader = gzReader } // Sample the first 1MB of the file content (decompressed if necessary) sampleSize := int64(1024 * 1024) buf := make([]byte, sampleSize) // Check for context cancellation select { case <-ctx.Done(): return 0, ctx.Err() default: } bytesRead, err := io.ReadFull(reader, buf) if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) { return fileSize / 150, nil // Fallback on read error } if bytesRead == 0 { return 0, nil // Empty file } // Count lines in the sample lineCount := bytes.Count(buf[:bytesRead], []byte{'\n'}) if lineCount == 0 { // Avoid division by zero, fallback to rough estimate return fileSize / 150, nil } // Calculate average line size from the sample avgLineSize := float64(bytesRead) / float64(lineCount) if avgLineSize == 0 { return fileSize / 150, nil // Fallback } // Estimate total lines var estimatedLines int64 if isCompressed { // For compressed files, use a compression ratio estimate estimatedUncompressedSize := fileSize * 5 // Conservative 5:1 compression ratio estimatedLines = int64(float64(estimatedUncompressedSize) / avgLineSize) } else { estimatedLines = int64(float64(fileSize) / avgLineSize) } return estimatedLines, nil } // IsCompressedFile determines if a file is compressed based on its extension func IsCompressedFile(filePath string) bool { ext := strings.ToLower(filepath.Ext(filePath)) return ext == ".gz" || ext == ".bz2" || ext == ".xz" || ext == ".lz4" } // IsRotationLogFile determines if a file is a rotation log file func IsRotationLogFile(filePath string) bool { base := filepath.Base(filePath) // Common nginx rotation patterns: // access.log, access.log.1, access.log.2.gz // access.1.log, access.2.log.gz // error.log, error.log.1, error.log.2.gz // Remove compression extensions first if IsCompressedFile(base) { base = strings.TrimSuffix(base, filepath.Ext(base)) } // Check for basic .log files if strings.HasSuffix(base, ".log") { return true } // Check for numbered rotation patterns: access.log.1, error.log.10, etc. parts := strings.Split(base, ".") if len(parts) >= 3 { // Pattern: name.log.number (e.g., access.log.1) if parts[len(parts)-2] == "log" && isNumeric(parts[len(parts)-1]) { return true } // Pattern: name.number.log (e.g., access.1.log) if parts[len(parts)-1] == "log" { for i := 1; i < len(parts)-1; i++ { if isNumeric(parts[i]) { return true } } } } return false } // isNumeric checks if a string represents a number func isNumeric(s string) bool { if len(s) == 0 { return false } for _, r := range s { if r < '0' || r > '9' { return false } } return true } // AddRotationFiles automatically detects and adds rotation log files with appropriate compression detection func (pt *ProgressTracker) AddRotationFiles(filePaths ...string) { for _, filePath := range filePaths { isCompressed := IsCompressedFile(filePath) pt.AddFile(filePath, isCompressed) } } // ProgressManager manages multiple progress trackers type ProgressManager struct { mu sync.RWMutex trackers map[string]*ProgressTracker } // NewProgressManager creates a new progress manager func NewProgressManager() *ProgressManager { return &ProgressManager{ trackers: make(map[string]*ProgressTracker), } } // GetTracker gets or creates a progress tracker for a log group func (pm *ProgressManager) GetTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker { pm.mu.Lock() defer pm.mu.Unlock() if tracker, exists := pm.trackers[logGroupPath]; exists { return tracker } tracker := NewProgressTracker(logGroupPath, config) pm.trackers[logGroupPath] = tracker return tracker } // RemoveTracker removes a progress tracker func (pm *ProgressManager) RemoveTracker(logGroupPath string) { pm.mu.Lock() defer pm.mu.Unlock() delete(pm.trackers, logGroupPath) } // GetAllTrackers returns all current trackers func (pm *ProgressManager) GetAllTrackers() map[string]*ProgressTracker { pm.mu.RLock() defer pm.mu.RUnlock() result := make(map[string]*ProgressTracker) for path, tracker := range pm.trackers { result[path] = tracker } return result } // Cleanup removes completed or failed trackers func (pm *ProgressManager) Cleanup() { pm.mu.Lock() defer pm.mu.Unlock() for path, tracker := range pm.trackers { if tracker.IsCompleted() { delete(pm.trackers, path) } } }