# Indexer Package The indexer package provides high-performance, multi-shard parallel indexing capabilities for NGINX logs with comprehensive persistence management, progress tracking, and rebuild functionality. ## Features - **Multi-Shard Architecture**: Distributed indexing across multiple Bleve indexes for scalability - **Parallel Processing**: Concurrent indexing with configurable worker pools - **Persistence Management**: Incremental indexing with position tracking and recovery - **Progress Tracking**: Real-time progress monitoring for long-running operations - **Rebuild Functionality**: Complete index reconstruction with comprehensive error handling - **Performance Optimization**: Memory management, caching, and batch processing - **High Availability**: Fault tolerance and automatic recovery mechanisms ## Architecture ``` indexer/ ├── types.go # Core types, interfaces, and index mapping ├── parallel_indexer.go # Main parallel indexer implementation (optimized) ├── shard_manager.go # Multi-shard management and distribution (optimized) ├── batch_writer.go # Efficient batch writing operations ├── persistence.go # Incremental indexing and persistence management ├── progress_tracker.go # Real-time progress monitoring ├── rebuild.go # Index rebuilding functionality ├── log_file_manager.go # Log file discovery and management ├── metrics.go # Performance metrics and monitoring └── README.md # This documentation Note: Performance optimizations now use the unified utils package (../utils/) ``` ## Quick Start ### Basic Indexing ```go import "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer" // Create indexer with default configuration indexer := indexer.NewParallelIndexer(nil) // Start indexer ctx := context.Background() if err := indexer.Start(ctx); err != nil { log.Fatal(err) } defer indexer.Stop() // Index a single document doc := &indexer.Document{ ID: "log_entry_1", Fields: &indexer.LogDocument{ Timestamp: time.Now().Unix(), IP: "192.168.1.1", Method: "GET", Path: "/api/status", Status: 200, BytesSent: 1234, FilePath: "/var/log/nginx/access.log", Raw: "192.168.1.1 - - [01/Jan/2024:12:00:00 +0000] \"GET /api/status HTTP/1.1\" 200 1234", }, } if err := indexer.IndexDocument(ctx, doc); err != nil { log.Printf("Indexing error: %v", err) } ``` ### Batch Processing ```go // Create multiple documents var documents []*indexer.Document for i := 0; i < 1000; i++ { doc := &indexer.Document{ ID: fmt.Sprintf("log_entry_%d", i), Fields: &indexer.LogDocument{ Timestamp: time.Now().Unix(), IP: fmt.Sprintf("192.168.1.%d", i%254+1), Method: "GET", Path: fmt.Sprintf("/api/endpoint_%d", i), Status: 200, BytesSent: int64(1000 + i), FilePath: "/var/log/nginx/access.log", }, } documents = append(documents, doc) } // Index batch with automatic optimization if err := indexer.IndexDocuments(ctx, documents); err != nil { log.Printf("Batch indexing error: %v", err) } // Get indexing statistics stats := indexer.GetStats() fmt.Printf("Total documents: %d, Indexing rate: %.2f docs/sec\n", stats.TotalDocuments, stats.IndexingRate) ``` ## Configuration ### IndexerConfig ```go type IndexerConfig struct { // Basic configuration IndexPath string `json:"index_path"` // Base path for index storage ShardCount int `json:"shard_count"` // Number of shards (default: 4) WorkerCount int `json:"worker_count"` // Worker pool size (default: 8) BatchSize int `json:"batch_size"` // Batch processing size (default: 1000) FlushInterval time.Duration `json:"flush_interval"` // Auto-flush interval (default: 5s) MaxQueueSize int `json:"max_queue_size"` // Maximum queue depth (default: 10000) // Performance tuning EnableCompression bool `json:"enable_compression"` // Enable index compression (default: true) MemoryQuota int64 `json:"memory_quota"` // Memory limit in bytes (default: 1GB) MaxSegmentSize int64 `json:"max_segment_size"` // Maximum segment size (default: 64MB) OptimizeInterval time.Duration `json:"optimize_interval"` // Auto-optimization interval (default: 30m) // Monitoring EnableMetrics bool `json:"enable_metrics"` // Enable performance metrics (default: true) } ``` ### Default Configuration ```go func DefaultIndexerConfig() *IndexerConfig { return &IndexerConfig{ IndexPath: "./log-index", ShardCount: 4, WorkerCount: 8, BatchSize: 1000, FlushInterval: 5 * time.Second, MaxQueueSize: 10000, EnableCompression: true, MemoryQuota: 1024 * 1024 * 1024, // 1GB MaxSegmentSize: 64 * 1024 * 1024, // 64MB OptimizeInterval: 30 * time.Minute, EnableMetrics: true, } } ``` ## Core Components ### 1. Parallel Indexer The main indexer implementation with multi-shard support: ```go // Create with custom configuration config := &IndexerConfig{ ShardCount: 8, // More shards for higher throughput WorkerCount: 16, // More workers for CPU-intensive workloads BatchSize: 2000, // Larger batches for better efficiency } indexer := NewParallelIndexer(config) // Asynchronous indexing with callback indexer.IndexDocumentAsync(doc, func(err error) { if err != nil { log.Printf("Async indexing failed: %v", err) } else { log.Println("Document indexed successfully") } }) ``` ### 2. Shard Manager Manages distribution across multiple index shards: ```go // Get shard information shardStats := indexer.GetShardInfo(0) fmt.Printf("Shard 0: %d documents, %s size\n", shardStats.DocumentCount, formatBytes(shardStats.Size)) // Optimize specific shard if err := indexer.OptimizeShard(0); err != nil { log.Printf("Shard optimization failed: %v", err) } // Health check if err := indexer.HealthCheck(); err != nil { log.Printf("Health check failed: %v", err) } ``` ### 3. Batch Writer Efficient batch writing with automatic flushing: ```go // Create batch writer batch := indexer.StartBatch() // Add documents to batch for _, doc := range documents { if err := batch.Add(doc); err != nil { log.Printf("Failed to add document to batch: %v", err) } // Automatic flush when batch size reached if batch.Size() >= 1000 { result, err := batch.Flush() if err != nil { log.Printf("Batch flush failed: %v", err) } else { fmt.Printf("Flushed %d documents in %v\n", result.Processed, result.Duration) } } } // Final flush if batch.Size() > 0 { batch.Flush() } ``` ### 4. Persistence Manager Handles incremental indexing and position tracking: ```go // Create persistence manager with database persistenceManager := NewPersistenceManager(&PersistenceConfig{ DatabaseURL: "postgres://user:pass@localhost/nginx_ui", TableName: "nginx_log_indexes", }) // Get incremental indexing information info, err := persistenceManager.GetIncrementalInfo("/var/log/nginx/access.log") if err != nil { log.Printf("Failed to get incremental info: %v", err) } fmt.Printf("Last indexed position: %d, Documents: %d\n", info.LastPosition, info.DocumentCount) // Update indexing progress newInfo := &LogFileInfo{ Path: "/var/log/nginx/access.log", LastPosition: info.LastPosition + 1024, DocumentCount: info.DocumentCount + 100, LastModified: time.Now(), IndexedAt: time.Now(), } if err := persistenceManager.UpdateIncrementalInfo("/var/log/nginx/access.log", newInfo); err != nil { log.Printf("Failed to update incremental info: %v", err) } ``` ### 5. Progress Tracker Real-time progress monitoring for long-running operations: ```go // Create progress configuration progressConfig := &ProgressConfig{ OnProgress: func(notification ProgressNotification) { fmt.Printf("Progress: %s - %.2f%% complete (%d/%d files)\n", notification.GroupPath, notification.OverallProgress*100, notification.CompletedFiles, notification.TotalFiles) }, OnCompletion: func(notification CompletionNotification) { fmt.Printf("Completed: %s in %v (processed %d documents)\n", notification.GroupPath, notification.Duration, notification.DocumentCount) }, } // Get progress tracker progressManager := NewProgressManager() tracker := progressManager.GetTracker("/var/log/nginx/access.log", progressConfig) // Track file processing tracker.AddFile("/var/log/nginx/access.log", false) tracker.SetFileEstimate("/var/log/nginx/access.log", 10000) // Estimated lines tracker.StartFile("/var/log/nginx/access.log") // Update progress periodically for i := 0; i < 10000; i++ { // Process log line... if i%100 == 0 { tracker.UpdateFileProgress("/var/log/nginx/access.log", int64(i)) } } tracker.CompleteFile("/var/log/nginx/access.log", 10000) ``` ### 6. Rebuild Manager Complete index reconstruction with progress tracking: ```go // Create rebuild manager rebuildManager := NewRebuildManager( indexer, persistenceManager, progressManager, shardManager, &RebuildConfig{ BatchSize: 2000, MaxConcurrency: 4, DeleteBeforeRebuild: true, ProgressInterval: 10 * time.Second, TimeoutPerFile: 30 * time.Minute, }, ) // Rebuild all indexes ctx := context.Background() if err := rebuildManager.RebuildAll(ctx); err != nil { log.Printf("Rebuild failed: %v", err) } else { log.Println("Rebuild completed successfully") } // Rebuild single log group if err := rebuildManager.RebuildSingle(ctx, "/var/log/nginx/access.log"); err != nil { log.Printf("Single rebuild failed: %v", err) } // Monitor rebuild status stats := rebuildManager.GetRebuildStats() if stats.IsRebuilding { fmt.Printf("Rebuild in progress (last rebuild: %v)\n", stats.LastRebuildTime) } ``` ## Performance Characteristics ### Benchmarks Based on comprehensive benchmarking on Apple M2 Pro: | Operation | Performance | Memory Usage | Notes | |-----------|-------------|--------------|-------| | Single document indexing | ~125µs | 1.2KB | Including shard selection | | Batch indexing (1000 docs) | ~45ms | 128KB | Optimized batch processing | | Shard selection | ~25ns | 0 allocs | Hash-based distribution | | Progress tracking update | ~57ns | 0 allocs | Lock-free counters | | Rebuild stats retrieval | ~25ns | 0 allocs | Atomic operations | | Memory optimization cycle | ~2.1ms | 45KB | Garbage collection trigger | | Index optimization | ~150ms | 2.3MB | Per shard | ### Throughput Characteristics | Scenario | Documents/sec | Memory Peak | CPU Usage | |----------|---------------|-------------|-----------| | Single worker | ~8,000 | 256MB | 25% | | 4 workers | ~28,000 | 512MB | 85% | | 8 workers | ~45,000 | 768MB | 95% | | 16 workers | ~52,000 | 1.2GB | 98% | ### Performance Tuning Guidelines 1. **Worker Count Configuration** ```go // CPU-bound workloads config.WorkerCount = runtime.GOMAXPROCS(0) // I/O-bound workloads config.WorkerCount = runtime.GOMAXPROCS(0) * 2 // Memory-constrained environments config.WorkerCount = max(2, runtime.GOMAXPROCS(0)/2) ``` 2. **Shard Count Optimization** ```go // For high-volume environments (>1M docs) config.ShardCount = 8 // For moderate volume (100K-1M docs) config.ShardCount = 4 // For low volume (<100K docs) config.ShardCount = 2 ``` 3. **Batch Size Tuning** ```go // High-throughput scenarios config.BatchSize = 2000 // Memory-constrained environments config.BatchSize = 500 // Real-time requirements config.BatchSize = 100 ``` 4. **Memory Management** ```go // Configure memory limits config.MemoryQuota = 2 * 1024 * 1024 * 1024 // 2GB config.MaxSegmentSize = 128 * 1024 * 1024 // 128MB // Enable automatic optimization config.OptimizeInterval = 15 * time.Minute ``` ## Index Structure and Mapping ### Document Schema ```go type LogDocument struct { // Core fields - always indexed Timestamp int64 `json:"timestamp"` // Unix timestamp (range queries) IP string `json:"ip"` // IP address (keyword matching) Method string `json:"method"` // HTTP method (keyword) Path string `json:"path"` // Request path (analyzed text) PathExact string `json:"path_exact"` // Exact path matching (keyword) Status int `json:"status"` // HTTP status (numeric range) BytesSent int64 `json:"bytes_sent"` // Response size (numeric) // Geographic enrichment (optional) RegionCode string `json:"region_code,omitempty"` // Country code (keyword) Province string `json:"province,omitempty"` // State/province (keyword) City string `json:"city,omitempty"` // City (keyword) ISP string `json:"isp,omitempty"` // ISP (keyword) // User agent analysis (optional) UserAgent string `json:"user_agent,omitempty"` // Full user agent (analyzed) Browser string `json:"browser,omitempty"` // Browser name (keyword) BrowserVer string `json:"browser_version,omitempty"` // Browser version (keyword) OS string `json:"os,omitempty"` // Operating system (keyword) OSVersion string `json:"os_version,omitempty"` // OS version (keyword) DeviceType string `json:"device_type,omitempty"` // Device type (keyword) // Performance metrics (optional) RequestTime float64 `json:"request_time,omitempty"` // Request duration (numeric) UpstreamTime *float64 `json:"upstream_time,omitempty"` // Upstream response time (numeric) // HTTP details (optional) Protocol string `json:"protocol,omitempty"` // HTTP protocol (keyword) Referer string `json:"referer,omitempty"` // HTTP referer (analyzed) // Metadata FilePath string `json:"file_path"` // Source file (keyword) Raw string `json:"raw"` // Original log line (stored only) } ``` ### Index Mapping Configuration ```go // Optimized mapping for NGINX logs func CreateLogIndexMapping() mapping.IndexMapping { indexMapping := bleve.NewIndexMapping() indexMapping.DefaultAnalyzer = "standard" docMapping := bleve.NewDocumentMapping() // Timestamp - numeric for range queries timestampMapping := bleve.NewNumericFieldMapping() timestampMapping.Store = true timestampMapping.Index = true docMapping.AddFieldMappingsAt("timestamp", timestampMapping) // IP address - keyword for exact matching ipMapping := bleve.NewTextFieldMapping() ipMapping.Analyzer = "keyword" ipMapping.Store = true ipMapping.Index = true docMapping.AddFieldMappingsAt("ip", ipMapping) // Path - dual mapping for different query types pathMapping := bleve.NewTextFieldMapping() // Analyzed for partial matching pathMapping.Analyzer = "standard" pathMapping.Store = true pathMapping.Index = true docMapping.AddFieldMappingsAt("path", pathMapping) pathExactMapping := bleve.NewTextFieldMapping() // Keyword for exact matching pathExactMapping.Analyzer = "keyword" pathExactMapping.Store = false pathExactMapping.Index = true docMapping.AddFieldMappingsAt("path_exact", pathExactMapping) // Status code - numeric for range queries statusMapping := bleve.NewNumericFieldMapping() statusMapping.Store = true statusMapping.Index = true docMapping.AddFieldMappingsAt("status", statusMapping) // Raw log line - stored but not indexed (for display) rawMapping := bleve.NewTextFieldMapping() rawMapping.Store = true rawMapping.Index = false docMapping.AddFieldMappingsAt("raw", rawMapping) indexMapping.AddDocumentMapping("_default", docMapping) return indexMapping } ``` ## Advanced Features ### 1. Incremental Indexing Process only new or modified log entries: ```go // Configure incremental indexing config := &PersistenceConfig{ DatabaseURL: "postgres://localhost/nginx_ui", EnabledPaths: []string{"/var/log/nginx/access.log"}, IncrementalConfig: &IncrementalIndexConfig{ CheckInterval: time.Minute, MaxFilesToCheck: 100, BatchSize: 1000, }, } persistenceManager := NewPersistenceManager(config) // Process incremental updates groups, err := persistenceManager.GetIncrementalGroups() if err != nil { log.Printf("Failed to get incremental groups: %v", err) } else { for _, group := range groups { fmt.Printf("Group: %s, Changed files: %d, Needs reindex: %d\n", group.GroupPath, group.ChangedFiles, group.NeedsReindex) } } ``` ### 2. Log Rotation Support Automatic detection and handling of rotated log files: ```go // Supports various rotation patterns: // - access.log, access.log.1, access.log.2, ... // - access.log.2024-01-01, access.log.2024-01-02, ... // - access.log.gz, access.log.1.gz, access.log.2.gz, ... // - access-20240101.log, access-20240102.log, ... // Get main log path from rotated file mainPath := getMainLogPathFromFile("/var/log/nginx/access.log.1") // Returns: "/var/log/nginx/access.log" // Check if file is compressed isCompressed := IsCompressedFile("/var/log/nginx/access.log.gz") // Returns: true // Estimate lines in compressed file ctx := context.Background() lines, err := EstimateFileLines(ctx, "/var/log/nginx/access.log.gz", fileSize, true) if err != nil { log.Printf("Line estimation failed: %v", err) } else { fmt.Printf("Estimated lines: %d\n", lines) } ``` ### 3. Performance Optimization Automatic memory management and optimization: ```go // Enable performance monitoring indexer := NewParallelIndexer(&IndexerConfig{ EnableMetrics: true, MemoryQuota: 2 * 1024 * 1024 * 1024, // 2GB limit }) // Monitor memory usage memStats := indexer.GetMemoryStats() fmt.Printf("Memory usage: %.2f MB (%.2f%% of quota)\n", memStats.AllocMB, (memStats.AllocMB/2048)*100) // Trigger optimization if needed if memStats.AllocMB > 1500 { // 75% of quota indexer.Optimize() } // Advanced optimization with specific targets optimizationConfig := &OptimizationConfig{ TargetMemoryUsage: 1024 * 1024 * 1024, // 1GB MaxSegmentCount: 10, MinSegmentSize: 16 * 1024 * 1024, // 16MB } if err := indexer.OptimizeWithConfig(optimizationConfig); err != nil { log.Printf("Optimization failed: %v", err) } ``` ### 4. Health Monitoring Comprehensive health checks and monitoring: ```go // Perform health check if err := indexer.HealthCheck(); err != nil { log.Printf("Health check failed: %v", err) // Get detailed status for i := 0; i < indexer.GetConfig().ShardCount; i++ { shardInfo, err := indexer.GetShardInfo(i) if err != nil { log.Printf("Shard %d: ERROR - %v", i, err) } else { log.Printf("Shard %d: OK - %d docs, %s", i, shardInfo.DocumentCount, formatBytes(shardInfo.Size)) } } } // Monitor worker status stats := indexer.GetStats() for i, worker := range stats.WorkerStats { status := worker.Status if status != "idle" && status != "busy" { log.Printf("Worker %d has abnormal status: %s", i, status) } } ``` ## Error Handling ### Error Types ```go var ( ErrIndexerNotStarted = "indexer not started" ErrIndexerStopped = "indexer stopped" ErrShardNotFound = "shard not found" ErrQueueFull = "queue is full" ErrInvalidDocument = "invalid document" ErrOptimizationFailed = "optimization failed" ErrIncrementalInfoNotFound = "incremental information not found" ErrInvalidLogFileFormat = "invalid log file format" ErrDatabaseConnectionFailed = "database connection failed" ) ``` ### Error Recovery Strategies ```go // Graceful error handling with retry logic func indexWithRetry(indexer *ParallelIndexer, doc *Document, maxRetries int) error { for attempt := 0; attempt < maxRetries; attempt++ { err := indexer.IndexDocument(context.Background(), doc) if err == nil { return nil } // Check if error is retryable if isRetryableError(err) { log.Printf("Attempt %d failed: %v, retrying...", attempt+1, err) time.Sleep(time.Duration(attempt+1) * time.Second) continue } // Non-retryable error return fmt.Errorf("non-retryable error: %w", err) } return fmt.Errorf("max retries (%d) exceeded", maxRetries) } func isRetryableError(err error) bool { errStr := err.Error() return strings.Contains(errStr, "queue is full") || strings.Contains(errStr, "temporary failure") || strings.Contains(errStr, "context deadline exceeded") } ``` ### Circuit Breaker Pattern ```go type CircuitBreaker struct { failures int32 lastFailure time.Time threshold int32 timeout time.Duration mutex sync.RWMutex } func (cb *CircuitBreaker) Call(fn func() error) error { cb.mutex.RLock() if atomic.LoadInt32(&cb.failures) >= cb.threshold { if time.Since(cb.lastFailure) < cb.timeout { cb.mutex.RUnlock() return fmt.Errorf("circuit breaker open") } } cb.mutex.RUnlock() err := fn() if err != nil { cb.mutex.Lock() atomic.AddInt32(&cb.failures, 1) cb.lastFailure = time.Now() cb.mutex.Unlock() return err } // Reset on success atomic.StoreInt32(&cb.failures, 0) return nil } // Usage cb := &CircuitBreaker{ threshold: 5, timeout: 30 * time.Second, } err := cb.Call(func() error { return indexer.IndexDocument(ctx, doc) }) ``` ## Integration Examples ### Complete Log Processing Pipeline ```go package main import ( "bufio" "context" "fmt" "log" "os" "time" "github.com/0xJacky/Nginx-UI/internal/nginx_log/parser" "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer" ) func main() { // Initialize components parser := parser.NewNginxParser(&parser.ParserConfig{ WorkerCount: 8, EnableGeoIP: true, EnableUserAgent: true, }) indexer := indexer.NewParallelIndexer(&indexer.IndexerConfig{ ShardCount: 4, WorkerCount: 8, BatchSize: 1000, }) persistenceManager := indexer.NewPersistenceManager(&indexer.PersistenceConfig{ DatabaseURL: "postgres://localhost/nginx_ui", }) progressManager := indexer.NewProgressManager() // Start indexer ctx := context.Background() if err := indexer.Start(ctx); err != nil { log.Fatal(err) } defer indexer.Stop() // Process log file logFile := "/var/log/nginx/access.log" if err := processLogFile(parser, indexer, persistenceManager, progressManager, logFile); err != nil { log.Fatal(err) } // Print final statistics stats := indexer.GetStats() fmt.Printf("Processing complete:\n") fmt.Printf(" Documents indexed: %d\n", stats.TotalDocuments) fmt.Printf(" Indexing rate: %.2f docs/sec\n", stats.IndexingRate) fmt.Printf(" Memory usage: %.2f MB\n", float64(stats.MemoryUsage)/(1024*1024)) } func processLogFile( parser *parser.NginxParser, indexer *indexer.ParallelIndexer, persistence *indexer.PersistenceManager, progressManager *indexer.ProgressManager, filePath string, ) error { // Get incremental information info, err := persistence.GetIncrementalInfo(filePath) if err != nil { log.Printf("No incremental info found, starting from beginning: %v", err) info = &indexer.LogFileInfo{ Path: filePath, LastPosition: 0, } } // Setup progress tracking progressConfig := &indexer.ProgressConfig{ OnProgress: func(pn indexer.ProgressNotification) { fmt.Printf("Progress: %.2f%% (%d/%d files)\n", pn.OverallProgress*100, pn.CompletedFiles, pn.TotalFiles) }, OnCompletion: func(cn indexer.CompletionNotification) { fmt.Printf("Completed: %s in %v\n", cn.GroupPath, cn.Duration) }, } tracker := progressManager.GetTracker(filePath, progressConfig) tracker.AddFile(filePath, false) // Open file and seek to last position file, err := os.Open(filePath) if err != nil { return err } defer file.Close() if info.LastPosition > 0 { if _, err := file.Seek(info.LastPosition, 0); err != nil { return err } } // Process file scanner := bufio.NewScanner(file) var documents []*indexer.Document var lineCount int64 currentPosition := info.LastPosition tracker.StartFile(filePath) for scanner.Scan() { line := scanner.Text() lineCount++ // Parse log line entry, err := parser.ParseLine(line) if err != nil { log.Printf("Parse error on line %d: %v", lineCount, err) continue } // Convert to document doc := &indexer.Document{ ID: fmt.Sprintf("%s_%d_%d", filePath, entry.Timestamp, lineCount), Fields: convertToLogDocument(entry), } documents = append(documents, doc) currentPosition += int64(len(line)) + 1 // +1 for newline // Process batch if len(documents) >= 1000 { if err := indexer.IndexDocuments(context.Background(), documents); err != nil { return err } // Update persistence info.LastPosition = currentPosition info.DocumentCount += uint64(len(documents)) info.LastModified = time.Now() if err := persistence.UpdateIncrementalInfo(filePath, info); err != nil { log.Printf("Failed to update persistence: %v", err) } // Update progress tracker.UpdateFileProgress(filePath, lineCount) documents = documents[:0] // Reset slice } } // Process remaining documents if len(documents) > 0 { if err := indexer.IndexDocuments(context.Background(), documents); err != nil { return err } info.LastPosition = currentPosition info.DocumentCount += uint64(len(documents)) info.LastModified = time.Now() if err := persistence.UpdateIncrementalInfo(filePath, info); err != nil { log.Printf("Failed to update persistence: %v", err) } } tracker.CompleteFile(filePath, lineCount) return scanner.Err() } func convertToLogDocument(entry *parser.LogEntry) *indexer.LogDocument { return &indexer.LogDocument{ Timestamp: entry.Timestamp, IP: entry.IP, RegionCode: entry.RegionCode, Province: entry.Province, City: entry.City, ISP: entry.ISP, Method: entry.Method, Path: entry.Path, PathExact: entry.Path, Protocol: entry.Protocol, Status: entry.Status, BytesSent: entry.BytesSent, Referer: entry.Referer, UserAgent: entry.UserAgent, Browser: entry.Browser, BrowserVer: entry.BrowserVer, OS: entry.OS, OSVersion: entry.OSVersion, DeviceType: entry.DeviceType, RequestTime: entry.RequestTime, UpstreamTime: entry.UpstreamTime, FilePath: entry.FilePath, Raw: entry.Raw, } } ``` ## API Reference ### Core Interfaces ```go // Indexer interface defines the main indexing contract type Indexer interface { // Document operations IndexDocument(ctx context.Context, doc *Document) error IndexDocuments(ctx context.Context, docs []*Document) error IndexDocumentAsync(doc *Document, callback func(error)) IndexDocumentsAsync(docs []*Document, callback func(error)) // Batch operations StartBatch() BatchWriterInterface FlushAll() error // Management operations Optimize() error GetStats() *IndexStats GetShardInfo(shardID int) (*ShardInfo, error) // Lifecycle management Start(ctx context.Context) error Stop() error IsHealthy() bool // Configuration GetConfig() *IndexerConfig UpdateConfig(config *IndexerConfig) error } // ShardManager manages multiple index shards type ShardManager interface { Initialize() error GetShard(key string) (bleve.Index, int, error) GetShardByID(id int) (bleve.Index, error) GetAllShards() []bleve.Index GetShardStats() []*ShardInfo CreateShard(id int, path string) error CloseShard(id int) error OptimizeShard(id int) error HealthCheck() error } // BatchWriterInterface provides efficient batch operations type BatchWriterInterface interface { Add(doc *Document) error Flush() (*IndexResult, error) Size() int Reset() } ``` This comprehensive documentation covers all aspects of the indexer package including architecture, configuration, performance characteristics, and practical examples for integration. ## ⚡ Performance Benchmarks *Latest benchmark results on Apple M2 Pro (August 25, 2025):* | Operation | Rate | ns/op | B/op | allocs/op | Notes | |-----------|------|--------|------|-----------|-------| | UpdateFileProgress | 20.9M ops/sec | 57.59 | 0 | 0 | Zero-allocation progress tracking | | GetProgress | 9.8M ops/sec | 117.5 | 0 | 0 | Zero-allocation status reads | | CacheAccess | 17.3M ops/sec | 68.40 | 29 | 1 | Optimized persistence cache | | ConcurrentAccess | 3.4M ops/sec | 346.2 | 590 | 4 | Multi-threaded operations | ### Key Performance Features - **Zero-allocation progress tracking** for high-frequency updates - **Optimized document ID generation** using utils.AppendInt + utils.BytesToStringUnsafe - **Efficient shard path creation** with pre-allocated buffers - **Memory pooling** through unified utils package - **Sub-microsecond** file progress operations *Performance optimizations delivered 20-75% allocation reduction in critical paths.*