123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- package indexer
- import (
- "context"
- "fmt"
- "os"
- "path/filepath"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/uozi-tech/cosy/logger"
- )
- // RotationScanner efficiently scans and prioritizes rotation logs for indexing
- type RotationScanner struct {
- config *RotationScanConfig
- scanResults map[string]*LogGroupScanResult
- resultsMutex sync.RWMutex
- priorityQueue []*RotationLogFileInfo
- queueMutex sync.Mutex
- scanInProgress bool
- progressMutex sync.Mutex
- }
- // RotationScanConfig configures the rotation log scanner
- type RotationScanConfig struct {
- // Scanning parameters
- MaxConcurrentScans int `json:"max_concurrent_scans"`
- ScanTimeout time.Duration `json:"scan_timeout"`
- MinFileSize int64 `json:"min_file_size"`
- MaxFileAge time.Duration `json:"max_file_age"`
- // Throughput optimization
- PrioritizeBySize bool `json:"prioritize_by_size"`
- PrioritizeByAge bool `json:"prioritize_by_age"`
- PreferredExtensions []string `json:"preferred_extensions"`
- ExcludePatterns []string `json:"exclude_patterns"`
- // Performance settings
- EnableParallelScan bool `json:"enable_parallel_scan"`
- ScanBatchSize int `json:"scan_batch_size"`
- }
- // LogGroupScanResult contains the scan results for a log group
- type LogGroupScanResult struct {
- BasePath string `json:"base_path"`
- Files []*RotationLogFileInfo `json:"files"`
- TotalSize int64 `json:"total_size"`
- TotalFiles int `json:"total_files"`
- ScanTime time.Time `json:"scan_time"`
- ScanDuration time.Duration `json:"scan_duration"`
- EstimatedLines int64 `json:"estimated_lines"`
- }
- // LogFileInfo contains detailed information about a log file
- type RotationLogFileInfo struct {
- Path string `json:"path"`
- Size int64 `json:"size"`
- ModTime time.Time `json:"mod_time"`
- IsCompressed bool `json:"is_compressed"`
- RotationIndex int `json:"rotation_index"`
- EstimatedLines int64 `json:"estimated_lines"`
- Priority int `json:"priority"`
- MainLogPath string `json:"main_log_path"`
- }
- // NewRotationScanner creates a new rotation log scanner
- func NewRotationScanner(config *RotationScanConfig) *RotationScanner {
- if config == nil {
- config = DefaultRotationScanConfig()
- }
- return &RotationScanner{
- config: config,
- scanResults: make(map[string]*LogGroupScanResult),
- priorityQueue: make([]*RotationLogFileInfo, 0),
- }
- }
- // DefaultRotationScanConfig returns default configuration for rotation scanning
- func DefaultRotationScanConfig() *RotationScanConfig {
- return &RotationScanConfig{
- MaxConcurrentScans: 8,
- ScanTimeout: 30 * time.Second,
- MinFileSize: 1024, // 1KB minimum
- MaxFileAge: 30 * 24 * time.Hour, // 30 days
- PrioritizeBySize: true,
- PrioritizeByAge: true,
- PreferredExtensions: []string{".log", ".gz"},
- ExcludePatterns: []string{"*.tmp", "*.lock", "*.swap"},
- EnableParallelScan: true,
- ScanBatchSize: 50,
- }
- }
- // ScanLogGroups scans multiple log groups and builds a prioritized queue
- func (rs *RotationScanner) ScanLogGroups(ctx context.Context, basePaths []string) error {
- rs.progressMutex.Lock()
- if rs.scanInProgress {
- rs.progressMutex.Unlock()
- return fmt.Errorf("scan already in progress")
- }
- rs.scanInProgress = true
- rs.progressMutex.Unlock()
- defer func() {
- rs.progressMutex.Lock()
- rs.scanInProgress = false
- rs.progressMutex.Unlock()
- }()
- logger.Infof("🔍 Starting rotation log scan for %d log groups", len(basePaths))
- if rs.config.EnableParallelScan {
- return rs.scanLogGroupsParallel(ctx, basePaths)
- } else {
- return rs.scanLogGroupsSequential(ctx, basePaths)
- }
- }
- // scanLogGroupsParallel scans log groups in parallel for maximum throughput
- func (rs *RotationScanner) scanLogGroupsParallel(ctx context.Context, basePaths []string) error {
- semaphore := make(chan struct{}, rs.config.MaxConcurrentScans)
- var wg sync.WaitGroup
- errors := make(chan error, len(basePaths))
- for _, basePath := range basePaths {
- wg.Add(1)
- go func(path string) {
- defer wg.Done()
- select {
- case semaphore <- struct{}{}:
- defer func() { <-semaphore }()
- if err := rs.scanSingleLogGroup(ctx, path); err != nil {
- errors <- fmt.Errorf("failed to scan %s: %w", path, err)
- }
- case <-ctx.Done():
- errors <- ctx.Err()
- return
- }
- }(basePath)
- }
- wg.Wait()
- close(errors)
- // Collect any errors
- var scanErrors []error
- for err := range errors {
- if err != nil {
- scanErrors = append(scanErrors, err)
- }
- }
- if len(scanErrors) > 0 {
- logger.Warnf("Encountered %d errors during parallel scan: %v", len(scanErrors), scanErrors)
- }
- // Build priority queue from all scan results
- rs.buildPriorityQueue()
- logger.Infof("✅ Rotation log scan completed: %d log groups, %d total files",
- len(rs.scanResults), len(rs.priorityQueue))
- return nil
- }
- // scanLogGroupsSequential scans log groups sequentially
- func (rs *RotationScanner) scanLogGroupsSequential(ctx context.Context, basePaths []string) error {
- for _, basePath := range basePaths {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- if err := rs.scanSingleLogGroup(ctx, basePath); err != nil {
- logger.Warnf("Failed to scan log group %s: %v", basePath, err)
- // Continue with other groups
- }
- }
- }
- rs.buildPriorityQueue()
- return nil
- }
- // scanSingleLogGroup scans a single log group for rotation logs
- func (rs *RotationScanner) scanSingleLogGroup(ctx context.Context, basePath string) error {
- start := time.Now()
- // Use optimized glob pattern for rotation logs
- patterns := []string{
- basePath, // Main log file
- basePath + ".*", // access.log.1, access.log.2.gz, etc.
- basePath + "-*", // access.log-20240101, etc.
- strings.TrimSuffix(basePath, ".log") + ".*.log*", // For patterns like access.20240101.log.gz
- }
- var allFiles []*RotationLogFileInfo
- seen := make(map[string]struct{})
- for _, pattern := range patterns {
- matches, err := filepath.Glob(pattern)
- if err != nil {
- continue // Skip problematic patterns
- }
- for _, match := range matches {
- if _, exists := seen[match]; exists {
- continue
- }
- seen[match] = struct{}{}
- info, err := os.Stat(match)
- if err != nil || !info.Mode().IsRegular() {
- continue
- }
- // Apply file filters
- if info.Size() < rs.config.MinFileSize {
- continue
- }
- if time.Since(info.ModTime()) > rs.config.MaxFileAge {
- continue
- }
- if rs.shouldExcludeFile(match) {
- continue
- }
- logFile := &RotationLogFileInfo{
- Path: match,
- Size: info.Size(),
- ModTime: info.ModTime(),
- IsCompressed: strings.HasSuffix(match, ".gz"),
- RotationIndex: rs.extractRotationIndex(match, basePath),
- EstimatedLines: rs.estimateLineCount(info.Size(), strings.HasSuffix(match, ".gz")),
- MainLogPath: basePath,
- }
- logFile.Priority = rs.calculatePriority(logFile)
- allFiles = append(allFiles, logFile)
- }
- }
- // Sort files by priority and rotation index
- sort.Slice(allFiles, func(i, j int) bool {
- if allFiles[i].Priority != allFiles[j].Priority {
- return allFiles[i].Priority > allFiles[j].Priority // Higher priority first
- }
- return allFiles[i].RotationIndex < allFiles[j].RotationIndex // Newer files first
- })
- // Calculate totals
- var totalSize int64
- var estimatedLines int64
- for _, file := range allFiles {
- totalSize += file.Size
- estimatedLines += file.EstimatedLines
- }
- result := &LogGroupScanResult{
- BasePath: basePath,
- Files: allFiles,
- TotalSize: totalSize,
- TotalFiles: len(allFiles),
- ScanTime: start,
- ScanDuration: time.Since(start),
- EstimatedLines: estimatedLines,
- }
- rs.resultsMutex.Lock()
- rs.scanResults[basePath] = result
- rs.resultsMutex.Unlock()
- logger.Debugf("📊 Scanned log group %s: %d files, %s total, %d estimated lines",
- basePath, len(allFiles), formatSize(totalSize), estimatedLines)
- return nil
- }
- // buildPriorityQueue builds a global priority queue from all scan results
- func (rs *RotationScanner) buildPriorityQueue() {
- rs.queueMutex.Lock()
- defer rs.queueMutex.Unlock()
- rs.priorityQueue = rs.priorityQueue[:0] // Clear existing queue
- rs.resultsMutex.RLock()
- for _, result := range rs.scanResults {
- rs.priorityQueue = append(rs.priorityQueue, result.Files...)
- }
- rs.resultsMutex.RUnlock()
- // Sort by priority
- sort.Slice(rs.priorityQueue, func(i, j int) bool {
- if rs.priorityQueue[i].Priority != rs.priorityQueue[j].Priority {
- return rs.priorityQueue[i].Priority > rs.priorityQueue[j].Priority
- }
- // Secondary sort by size for throughput
- return rs.priorityQueue[i].Size > rs.priorityQueue[j].Size
- })
- logger.Infof("🚀 Built priority queue with %d files for optimized indexing", len(rs.priorityQueue))
- }
- // GetNextBatch returns the next batch of files to index, prioritized for throughput
- func (rs *RotationScanner) GetNextBatch(batchSize int) []*RotationLogFileInfo {
- rs.queueMutex.Lock()
- defer rs.queueMutex.Unlock()
- if len(rs.priorityQueue) == 0 {
- return []*RotationLogFileInfo{} // Return empty slice instead of nil
- }
- end := minInt(batchSize, len(rs.priorityQueue))
- batch := make([]*RotationLogFileInfo, end)
- copy(batch, rs.priorityQueue[:end])
- // Remove processed files from queue
- rs.priorityQueue = rs.priorityQueue[end:]
- return batch
- }
- // extractRotationIndex extracts rotation index from file path
- func (rs *RotationScanner) extractRotationIndex(filePath, basePath string) int {
- if filePath == basePath {
- return 0 // Main log file
- }
- // Extract numeric suffix (e.g., access.log.1 -> 1)
- suffix := strings.TrimPrefix(filePath, basePath)
- // Handle various rotation formats
- if strings.HasPrefix(suffix, ".") {
- suffix = strings.TrimPrefix(suffix, ".")
- // Always trim optional ".gz" suffix
- suffix = strings.TrimSuffix(suffix, ".gz")
- // Try to parse as integer
- var index int
- if n, err := fmt.Sscanf(suffix, "%d", &index); n == 1 && err == nil {
- return index
- }
- }
- // Default to high index for unknown formats
- return 999
- }
- // calculatePriority calculates file priority based on configuration
- func (rs *RotationScanner) calculatePriority(file *RotationLogFileInfo) int {
- priority := 100 // Base priority
- if rs.config.PrioritizeBySize {
- // Larger files get higher priority (better throughput)
- sizeMB := file.Size / (1024 * 1024)
- priority += int(sizeMB/10) * 10 // +10 points per 10MB
- }
- if rs.config.PrioritizeByAge {
- // Newer files get higher priority
- hoursOld := int(time.Since(file.ModTime).Hours())
- if hoursOld < 24 {
- priority += 50 // Recent files
- } else if hoursOld < 168 { // 1 week
- priority += 20
- }
- }
- // Main log file gets highest priority
- if file.RotationIndex == 0 {
- priority += 100
- }
- // Compressed files get slightly lower priority (decompression overhead)
- if file.IsCompressed {
- priority -= 10
- }
- return priority
- }
- // estimateLineCount estimates the number of log lines in a file
- func (rs *RotationScanner) estimateLineCount(size int64, isCompressed bool) int64 {
- avgLineSize := int64(200) // Average nginx log line size in bytes
- if isCompressed {
- // Assume 3x compression ratio for text logs
- size = size * 3
- }
- return size / avgLineSize
- }
- // shouldExcludeFile checks if file should be excluded based on patterns
- func (rs *RotationScanner) shouldExcludeFile(path string) bool {
- filename := filepath.Base(path)
- for _, pattern := range rs.config.ExcludePatterns {
- if matched, _ := filepath.Match(pattern, filename); matched {
- return true
- }
- }
- return false
- }
- // GetScanResults returns the current scan results
- func (rs *RotationScanner) GetScanResults() map[string]*LogGroupScanResult {
- rs.resultsMutex.RLock()
- defer rs.resultsMutex.RUnlock()
- results := make(map[string]*LogGroupScanResult)
- for k, v := range rs.scanResults {
- results[k] = v
- }
- return results
- }
- // GetQueueSize returns the current number of files in the priority queue
- func (rs *RotationScanner) GetQueueSize() int {
- rs.queueMutex.Lock()
- defer rs.queueMutex.Unlock()
- return len(rs.priorityQueue)
- }
- // IsScanning returns true if a scan is currently in progress
- func (rs *RotationScanner) IsScanning() bool {
- rs.progressMutex.Lock()
- defer rs.progressMutex.Unlock()
- return rs.scanInProgress
- }
- // formatSize formats byte size for display
- func formatSize(bytes int64) string {
- const unit = 1024
- if bytes < unit {
- return fmt.Sprintf("%d B", bytes)
- }
- div, exp := int64(unit), 0
- for n := bytes / unit; n >= unit; n /= unit {
- div *= unit
- exp++
- }
- return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
- }
- func minInt(a, b int) int {
- if a < b {
- return a
- }
- return b
- }
|