123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436 |
- package nginx_log
- import (
- "bufio"
- "compress/gzip"
- "fmt"
- "io"
- "os"
- "strings"
- "time"
- "github.com/0xJacky/Nginx-UI/model"
- "github.com/uozi-tech/cosy/logger"
- )
- // indexFileFromPosition indexes a file starting from a specific byte position
- func (li *LogIndexer) indexFileFromPosition(filePath string, startPosition int64, logIndex *model.NginxLogIndex) error {
- indexStartTime := time.Now()
- // Record the start time of indexing operation
- logIndex.SetIndexStartTime(indexStartTime)
- file, err := li.safeOpenFile(filePath)
- if err != nil {
- return fmt.Errorf("failed to safely open file %s: %w", filePath, err)
- }
- defer file.Close()
- // Get file info
- fileInfo, err := file.Stat()
- if err != nil {
- return fmt.Errorf("failed to get file info: %w", err)
- }
- // Seek to start position
- if startPosition > 0 {
- _, err = file.Seek(startPosition, io.SeekStart)
- if err != nil {
- return fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
- }
- }
- var reader io.Reader = file
- // Handle compressed files (note: incremental indexing may not work well with compressed files)
- if strings.HasSuffix(filePath, ".gz") {
- if startPosition > 0 {
- return fmt.Errorf("incremental indexing not supported for compressed files")
- }
- gzReader, err := gzip.NewReader(file)
- if err != nil {
- return fmt.Errorf("failed to create gzip reader: %w", err)
- }
- defer gzReader.Close()
- reader = gzReader
- }
- scanner := bufio.NewScanner(reader)
- const maxCapacity = 1024 * 1024 // 1MB
- buf := make([]byte, maxCapacity)
- scanner.Buffer(buf, maxCapacity)
- // Use streaming processing to avoid loading all lines into memory
- return li.indexFileFromPositionStreaming(filePath, startPosition, logIndex, fileInfo, scanner, indexStartTime)
- }
- // indexFileFromPositionWithMainLogPath indexes a file starting from a specific byte position with specified main log path
- func (li *LogIndexer) indexFileFromPositionWithMainLogPath(filePath, mainLogPath string, startPosition int64, logIndex *model.NginxLogIndex, progressTracker *ProgressTracker) error {
- indexStartTime := time.Now()
- // Record the start time of indexing operation
- logIndex.SetIndexStartTime(indexStartTime)
- file, err := li.safeOpenFile(filePath)
- if err != nil {
- return fmt.Errorf("failed to safely open file %s: %w", filePath, err)
- }
- defer file.Close()
- // Get file info
- fileInfo, err := file.Stat()
- if err != nil {
- return fmt.Errorf("failed to get file info: %w", err)
- }
- // Seek to start position
- if startPosition > 0 {
- _, err = file.Seek(startPosition, io.SeekStart)
- if err != nil {
- return fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
- }
- }
- var reader io.Reader = file
- // Handle compressed files (note: incremental indexing may not work well with compressed files)
- if strings.HasSuffix(filePath, ".gz") {
- if startPosition > 0 {
- return fmt.Errorf("incremental indexing not supported for compressed files")
- }
- gzReader, err := gzip.NewReader(file)
- if err != nil {
- return fmt.Errorf("failed to create gzip reader: %w", err)
- }
- defer gzReader.Close()
- reader = gzReader
- }
- scanner := bufio.NewScanner(reader)
- const maxCapacity = 1024 * 1024 // 1MB
- buf := make([]byte, maxCapacity)
- scanner.Buffer(buf, maxCapacity)
- // Use streaming processing with specified main log path
- return li.indexFileFromPositionStreamingWithMainLogPath(filePath, mainLogPath, startPosition, logIndex, fileInfo, scanner, indexStartTime, progressTracker)
- }
- // indexFileFromPositionStreamingWithMainLogPath processes file content using streaming approach with specified main log path
- func (li *LogIndexer) indexFileFromPositionStreamingWithMainLogPath(filePath, mainLogPath string, startPosition int64, logIndex *model.NginxLogIndex, fileInfo os.FileInfo, scanner *bufio.Scanner, startTime time.Time, progressTracker *ProgressTracker) error {
- // Record index start time
- logIndex.SetIndexStartTime(startTime)
- var currentPosition int64 = startPosition
- lineCount := 0
- entryCount := 0
- batch := li.index.NewBatch()
- var newTimeStart, newTimeEnd int64
- logger.Infof("Starting index for file %s -> %s (size: %d bytes)", filePath, mainLogPath, fileInfo.Size())
- // Set file size in progress tracker
- if progressTracker != nil {
- progressTracker.SetFileSize(filePath, fileInfo.Size())
- }
- // Determine compression status
- isCompressed := strings.HasSuffix(filePath, ".gz") || strings.HasSuffix(filePath, ".bz2")
- // Line buffer for batch processing
- const batchLines = 10000 // Process 10000 lines at a time for better performance
- var lineBuffer []string
- // If starting from middle of file, skip partial line
- if startPosition > 0 {
- if scanner.Scan() {
- // Skip the first (potentially partial) line
- line := scanner.Text()
- currentPosition += int64(len(line)) + 1 // +1 for newline
- }
- }
- // Process lines in batches
- for scanner.Scan() {
- line := strings.TrimSpace(scanner.Text())
- if line != "" {
- lineBuffer = append(lineBuffer, line)
- lineCount++
- currentPosition += int64(len(scanner.Text())) + 1 // +1 for newline
- // Process batch when buffer is full
- if len(lineBuffer) >= batchLines {
- if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
- return err
- }
- // Clear buffer
- lineBuffer = lineBuffer[:0]
- }
- // Update progress tracker periodically
- if lineCount%5000 == 0 {
- progressTracker.UpdateFilePosition(filePath, currentPosition, int64(lineCount))
- }
- }
- }
- // Process remaining lines in buffer
- if len(lineBuffer) > 0 {
- if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
- return err
- }
- }
- if err := scanner.Err(); err != nil {
- return fmt.Errorf("failed to scan file %s: %w", filePath, err)
- }
- // Execute final batch
- if batch.Size() > 0 {
- if err := li.index.Batch(batch); err != nil {
- return fmt.Errorf("failed to execute final batch: %w", err)
- }
- logger.Infof("Final batch executed: %d entries indexed for %s", batch.Size(), filePath)
- }
- // For compressed files, mark as fully indexed
- if isCompressed {
- currentPosition = fileInfo.Size()
- }
- // Update persistence with final status
- var newTimeStartPtr, newTimeEndPtr *time.Time
- if newTimeStart != 0 {
- t := time.Unix(newTimeStart, 0)
- newTimeStartPtr = &t
- }
- if newTimeEnd != 0 {
- t := time.Unix(newTimeEnd, 0)
- newTimeEndPtr = &t
- }
- logIndex.UpdateProgress(fileInfo.ModTime(), fileInfo.Size(), currentPosition, uint64(entryCount), newTimeStartPtr, newTimeEndPtr)
- logIndex.SetIndexDuration(startTime)
- // Save the updated log index
- if err := li.persistence.SaveLogIndex(logIndex); err != nil {
- logger.Warnf("Failed to save log index for %s: %v", filePath, err)
- }
- // Final position update for progress tracker
- if progressTracker != nil {
- progressTracker.UpdateFilePosition(filePath, currentPosition, int64(lineCount))
- }
- // Complete file in progress tracker
- if progressTracker != nil {
- progressTracker.CompleteFile(filePath, int64(lineCount))
- }
- duration := time.Since(startTime)
- logger.Infof("Completed indexing of %s: %d lines processed, %d entries indexed in %v", filePath, lineCount, entryCount, duration)
- return nil
- }
- // indexFileFromPositionStreaming processes file content using streaming approach
- func (li *LogIndexer) indexFileFromPositionStreaming(filePath string, startPosition int64, logIndex *model.NginxLogIndex, fileInfo os.FileInfo, scanner *bufio.Scanner, startTime time.Time) error {
- // Record index start time
- logIndex.SetIndexStartTime(startTime)
- var currentPosition int64 = startPosition
- lineCount := 0
- entryCount := 0
- batch := li.index.NewBatch()
- var newTimeStart, newTimeEnd int64
- // Get main log path first (for statistics grouping)
- mainLogPath := li.getMainLogPath(filePath)
- // Note: Stats calculation removed - using Bleve aggregations instead
- // For compressed files, we can't use position-based progress accurately
- // Fall back to line-based estimation for compressed files
- isCompressed := strings.HasSuffix(filePath, ".gz") || strings.HasSuffix(filePath, ".bz2")
- var totalFileSize int64
- if isCompressed {
- // For compressed files, estimate uncompressed size using compression ratio
- // Use 3:1 compression ratio as a reasonable estimate for most log files
- estimatedUncompressedSize := fileInfo.Size() * 3
- if estimatedUncompressedSize < 1 {
- estimatedUncompressedSize = 1
- }
- totalFileSize = estimatedUncompressedSize
- logger.Infof("Starting index for compressed file %s: compressed size %d, estimated uncompressed size %d", filePath, fileInfo.Size(), estimatedUncompressedSize)
- } else {
- // For uncompressed files, use actual file size
- totalFileSize = fileInfo.Size()
- if totalFileSize < 1 {
- totalFileSize = 1
- }
- logger.Infof("Starting index from position %d of %d bytes for %s", startPosition, totalFileSize, filePath)
- }
- logger.Debugf("Starting indexing: filePath=%s, mainLogPath=%s", filePath, mainLogPath)
- // Line buffer for batch processing
- const batchLines = 10000 // Process 10000 lines at a time for better performance
- var lineBuffer []string
- // If starting from middle of file, skip partial line
- if startPosition > 0 {
- if scanner.Scan() {
- // Skip the first (potentially partial) line
- line := scanner.Text()
- currentPosition += int64(len(line)) + 1 // +1 for newline
- }
- }
- // Process lines in batches
- for scanner.Scan() {
- line := strings.TrimSpace(scanner.Text())
- if line != "" {
- lineBuffer = append(lineBuffer, line)
- lineCount++
- currentPosition += int64(len(scanner.Text())) + 1 // +1 for newline
- // Process batch when buffer is full
- if len(lineBuffer) >= batchLines {
- if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
- return err
- }
- // Clear buffer
- lineBuffer = lineBuffer[:0]
- }
- // Progress reporting for large files
- if lineCount%10000 == 0 {
- // Calculate current file progress percentage
- var currentFileProgress float64
- if isCompressed {
- logger.Debugf("Processed %d lines, indexed %d entries from compressed file %s...",
- lineCount, entryCount, filePath)
- // For compressed files, estimate progress based on current position vs total estimated size
- currentFileProgress = float64(currentPosition) / float64(totalFileSize) * 100
- } else {
- logger.Debugf("Processed %d lines, indexed %d entries from %s... (position: %d/%d bytes)",
- lineCount, entryCount, filePath, currentPosition, totalFileSize)
- // For uncompressed files, use byte position
- currentFileProgress = float64(currentPosition) / float64(totalFileSize) * 100
- }
- // Ensure file progress doesn't exceed 100%
- if currentFileProgress > 100 {
- currentFileProgress = 100
- }
- // Log progress (simplified for incremental indexing)
- logger.Debugf("Processed %d lines for incremental indexing of %s", lineCount, filePath)
- }
- }
- }
- // Process remaining lines in buffer
- if len(lineBuffer) > 0 {
- if err := li.processBatchStreaming(lineBuffer, filePath, mainLogPath, startPosition, &batch, &entryCount, &newTimeStart, &newTimeEnd); err != nil {
- return err
- }
- }
- if err := scanner.Err(); err != nil {
- return fmt.Errorf("failed to scan file %s: %w", filePath, err)
- }
- // Execute final batch
- if batch.Size() > 0 {
- if err := li.index.Batch(batch); err != nil {
- return fmt.Errorf("failed to execute final batch: %w", err)
- }
- }
- logger.Debugf("Processed %d lines, indexed %d entries from %s (position: %d->%d)", lineCount, entryCount, filePath, startPosition, currentPosition)
- // Note: Log group aggregator removed - using Bleve aggregations instead
- if entryCount == 0 {
- logger.Debugf("No new entries to index in %s", filePath)
- // Still update the position and modification time with total index size
- totalSize := li.calculateRelatedLogFilesSize(filePath)
- // Record index completion time and duration
- logIndex.SetIndexDuration(startTime)
- logIndex.UpdateProgress(fileInfo.ModTime(), totalSize, fileInfo.Size(), logIndex.DocumentCount, logIndex.TimeRangeStart, logIndex.TimeRangeEnd)
- return li.persistence.SaveLogIndex(logIndex)
- }
- // Update time range in log index
- var timeRangeStart, timeRangeEnd *time.Time
- if logIndex.TimeRangeStart != nil {
- timeRangeStart = logIndex.TimeRangeStart
- } else if newTimeStart != 0 {
- t := time.Unix(newTimeStart, 0)
- timeRangeStart = &t
- }
- if logIndex.TimeRangeEnd != nil {
- timeRangeEnd = logIndex.TimeRangeEnd
- } else if newTimeEnd != 0 {
- t := time.Unix(newTimeEnd, 0)
- timeRangeEnd = &t
- }
- // Expand time range if needed
- if newTimeStart != 0 && (timeRangeStart == nil || time.Unix(newTimeStart, 0).Before(*timeRangeStart)) {
- t := time.Unix(newTimeStart, 0)
- timeRangeStart = &t
- }
- if newTimeEnd != 0 && (timeRangeEnd == nil || time.Unix(newTimeEnd, 0).After(*timeRangeEnd)) {
- t := time.Unix(newTimeEnd, 0)
- timeRangeEnd = &t
- }
- // Calculate total index size of related log files for this log group
- totalSize := li.calculateRelatedLogFilesSize(filePath)
- // Record index completion time and duration
- logIndex.SetIndexDuration(startTime)
- // Update persistence record with total index size
- logIndex.UpdateProgress(fileInfo.ModTime(), totalSize, currentPosition, logIndex.DocumentCount+uint64(entryCount), timeRangeStart, timeRangeEnd)
- if err := li.persistence.SaveLogIndex(logIndex); err != nil {
- logger.Warnf("Failed to save log index: %v", err)
- }
- // Update in-memory file info for compatibility
- li.mu.Lock()
- if fileInfo, exists := li.logPaths[filePath]; exists {
- fileInfo.LastModified = logIndex.LastModified.Unix()
- fileInfo.LastSize = logIndex.LastSize
- fileInfo.LastIndexed = logIndex.LastIndexed.Unix()
- if timeRangeStart != nil && timeRangeEnd != nil {
- fileInfo.TimeRange = &TimeRange{Start: timeRangeStart.Unix(), End: timeRangeEnd.Unix()}
- }
- }
- li.mu.Unlock()
- // Invalidate statistics cache since data has changed
- li.invalidateStatsCache()
- // Clear indexing status for this file
- SetIndexingStatus(filePath, false)
- statusManager := GetIndexingStatusManager()
- statusManager.UpdateIndexingStatus()
- indexDuration := time.Since(startTime)
- logger.Infof("Indexed %d new entries from %s in %v (position: %d->%d, index_size: %d bytes)",
- entryCount, filePath, indexDuration, startPosition, currentPosition, totalSize)
- // Send completion event
- // duration := time.Since(startTime).Milliseconds()
- if isCompressed {
- logger.Infof("Indexing completed for compressed file %s: processed %d lines (estimated %d total)", filePath, lineCount, totalFileSize)
- } else {
- logger.Infof("Indexing completed for %s: processed %d lines, position %d/%d bytes", filePath, lineCount, currentPosition, totalFileSize)
- }
- // Note: Index complete notification will be sent with the log group ready notification
- // Note: Log group ready notification is now handled centrally after all files complete
- return nil
- }
|