| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617 | package indexerimport (	"fmt"	"os"	"path/filepath"	"regexp"	"sort"	"strings"	"sync"	"time"	"github.com/0xJacky/Nginx-UI/internal/nginx_log/utils"	"github.com/0xJacky/Nginx-UI/model"	"github.com/uozi-tech/cosy/logger")// Legacy constants for backward compatibility - use IndexStatus enum in types.go instead// NginxLogCache represents a cached log entry from nginx configurationtype NginxLogCache struct {	Path       string `json:"path"`        // Path to the log file	Type       string `json:"type"`        // Type of log: "access" or "error"	Name       string `json:"name"`        // Name of the log file	ConfigFile string `json:"config_file"` // Path to the configuration file that contains this log directive}// NginxLogWithIndex represents a log file with its index status informationtype NginxLogWithIndex struct {	Path           string `json:"path"`                       // Path to the log file	Type           string `json:"type"`                       // Type of log: "access" or "error"	Name           string `json:"name"`                       // Name of the log file	ConfigFile     string `json:"config_file"`                // Path to the configuration file	IndexStatus    string `json:"index_status"`               // Index status: indexed, indexing, not_indexed, queued, error	LastModified   int64  `json:"last_modified,omitempty"`    // Unix timestamp of last modification time	LastSize       int64  `json:"last_size,omitempty"`        // Last known size of the file	LastIndexed    int64  `json:"last_indexed,omitempty"`     // Unix timestamp when the file was last indexed	IndexStartTime int64  `json:"index_start_time,omitempty"` // Unix timestamp when the last indexing operation started	IndexDuration  int64  `json:"index_duration,omitempty"`   // Duration of last indexing operation in milliseconds	IsCompressed   bool   `json:"is_compressed"`              // Whether the file is compressed	HasTimeRange   bool   `json:"has_timerange"`              // Whether time range is available	TimeRangeStart int64  `json:"timerange_start,omitempty"`  // Unix timestamp of start of time range in the log	TimeRangeEnd   int64  `json:"timerange_end,omitempty"`    // Unix timestamp of end of time range in the log	DocumentCount  uint64 `json:"document_count,omitempty"`   // Number of indexed documents from this file	// Enhanced status tracking fields	ErrorMessage  string `json:"error_message,omitempty"`  // Error message if indexing failed	ErrorTime     int64  `json:"error_time,omitempty"`     // Unix timestamp when error occurred	RetryCount    int    `json:"retry_count,omitempty"`    // Number of retry attempts	QueuePosition int    `json:"queue_position,omitempty"` // Position in indexing queue}// LogFileManager manages nginx log file discovery and index statustype LogFileManager struct {	logCache       map[string]*NginxLogCache	cacheMutex     sync.RWMutex	persistence    *PersistenceManager	indexingStatus map[string]bool	indexingMutex  sync.RWMutex	indexer        *ParallelIndexer}// NewLogFileManager creates a new log file managerfunc NewLogFileManager() *LogFileManager {	return &LogFileManager{		logCache:       make(map[string]*NginxLogCache),		persistence:    NewPersistenceManager(DefaultIncrementalConfig()),		indexingStatus: make(map[string]bool),	}}// SetIndexer injects the running ParallelIndexer so we can query exact doc counts before persistingfunc (lm *LogFileManager) SetIndexer(pi *ParallelIndexer) {	lm.indexer = pi}// AddLogPath adds a log path to the log cache with the source config filefunc (lm *LogFileManager) AddLogPath(path, logType, name, configFile string) {	lm.cacheMutex.Lock()	defer lm.cacheMutex.Unlock()	lm.logCache[path] = &NginxLogCache{		Path:       path,		Type:       logType,		Name:       name,		ConfigFile: configFile,	}}// RemoveLogPathsFromConfig removes all log paths associated with a specific config filefunc (lm *LogFileManager) RemoveLogPathsFromConfig(configFile string) {	lm.cacheMutex.Lock()	defer lm.cacheMutex.Unlock()	for path, logEntry := range lm.logCache {		if logEntry.ConfigFile == configFile {			delete(lm.logCache, path)		}	}}// GetAllLogPaths returns all cached log paths, optionally filteredfunc (lm *LogFileManager) GetAllLogPaths(filters ...func(*NginxLogCache) bool) []*NginxLogCache {	lm.cacheMutex.RLock()	defer lm.cacheMutex.RUnlock()	var logs []*NginxLogCache	for _, logEntry := range lm.logCache {		// Apply all filters		include := true		for _, filter := range filters {			if !filter(logEntry) {				include = false				break			}		}		if include {			// Create a copy to avoid race conditions			logCopy := *logEntry			logs = append(logs, &logCopy)		}	}	return logs}// SetIndexingStatus sets the indexing status for a specific file pathfunc (lm *LogFileManager) SetIndexingStatus(path string, isIndexing bool) {	lm.indexingMutex.Lock()	defer lm.indexingMutex.Unlock()	if isIndexing {		lm.indexingStatus[path] = true	} else {		delete(lm.indexingStatus, path)	}}// GetIndexingFiles returns a list of files currently being indexedfunc (lm *LogFileManager) GetIndexingFiles() []string {	lm.indexingMutex.RLock()	defer lm.indexingMutex.RUnlock()	var files []string	for path := range lm.indexingStatus {		files = append(files, path)	}	return files}// getBaseLogName determines the base log file name for grouping rotated filesfunc getBaseLogName(filePath string) string {	dir := filepath.Dir(filePath)	filename := filepath.Base(filePath)	// Remove compression extensions first	filename = strings.TrimSuffix(filename, ".gz")	filename = strings.TrimSuffix(filename, ".bz2")	// Handle numbered rotation (access.log.1, access.log.2, etc.)	if match := regexp.MustCompile(`^(.+)\.(\d+)$`).FindStringSubmatch(filename); len(match) > 1 {		baseFilename := match[1]		return filepath.Join(dir, baseFilename)	}	// Handle date rotation suffixes	parts := strings.Split(filename, ".")	if len(parts) >= 2 {		lastPart := parts[len(parts)-1]		if isDatePattern(lastPart) {			baseFilename := strings.Join(parts[:len(parts)-1], ".")			// If the base doesn't end with .log, add it			if !strings.HasSuffix(baseFilename, ".log") {				baseFilename += ".log"			}			return filepath.Join(dir, baseFilename)		}	}	// If it already looks like a base log file, return as-is	return filePath}// GetAllLogsWithIndexGrouped returns logs grouped by their base name (e.g., access.log includes access.log.1, access.log.2.gz etc.)func (lm *LogFileManager) GetAllLogsWithIndexGrouped(filters ...func(*NginxLogWithIndex) bool) []*NginxLogWithIndex {	lm.cacheMutex.RLock()	defer lm.cacheMutex.RUnlock()	// Get all logs from both cache (config files) and persistence (indexed files)	allLogsMap := make(map[string]*NginxLogWithIndex)	// First, get logs from the cache (these are from nginx config)	for _, cache := range lm.logCache {		logWithIndex := &NginxLogWithIndex{			Path:         cache.Path,			Type:         cache.Type,			Name:         cache.Name,			ConfigFile:   cache.ConfigFile,			IndexStatus:  string(IndexStatusNotIndexed),			IsCompressed: false,			HasTimeRange: false,		}		allLogsMap[cache.Path] = logWithIndex	}	// Get persistence indexes and update status	persistenceIndexes, err := lm.persistence.GetAllLogIndexes()	if err != nil {		logger.Warnf("Failed to get persistence indexes: %v", err)		persistenceIndexes = []*model.NginxLogIndex{}	}	// Add all indexed files from persistence (including rotated files)	for _, idx := range persistenceIndexes {		if _, exists := allLogsMap[idx.Path]; !exists {			// This is a rotated file not in config cache, create entry for it			logType := "access"			if strings.Contains(idx.Path, "error") {				logType = "error"			}			logWithIndex := &NginxLogWithIndex{				Path:        idx.Path,				Type:        logType,				Name:        filepath.Base(idx.Path),				ConfigFile:  "",				IndexStatus: string(IndexStatusNotIndexed),			}			allLogsMap[idx.Path] = logWithIndex		}		// Update index status from persistence data		logWithIndex := allLogsMap[idx.Path]		logWithIndex.LastModified = idx.LastModified.Unix()		logWithIndex.LastSize = idx.LastSize		logWithIndex.LastIndexed = idx.LastIndexed.Unix()		if idx.IndexStartTime != nil {			logWithIndex.IndexStartTime = idx.IndexStartTime.Unix()		}		if idx.IndexDuration != nil {			logWithIndex.IndexDuration = *idx.IndexDuration		}		logWithIndex.DocumentCount = idx.DocumentCount		// Set queue position if available		logWithIndex.QueuePosition = idx.QueuePosition		// Set error message if available		logWithIndex.ErrorMessage = idx.ErrorMessage		if idx.ErrorTime != nil {			logWithIndex.ErrorTime = idx.ErrorTime.Unix()		}		logWithIndex.RetryCount = idx.RetryCount		// Use the index status from the database if it's set		if idx.IndexStatus != "" {			logWithIndex.IndexStatus = idx.IndexStatus		} else {			// Fallback to determining status if not set in DB			lm.indexingMutex.RLock()			isIndexing := lm.indexingStatus[idx.Path]			lm.indexingMutex.RUnlock()			if isIndexing {				logWithIndex.IndexStatus = string(IndexStatusIndexing)			} else if !idx.LastIndexed.IsZero() {				// If file has been indexed (regardless of document count), it's indexed				logWithIndex.IndexStatus = string(IndexStatusIndexed)			}		}		// Set time range if available		if idx.TimeRangeStart != nil && idx.TimeRangeEnd != nil && !idx.TimeRangeStart.IsZero() && !idx.TimeRangeEnd.IsZero() {			logWithIndex.HasTimeRange = true			logWithIndex.TimeRangeStart = idx.TimeRangeStart.Unix()			logWithIndex.TimeRangeEnd = idx.TimeRangeEnd.Unix()		}		logWithIndex.IsCompressed = strings.HasSuffix(idx.Path, ".gz") || strings.HasSuffix(idx.Path, ".bz2")	}	// Convert to slice and apply filters	var logs []*NginxLogWithIndex	for _, log := range allLogsMap {		// Apply all filters		include := true		for _, filter := range filters {			if !filter(log) {				include = false				break			}		}		if include {			logs = append(logs, log)		}	}	// Group by base log name with stable aggregation	groupedMap := make(map[string]*NginxLogWithIndex)	// Sort logs by path first to ensure consistent processing order	sort.Slice(logs, func(i, j int) bool {		return logs[i].Path < logs[j].Path	})	for _, log := range logs {		baseLogName := getBaseLogName(log.Path)		if existing, exists := groupedMap[baseLogName]; exists {			// Check if current log is a main log path record (already aggregated)			// or if existing record is a main log path record			logIsMainPath := (log.Path == baseLogName)			existingIsMainPath := (existing.Path == baseLogName)			if logIsMainPath && !existingIsMainPath {				// Current log is the main aggregated record, replace existing				groupedLog := *log				groupedLog.Path = baseLogName				groupedLog.Name = filepath.Base(baseLogName)				groupedMap[baseLogName] = &groupedLog			} else if !logIsMainPath && existingIsMainPath {				// Existing is main record, keep it, don't accumulate				// Only update status if needed				if log.IndexStatus == string(IndexStatusIndexing) {					existing.IndexStatus = string(IndexStatusIndexing)				}			} else if !logIsMainPath && !existingIsMainPath {				// Both are individual files, accumulate normally				if log.LastIndexed > existing.LastIndexed {					existing.LastModified = log.LastModified					existing.LastIndexed = log.LastIndexed					existing.IndexStartTime = log.IndexStartTime					existing.IndexDuration = log.IndexDuration				}				existing.DocumentCount += log.DocumentCount				existing.LastSize += log.LastSize				// Update status with priority: indexing > queued > indexed > error > not_indexed				if log.IndexStatus == string(IndexStatusIndexing) {					existing.IndexStatus = string(IndexStatusIndexing)				} else if log.IndexStatus == string(IndexStatusQueued) &&					existing.IndexStatus != string(IndexStatusIndexing) {					existing.IndexStatus = string(IndexStatusQueued)					// Keep the queue position from the queued log					if log.QueuePosition > 0 {						existing.QueuePosition = log.QueuePosition					}				} else if log.IndexStatus == string(IndexStatusIndexed) &&					existing.IndexStatus != string(IndexStatusIndexing) &&					existing.IndexStatus != string(IndexStatusQueued) {					existing.IndexStatus = string(IndexStatusIndexed)				} else if log.IndexStatus == string(IndexStatusError) &&					existing.IndexStatus != string(IndexStatusIndexing) &&					existing.IndexStatus != string(IndexStatusQueued) &&					existing.IndexStatus != string(IndexStatusIndexed) {					existing.IndexStatus = string(IndexStatusError)					existing.ErrorMessage = log.ErrorMessage					existing.ErrorTime = log.ErrorTime				}				if log.HasTimeRange {					if !existing.HasTimeRange {						existing.HasTimeRange = true						existing.TimeRangeStart = log.TimeRangeStart						existing.TimeRangeEnd = log.TimeRangeEnd					} else {						if log.TimeRangeStart > 0 && (existing.TimeRangeStart == 0 || log.TimeRangeStart < existing.TimeRangeStart) {							existing.TimeRangeStart = log.TimeRangeStart						}						if log.TimeRangeEnd > existing.TimeRangeEnd {							existing.TimeRangeEnd = log.TimeRangeEnd						}					}				}			} else if logIsMainPath && existingIsMainPath {				// If both are main paths, use the one with more recent LastIndexed				if log.LastIndexed > existing.LastIndexed {					groupedLog := *log					groupedLog.Path = baseLogName					groupedLog.Name = filepath.Base(baseLogName)					groupedMap[baseLogName] = &groupedLog				}			}		} else {			// Create new entry with base log name as path for grouping			groupedLog := *log			groupedLog.Path = baseLogName			groupedLog.Name = filepath.Base(baseLogName)			// Preserve queue position and error info for the grouped log			groupedLog.QueuePosition = log.QueuePosition			groupedLog.ErrorMessage = log.ErrorMessage			groupedLog.ErrorTime = log.ErrorTime			groupedLog.RetryCount = log.RetryCount			groupedMap[baseLogName] = &groupedLog		}	}	// Convert map to slice with consistent ordering	var result []*NginxLogWithIndex	// Create a sorted list of keys to ensure consistent order	var keys []string	for key := range groupedMap {		keys = append(keys, key)	}	sort.Strings(keys)	// Build result in consistent order	for _, key := range keys {		result = append(result, groupedMap[key])	}	// --- START DIAGNOSTIC LOGGING ---	logger.Debugf("===== FINAL GROUPED LIST =====")	for _, fLog := range result {		logger.Debugf("Final Group: Path=%s, DocCount=%d, Status=%s", fLog.Path, fLog.DocumentCount, fLog.IndexStatus)	}	logger.Debugf("===============================")	// --- END DIAGNOSTIC LOGGING ---	return result}// SaveIndexMetadata saves the metadata for a log group after an indexing operation.// It creates a new record for the base log path.func (lm *LogFileManager) SaveIndexMetadata(basePath string, documentCount uint64, startTime time.Time, duration time.Duration, minTime *time.Time, maxTime *time.Time) error {	// We want to save the metadata against the base path (the "log group").	// We get or create a record for this specific path.	logIndex, err := lm.persistence.GetLogIndex(basePath)	if err != nil {		// If the error is anything other than "not found", it's a real problem.		// GetLogIndex is designed to return a new object if not found, so this should be rare.		return fmt.Errorf("could not get or create log index for '%s': %w", basePath, err)	}	// Get file stats to update LastModified and LastSize	// Validate log path before accessing it	if utils.IsValidLogPath(basePath) {		if fileInfo, err := os.Stat(basePath); err == nil {			logIndex.LastModified = fileInfo.ModTime()			logIndex.LastSize = fileInfo.Size()		}	}	// If indexer is available and healthy, query Bleve for exact document count	if lm.indexer != nil && lm.indexer.IsHealthy() {		// Decide whether this path is a main log path (group) or a specific file		mainPath := getMainLogPathFromFile(basePath)		if mainPath == basePath {			if exact, err := lm.indexer.CountDocsByMainLogPath(basePath); err == nil {				documentCount = exact			} else {				logger.Warnf("Falling back to provided documentCount for group %s due to count error: %v", basePath, err)			}		} else {			if exact, err := lm.indexer.CountDocsByFilePath(basePath); err == nil {				documentCount = exact			} else {				logger.Warnf("Falling back to provided documentCount for file %s due to count error: %v", basePath, err)			}		}	}	// Update the record with the (possibly corrected) metadata	logIndex.DocumentCount = documentCount	logIndex.LastIndexed = time.Now()	logIndex.IndexStartTime = &startTime	durationMs := duration.Milliseconds()	logIndex.IndexDuration = &durationMs	// Merge time ranges: preserve existing historical range and expand if necessary	// This prevents incremental indexing from losing historical time range data	if minTime != nil {		if logIndex.TimeRangeStart == nil || minTime.Before(*logIndex.TimeRangeStart) {			logIndex.TimeRangeStart = minTime		}	}	if maxTime != nil {		if logIndex.TimeRangeEnd == nil || maxTime.After(*logIndex.TimeRangeEnd) {			logIndex.TimeRangeEnd = maxTime		}	}	// Save the updated record to the database	return lm.persistence.SaveLogIndex(logIndex)}// DeleteIndexMetadataByGroup deletes all database records for a given log group.func (lm *LogFileManager) DeleteIndexMetadataByGroup(basePath string) error {	// The basePath is the main log path for the group.	return lm.persistence.DeleteLogIndexesByGroup(basePath)}// DeleteAllIndexMetadata deletes all index metadata from the database.func (lm *LogFileManager) DeleteAllIndexMetadata() error {	return lm.persistence.DeleteAllLogIndexes()}// GetLogByPath returns the full NginxLogWithIndex struct for a given base path.func (lm *LogFileManager) GetLogByPath(basePath string) (*NginxLogWithIndex, error) {	// This is not the most efficient way, but it's reliable.	// It ensures we get the same grouped and aggregated data the UI sees.	allLogs := lm.GetAllLogsWithIndexGrouped()	for _, log := range allLogs {		if log.Path == basePath {			return log, nil		}	}	return nil, fmt.Errorf("log group with base path not found: %s", basePath)}// GetFilePathsForGroup returns all physical file paths for a given log group base path.func (lm *LogFileManager) GetFilePathsForGroup(basePath string) ([]string, error) {	// Query the database for all log indexes with matching main_log_path	logIndexes, err := lm.persistence.GetLogIndexesByGroup(basePath)	if err != nil {		return nil, fmt.Errorf("failed to get log indexes for group %s: %w", basePath, err)	}	// Extract file paths from the database records	filePaths := make([]string, 0, len(logIndexes))	for _, logIndex := range logIndexes {		filePaths = append(filePaths, logIndex.Path)	}	return filePaths, nil}// GetPersistence returns the persistence manager for advanced operationsfunc (lm *LogFileManager) GetPersistence() *PersistenceManager {	return lm.persistence}// GetAllLogsWithIndex returns all cached log paths with their index status (non-grouped)func (lm *LogFileManager) GetAllLogsWithIndex(filters ...func(*NginxLogWithIndex) bool) []*NginxLogWithIndex {	lm.cacheMutex.RLock()	defer lm.cacheMutex.RUnlock()	result := make([]*NginxLogWithIndex, 0, len(lm.logCache))	// Get persistence indexes	persistenceIndexes, err := lm.persistence.GetAllLogIndexes()	if err != nil {		logger.Warnf("Failed to get persistence indexes: %v", err)		persistenceIndexes = []*model.NginxLogIndex{}	}	// Create a map of persistence indexes for quick lookup	persistenceMap := make(map[string]*model.NginxLogIndex)	for _, idx := range persistenceIndexes {		persistenceMap[idx.Path] = idx	}	// Process cached logs (from nginx config)	for _, cache := range lm.logCache {		logWithIndex := &NginxLogWithIndex{			Path:         cache.Path,			Type:         cache.Type,			Name:         cache.Name,			ConfigFile:   cache.ConfigFile,			IndexStatus:  string(IndexStatusNotIndexed),			IsCompressed: strings.HasSuffix(cache.Path, ".gz") || strings.HasSuffix(cache.Path, ".bz2"),		}		// Update with persistence data if available		if idx, exists := persistenceMap[cache.Path]; exists {			logWithIndex.LastModified = idx.LastModified.Unix()			logWithIndex.LastSize = idx.LastSize			logWithIndex.LastIndexed = idx.LastIndexed.Unix()			if idx.IndexStartTime != nil {				logWithIndex.IndexStartTime = idx.IndexStartTime.Unix()			}			if idx.IndexDuration != nil {				logWithIndex.IndexDuration = *idx.IndexDuration			}			logWithIndex.DocumentCount = idx.DocumentCount			// Determine status			lm.indexingMutex.RLock()			isIndexing := lm.indexingStatus[cache.Path]			lm.indexingMutex.RUnlock()			if isIndexing {				logWithIndex.IndexStatus = string(IndexStatusIndexing)			} else if !idx.LastIndexed.IsZero() {				// If file has been indexed (regardless of document count), it's indexed				logWithIndex.IndexStatus = string(IndexStatusIndexed)			}			// Set time range if available			if idx.TimeRangeStart != nil && idx.TimeRangeEnd != nil && !idx.TimeRangeStart.IsZero() && !idx.TimeRangeEnd.IsZero() {				logWithIndex.HasTimeRange = true				logWithIndex.TimeRangeStart = idx.TimeRangeStart.Unix()				logWithIndex.TimeRangeEnd = idx.TimeRangeEnd.Unix()			}		}		// Apply filters		include := true		for _, filter := range filters {			if !filter(logWithIndex) {				include = false				break			}		}		if include {			result = append(result, logWithIndex)		}	}	return result}
 |