|
1 tuần trước cách đây | |
---|---|---|
.. | ||
README.md | 1 tháng trước cách đây | |
adaptive_optimization.go | 1 tuần trước cách đây | |
adaptive_optimization_test.go | 3 tuần trước cách đây | |
batch_writer.go | 1 tháng trước cách đây | |
grouped_shard_manager.go | 3 tuần trước cách đây | |
integration_validation_test.go | 3 tuần trước cách đây | |
log_file_manager.go | 2 tuần trước cách đây | |
metrics.go | 1 tháng trước cách đây | |
optimized_parallel_indexer.go | 1 tháng trước cách đây | |
parallel_indexer.go | 2 tuần trước cách đây | |
parallel_indexer_worker_test.go | 3 tuần trước cách đây | |
parser.go | 3 tuần trước cách đây | |
persistence.go | 1 tháng trước cách đây | |
persistence_test.go | 1 tháng trước cách đây | |
progress_tracker.go | 1 tháng trước cách đây | |
progress_tracker_bench_test.go | 1 tháng trước cách đây | |
progress_tracker_test.go | 1 tháng trước cách đây | |
rebuild.go | 2 tuần trước cách đây | |
rebuild_simple_test.go | 1 tháng trước cách đây | |
rebuild_test.go | 3 tuần trước cách đây | |
rotation_scanner.go | 3 tuần trước cách đây | |
throughput_optimizer.go | 1 tháng trước cách đây | |
time_range_fix_test.go | 1 tháng trước cách đây | |
types.go | 3 tuần trước cách đây | |
validation_test.go | 1 tháng trước cách đây | |
zero_allocation_pool.go | 3 tuần trước cách đây |
The indexer package provides high-performance, multi-shard parallel indexing capabilities for NGINX logs with comprehensive persistence management, progress tracking, and rebuild functionality.
indexer/
├── types.go # Core types, interfaces, and index mapping
├── parallel_indexer.go # Main parallel indexer implementation (optimized)
├── shard_manager.go # Multi-shard management and distribution (optimized)
├── batch_writer.go # Efficient batch writing operations
├── persistence.go # Incremental indexing and persistence management
├── progress_tracker.go # Real-time progress monitoring
├── rebuild.go # Index rebuilding functionality
├── log_file_manager.go # Log file discovery and management
├── metrics.go # Performance metrics and monitoring
└── README.md # This documentation
Note: Performance optimizations now use the unified utils package (../utils/)
import "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
// Create indexer with default configuration
indexer := indexer.NewParallelIndexer(nil)
// Start indexer
ctx := context.Background()
if err := indexer.Start(ctx); err != nil {
log.Fatal(err)
}
defer indexer.Stop()
// Index a single document
doc := &indexer.Document{
ID: "log_entry_1",
Fields: &indexer.LogDocument{
Timestamp: time.Now().Unix(),
IP: "192.168.1.1",
Method: "GET",
Path: "/api/status",
Status: 200,
BytesSent: 1234,
FilePath: "/var/log/nginx/access.log",
Raw: "192.168.1.1 - - [01/Jan/2024:12:00:00 +0000] \"GET /api/status HTTP/1.1\" 200 1234",
},
}
if err := indexer.IndexDocument(ctx, doc); err != nil {
log.Printf("Indexing error: %v", err)
}
// Create multiple documents
var documents []*indexer.Document
for i := 0; i < 1000; i++ {
doc := &indexer.Document{
ID: fmt.Sprintf("log_entry_%d", i),
Fields: &indexer.LogDocument{
Timestamp: time.Now().Unix(),
IP: fmt.Sprintf("192.168.1.%d", i%254+1),
Method: "GET",
Path: fmt.Sprintf("/api/endpoint_%d", i),
Status: 200,
BytesSent: int64(1000 + i),
FilePath: "/var/log/nginx/access.log",
},
}
documents = append(documents, doc)
}
// Index batch with automatic optimization
if err := indexer.IndexDocuments(ctx, documents); err != nil {
log.Printf("Batch indexing error: %v", err)
}
// Get indexing statistics
stats := indexer.GetStats()
fmt.Printf("Total documents: %d, Indexing rate: %.2f docs/sec\n",
stats.TotalDocuments, stats.IndexingRate)
type IndexerConfig struct {
// Basic configuration
IndexPath string `json:"index_path"` // Base path for index storage
ShardCount int `json:"shard_count"` // Number of shards (default: 4)
WorkerCount int `json:"worker_count"` // Worker pool size (default: 8)
BatchSize int `json:"batch_size"` // Batch processing size (default: 1000)
FlushInterval time.Duration `json:"flush_interval"` // Auto-flush interval (default: 5s)
MaxQueueSize int `json:"max_queue_size"` // Maximum queue depth (default: 10000)
// Performance tuning
EnableCompression bool `json:"enable_compression"` // Enable index compression (default: true)
MemoryQuota int64 `json:"memory_quota"` // Memory limit in bytes (default: 1GB)
MaxSegmentSize int64 `json:"max_segment_size"` // Maximum segment size (default: 64MB)
OptimizeInterval time.Duration `json:"optimize_interval"` // Auto-optimization interval (default: 30m)
// Monitoring
EnableMetrics bool `json:"enable_metrics"` // Enable performance metrics (default: true)
}
func DefaultIndexerConfig() *IndexerConfig {
return &IndexerConfig{
IndexPath: "./log-index",
ShardCount: 4,
WorkerCount: 8,
BatchSize: 1000,
FlushInterval: 5 * time.Second,
MaxQueueSize: 10000,
EnableCompression: true,
MemoryQuota: 1024 * 1024 * 1024, // 1GB
MaxSegmentSize: 64 * 1024 * 1024, // 64MB
OptimizeInterval: 30 * time.Minute,
EnableMetrics: true,
}
}
The main indexer implementation with multi-shard support:
// Create with custom configuration
config := &IndexerConfig{
ShardCount: 8, // More shards for higher throughput
WorkerCount: 16, // More workers for CPU-intensive workloads
BatchSize: 2000, // Larger batches for better efficiency
}
indexer := NewParallelIndexer(config)
// Asynchronous indexing with callback
indexer.IndexDocumentAsync(doc, func(err error) {
if err != nil {
log.Printf("Async indexing failed: %v", err)
} else {
log.Println("Document indexed successfully")
}
})
Manages distribution across multiple index shards:
// Get shard information
shardStats := indexer.GetShardInfo(0)
fmt.Printf("Shard 0: %d documents, %s size\n",
shardStats.DocumentCount, formatBytes(shardStats.Size))
// Optimize specific shard
if err := indexer.OptimizeShard(0); err != nil {
log.Printf("Shard optimization failed: %v", err)
}
// Health check
if err := indexer.HealthCheck(); err != nil {
log.Printf("Health check failed: %v", err)
}
Efficient batch writing with automatic flushing:
// Create batch writer
batch := indexer.StartBatch()
// Add documents to batch
for _, doc := range documents {
if err := batch.Add(doc); err != nil {
log.Printf("Failed to add document to batch: %v", err)
}
// Automatic flush when batch size reached
if batch.Size() >= 1000 {
result, err := batch.Flush()
if err != nil {
log.Printf("Batch flush failed: %v", err)
} else {
fmt.Printf("Flushed %d documents in %v\n",
result.Processed, result.Duration)
}
}
}
// Final flush
if batch.Size() > 0 {
batch.Flush()
}
Handles incremental indexing and position tracking:
// Create persistence manager with database
persistenceManager := NewPersistenceManager(&PersistenceConfig{
DatabaseURL: "postgres://user:pass@localhost/nginx_ui",
TableName: "nginx_log_indexes",
})
// Get incremental indexing information
info, err := persistenceManager.GetIncrementalInfo("/var/log/nginx/access.log")
if err != nil {
log.Printf("Failed to get incremental info: %v", err)
}
fmt.Printf("Last indexed position: %d, Documents: %d\n",
info.LastPosition, info.DocumentCount)
// Update indexing progress
newInfo := &LogFileInfo{
Path: "/var/log/nginx/access.log",
LastPosition: info.LastPosition + 1024,
DocumentCount: info.DocumentCount + 100,
LastModified: time.Now(),
IndexedAt: time.Now(),
}
if err := persistenceManager.UpdateIncrementalInfo("/var/log/nginx/access.log", newInfo); err != nil {
log.Printf("Failed to update incremental info: %v", err)
}
Real-time progress monitoring for long-running operations:
// Create progress configuration
progressConfig := &ProgressConfig{
OnProgress: func(notification ProgressNotification) {
fmt.Printf("Progress: %s - %.2f%% complete (%d/%d files)\n",
notification.GroupPath, notification.OverallProgress*100,
notification.CompletedFiles, notification.TotalFiles)
},
OnCompletion: func(notification CompletionNotification) {
fmt.Printf("Completed: %s in %v (processed %d documents)\n",
notification.GroupPath, notification.Duration, notification.DocumentCount)
},
}
// Get progress tracker
progressManager := NewProgressManager()
tracker := progressManager.GetTracker("/var/log/nginx/access.log", progressConfig)
// Track file processing
tracker.AddFile("/var/log/nginx/access.log", false)
tracker.SetFileEstimate("/var/log/nginx/access.log", 10000) // Estimated lines
tracker.StartFile("/var/log/nginx/access.log")
// Update progress periodically
for i := 0; i < 10000; i++ {
// Process log line...
if i%100 == 0 {
tracker.UpdateFileProgress("/var/log/nginx/access.log", int64(i))
}
}
tracker.CompleteFile("/var/log/nginx/access.log", 10000)
Complete index reconstruction with progress tracking:
// Create rebuild manager
rebuildManager := NewRebuildManager(
indexer,
persistenceManager,
progressManager,
shardManager,
&RebuildConfig{
BatchSize: 2000,
MaxConcurrency: 4,
DeleteBeforeRebuild: true,
ProgressInterval: 10 * time.Second,
TimeoutPerFile: 30 * time.Minute,
},
)
// Rebuild all indexes
ctx := context.Background()
if err := rebuildManager.RebuildAll(ctx); err != nil {
log.Printf("Rebuild failed: %v", err)
} else {
log.Println("Rebuild completed successfully")
}
// Rebuild single log group
if err := rebuildManager.RebuildSingle(ctx, "/var/log/nginx/access.log"); err != nil {
log.Printf("Single rebuild failed: %v", err)
}
// Monitor rebuild status
stats := rebuildManager.GetRebuildStats()
if stats.IsRebuilding {
fmt.Printf("Rebuild in progress (last rebuild: %v)\n", stats.LastRebuildTime)
}
Based on comprehensive benchmarking on Apple M2 Pro:
Operation | Performance | Memory Usage | Notes |
---|---|---|---|
Single document indexing | ~125µs | 1.2KB | Including shard selection |
Batch indexing (1000 docs) | ~45ms | 128KB | Optimized batch processing |
Shard selection | ~25ns | 0 allocs | Hash-based distribution |
Progress tracking update | ~57ns | 0 allocs | Lock-free counters |
Rebuild stats retrieval | ~25ns | 0 allocs | Atomic operations |
Memory optimization cycle | ~2.1ms | 45KB | Garbage collection trigger |
Index optimization | ~150ms | 2.3MB | Per shard |
Scenario | Documents/sec | Memory Peak | CPU Usage |
---|---|---|---|
Single worker | ~8,000 | 256MB | 25% |
4 workers | ~28,000 | 512MB | 85% |
8 workers | ~45,000 | 768MB | 95% |
16 workers | ~52,000 | 1.2GB | 98% |
Worker Count Configuration
// CPU-bound workloads
config.WorkerCount = runtime.GOMAXPROCS(0)
// I/O-bound workloads
config.WorkerCount = runtime.GOMAXPROCS(0) * 2
// Memory-constrained environments
config.WorkerCount = max(2, runtime.GOMAXPROCS(0)/2)
Shard Count Optimization
// For high-volume environments (>1M docs)
config.ShardCount = 8
// For moderate volume (100K-1M docs)
config.ShardCount = 4
// For low volume (<100K docs)
config.ShardCount = 2
Batch Size Tuning
// High-throughput scenarios
config.BatchSize = 2000
// Memory-constrained environments
config.BatchSize = 500
// Real-time requirements
config.BatchSize = 100
Memory Management
// Configure memory limits
config.MemoryQuota = 2 * 1024 * 1024 * 1024 // 2GB
config.MaxSegmentSize = 128 * 1024 * 1024 // 128MB
// Enable automatic optimization
config.OptimizeInterval = 15 * time.Minute
type LogDocument struct {
// Core fields - always indexed
Timestamp int64 `json:"timestamp"` // Unix timestamp (range queries)
IP string `json:"ip"` // IP address (keyword matching)
Method string `json:"method"` // HTTP method (keyword)
Path string `json:"path"` // Request path (analyzed text)
PathExact string `json:"path_exact"` // Exact path matching (keyword)
Status int `json:"status"` // HTTP status (numeric range)
BytesSent int64 `json:"bytes_sent"` // Response size (numeric)
// Geographic enrichment (optional)
RegionCode string `json:"region_code,omitempty"` // Country code (keyword)
Province string `json:"province,omitempty"` // State/province (keyword)
City string `json:"city,omitempty"` // City (keyword)
ISP string `json:"isp,omitempty"` // ISP (keyword)
// User agent analysis (optional)
UserAgent string `json:"user_agent,omitempty"` // Full user agent (analyzed)
Browser string `json:"browser,omitempty"` // Browser name (keyword)
BrowserVer string `json:"browser_version,omitempty"` // Browser version (keyword)
OS string `json:"os,omitempty"` // Operating system (keyword)
OSVersion string `json:"os_version,omitempty"` // OS version (keyword)
DeviceType string `json:"device_type,omitempty"` // Device type (keyword)
// Performance metrics (optional)
RequestTime float64 `json:"request_time,omitempty"` // Request duration (numeric)
UpstreamTime *float64 `json:"upstream_time,omitempty"` // Upstream response time (numeric)
// HTTP details (optional)
Protocol string `json:"protocol,omitempty"` // HTTP protocol (keyword)
Referer string `json:"referer,omitempty"` // HTTP referer (analyzed)
// Metadata
FilePath string `json:"file_path"` // Source file (keyword)
Raw string `json:"raw"` // Original log line (stored only)
}
// Optimized mapping for NGINX logs
func CreateLogIndexMapping() mapping.IndexMapping {
indexMapping := bleve.NewIndexMapping()
indexMapping.DefaultAnalyzer = "standard"
docMapping := bleve.NewDocumentMapping()
// Timestamp - numeric for range queries
timestampMapping := bleve.NewNumericFieldMapping()
timestampMapping.Store = true
timestampMapping.Index = true
docMapping.AddFieldMappingsAt("timestamp", timestampMapping)
// IP address - keyword for exact matching
ipMapping := bleve.NewTextFieldMapping()
ipMapping.Analyzer = "keyword"
ipMapping.Store = true
ipMapping.Index = true
docMapping.AddFieldMappingsAt("ip", ipMapping)
// Path - dual mapping for different query types
pathMapping := bleve.NewTextFieldMapping() // Analyzed for partial matching
pathMapping.Analyzer = "standard"
pathMapping.Store = true
pathMapping.Index = true
docMapping.AddFieldMappingsAt("path", pathMapping)
pathExactMapping := bleve.NewTextFieldMapping() // Keyword for exact matching
pathExactMapping.Analyzer = "keyword"
pathExactMapping.Store = false
pathExactMapping.Index = true
docMapping.AddFieldMappingsAt("path_exact", pathExactMapping)
// Status code - numeric for range queries
statusMapping := bleve.NewNumericFieldMapping()
statusMapping.Store = true
statusMapping.Index = true
docMapping.AddFieldMappingsAt("status", statusMapping)
// Raw log line - stored but not indexed (for display)
rawMapping := bleve.NewTextFieldMapping()
rawMapping.Store = true
rawMapping.Index = false
docMapping.AddFieldMappingsAt("raw", rawMapping)
indexMapping.AddDocumentMapping("_default", docMapping)
return indexMapping
}
Process only new or modified log entries:
// Configure incremental indexing
config := &PersistenceConfig{
DatabaseURL: "postgres://localhost/nginx_ui",
EnabledPaths: []string{"/var/log/nginx/access.log"},
IncrementalConfig: &IncrementalIndexConfig{
CheckInterval: time.Minute,
MaxFilesToCheck: 100,
BatchSize: 1000,
},
}
persistenceManager := NewPersistenceManager(config)
// Process incremental updates
groups, err := persistenceManager.GetIncrementalGroups()
if err != nil {
log.Printf("Failed to get incremental groups: %v", err)
} else {
for _, group := range groups {
fmt.Printf("Group: %s, Changed files: %d, Needs reindex: %d\n",
group.GroupPath, group.ChangedFiles, group.NeedsReindex)
}
}
Automatic detection and handling of rotated log files:
// Supports various rotation patterns:
// - access.log, access.log.1, access.log.2, ...
// - access.log.2024-01-01, access.log.2024-01-02, ...
// - access.log.gz, access.log.1.gz, access.log.2.gz, ...
// - access-20240101.log, access-20240102.log, ...
// Get main log path from rotated file
mainPath := getMainLogPathFromFile("/var/log/nginx/access.log.1")
// Returns: "/var/log/nginx/access.log"
// Check if file is compressed
isCompressed := IsCompressedFile("/var/log/nginx/access.log.gz")
// Returns: true
// Estimate lines in compressed file
ctx := context.Background()
lines, err := EstimateFileLines(ctx, "/var/log/nginx/access.log.gz", fileSize, true)
if err != nil {
log.Printf("Line estimation failed: %v", err)
} else {
fmt.Printf("Estimated lines: %d\n", lines)
}
Automatic memory management and optimization:
// Enable performance monitoring
indexer := NewParallelIndexer(&IndexerConfig{
EnableMetrics: true,
MemoryQuota: 2 * 1024 * 1024 * 1024, // 2GB limit
})
// Monitor memory usage
memStats := indexer.GetMemoryStats()
fmt.Printf("Memory usage: %.2f MB (%.2f%% of quota)\n",
memStats.AllocMB, (memStats.AllocMB/2048)*100)
// Trigger optimization if needed
if memStats.AllocMB > 1500 { // 75% of quota
indexer.Optimize()
}
// Advanced optimization with specific targets
optimizationConfig := &OptimizationConfig{
TargetMemoryUsage: 1024 * 1024 * 1024, // 1GB
MaxSegmentCount: 10,
MinSegmentSize: 16 * 1024 * 1024, // 16MB
}
if err := indexer.OptimizeWithConfig(optimizationConfig); err != nil {
log.Printf("Optimization failed: %v", err)
}
Comprehensive health checks and monitoring:
// Perform health check
if err := indexer.HealthCheck(); err != nil {
log.Printf("Health check failed: %v", err)
// Get detailed status
for i := 0; i < indexer.GetConfig().ShardCount; i++ {
shardInfo, err := indexer.GetShardInfo(i)
if err != nil {
log.Printf("Shard %d: ERROR - %v", i, err)
} else {
log.Printf("Shard %d: OK - %d docs, %s",
i, shardInfo.DocumentCount, formatBytes(shardInfo.Size))
}
}
}
// Monitor worker status
stats := indexer.GetStats()
for i, worker := range stats.WorkerStats {
status := worker.Status
if status != "idle" && status != "busy" {
log.Printf("Worker %d has abnormal status: %s", i, status)
}
}
var (
ErrIndexerNotStarted = "indexer not started"
ErrIndexerStopped = "indexer stopped"
ErrShardNotFound = "shard not found"
ErrQueueFull = "queue is full"
ErrInvalidDocument = "invalid document"
ErrOptimizationFailed = "optimization failed"
ErrIncrementalInfoNotFound = "incremental information not found"
ErrInvalidLogFileFormat = "invalid log file format"
ErrDatabaseConnectionFailed = "database connection failed"
)
// Graceful error handling with retry logic
func indexWithRetry(indexer *ParallelIndexer, doc *Document, maxRetries int) error {
for attempt := 0; attempt < maxRetries; attempt++ {
err := indexer.IndexDocument(context.Background(), doc)
if err == nil {
return nil
}
// Check if error is retryable
if isRetryableError(err) {
log.Printf("Attempt %d failed: %v, retrying...", attempt+1, err)
time.Sleep(time.Duration(attempt+1) * time.Second)
continue
}
// Non-retryable error
return fmt.Errorf("non-retryable error: %w", err)
}
return fmt.Errorf("max retries (%d) exceeded", maxRetries)
}
func isRetryableError(err error) bool {
errStr := err.Error()
return strings.Contains(errStr, "queue is full") ||
strings.Contains(errStr, "temporary failure") ||
strings.Contains(errStr, "context deadline exceeded")
}
type CircuitBreaker struct {
failures int32
lastFailure time.Time
threshold int32
timeout time.Duration
mutex sync.RWMutex
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mutex.RLock()
if atomic.LoadInt32(&cb.failures) >= cb.threshold {
if time.Since(cb.lastFailure) < cb.timeout {
cb.mutex.RUnlock()
return fmt.Errorf("circuit breaker open")
}
}
cb.mutex.RUnlock()
err := fn()
if err != nil {
cb.mutex.Lock()
atomic.AddInt32(&cb.failures, 1)
cb.lastFailure = time.Now()
cb.mutex.Unlock()
return err
}
// Reset on success
atomic.StoreInt32(&cb.failures, 0)
return nil
}
// Usage
cb := &CircuitBreaker{
threshold: 5,
timeout: 30 * time.Second,
}
err := cb.Call(func() error {
return indexer.IndexDocument(ctx, doc)
})
package main
import (
"bufio"
"context"
"fmt"
"log"
"os"
"time"
"github.com/0xJacky/Nginx-UI/internal/nginx_log/parser"
"github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
)
func main() {
// Initialize components
parser := parser.NewNginxParser(&parser.ParserConfig{
WorkerCount: 8,
EnableGeoIP: true,
EnableUserAgent: true,
})
indexer := indexer.NewParallelIndexer(&indexer.IndexerConfig{
ShardCount: 4,
WorkerCount: 8,
BatchSize: 1000,
})
persistenceManager := indexer.NewPersistenceManager(&indexer.PersistenceConfig{
DatabaseURL: "postgres://localhost/nginx_ui",
})
progressManager := indexer.NewProgressManager()
// Start indexer
ctx := context.Background()
if err := indexer.Start(ctx); err != nil {
log.Fatal(err)
}
defer indexer.Stop()
// Process log file
logFile := "/var/log/nginx/access.log"
if err := processLogFile(parser, indexer, persistenceManager, progressManager, logFile); err != nil {
log.Fatal(err)
}
// Print final statistics
stats := indexer.GetStats()
fmt.Printf("Processing complete:\n")
fmt.Printf(" Documents indexed: %d\n", stats.TotalDocuments)
fmt.Printf(" Indexing rate: %.2f docs/sec\n", stats.IndexingRate)
fmt.Printf(" Memory usage: %.2f MB\n", float64(stats.MemoryUsage)/(1024*1024))
}
func processLogFile(
parser *parser.NginxParser,
indexer *indexer.ParallelIndexer,
persistence *indexer.PersistenceManager,
progressManager *indexer.ProgressManager,
filePath string,
) error {
// Get incremental information
info, err := persistence.GetIncrementalInfo(filePath)
if err != nil {
log.Printf("No incremental info found, starting from beginning: %v", err)
info = &indexer.LogFileInfo{
Path: filePath,
LastPosition: 0,
}
}
// Setup progress tracking
progressConfig := &indexer.ProgressConfig{
OnProgress: func(pn indexer.ProgressNotification) {
fmt.Printf("Progress: %.2f%% (%d/%d files)\n",
pn.OverallProgress*100, pn.CompletedFiles, pn.TotalFiles)
},
OnCompletion: func(cn indexer.CompletionNotification) {
fmt.Printf("Completed: %s in %v\n", cn.GroupPath, cn.Duration)
},
}
tracker := progressManager.GetTracker(filePath, progressConfig)
tracker.AddFile(filePath, false)
// Open file and seek to last position
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
if info.LastPosition > 0 {
if _, err := file.Seek(info.LastPosition, 0); err != nil {
return err
}
}
// Process file
scanner := bufio.NewScanner(file)
var documents []*indexer.Document
var lineCount int64
currentPosition := info.LastPosition
tracker.StartFile(filePath)
for scanner.Scan() {
line := scanner.Text()
lineCount++
// Parse log line
entry, err := parser.ParseLine(line)
if err != nil {
log.Printf("Parse error on line %d: %v", lineCount, err)
continue
}
// Convert to document
doc := &indexer.Document{
ID: fmt.Sprintf("%s_%d_%d", filePath, entry.Timestamp, lineCount),
Fields: convertToLogDocument(entry),
}
documents = append(documents, doc)
currentPosition += int64(len(line)) + 1 // +1 for newline
// Process batch
if len(documents) >= 1000 {
if err := indexer.IndexDocuments(context.Background(), documents); err != nil {
return err
}
// Update persistence
info.LastPosition = currentPosition
info.DocumentCount += uint64(len(documents))
info.LastModified = time.Now()
if err := persistence.UpdateIncrementalInfo(filePath, info); err != nil {
log.Printf("Failed to update persistence: %v", err)
}
// Update progress
tracker.UpdateFileProgress(filePath, lineCount)
documents = documents[:0] // Reset slice
}
}
// Process remaining documents
if len(documents) > 0 {
if err := indexer.IndexDocuments(context.Background(), documents); err != nil {
return err
}
info.LastPosition = currentPosition
info.DocumentCount += uint64(len(documents))
info.LastModified = time.Now()
if err := persistence.UpdateIncrementalInfo(filePath, info); err != nil {
log.Printf("Failed to update persistence: %v", err)
}
}
tracker.CompleteFile(filePath, lineCount)
return scanner.Err()
}
func convertToLogDocument(entry *parser.LogEntry) *indexer.LogDocument {
return &indexer.LogDocument{
Timestamp: entry.Timestamp,
IP: entry.IP,
RegionCode: entry.RegionCode,
Province: entry.Province,
City: entry.City,
ISP: entry.ISP,
Method: entry.Method,
Path: entry.Path,
PathExact: entry.Path,
Protocol: entry.Protocol,
Status: entry.Status,
BytesSent: entry.BytesSent,
Referer: entry.Referer,
UserAgent: entry.UserAgent,
Browser: entry.Browser,
BrowserVer: entry.BrowserVer,
OS: entry.OS,
OSVersion: entry.OSVersion,
DeviceType: entry.DeviceType,
RequestTime: entry.RequestTime,
UpstreamTime: entry.UpstreamTime,
FilePath: entry.FilePath,
Raw: entry.Raw,
}
}
// Indexer interface defines the main indexing contract
type Indexer interface {
// Document operations
IndexDocument(ctx context.Context, doc *Document) error
IndexDocuments(ctx context.Context, docs []*Document) error
IndexDocumentAsync(doc *Document, callback func(error))
IndexDocumentsAsync(docs []*Document, callback func(error))
// Batch operations
StartBatch() BatchWriterInterface
FlushAll() error
// Management operations
Optimize() error
GetStats() *IndexStats
GetShardInfo(shardID int) (*ShardInfo, error)
// Lifecycle management
Start(ctx context.Context) error
Stop() error
IsHealthy() bool
// Configuration
GetConfig() *IndexerConfig
UpdateConfig(config *IndexerConfig) error
}
// ShardManager manages multiple index shards
type ShardManager interface {
Initialize() error
GetShard(key string) (bleve.Index, int, error)
GetShardByID(id int) (bleve.Index, error)
GetAllShards() []bleve.Index
GetShardStats() []*ShardInfo
CreateShard(id int, path string) error
CloseShard(id int) error
OptimizeShard(id int) error
HealthCheck() error
}
// BatchWriterInterface provides efficient batch operations
type BatchWriterInterface interface {
Add(doc *Document) error
Flush() (*IndexResult, error)
Size() int
Reset()
}
This comprehensive documentation covers all aspects of the indexer package including architecture, configuration, performance characteristics, and practical examples for integration.
Latest benchmark results on Apple M2 Pro (August 25, 2025):
Operation | Rate | ns/op | B/op | allocs/op | Notes |
---|---|---|---|---|---|
UpdateFileProgress | 20.9M ops/sec | 57.59 | 0 | 0 | Zero-allocation progress tracking |
GetProgress | 9.8M ops/sec | 117.5 | 0 | 0 | Zero-allocation status reads |
CacheAccess | 17.3M ops/sec | 68.40 | 29 | 1 | Optimized persistence cache |
ConcurrentAccess | 3.4M ops/sec | 346.2 | 590 | 4 | Multi-threaded operations |
Performance optimizations delivered 20-75% allocation reduction in critical paths.