Преглед на файлове

feat(nginx_log): inject indexer for precise document counting and enhance metadata persistence

0xJacky преди 3 седмици
родител
ревизия
4ec1a50ef2

+ 26 - 1
internal/nginx_log/indexer/log_file_manager.go

@@ -55,6 +55,7 @@ type LogFileManager struct {
 	persistence    *PersistenceManager
 	indexingStatus map[string]bool
 	indexingMutex  sync.RWMutex
+	indexer        *ParallelIndexer
 }
 
 // NewLogFileManager creates a new log file manager
@@ -66,6 +67,11 @@ func NewLogFileManager() *LogFileManager {
 	}
 }
 
+// SetIndexer injects the running ParallelIndexer so we can query exact doc counts before persisting
+func (lm *LogFileManager) SetIndexer(pi *ParallelIndexer) {
+	lm.indexer = pi
+}
+
 // AddLogPath adds a log path to the log cache with the source config file
 func (lm *LogFileManager) AddLogPath(path, logType, name, configFile string) {
 	lm.cacheMutex.Lock()
@@ -436,7 +442,26 @@ func (lm *LogFileManager) SaveIndexMetadata(basePath string, documentCount uint6
 		logIndex.LastSize = fileInfo.Size()
 	}
 
-	// Update the record with the new metadata
+	// If indexer is available and healthy, query Bleve for exact document count
+	if lm.indexer != nil && lm.indexer.IsHealthy() {
+		// Decide whether this path is a main log path (group) or a specific file
+		mainPath := getMainLogPathFromFile(basePath)
+		if mainPath == basePath {
+			if exact, err := lm.indexer.CountDocsByMainLogPath(basePath); err == nil {
+				documentCount = exact
+			} else {
+				logger.Warnf("Falling back to provided documentCount for group %s due to count error: %v", basePath, err)
+			}
+		} else {
+			if exact, err := lm.indexer.CountDocsByFilePath(basePath); err == nil {
+				documentCount = exact
+			} else {
+				logger.Warnf("Falling back to provided documentCount for file %s due to count error: %v", basePath, err)
+			}
+		}
+	}
+
+	// Update the record with the (possibly corrected) metadata
 	logIndex.DocumentCount = documentCount
 	logIndex.LastIndexed = time.Now()
 	logIndex.IndexStartTime = &startTime

+ 74 - 0
internal/nginx_log/indexer/parallel_indexer.go

@@ -1318,3 +1318,77 @@ func sumDocCounts(docsCountMap map[string]uint64) uint64 {
 	}
 	return total
 }
+
+// CountDocsByMainLogPath returns the exact number of documents indexed for a given log group (main log path)
+// by querying all shards and summing results.
+func (pi *ParallelIndexer) CountDocsByMainLogPath(basePath string) (uint64, error) {
+	if !pi.IsHealthy() {
+		return 0, fmt.Errorf("indexer not healthy")
+	}
+
+	var total uint64
+	var errs []error
+
+	// Build term query on main_log_path
+	q := bleve.NewTermQuery(basePath)
+	q.SetField("main_log_path")
+
+	shards := pi.shardManager.GetAllShards()
+	for i, shard := range shards {
+		if shard == nil {
+			continue
+		}
+		req := bleve.NewSearchRequest(q)
+		// We only need counts
+		req.Size = 0
+
+		res, err := shard.Search(req)
+		if err != nil {
+			errs = append(errs, fmt.Errorf("shard %d search failed: %w", i, err))
+			continue
+		}
+		total += uint64(res.Total)
+	}
+
+	if len(errs) > 0 {
+		return total, fmt.Errorf("%d shard errors (partial count=%d), e.g. %v", len(errs), total, errs[0])
+	}
+	return total, nil
+}
+
+// CountDocsByFilePath returns the exact number of documents indexed for a specific physical log file path
+// by querying all shards and summing results.
+func (pi *ParallelIndexer) CountDocsByFilePath(filePath string) (uint64, error) {
+	if !pi.IsHealthy() {
+		return 0, fmt.Errorf("indexer not healthy")
+	}
+
+	var total uint64
+	var errs []error
+
+	// Build term query on file_path
+	q := bleve.NewTermQuery(filePath)
+	q.SetField("file_path")
+
+	shards := pi.shardManager.GetAllShards()
+	for i, shard := range shards {
+		if shard == nil {
+			continue
+		}
+		req := bleve.NewSearchRequest(q)
+		// We only need counts
+		req.Size = 0
+
+		res, err := shard.Search(req)
+		if err != nil {
+			errs = append(errs, fmt.Errorf("shard %d search failed: %w", i, err))
+			continue
+		}
+		total += uint64(res.Total)
+	}
+
+	if len(errs) > 0 {
+		return total, fmt.Errorf("%d shard errors (partial count=%d), e.g. %v", len(errs), total, errs[0])
+	}
+	return total, nil
+}

+ 51 - 43
internal/nginx_log/indexer/rebuild.go

@@ -12,7 +12,7 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
-	
+
 	"github.com/uozi-tech/cosy/logger"
 )
 
@@ -269,9 +269,17 @@ func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath stri
 		// Mark as completed
 		tracker.CompleteFile(file.Path, file.ProcessedLines)
 
-		// Update persistence
+		// Update persistence with exact doc count from Bleve
 		if rm.persistence != nil {
-			if err := rm.persistence.MarkFileAsIndexed(file.Path, file.DocumentCount, file.LastPosition); err != nil {
+			exactCount := file.DocumentCount
+			if rm.indexer != nil && rm.indexer.IsHealthy() {
+				if c, err := rm.indexer.CountDocsByFilePath(file.Path); err == nil {
+					exactCount = c
+				} else {
+					logger.Warnf("Falling back to computed count for %s due to count error: %v", file.Path, err)
+				}
+			}
+			if err := rm.persistence.MarkFileAsIndexed(file.Path, exactCount, file.LastPosition); err != nil {
 				// Log but don't fail
 				// logger.Warnf("Failed to update persistence for %s: %v", file.Path, err)
 			}
@@ -288,7 +296,7 @@ func (rm *RebuildManager) shouldProcessFile(file *LogGroupFile) (bool, string) {
 	if err != nil {
 		return true, fmt.Sprintf("cannot stat file (will process): %v", err)
 	}
-	
+
 	// For compressed files (.gz), check if we've already processed them and they haven't changed
 	if file.IsCompressed {
 		// Check if we have persistence information for this file
@@ -297,16 +305,16 @@ func (rm *RebuildManager) shouldProcessFile(file *LogGroupFile) (bool, string) {
 				// Check if file hasn't changed since last indexing
 				currentModTime := fileInfo.ModTime().Unix()
 				currentSize := fileInfo.Size()
-				
-				if info.LastModified == currentModTime && 
-				   info.LastSize == currentSize && 
-				   info.LastPosition == currentSize {
+
+				if info.LastModified == currentModTime &&
+					info.LastSize == currentSize &&
+					info.LastPosition == currentSize {
 					return false, "compressed file already fully indexed and unchanged"
 				}
 			}
 		}
 	}
-	
+
 	// For active log files (non-compressed), always process but may resume from checkpoint
 	if !file.IsCompressed {
 		// Check if file has grown or changed
@@ -314,30 +322,30 @@ func (rm *RebuildManager) shouldProcessFile(file *LogGroupFile) (bool, string) {
 			if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
 				currentModTime := fileInfo.ModTime().Unix()
 				currentSize := fileInfo.Size()
-				
+
 				// File hasn't changed at all
-				if info.LastModified == currentModTime && 
-				   info.LastSize == currentSize && 
-				   info.LastPosition == currentSize {
+				if info.LastModified == currentModTime &&
+					info.LastSize == currentSize &&
+					info.LastPosition == currentSize {
 					return false, "active file unchanged since last indexing"
 				}
-				
+
 				// File has shrunk (possible log rotation)
 				if currentSize < info.LastSize {
 					return true, "active file appears to have been rotated (size decreased)"
 				}
-				
+
 				// File has grown or been modified
 				if currentSize > info.LastSize || currentModTime > info.LastModified {
 					return true, "active file has new content"
 				}
 			}
 		}
-		
+
 		// No persistence info available, process the file
 		return true, "no previous indexing record found for active file"
 	}
-	
+
 	// Default: process compressed files if no persistence info
 	return true, "no previous indexing record found for compressed file"
 }
@@ -346,7 +354,7 @@ func (rm *RebuildManager) shouldProcessFile(file *LogGroupFile) (bool, string) {
 type LogGroupFile struct {
 	Path           string
 	Size           int64
-	ModTime        int64  // Unix timestamp of file modification time
+	ModTime        int64 // Unix timestamp of file modification time
 	IsCompressed   bool
 	EstimatedLines int64
 	ProcessedLines int64
@@ -411,7 +419,7 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 	// Get checkpoint information from persistence layer
 	var startPosition int64 = 0
 	var resuming bool = false
-	
+
 	if rm.persistence != nil {
 		if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
 			// Get current file modification time
@@ -419,35 +427,35 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 			if err != nil {
 				return fmt.Errorf("failed to stat file %s: %w", file.Path, err)
 			}
-			
+
 			currentModTime := fileInfo.ModTime().Unix()
 			currentSize := fileInfo.Size()
-			
+
 			// Check if file hasn't changed since last indexing
-			if info.LastIndexed > 0 && 
-			   info.LastModified == currentModTime && 
-			   info.LastSize == currentSize && 
-			   info.LastPosition == currentSize {
+			if info.LastIndexed > 0 &&
+				info.LastModified == currentModTime &&
+				info.LastSize == currentSize &&
+				info.LastPosition == currentSize {
 				// File hasn't changed and was fully indexed
-				logger.Infof("Skipping indexing for unchanged file %s (last indexed: %v)", 
+				logger.Infof("Skipping indexing for unchanged file %s (last indexed: %v)",
 					file.Path, time.Unix(info.LastIndexed, 0))
 				file.ProcessedLines = 0 // No new lines processed
 				file.DocumentCount = 0  // No new documents added
 				file.LastPosition = currentSize
 				return nil
 			}
-			
+
 			// Check if we should resume from a previous position
 			if info.LastPosition > 0 && info.LastPosition < currentSize {
 				// File has grown since last indexing
 				startPosition = info.LastPosition
 				resuming = true
-				logger.Infof("Resuming indexing from position %d for file %s (file size: %d -> %d)", 
+				logger.Infof("Resuming indexing from position %d for file %s (file size: %d -> %d)",
 					startPosition, file.Path, info.LastSize, currentSize)
 			} else if currentSize < info.LastSize {
 				// File has been truncated or rotated, start from beginning
 				startPosition = 0
-				logger.Infof("File %s has been truncated/rotated (size: %d -> %d), reindexing from start", 
+				logger.Infof("File %s has been truncated/rotated (size: %d -> %d), reindexing from start",
 					file.Path, info.LastSize, currentSize)
 			} else if info.LastPosition >= currentSize && currentSize > 0 {
 				// File size hasn't changed and we've already processed it completely
@@ -477,7 +485,7 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 	var currentPosition int64 = startPosition
 	var documentCount uint64 = 0
 	checkpointInterval := int64(1000) // Save checkpoint every 1000 lines
-	
+
 	scanner := bufio.NewScanner(reader)
 	for scanner.Scan() {
 		// Check context for cancellation
@@ -486,22 +494,22 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 			return ctx.Err()
 		default:
 		}
-		
+
 		line := scanner.Text()
 		currentPosition += int64(len(line)) + 1 // +1 for newline
-		
+
 		// Process the log line (parse and add to batch)
 		// This would typically involve:
 		// 1. Parse log entry using parser
 		// 2. Create search document
 		// 3. Add to batch
-		
+
 		processedLines++
 		documentCount++
-		
+
 		// Update progress
 		tracker.UpdateFileProgress(file.Path, processedLines)
-		
+
 		// Periodic checkpoint saving
 		if processedLines%checkpointInterval == 0 {
 			if rm.persistence != nil {
@@ -513,7 +521,7 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 				} else {
 					modTime = time.Now().Unix()
 				}
-				
+
 				info := &LogFileInfo{
 					Path:         file.Path,
 					LastPosition: currentPosition,
@@ -527,7 +535,7 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 			}
 		}
 	}
-	
+
 	if err := scanner.Err(); err != nil {
 		return fmt.Errorf("error reading file %s: %w", file.Path, err)
 	}
@@ -536,7 +544,7 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 	file.ProcessedLines = processedLines
 	file.DocumentCount = documentCount
 	file.LastPosition = currentPosition
-	
+
 	// Save final checkpoint
 	if rm.persistence != nil {
 		// Get current file info for accurate metadata
@@ -547,7 +555,7 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 		} else {
 			modTime = time.Now().Unix()
 		}
-		
+
 		info := &LogFileInfo{
 			Path:         file.Path,
 			LastPosition: currentPosition,
@@ -561,7 +569,7 @@ func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tra
 	}
 
 	if resuming {
-		logger.Infof("Completed resumed indexing for %s: %d lines, %d documents", 
+		logger.Infof("Completed resumed indexing for %s: %d lines, %d documents",
 			file.Path, processedLines, documentCount)
 	}
 
@@ -578,7 +586,7 @@ func (rm *RebuildManager) openFileFromPosition(filePath string, startPosition in
 
 	// Check if file is compressed
 	isGzipped := strings.HasSuffix(filePath, ".gz")
-	
+
 	if isGzipped {
 		// For gzip files, we need to read from the beginning and skip to position
 		// This is because gzip doesn't support random seeking
@@ -587,7 +595,7 @@ func (rm *RebuildManager) openFileFromPosition(filePath string, startPosition in
 			file.Close()
 			return nil, fmt.Errorf("failed to create gzip reader: %w", err)
 		}
-		
+
 		if startPosition > 0 {
 			// Skip to the start position by reading and discarding bytes
 			_, err := io.CopyN(io.Discard, gzReader, startPosition)
@@ -597,7 +605,7 @@ func (rm *RebuildManager) openFileFromPosition(filePath string, startPosition in
 				return nil, fmt.Errorf("failed to seek to position %d in gzip file: %w", startPosition, err)
 			}
 		}
-		
+
 		// Return a wrapped reader that closes both gzReader and file
 		return &gzipReaderCloser{gzReader: gzReader, file: file}, nil
 	} else {

+ 2 - 0
internal/nginx_log/modern_services.go

@@ -100,6 +100,8 @@ func initializeWithDefaults(ctx context.Context) error {
 
 	// Initialize log file manager
 	globalLogFileManager = indexer.NewLogFileManager()
+	// Inject indexer for precise doc counting before persisting
+	globalLogFileManager.SetIndexer(globalIndexer)
 
 	servicesInitialized = true