0xJacky 9124b33cf3 feat(nginx_log): add basic log_list fallback when AdvancedIndexing is disabled 1 주 전
..
README.md e3ce9a9023 feat(nginx_log): adaptive optimization and worker count adjustment 1 개월 전
adaptive_optimization.go 9124b33cf3 feat(nginx_log): add basic log_list fallback when AdvancedIndexing is disabled 1 주 전
adaptive_optimization_test.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전
batch_writer.go fc968a3b39 refactor(nginx-log): optimized parser, indexer, searcher and analyzer 1 개월 전
grouped_shard_manager.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전
integration_validation_test.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전
log_file_manager.go 4ec1a50ef2 feat(nginx_log): inject indexer for precise document counting and enhance metadata persistence 2 주 전
metrics.go 03fbd9be4c perf: introduced cache for user, site_config, node 1 개월 전
optimized_parallel_indexer.go 1626c6117b perf: optimize indexer config for multi-core systems 1 개월 전
parallel_indexer.go 4ec1a50ef2 feat(nginx_log): inject indexer for precise document counting and enhance metadata persistence 2 주 전
parallel_indexer_worker_test.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전
parser.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전
persistence.go 1626c6117b perf: optimize indexer config for multi-core systems 1 개월 전
persistence_test.go fc968a3b39 refactor(nginx-log): optimized parser, indexer, searcher and analyzer 1 개월 전
progress_tracker.go b34fa8eeaf enhance(nginx_log): indexing logic, UI feedback for log availability 1 개월 전
progress_tracker_bench_test.go fc968a3b39 refactor(nginx-log): optimized parser, indexer, searcher and analyzer 1 개월 전
progress_tracker_test.go 618ff5c99c feat(nginx-log): hybrid progress calculation and dynamic estimation 1 개월 전
rebuild.go 4ec1a50ef2 feat(nginx_log): inject indexer for precise document counting and enhance metadata persistence 2 주 전
rebuild_simple_test.go e3ce9a9023 feat(nginx_log): adaptive optimization and worker count adjustment 1 개월 전
rebuild_test.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전
rotation_scanner.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전
throughput_optimizer.go 1626c6117b perf: optimize indexer config for multi-core systems 1 개월 전
time_range_fix_test.go 1626c6117b perf: optimize indexer config for multi-core systems 1 개월 전
types.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전
validation_test.go 1626c6117b perf: optimize indexer config for multi-core systems 1 개월 전
zero_allocation_pool.go b81a72c2a2 refactor(nginx-log): integrated and optimized shard management 3 주 전

README.md

Indexer Package

The indexer package provides high-performance, multi-shard parallel indexing capabilities for NGINX logs with comprehensive persistence management, progress tracking, and rebuild functionality.

Features

  • Multi-Shard Architecture: Distributed indexing across multiple Bleve indexes for scalability
  • Parallel Processing: Concurrent indexing with configurable worker pools
  • Persistence Management: Incremental indexing with position tracking and recovery
  • Progress Tracking: Real-time progress monitoring for long-running operations
  • Rebuild Functionality: Complete index reconstruction with comprehensive error handling
  • Performance Optimization: Memory management, caching, and batch processing
  • High Availability: Fault tolerance and automatic recovery mechanisms

Architecture

indexer/
├── types.go                    # Core types, interfaces, and index mapping
├── parallel_indexer.go         # Main parallel indexer implementation (optimized)
├── shard_manager.go           # Multi-shard management and distribution (optimized)
├── batch_writer.go            # Efficient batch writing operations
├── persistence.go             # Incremental indexing and persistence management
├── progress_tracker.go        # Real-time progress monitoring
├── rebuild.go                 # Index rebuilding functionality
├── log_file_manager.go        # Log file discovery and management
├── metrics.go                 # Performance metrics and monitoring
└── README.md                  # This documentation

Note: Performance optimizations now use the unified utils package (../utils/)

Quick Start

Basic Indexing

import "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"

// Create indexer with default configuration
indexer := indexer.NewParallelIndexer(nil)

// Start indexer
ctx := context.Background()
if err := indexer.Start(ctx); err != nil {
    log.Fatal(err)
}
defer indexer.Stop()

// Index a single document
doc := &indexer.Document{
    ID: "log_entry_1",
    Fields: &indexer.LogDocument{
        Timestamp: time.Now().Unix(),
        IP:        "192.168.1.1",
        Method:    "GET",
        Path:      "/api/status",
        Status:    200,
        BytesSent: 1234,
        FilePath:  "/var/log/nginx/access.log",
        Raw:       "192.168.1.1 - - [01/Jan/2024:12:00:00 +0000] \"GET /api/status HTTP/1.1\" 200 1234",
    },
}

if err := indexer.IndexDocument(ctx, doc); err != nil {
    log.Printf("Indexing error: %v", err)
}

Batch Processing

// Create multiple documents
var documents []*indexer.Document
for i := 0; i < 1000; i++ {
    doc := &indexer.Document{
        ID: fmt.Sprintf("log_entry_%d", i),
        Fields: &indexer.LogDocument{
            Timestamp: time.Now().Unix(),
            IP:        fmt.Sprintf("192.168.1.%d", i%254+1),
            Method:    "GET",
            Path:      fmt.Sprintf("/api/endpoint_%d", i),
            Status:    200,
            BytesSent: int64(1000 + i),
            FilePath:  "/var/log/nginx/access.log",
        },
    }
    documents = append(documents, doc)
}

// Index batch with automatic optimization
if err := indexer.IndexDocuments(ctx, documents); err != nil {
    log.Printf("Batch indexing error: %v", err)
}

// Get indexing statistics
stats := indexer.GetStats()
fmt.Printf("Total documents: %d, Indexing rate: %.2f docs/sec\n", 
    stats.TotalDocuments, stats.IndexingRate)

Configuration

IndexerConfig

type IndexerConfig struct {
    // Basic configuration
    IndexPath         string        `json:"index_path"`         // Base path for index storage
    ShardCount        int           `json:"shard_count"`        // Number of shards (default: 4)
    WorkerCount       int           `json:"worker_count"`       // Worker pool size (default: 8)
    BatchSize         int           `json:"batch_size"`         // Batch processing size (default: 1000)
    FlushInterval     time.Duration `json:"flush_interval"`     // Auto-flush interval (default: 5s)
    MaxQueueSize      int           `json:"max_queue_size"`     // Maximum queue depth (default: 10000)
    
    // Performance tuning
    EnableCompression bool          `json:"enable_compression"` // Enable index compression (default: true)
    MemoryQuota       int64         `json:"memory_quota"`       // Memory limit in bytes (default: 1GB)
    MaxSegmentSize    int64         `json:"max_segment_size"`   // Maximum segment size (default: 64MB)
    OptimizeInterval  time.Duration `json:"optimize_interval"`  // Auto-optimization interval (default: 30m)
    
    // Monitoring
    EnableMetrics     bool          `json:"enable_metrics"`     // Enable performance metrics (default: true)
}

Default Configuration

func DefaultIndexerConfig() *IndexerConfig {
    return &IndexerConfig{
        IndexPath:         "./log-index",
        ShardCount:        4,
        WorkerCount:       8,
        BatchSize:         1000,
        FlushInterval:     5 * time.Second,
        MaxQueueSize:      10000,
        EnableCompression: true,
        MemoryQuota:       1024 * 1024 * 1024, // 1GB
        MaxSegmentSize:    64 * 1024 * 1024,   // 64MB
        OptimizeInterval:  30 * time.Minute,
        EnableMetrics:     true,
    }
}

Core Components

1. Parallel Indexer

The main indexer implementation with multi-shard support:

// Create with custom configuration
config := &IndexerConfig{
    ShardCount:  8,     // More shards for higher throughput
    WorkerCount: 16,    // More workers for CPU-intensive workloads
    BatchSize:   2000,  // Larger batches for better efficiency
}

indexer := NewParallelIndexer(config)

// Asynchronous indexing with callback
indexer.IndexDocumentAsync(doc, func(err error) {
    if err != nil {
        log.Printf("Async indexing failed: %v", err)
    } else {
        log.Println("Document indexed successfully")
    }
})

2. Shard Manager

Manages distribution across multiple index shards:

// Get shard information
shardStats := indexer.GetShardInfo(0)
fmt.Printf("Shard 0: %d documents, %s size\n", 
    shardStats.DocumentCount, formatBytes(shardStats.Size))

// Optimize specific shard
if err := indexer.OptimizeShard(0); err != nil {
    log.Printf("Shard optimization failed: %v", err)
}

// Health check
if err := indexer.HealthCheck(); err != nil {
    log.Printf("Health check failed: %v", err)
}

3. Batch Writer

Efficient batch writing with automatic flushing:

// Create batch writer
batch := indexer.StartBatch()

// Add documents to batch
for _, doc := range documents {
    if err := batch.Add(doc); err != nil {
        log.Printf("Failed to add document to batch: %v", err)
    }
    
    // Automatic flush when batch size reached
    if batch.Size() >= 1000 {
        result, err := batch.Flush()
        if err != nil {
            log.Printf("Batch flush failed: %v", err)
        } else {
            fmt.Printf("Flushed %d documents in %v\n", 
                result.Processed, result.Duration)
        }
    }
}

// Final flush
if batch.Size() > 0 {
    batch.Flush()
}

4. Persistence Manager

Handles incremental indexing and position tracking:

// Create persistence manager with database
persistenceManager := NewPersistenceManager(&PersistenceConfig{
    DatabaseURL: "postgres://user:pass@localhost/nginx_ui",
    TableName:   "nginx_log_indexes",
})

// Get incremental indexing information
info, err := persistenceManager.GetIncrementalInfo("/var/log/nginx/access.log")
if err != nil {
    log.Printf("Failed to get incremental info: %v", err)
}

fmt.Printf("Last indexed position: %d, Documents: %d\n", 
    info.LastPosition, info.DocumentCount)

// Update indexing progress
newInfo := &LogFileInfo{
    Path:           "/var/log/nginx/access.log",
    LastPosition:   info.LastPosition + 1024,
    DocumentCount:  info.DocumentCount + 100,
    LastModified:   time.Now(),
    IndexedAt:      time.Now(),
}

if err := persistenceManager.UpdateIncrementalInfo("/var/log/nginx/access.log", newInfo); err != nil {
    log.Printf("Failed to update incremental info: %v", err)
}

5. Progress Tracker

Real-time progress monitoring for long-running operations:

// Create progress configuration
progressConfig := &ProgressConfig{
    OnProgress: func(notification ProgressNotification) {
        fmt.Printf("Progress: %s - %.2f%% complete (%d/%d files)\n",
            notification.GroupPath, notification.OverallProgress*100,
            notification.CompletedFiles, notification.TotalFiles)
    },
    OnCompletion: func(notification CompletionNotification) {
        fmt.Printf("Completed: %s in %v (processed %d documents)\n",
            notification.GroupPath, notification.Duration, notification.DocumentCount)
    },
}

// Get progress tracker
progressManager := NewProgressManager()
tracker := progressManager.GetTracker("/var/log/nginx/access.log", progressConfig)

// Track file processing
tracker.AddFile("/var/log/nginx/access.log", false)
tracker.SetFileEstimate("/var/log/nginx/access.log", 10000) // Estimated lines

tracker.StartFile("/var/log/nginx/access.log")

// Update progress periodically
for i := 0; i < 10000; i++ {
    // Process log line...
    
    if i%100 == 0 {
        tracker.UpdateFileProgress("/var/log/nginx/access.log", int64(i))
    }
}

tracker.CompleteFile("/var/log/nginx/access.log", 10000)

6. Rebuild Manager

Complete index reconstruction with progress tracking:

// Create rebuild manager
rebuildManager := NewRebuildManager(
    indexer,
    persistenceManager,
    progressManager,
    shardManager,
    &RebuildConfig{
        BatchSize:          2000,
        MaxConcurrency:     4,
        DeleteBeforeRebuild: true,
        ProgressInterval:   10 * time.Second,
        TimeoutPerFile:     30 * time.Minute,
    },
)

// Rebuild all indexes
ctx := context.Background()
if err := rebuildManager.RebuildAll(ctx); err != nil {
    log.Printf("Rebuild failed: %v", err)
} else {
    log.Println("Rebuild completed successfully")
}

// Rebuild single log group
if err := rebuildManager.RebuildSingle(ctx, "/var/log/nginx/access.log"); err != nil {
    log.Printf("Single rebuild failed: %v", err)
}

// Monitor rebuild status
stats := rebuildManager.GetRebuildStats()
if stats.IsRebuilding {
    fmt.Printf("Rebuild in progress (last rebuild: %v)\n", stats.LastRebuildTime)
}

Performance Characteristics

Benchmarks

Based on comprehensive benchmarking on Apple M2 Pro:

Operation Performance Memory Usage Notes
Single document indexing ~125µs 1.2KB Including shard selection
Batch indexing (1000 docs) ~45ms 128KB Optimized batch processing
Shard selection ~25ns 0 allocs Hash-based distribution
Progress tracking update ~57ns 0 allocs Lock-free counters
Rebuild stats retrieval ~25ns 0 allocs Atomic operations
Memory optimization cycle ~2.1ms 45KB Garbage collection trigger
Index optimization ~150ms 2.3MB Per shard

Throughput Characteristics

Scenario Documents/sec Memory Peak CPU Usage
Single worker ~8,000 256MB 25%
4 workers ~28,000 512MB 85%
8 workers ~45,000 768MB 95%
16 workers ~52,000 1.2GB 98%

Performance Tuning Guidelines

  1. Worker Count Configuration

    // CPU-bound workloads
    config.WorkerCount = runtime.GOMAXPROCS(0)
    
    // I/O-bound workloads  
    config.WorkerCount = runtime.GOMAXPROCS(0) * 2
    
    // Memory-constrained environments
    config.WorkerCount = max(2, runtime.GOMAXPROCS(0)/2)
    
  2. Shard Count Optimization

    // For high-volume environments (>1M docs)
    config.ShardCount = 8
    
    // For moderate volume (100K-1M docs)
    config.ShardCount = 4
    
    // For low volume (<100K docs)
    config.ShardCount = 2
    
  3. Batch Size Tuning

    // High-throughput scenarios
    config.BatchSize = 2000
    
    // Memory-constrained environments
    config.BatchSize = 500
    
    // Real-time requirements
    config.BatchSize = 100
    
  4. Memory Management

    // Configure memory limits
    config.MemoryQuota = 2 * 1024 * 1024 * 1024 // 2GB
    config.MaxSegmentSize = 128 * 1024 * 1024   // 128MB
    
    // Enable automatic optimization
    config.OptimizeInterval = 15 * time.Minute
    

Index Structure and Mapping

Document Schema

type LogDocument struct {
    // Core fields - always indexed
    Timestamp    int64    `json:"timestamp"`    // Unix timestamp (range queries)
    IP           string   `json:"ip"`           // IP address (keyword matching)
    Method       string   `json:"method"`       // HTTP method (keyword)
    Path         string   `json:"path"`         // Request path (analyzed text)
    PathExact    string   `json:"path_exact"`   // Exact path matching (keyword)
    Status       int      `json:"status"`       // HTTP status (numeric range)
    BytesSent    int64    `json:"bytes_sent"`   // Response size (numeric)
    
    // Geographic enrichment (optional)
    RegionCode   string   `json:"region_code,omitempty"`   // Country code (keyword)
    Province     string   `json:"province,omitempty"`      // State/province (keyword)
    City         string   `json:"city,omitempty"`          // City (keyword)
    ISP          string   `json:"isp,omitempty"`           // ISP (keyword)
    
    // User agent analysis (optional)
    UserAgent    string   `json:"user_agent,omitempty"`    // Full user agent (analyzed)
    Browser      string   `json:"browser,omitempty"`       // Browser name (keyword)
    BrowserVer   string   `json:"browser_version,omitempty"` // Browser version (keyword)
    OS           string   `json:"os,omitempty"`            // Operating system (keyword)
    OSVersion    string   `json:"os_version,omitempty"`    // OS version (keyword)
    DeviceType   string   `json:"device_type,omitempty"`   // Device type (keyword)
    
    // Performance metrics (optional)
    RequestTime  float64  `json:"request_time,omitempty"`  // Request duration (numeric)
    UpstreamTime *float64 `json:"upstream_time,omitempty"` // Upstream response time (numeric)
    
    // HTTP details (optional)
    Protocol     string   `json:"protocol,omitempty"`      // HTTP protocol (keyword)
    Referer      string   `json:"referer,omitempty"`       // HTTP referer (analyzed)
    
    // Metadata
    FilePath     string   `json:"file_path"`               // Source file (keyword)
    Raw          string   `json:"raw"`                     // Original log line (stored only)
}

Index Mapping Configuration

// Optimized mapping for NGINX logs
func CreateLogIndexMapping() mapping.IndexMapping {
    indexMapping := bleve.NewIndexMapping()
    indexMapping.DefaultAnalyzer = "standard"
    
    docMapping := bleve.NewDocumentMapping()
    
    // Timestamp - numeric for range queries
    timestampMapping := bleve.NewNumericFieldMapping()
    timestampMapping.Store = true
    timestampMapping.Index = true
    docMapping.AddFieldMappingsAt("timestamp", timestampMapping)
    
    // IP address - keyword for exact matching
    ipMapping := bleve.NewTextFieldMapping()
    ipMapping.Analyzer = "keyword"
    ipMapping.Store = true
    ipMapping.Index = true
    docMapping.AddFieldMappingsAt("ip", ipMapping)
    
    // Path - dual mapping for different query types
    pathMapping := bleve.NewTextFieldMapping()        // Analyzed for partial matching
    pathMapping.Analyzer = "standard"
    pathMapping.Store = true
    pathMapping.Index = true
    docMapping.AddFieldMappingsAt("path", pathMapping)
    
    pathExactMapping := bleve.NewTextFieldMapping()   // Keyword for exact matching
    pathExactMapping.Analyzer = "keyword"
    pathExactMapping.Store = false
    pathExactMapping.Index = true
    docMapping.AddFieldMappingsAt("path_exact", pathExactMapping)
    
    // Status code - numeric for range queries
    statusMapping := bleve.NewNumericFieldMapping()
    statusMapping.Store = true
    statusMapping.Index = true
    docMapping.AddFieldMappingsAt("status", statusMapping)
    
    // Raw log line - stored but not indexed (for display)
    rawMapping := bleve.NewTextFieldMapping()
    rawMapping.Store = true
    rawMapping.Index = false
    docMapping.AddFieldMappingsAt("raw", rawMapping)
    
    indexMapping.AddDocumentMapping("_default", docMapping)
    return indexMapping
}

Advanced Features

1. Incremental Indexing

Process only new or modified log entries:

// Configure incremental indexing
config := &PersistenceConfig{
    DatabaseURL:        "postgres://localhost/nginx_ui",
    EnabledPaths:       []string{"/var/log/nginx/access.log"},
    IncrementalConfig: &IncrementalIndexConfig{
        CheckInterval:    time.Minute,
        MaxFilesToCheck: 100,
        BatchSize:       1000,
    },
}

persistenceManager := NewPersistenceManager(config)

// Process incremental updates
groups, err := persistenceManager.GetIncrementalGroups()
if err != nil {
    log.Printf("Failed to get incremental groups: %v", err)
} else {
    for _, group := range groups {
        fmt.Printf("Group: %s, Changed files: %d, Needs reindex: %d\n",
            group.GroupPath, group.ChangedFiles, group.NeedsReindex)
    }
}

2. Log Rotation Support

Automatic detection and handling of rotated log files:

// Supports various rotation patterns:
// - access.log, access.log.1, access.log.2, ...
// - access.log.2024-01-01, access.log.2024-01-02, ...
// - access.log.gz, access.log.1.gz, access.log.2.gz, ...
// - access-20240101.log, access-20240102.log, ...

// Get main log path from rotated file
mainPath := getMainLogPathFromFile("/var/log/nginx/access.log.1")
// Returns: "/var/log/nginx/access.log"

// Check if file is compressed
isCompressed := IsCompressedFile("/var/log/nginx/access.log.gz")
// Returns: true

// Estimate lines in compressed file
ctx := context.Background()
lines, err := EstimateFileLines(ctx, "/var/log/nginx/access.log.gz", fileSize, true)
if err != nil {
    log.Printf("Line estimation failed: %v", err)
} else {
    fmt.Printf("Estimated lines: %d\n", lines)
}

3. Performance Optimization

Automatic memory management and optimization:

// Enable performance monitoring
indexer := NewParallelIndexer(&IndexerConfig{
    EnableMetrics: true,
    MemoryQuota:   2 * 1024 * 1024 * 1024, // 2GB limit
})

// Monitor memory usage
memStats := indexer.GetMemoryStats()
fmt.Printf("Memory usage: %.2f MB (%.2f%% of quota)\n",
    memStats.AllocMB, (memStats.AllocMB/2048)*100)

// Trigger optimization if needed
if memStats.AllocMB > 1500 { // 75% of quota
    indexer.Optimize()
}

// Advanced optimization with specific targets
optimizationConfig := &OptimizationConfig{
    TargetMemoryUsage: 1024 * 1024 * 1024, // 1GB
    MaxSegmentCount:   10,
    MinSegmentSize:    16 * 1024 * 1024,   // 16MB
}

if err := indexer.OptimizeWithConfig(optimizationConfig); err != nil {
    log.Printf("Optimization failed: %v", err)
}

4. Health Monitoring

Comprehensive health checks and monitoring:

// Perform health check
if err := indexer.HealthCheck(); err != nil {
    log.Printf("Health check failed: %v", err)
    
    // Get detailed status
    for i := 0; i < indexer.GetConfig().ShardCount; i++ {
        shardInfo, err := indexer.GetShardInfo(i)
        if err != nil {
            log.Printf("Shard %d: ERROR - %v", i, err)
        } else {
            log.Printf("Shard %d: OK - %d docs, %s",
                i, shardInfo.DocumentCount, formatBytes(shardInfo.Size))
        }
    }
}

// Monitor worker status
stats := indexer.GetStats()
for i, worker := range stats.WorkerStats {
    status := worker.Status
    if status != "idle" && status != "busy" {
        log.Printf("Worker %d has abnormal status: %s", i, status)
    }
}

Error Handling

Error Types

var (
    ErrIndexerNotStarted     = "indexer not started"
    ErrIndexerStopped        = "indexer stopped"
    ErrShardNotFound         = "shard not found"
    ErrQueueFull             = "queue is full"
    ErrInvalidDocument       = "invalid document"
    ErrOptimizationFailed    = "optimization failed"
    ErrIncrementalInfoNotFound = "incremental information not found"
    ErrInvalidLogFileFormat    = "invalid log file format"
    ErrDatabaseConnectionFailed = "database connection failed"
)

Error Recovery Strategies

// Graceful error handling with retry logic
func indexWithRetry(indexer *ParallelIndexer, doc *Document, maxRetries int) error {
    for attempt := 0; attempt < maxRetries; attempt++ {
        err := indexer.IndexDocument(context.Background(), doc)
        if err == nil {
            return nil
        }
        
        // Check if error is retryable
        if isRetryableError(err) {
            log.Printf("Attempt %d failed: %v, retrying...", attempt+1, err)
            time.Sleep(time.Duration(attempt+1) * time.Second)
            continue
        }
        
        // Non-retryable error
        return fmt.Errorf("non-retryable error: %w", err)
    }
    
    return fmt.Errorf("max retries (%d) exceeded", maxRetries)
}

func isRetryableError(err error) bool {
    errStr := err.Error()
    return strings.Contains(errStr, "queue is full") ||
           strings.Contains(errStr, "temporary failure") ||
           strings.Contains(errStr, "context deadline exceeded")
}

Circuit Breaker Pattern

type CircuitBreaker struct {
    failures    int32
    lastFailure time.Time
    threshold   int32
    timeout     time.Duration
    mutex       sync.RWMutex
}

func (cb *CircuitBreaker) Call(fn func() error) error {
    cb.mutex.RLock()
    if atomic.LoadInt32(&cb.failures) >= cb.threshold {
        if time.Since(cb.lastFailure) < cb.timeout {
            cb.mutex.RUnlock()
            return fmt.Errorf("circuit breaker open")
        }
    }
    cb.mutex.RUnlock()
    
    err := fn()
    if err != nil {
        cb.mutex.Lock()
        atomic.AddInt32(&cb.failures, 1)
        cb.lastFailure = time.Now()
        cb.mutex.Unlock()
        return err
    }
    
    // Reset on success
    atomic.StoreInt32(&cb.failures, 0)
    return nil
}

// Usage
cb := &CircuitBreaker{
    threshold: 5,
    timeout:   30 * time.Second,
}

err := cb.Call(func() error {
    return indexer.IndexDocument(ctx, doc)
})

Integration Examples

Complete Log Processing Pipeline

package main

import (
    "bufio"
    "context"
    "fmt"
    "log"
    "os"
    "time"
    
    "github.com/0xJacky/Nginx-UI/internal/nginx_log/parser"
    "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
)

func main() {
    // Initialize components
    parser := parser.NewNginxParser(&parser.ParserConfig{
        WorkerCount:     8,
        EnableGeoIP:     true,
        EnableUserAgent: true,
    })
    
    indexer := indexer.NewParallelIndexer(&indexer.IndexerConfig{
        ShardCount:  4,
        WorkerCount: 8,
        BatchSize:   1000,
    })
    
    persistenceManager := indexer.NewPersistenceManager(&indexer.PersistenceConfig{
        DatabaseURL: "postgres://localhost/nginx_ui",
    })
    
    progressManager := indexer.NewProgressManager()
    
    // Start indexer
    ctx := context.Background()
    if err := indexer.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer indexer.Stop()
    
    // Process log file
    logFile := "/var/log/nginx/access.log"
    if err := processLogFile(parser, indexer, persistenceManager, progressManager, logFile); err != nil {
        log.Fatal(err)
    }
    
    // Print final statistics
    stats := indexer.GetStats()
    fmt.Printf("Processing complete:\n")
    fmt.Printf("  Documents indexed: %d\n", stats.TotalDocuments)
    fmt.Printf("  Indexing rate: %.2f docs/sec\n", stats.IndexingRate)
    fmt.Printf("  Memory usage: %.2f MB\n", float64(stats.MemoryUsage)/(1024*1024))
}

func processLogFile(
    parser *parser.NginxParser,
    indexer *indexer.ParallelIndexer,
    persistence *indexer.PersistenceManager,
    progressManager *indexer.ProgressManager,
    filePath string,
) error {
    // Get incremental information
    info, err := persistence.GetIncrementalInfo(filePath)
    if err != nil {
        log.Printf("No incremental info found, starting from beginning: %v", err)
        info = &indexer.LogFileInfo{
            Path:         filePath,
            LastPosition: 0,
        }
    }
    
    // Setup progress tracking
    progressConfig := &indexer.ProgressConfig{
        OnProgress: func(pn indexer.ProgressNotification) {
            fmt.Printf("Progress: %.2f%% (%d/%d files)\n",
                pn.OverallProgress*100, pn.CompletedFiles, pn.TotalFiles)
        },
        OnCompletion: func(cn indexer.CompletionNotification) {
            fmt.Printf("Completed: %s in %v\n", cn.GroupPath, cn.Duration)
        },
    }
    
    tracker := progressManager.GetTracker(filePath, progressConfig)
    tracker.AddFile(filePath, false)
    
    // Open file and seek to last position
    file, err := os.Open(filePath)
    if err != nil {
        return err
    }
    defer file.Close()
    
    if info.LastPosition > 0 {
        if _, err := file.Seek(info.LastPosition, 0); err != nil {
            return err
        }
    }
    
    // Process file
    scanner := bufio.NewScanner(file)
    var documents []*indexer.Document
    var lineCount int64
    currentPosition := info.LastPosition
    
    tracker.StartFile(filePath)
    
    for scanner.Scan() {
        line := scanner.Text()
        lineCount++
        
        // Parse log line
        entry, err := parser.ParseLine(line)
        if err != nil {
            log.Printf("Parse error on line %d: %v", lineCount, err)
            continue
        }
        
        // Convert to document
        doc := &indexer.Document{
            ID: fmt.Sprintf("%s_%d_%d", filePath, entry.Timestamp, lineCount),
            Fields: convertToLogDocument(entry),
        }
        
        documents = append(documents, doc)
        currentPosition += int64(len(line)) + 1 // +1 for newline
        
        // Process batch
        if len(documents) >= 1000 {
            if err := indexer.IndexDocuments(context.Background(), documents); err != nil {
                return err
            }
            
            // Update persistence
            info.LastPosition = currentPosition
            info.DocumentCount += uint64(len(documents))
            info.LastModified = time.Now()
            if err := persistence.UpdateIncrementalInfo(filePath, info); err != nil {
                log.Printf("Failed to update persistence: %v", err)
            }
            
            // Update progress
            tracker.UpdateFileProgress(filePath, lineCount)
            
            documents = documents[:0] // Reset slice
        }
    }
    
    // Process remaining documents
    if len(documents) > 0 {
        if err := indexer.IndexDocuments(context.Background(), documents); err != nil {
            return err
        }
        
        info.LastPosition = currentPosition
        info.DocumentCount += uint64(len(documents))
        info.LastModified = time.Now()
        if err := persistence.UpdateIncrementalInfo(filePath, info); err != nil {
            log.Printf("Failed to update persistence: %v", err)
        }
    }
    
    tracker.CompleteFile(filePath, lineCount)
    
    return scanner.Err()
}

func convertToLogDocument(entry *parser.LogEntry) *indexer.LogDocument {
    return &indexer.LogDocument{
        Timestamp:    entry.Timestamp,
        IP:           entry.IP,
        RegionCode:   entry.RegionCode,
        Province:     entry.Province,
        City:         entry.City,
        ISP:          entry.ISP,
        Method:       entry.Method,
        Path:         entry.Path,
        PathExact:    entry.Path,
        Protocol:     entry.Protocol,
        Status:       entry.Status,
        BytesSent:    entry.BytesSent,
        Referer:      entry.Referer,
        UserAgent:    entry.UserAgent,
        Browser:      entry.Browser,
        BrowserVer:   entry.BrowserVer,
        OS:           entry.OS,
        OSVersion:    entry.OSVersion,
        DeviceType:   entry.DeviceType,
        RequestTime:  entry.RequestTime,
        UpstreamTime: entry.UpstreamTime,
        FilePath:     entry.FilePath,
        Raw:          entry.Raw,
    }
}

API Reference

Core Interfaces

// Indexer interface defines the main indexing contract
type Indexer interface {
    // Document operations
    IndexDocument(ctx context.Context, doc *Document) error
    IndexDocuments(ctx context.Context, docs []*Document) error
    IndexDocumentAsync(doc *Document, callback func(error))
    IndexDocumentsAsync(docs []*Document, callback func(error))
    
    // Batch operations
    StartBatch() BatchWriterInterface
    FlushAll() error
    
    // Management operations
    Optimize() error
    GetStats() *IndexStats
    GetShardInfo(shardID int) (*ShardInfo, error)
    
    // Lifecycle management
    Start(ctx context.Context) error
    Stop() error
    IsHealthy() bool
    
    // Configuration
    GetConfig() *IndexerConfig
    UpdateConfig(config *IndexerConfig) error
}

// ShardManager manages multiple index shards
type ShardManager interface {
    Initialize() error
    GetShard(key string) (bleve.Index, int, error)
    GetShardByID(id int) (bleve.Index, error)
    GetAllShards() []bleve.Index
    GetShardStats() []*ShardInfo
    CreateShard(id int, path string) error
    CloseShard(id int) error
    OptimizeShard(id int) error
    HealthCheck() error
}

// BatchWriterInterface provides efficient batch operations
type BatchWriterInterface interface {
    Add(doc *Document) error
    Flush() (*IndexResult, error)
    Size() int
    Reset()
}

This comprehensive documentation covers all aspects of the indexer package including architecture, configuration, performance characteristics, and practical examples for integration.

⚡ Performance Benchmarks

Latest benchmark results on Apple M2 Pro (August 25, 2025):

Operation Rate ns/op B/op allocs/op Notes
UpdateFileProgress 20.9M ops/sec 57.59 0 0 Zero-allocation progress tracking
GetProgress 9.8M ops/sec 117.5 0 0 Zero-allocation status reads
CacheAccess 17.3M ops/sec 68.40 29 1 Optimized persistence cache
ConcurrentAccess 3.4M ops/sec 346.2 590 4 Multi-threaded operations

Key Performance Features

  • Zero-allocation progress tracking for high-frequency updates
  • Optimized document ID generation using utils.AppendInt + utils.BytesToStringUnsafe
  • Efficient shard path creation with pre-allocated buffers
  • Memory pooling through unified utils package
  • Sub-microsecond file progress operations

Performance optimizations delivered 20-75% allocation reduction in critical paths.