| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778 | package indexerimport (	"bytes"	"compress/gzip"	"context"	"errors"	"io"	"os"	"path/filepath"	"strings"	"sync"	"time"	"github.com/0xJacky/Nginx-UI/internal/nginx_log/utils")// ProgressTracker manages progress tracking for indexing operationstype ProgressTracker struct {	mu                 sync.RWMutex	logGroupPath       string	startTime          time.Time	files              map[string]*FileProgress	totalEstimate      int64 // Total estimated lines across all files	totalActual        int64 // Total actual lines processed	isCompleted        bool	completionNotified bool // Flag to prevent duplicate completion notifications	lastNotify         time.Time	notifyInterval     time.Duration // Configurable notification interval	// Callback functions for notifications	onProgress   func(ProgressNotification)	onCompletion func(CompletionNotification)}// FileProgress tracks progress for individual filestype FileProgress struct {	FilePath       string    `json:"file_path"`	State          FileState `json:"state"`	EstimatedLines int64     `json:"estimated_lines"` // Estimated total lines in this file	ProcessedLines int64     `json:"processed_lines"` // Actually processed lines	FileSize       int64     `json:"file_size"`       // Total file size in bytes	CurrentPos     int64     `json:"current_pos"`     // Current reading position in bytes	AvgLineSize    int64     `json:"avg_line_size"`   // Dynamic average line size in bytes	SampleCount    int64     `json:"sample_count"`    // Number of lines sampled for average calculation	IsCompressed   bool      `json:"is_compressed"`	StartTime      time.Time `json:"start_time"`	CompletedTime  time.Time `json:"completed_time"`	ErrorMsg       string    `json:"error_msg,omitempty"` // Error message if processing failed}// FileState represents the current state of file processingtype FileState intconst (	FileStatePending FileState = iota	FileStateProcessing	FileStateCompleted	FileStateFailed)func (fs FileState) String() string {	switch fs {	case FileStatePending:		return "pending"	case FileStateProcessing:		return "processing"	case FileStateCompleted:		return "completed"	case FileStateFailed:		return "failed"	default:		return "unknown"	}}// ProgressNotification contains progress update informationtype ProgressNotification struct {	LogGroupPath    string        `json:"log_group_path"`	Percentage      float64       `json:"percentage"`	TotalFiles      int           `json:"total_files"`	CompletedFiles  int           `json:"completed_files"`	ProcessingFiles int           `json:"processing_files"`	FailedFiles     int           `json:"failed_files"`	ProcessedLines  int64         `json:"processed_lines"`	EstimatedLines  int64         `json:"estimated_lines"`	ElapsedTime     time.Duration `json:"elapsed_time"`	EstimatedRemain time.Duration `json:"estimated_remain"`	IsCompleted     bool          `json:"is_completed"`}// CompletionNotification contains completion informationtype CompletionNotification struct {	LogGroupPath string        `json:"log_group_path"`	Success      bool          `json:"success"`	Duration     time.Duration `json:"duration"`	TotalLines   int64         `json:"total_lines"`	TotalFiles   int           `json:"total_files"`	FailedFiles  int           `json:"failed_files"`	IndexedSize  int64         `json:"indexed_size"`	Error        string        `json:"error,omitempty"`}// ProgressConfig contains configuration for progress trackingtype ProgressConfig struct {	NotifyInterval time.Duration // Minimum time between progress notifications	OnProgress     func(ProgressNotification)	OnCompletion   func(CompletionNotification)}// NewProgressTracker creates a new progress tracker for indexing operationsfunc NewProgressTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker {	pt := &ProgressTracker{		logGroupPath:       logGroupPath,		startTime:          time.Now(),		files:              make(map[string]*FileProgress),		completionNotified: false,	}	if config != nil {		if config.NotifyInterval == 0 {			config.NotifyInterval = 1 * time.Second // Default notify interval		}		pt.notifyInterval = config.NotifyInterval		pt.onProgress = config.OnProgress		pt.onCompletion = config.OnCompletion	} else {		pt.notifyInterval = 1 * time.Second // Default when no config provided	}	return pt}// AddFile adds a file to the progress trackerfunc (pt *ProgressTracker) AddFile(filePath string, isCompressed bool) {	pt.mu.Lock()	defer pt.mu.Unlock()	pt.files[filePath] = &FileProgress{		FilePath:     filePath,		State:        FileStatePending,		IsCompressed: isCompressed,		AvgLineSize:  120, // Initial estimate: 120 bytes per line		SampleCount:  0,	}}// SetFileEstimate sets the estimated line count for a filefunc (pt *ProgressTracker) SetFileEstimate(filePath string, estimatedLines int64) {	pt.mu.Lock()	defer pt.mu.Unlock()	if progress, exists := pt.files[filePath]; exists {		oldEstimate := progress.EstimatedLines		progress.EstimatedLines = estimatedLines		// Update total estimate		pt.totalEstimate = pt.totalEstimate - oldEstimate + estimatedLines	}}// SetFileSize sets the file size for a filefunc (pt *ProgressTracker) SetFileSize(filePath string, fileSize int64) {	pt.mu.Lock()	defer pt.mu.Unlock()	if progress, exists := pt.files[filePath]; exists {		progress.FileSize = fileSize	}}// UpdateFileProgress updates the processed line count and position for a filefunc (pt *ProgressTracker) UpdateFileProgress(filePath string, processedLines int64, currentPos ...int64) {	pt.mu.Lock()	defer pt.mu.Unlock()	if progress, exists := pt.files[filePath]; exists {		// Update total actual processed, ensuring not to double-count on completion		if progress.State != FileStateCompleted {			pt.totalActual = pt.totalActual - progress.ProcessedLines + processedLines		}		progress.ProcessedLines = processedLines		// Update position if provided		if len(currentPos) > 0 && !progress.IsCompressed {			progress.CurrentPos = currentPos[0]		}		// Update average line size for compressed files		if progress.IsCompressed && processedLines > 0 && currentPos != nil && len(currentPos) > 0 {			progress.SampleCount++			if progress.SampleCount > 0 {				progress.AvgLineSize = currentPos[0] / processedLines			}		}		// Notify progress if enough time has passed		pt.notifyProgressLocked()	}}// StartFile marks a file as started processingfunc (pt *ProgressTracker) StartFile(filePath string) {	pt.mu.Lock()	defer pt.mu.Unlock()	if progress, exists := pt.files[filePath]; exists {		progress.State = FileStateProcessing		progress.StartTime = time.Now()		pt.notifyProgressLocked()	}}// CompleteFile marks a file as completed successfullyfunc (pt *ProgressTracker) CompleteFile(filePath string, finalProcessedLines int64) {	pt.mu.Lock()	defer pt.mu.Unlock()	if progress, exists := pt.files[filePath]; exists {		// Prevent marking as completed multiple times		if progress.State == FileStateCompleted || progress.State == FileStateFailed {			return		}		// Ensure final processed lines are correctly accounted for in total		pt.totalActual = pt.totalActual - progress.ProcessedLines + finalProcessedLines		progress.ProcessedLines = finalProcessedLines		progress.State = FileStateCompleted		progress.CompletedTime = time.Now()		pt.checkCompletionLocked()	}}// FailFile marks a file as failed processingfunc (pt *ProgressTracker) FailFile(filePath string, errorMsg string) {	pt.mu.Lock()	defer pt.mu.Unlock()	if progress, exists := pt.files[filePath]; exists {		// Prevent marking as failed multiple times		if progress.State == FileStateCompleted || progress.State == FileStateFailed {			return		}		progress.State = FileStateFailed		progress.ErrorMsg = errorMsg		progress.CompletedTime = time.Now()		pt.checkCompletionLocked()	}}// GetProgress returns the current progress percentage and statsfunc (pt *ProgressTracker) GetProgress() ProgressNotification {	pt.mu.RLock()	defer pt.mu.RUnlock()	return pt.getProgressLocked()}// GetFileProgress returns progress for a specific filefunc (pt *ProgressTracker) GetFileProgress(filePath string) (*FileProgress, bool) {	pt.mu.RLock()	defer pt.mu.RUnlock()	progress, exists := pt.files[filePath]	if !exists {		return nil, false	}	// Return a copy to avoid race conditions	copy := *progress	return ©, true}// GetAllFiles returns all file progress informationfunc (pt *ProgressTracker) GetAllFiles() map[string]*FileProgress {	pt.mu.RLock()	defer pt.mu.RUnlock()	result := make(map[string]*FileProgress)	for path, progress := range pt.files {		copy := *progress		result[path] = ©	}	return result}// IsCompleted returns whether all files have been processedfunc (pt *ProgressTracker) IsCompleted() bool {	pt.mu.RLock()	defer pt.mu.RUnlock()	return pt.isCompleted}// Cancel marks the tracker as cancelled and stops processingfunc (pt *ProgressTracker) Cancel(reason string) {	pt.mu.Lock()	defer pt.mu.Unlock()	for _, progress := range pt.files {		if progress.State == FileStateProcessing || progress.State == FileStatePending {			progress.State = FileStateFailed			progress.ErrorMsg = "cancelled: " + reason			progress.CompletedTime = time.Now()		}	}	pt.isCompleted = true	pt.notifyCompletionLocked()}// checkCompletionLocked checks if all files are completed and notifies if sofunc (pt *ProgressTracker) checkCompletionLocked() {	if pt.completionNotified {		return	}	allCompleted := true	for _, fp := range pt.files {		if fp.State != FileStateCompleted && fp.State != FileStateFailed {			allCompleted = false			break		}	}	if allCompleted {		pt.isCompleted = true		pt.completionNotified = true		pt.notifyCompletionLocked()	} else {		pt.notifyProgressLocked()	}}// notifyProgressLocked sends progress notification (must be called with lock held)func (pt *ProgressTracker) notifyProgressLocked() {	// Throttle notifications to avoid spam using configurable interval	now := time.Now()	if now.Sub(pt.lastNotify) < pt.notifyInterval {		return	}	pt.lastNotify = now	if pt.onProgress != nil {		notification := pt.getProgressLocked()		go pt.onProgress(notification) // Non-blocking notification	}}// notifyCompletionLocked sends completion notification (must be called with lock held)func (pt *ProgressTracker) notifyCompletionLocked() {	if pt.onCompletion == nil {		return	}	elapsed := time.Since(pt.startTime)	// Calculate total size processed	var totalSize int64	var failedFiles int	for _, fp := range pt.files {		if fp.State == FileStateFailed {			failedFiles++			continue		}		if fp.IsCompressed {			// For compressed files, use dynamic average line size			totalSize += fp.ProcessedLines * fp.AvgLineSize		} else {			// For uncompressed files, use actual bytes processed if available			if fp.CurrentPos > 0 {				totalSize += fp.CurrentPos			} else {				// Fallback to line-based estimation				totalSize += fp.ProcessedLines * 150			}		}	}	notification := CompletionNotification{		LogGroupPath: pt.logGroupPath,		Success:      failedFiles == 0,		Duration:     elapsed,		TotalLines:   pt.totalActual,		TotalFiles:   len(pt.files),		FailedFiles:  failedFiles,		IndexedSize:  totalSize,	}	if failedFiles > 0 {		notification.Error = "some files failed to process"	}	go pt.onCompletion(notification) // Non-blocking notification}// getProgressLocked returns progress without notification (must be called with lock held)func (pt *ProgressTracker) getProgressLocked() ProgressNotification {	var completedFiles, processingFiles, failedFiles int	// Count files by state	for _, fp := range pt.files {		switch fp.State {		case FileStateCompleted:			completedFiles++		case FileStateProcessing:			processingFiles++		case FileStateFailed:			failedFiles++		}	}	// Calculate progress percentage using hybrid approach	var percentage float64		// Calculate weighted progress combining file count, file sizes, and line estimates	var totalFileWeight, processedFileWeight float64	var totalSizeWeight, processedSizeWeight float64	var totalLineWeight, processedLineWeight float64		// Collect metrics for hybrid calculation	for _, fp := range pt.files {		fileWeight := 1.0 // Each file contributes equally to file-based progress		sizeWeight := float64(fp.FileSize)		lineWeight := float64(fp.EstimatedLines)				totalFileWeight += fileWeight		totalSizeWeight += sizeWeight		if lineWeight > 0 {			totalLineWeight += lineWeight		}				if fp.State == FileStateCompleted {			processedFileWeight += fileWeight			processedSizeWeight += sizeWeight			if lineWeight > 0 {				processedLineWeight += float64(fp.ProcessedLines)			}		} else if fp.State == FileStateProcessing && fp.ProcessedLines > 0 {			// For processing files, add partial progress			if lineWeight > 0 {				processedLineWeight += float64(fp.ProcessedLines)				// For size-based progress, estimate based on lines processed				if fp.EstimatedLines > 0 {					sizeProgress := float64(fp.ProcessedLines) / float64(fp.EstimatedLines)					processedSizeWeight += sizeWeight * sizeProgress				}			} else if fp.FileSize > 0 && fp.CurrentPos > 0 {				// Use position-based progress for files without line estimates				sizeProgress := float64(fp.CurrentPos) / float64(fp.FileSize)				processedSizeWeight += sizeWeight * sizeProgress				processedFileWeight += fileWeight * sizeProgress			}		}	}		// Calculate progress using the most reliable metric available	if totalLineWeight > 0 && processedLineWeight > 0 {		// Dynamic line estimation: adjust line estimates based on completed files		dynamicLineWeight := totalLineWeight		if completedFiles >= 2 {			// Calculate average lines per completed file			var totalLinesFromCompleted int64			for _, fp := range pt.files {				if fp.State == FileStateCompleted {					totalLinesFromCompleted += fp.ProcessedLines				}			}						if completedFiles > 0 {				avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles)				// Adjust remaining file estimates based on observed average				remainingFiles := len(pt.files) - completedFiles				if remainingFiles > 0 {					// Calculate current estimate for remaining files					var remainingEstimate int64					for _, fp := range pt.files {						if fp.State != FileStateCompleted {							remainingEstimate += fp.EstimatedLines						}					}										// Replace remaining estimates with dynamic estimate					dynamicRemainingEstimate := int64(avgLinesPerFile * float64(remainingFiles))					dynamicLineWeight = float64(int64(totalLinesFromCompleted) + dynamicRemainingEstimate)				}			}		}				// Use line-based progress with dynamic estimation		linePercentage := (processedLineWeight / float64(dynamicLineWeight)) * 100				// Use file-based progress as primary metric for reliability  		filePercentage := (processedFileWeight / totalFileWeight) * 100				// Weight them: 40% lines, 60% files for better reliability		// Files are more predictable than line estimates for compressed files		percentage = (linePercentage * 0.4) + (filePercentage * 0.6)				// Additional safety: never exceed file-based progress by more than 10%		// This prevents premature 100% when file-based progress is more reliable		maxAllowedPercentage := filePercentage + 10.0		if percentage > maxAllowedPercentage {			percentage = maxAllowedPercentage		}	} else if totalSizeWeight > 0 {		// Fallback to size-based progress		percentage = (processedSizeWeight / totalSizeWeight) * 100	} else if totalFileWeight > 0 {		// Fallback to file-based progress only		percentage = (processedFileWeight / totalFileWeight) * 100	}	// Cap at 100%	if percentage > 100 {		percentage = 100	}	elapsed := time.Since(pt.startTime)	var estimatedRemain time.Duration	if percentage > 0 && percentage < 100 {		avgTimePerPercent := float64(elapsed.Nanoseconds()) / percentage		remainingPercent := 100.0 - percentage		estimatedRemain = time.Duration(int64(avgTimePerPercent * remainingPercent))	}	// Use dynamic line estimation for reporting	adjustedEstimate := pt.totalEstimate	if completedFiles >= 2 {		// Calculate average lines per completed file		var totalLinesFromCompleted int64		for _, fp := range pt.files {			if fp.State == FileStateCompleted {				totalLinesFromCompleted += fp.ProcessedLines			}		}				if completedFiles > 0 {			avgLinesPerFile := float64(totalLinesFromCompleted) / float64(completedFiles)			// Project total lines based on completed files			projectedTotal := int64(avgLinesPerFile * float64(len(pt.files)))						// Use dynamic estimate with some constraints to prevent extreme changes			if completedFiles >= len(pt.files)/5 { // At least 20% of files processed				// More confidence, allow larger adjustments				adjustedEstimate = projectedTotal			} else if projectedTotal > adjustedEstimate {				// Less confidence, be more conservative but still adjust upward				maxIncrease := adjustedEstimate + (projectedTotal-adjustedEstimate)/2				adjustedEstimate = maxIncrease			}		}	}	return ProgressNotification{		LogGroupPath:    pt.logGroupPath,		Percentage:      percentage,		TotalFiles:      len(pt.files),		CompletedFiles:  completedFiles,		ProcessingFiles: processingFiles,		FailedFiles:     failedFiles,		ProcessedLines:  pt.totalActual,		EstimatedLines:  adjustedEstimate,		ElapsedTime:     elapsed,		EstimatedRemain: estimatedRemain,		IsCompleted:     pt.isCompleted,	}}// EstimateFileLines estimates the number of lines in a file based on samplingfunc EstimateFileLines(ctx context.Context, filePath string, fileSize int64, isCompressed bool) (int64, error) {	if fileSize == 0 {		return 0, nil	}	// Validate log path before accessing it	if !utils.IsValidLogPath(filePath) {		// Return fallback estimate for invalid paths		return fileSize / 150, nil // Fallback: ~150 bytes per line	}	file, err := os.Open(filePath)	if err != nil {		// Return fallback estimate instead of error		return fileSize / 150, nil // Fallback: ~150 bytes per line	}	defer file.Close()	var reader io.Reader = file	// Handle compressed files	if isCompressed {		gzReader, err := gzip.NewReader(file)		if err != nil {			return (fileSize * 3) / 150, nil // Fallback for compressed: 3:1 ratio		}		defer gzReader.Close()		reader = gzReader	}	// Sample the first 1MB of the file content (decompressed if necessary)	sampleSize := int64(1024 * 1024)	buf := make([]byte, sampleSize)	// Check for context cancellation	select {	case <-ctx.Done():		return 0, ctx.Err()	default:	}	bytesRead, err := io.ReadFull(reader, buf)	if err != nil && err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {		return fileSize / 150, nil // Fallback on read error	}	if bytesRead == 0 {		return 0, nil // Empty file	}	// Count lines in the sample	lineCount := bytes.Count(buf[:bytesRead], []byte{'\n'})	if lineCount == 0 {		// Avoid division by zero, fallback to rough estimate		return fileSize / 150, nil	}	// Calculate average line size from the sample	avgLineSize := float64(bytesRead) / float64(lineCount)	if avgLineSize == 0 {		return fileSize / 150, nil // Fallback	}	// Estimate total lines	var estimatedLines int64	if isCompressed {		// For compressed files, use a compression ratio estimate		estimatedUncompressedSize := fileSize * 5 // Conservative 5:1 compression ratio		estimatedLines = int64(float64(estimatedUncompressedSize) / avgLineSize)	} else {		estimatedLines = int64(float64(fileSize) / avgLineSize)	}	return estimatedLines, nil}// IsCompressedFile determines if a file is compressed based on its extensionfunc IsCompressedFile(filePath string) bool {	ext := strings.ToLower(filepath.Ext(filePath))	return ext == ".gz" || ext == ".bz2" || ext == ".xz" || ext == ".lz4"}// IsRotationLogFile determines if a file is a rotation log filefunc IsRotationLogFile(filePath string) bool {	base := filepath.Base(filePath)	// Common nginx rotation patterns:	// access.log, access.log.1, access.log.2.gz	// access.1.log, access.2.log.gz	// error.log, error.log.1, error.log.2.gz	// Remove compression extensions first	if IsCompressedFile(base) {		base = strings.TrimSuffix(base, filepath.Ext(base))	}	// Check for basic .log files	if strings.HasSuffix(base, ".log") {		return true	}	// Check for numbered rotation patterns: access.log.1, error.log.10, etc.	parts := strings.Split(base, ".")	if len(parts) >= 3 {		// Pattern: name.log.number (e.g., access.log.1)		if parts[len(parts)-2] == "log" && isNumeric(parts[len(parts)-1]) {			return true		}		// Pattern: name.number.log (e.g., access.1.log)		if parts[len(parts)-1] == "log" {			for i := 1; i < len(parts)-1; i++ {				if isNumeric(parts[i]) {					return true				}			}		}	}	return false}// isNumeric checks if a string represents a numberfunc isNumeric(s string) bool {	if len(s) == 0 {		return false	}	for _, r := range s {		if r < '0' || r > '9' {			return false		}	}	return true}// AddRotationFiles automatically detects and adds rotation log files with appropriate compression detectionfunc (pt *ProgressTracker) AddRotationFiles(filePaths ...string) {	for _, filePath := range filePaths {		isCompressed := IsCompressedFile(filePath)		pt.AddFile(filePath, isCompressed)	}}// ProgressManager manages multiple progress trackerstype ProgressManager struct {	mu       sync.RWMutex	trackers map[string]*ProgressTracker}// NewProgressManager creates a new progress managerfunc NewProgressManager() *ProgressManager {	return &ProgressManager{		trackers: make(map[string]*ProgressTracker),	}}// GetTracker gets or creates a progress tracker for a log groupfunc (pm *ProgressManager) GetTracker(logGroupPath string, config *ProgressConfig) *ProgressTracker {	pm.mu.Lock()	defer pm.mu.Unlock()	if tracker, exists := pm.trackers[logGroupPath]; exists {		return tracker	}	tracker := NewProgressTracker(logGroupPath, config)	pm.trackers[logGroupPath] = tracker	return tracker}// RemoveTracker removes a progress trackerfunc (pm *ProgressManager) RemoveTracker(logGroupPath string) {	pm.mu.Lock()	defer pm.mu.Unlock()	delete(pm.trackers, logGroupPath)}// GetAllTrackers returns all current trackersfunc (pm *ProgressManager) GetAllTrackers() map[string]*ProgressTracker {	pm.mu.RLock()	defer pm.mu.RUnlock()	result := make(map[string]*ProgressTracker)	for path, tracker := range pm.trackers {		result[path] = tracker	}	return result}// Cleanup removes completed or failed trackersfunc (pm *ProgressManager) Cleanup() {	pm.mu.Lock()	defer pm.mu.Unlock()	for path, tracker := range pm.trackers {		if tracker.IsCompleted() {			delete(pm.trackers, path)		}	}}
 |