123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545 |
- package nginx_log
- import (
- "fmt"
- "runtime"
- "sync"
- "time"
- "github.com/blevesearch/bleve/v2"
- "github.com/blevesearch/bleve/v2/mapping"
- "github.com/uozi-tech/cosy/logger"
- )
- // OptimizedSearchIndexer provides high-performance indexing capabilities
- type OptimizedSearchIndexer struct {
- index bleve.Index
- indexPath string
- parser *OptimizedLogParser
- batchSize int
- workerCount int
- flushInterval time.Duration
-
- // Performance optimizations
- entryPool *sync.Pool
- batchPool *sync.Pool
- indexMapping mapping.IndexMapping
-
- // Channels for batch processing
- entryChannel chan *AccessLogEntry
- batchChannel chan []*AccessLogEntry
- errorChannel chan error
-
- // Control channels
- stopChannel chan struct{}
- wg sync.WaitGroup
-
- // Statistics
- indexedCount int64
- batchCount int64
- errorCount int64
- mu sync.RWMutex
- }
- // OptimizedIndexerConfig holds configuration for the optimized indexer
- type OptimizedIndexerConfig struct {
- IndexPath string
- BatchSize int
- WorkerCount int
- FlushInterval time.Duration
- Parser *OptimizedLogParser
- }
- // NewOptimizedSearchIndexer creates a new optimized search indexer
- func NewOptimizedSearchIndexer(config *OptimizedIndexerConfig) (*OptimizedSearchIndexer, error) {
- // Set defaults
- if config.BatchSize == 0 {
- config.BatchSize = 10000
- }
- if config.WorkerCount == 0 {
- config.WorkerCount = runtime.NumCPU()
- }
- if config.FlushInterval == 0 {
- config.FlushInterval = 5 * time.Second
- }
-
- // Create optimized index mapping
- indexMapping := createOptimizedIndexMapping()
-
- // Create or open the index
- index, err := bleve.Open(config.IndexPath)
- if err != nil {
- // Index doesn't exist, create it
- index, err = bleve.New(config.IndexPath, indexMapping)
- if err != nil {
- return nil, fmt.Errorf("failed to create index: %w", err)
- }
- }
-
- indexer := &OptimizedSearchIndexer{
- index: index,
- indexPath: config.IndexPath,
- parser: config.Parser,
- batchSize: config.BatchSize,
- workerCount: config.WorkerCount,
- flushInterval: config.FlushInterval,
- indexMapping: indexMapping,
-
- // Initialize object pools
- entryPool: &sync.Pool{
- New: func() interface{} {
- return &AccessLogEntry{}
- },
- },
- batchPool: &sync.Pool{
- New: func() interface{} {
- return make([]*AccessLogEntry, 0, config.BatchSize)
- },
- },
-
- // Initialize channels
- entryChannel: make(chan *AccessLogEntry, config.BatchSize*2),
- batchChannel: make(chan []*AccessLogEntry, config.WorkerCount*2),
- errorChannel: make(chan error, config.WorkerCount),
- stopChannel: make(chan struct{}),
- }
-
- // Start background workers
- indexer.startWorkers()
-
- return indexer, nil
- }
- // createOptimizedIndexMapping creates an optimized index mapping for better performance
- func createOptimizedIndexMapping() mapping.IndexMapping {
- indexMapping := bleve.NewIndexMapping()
-
- // Create document mapping
- docMapping := bleve.NewDocumentMapping()
-
- // Optimize field mappings for better search performance
- timestampMapping := bleve.NewNumericFieldMapping()
- timestampMapping.Store = true // Store for time range queries
- timestampMapping.Index = true
- docMapping.AddFieldMappingsAt("timestamp", timestampMapping)
-
- // IP field - exact match, no analysis
- ipMapping := bleve.NewKeywordFieldMapping()
- ipMapping.Store = true
- ipMapping.Index = true
- docMapping.AddFieldMappingsAt("ip", ipMapping)
-
- // Method field - exact match
- methodMapping := bleve.NewKeywordFieldMapping()
- methodMapping.Store = true
- methodMapping.Index = true
- docMapping.AddFieldMappingsAt("method", methodMapping)
-
- // Path field - text search with keyword indexing
- pathMapping := bleve.NewTextFieldMapping()
- pathMapping.Store = true
- pathMapping.Index = true
- pathMapping.Analyzer = "keyword"
- docMapping.AddFieldMappingsAt("path", pathMapping)
-
- // Status field - numeric for range queries
- statusMapping := bleve.NewNumericFieldMapping()
- statusMapping.Store = true
- statusMapping.Index = true
- docMapping.AddFieldMappingsAt("status", statusMapping)
-
- // Bytes sent - numeric
- bytesMapping := bleve.NewNumericFieldMapping()
- bytesMapping.Store = true
- bytesMapping.Index = true
- docMapping.AddFieldMappingsAt("bytes_sent", bytesMapping)
-
- // Request time - numeric
- requestTimeMapping := bleve.NewNumericFieldMapping()
- requestTimeMapping.Store = true
- requestTimeMapping.Index = true
- docMapping.AddFieldMappingsAt("request_time", requestTimeMapping)
-
- // User agent - text search
- userAgentMapping := bleve.NewTextFieldMapping()
- userAgentMapping.Store = true
- userAgentMapping.Index = true
- userAgentMapping.Analyzer = "standard"
- docMapping.AddFieldMappingsAt("user_agent", userAgentMapping)
-
- // Browser fields - keyword for exact matching
- browserMapping := bleve.NewKeywordFieldMapping()
- browserMapping.Store = true
- browserMapping.Index = true
- docMapping.AddFieldMappingsAt("browser", browserMapping)
-
- osMapping := bleve.NewKeywordFieldMapping()
- osMapping.Store = true
- osMapping.Index = true
- docMapping.AddFieldMappingsAt("os", osMapping)
-
- deviceMapping := bleve.NewKeywordFieldMapping()
- deviceMapping.Store = true
- deviceMapping.Index = true
- docMapping.AddFieldMappingsAt("device_type", deviceMapping)
-
- // Geographic fields - keyword for exact matching
- regionCodeMapping := bleve.NewKeywordFieldMapping()
- regionCodeMapping.Store = true
- regionCodeMapping.Index = true
- docMapping.AddFieldMappingsAt("region_code", regionCodeMapping)
-
- provinceMapping := bleve.NewKeywordFieldMapping()
- provinceMapping.Store = true
- provinceMapping.Index = true
- docMapping.AddFieldMappingsAt("province", provinceMapping)
-
- cityMapping := bleve.NewKeywordFieldMapping()
- cityMapping.Store = true
- cityMapping.Index = true
- docMapping.AddFieldMappingsAt("city", cityMapping)
-
- // Raw log line for full-text search
- rawMapping := bleve.NewTextFieldMapping()
- rawMapping.Store = false // Don't store raw data, just index
- rawMapping.Index = true
- rawMapping.Analyzer = "standard"
- docMapping.AddFieldMappingsAt("raw", rawMapping)
-
- // Add the document mapping to the index mapping
- indexMapping.AddDocumentMapping("_default", docMapping)
-
- // Optimize index settings
- indexMapping.DefaultAnalyzer = "standard"
- indexMapping.DefaultDateTimeParser = "2006-01-02T15:04:05Z07:00"
-
- return indexMapping
- }
- // startWorkers starts the background workers for batch processing
- func (osi *OptimizedSearchIndexer) startWorkers() {
- // Start batch collector
- osi.wg.Add(1)
- go osi.batchCollector()
-
- // Start indexing workers
- for i := 0; i < osi.workerCount; i++ {
- osi.wg.Add(1)
- go osi.indexWorker(i)
- }
-
- // Start flush timer
- osi.wg.Add(1)
- go osi.flushTimer()
-
- logger.Infof("Started %d indexing workers with batch size %d", osi.workerCount, osi.batchSize)
- }
- // batchCollector collects entries into batches for efficient indexing
- func (osi *OptimizedSearchIndexer) batchCollector() {
- defer osi.wg.Done()
-
- batch := osi.batchPool.Get().([]AccessLogEntry)
- batch = batch[:0]
-
- defer func() {
- // Process final batch
- if len(batch) > 0 {
- batchCopy := make([]*AccessLogEntry, len(batch))
- for i := range batch {
- batchCopy[i] = &batch[i]
- }
- select {
- case osi.batchChannel <- batchCopy:
- case <-osi.stopChannel:
- }
- }
- osi.batchPool.Put(batch)
- }()
-
- for {
- select {
- case entry := <-osi.entryChannel:
- if entry != nil {
- batch = append(batch, *entry)
- osi.entryPool.Put(entry)
-
- if len(batch) >= osi.batchSize {
- // Send batch for indexing
- batchCopy := make([]*AccessLogEntry, len(batch))
- for i := range batch {
- batchCopy[i] = &batch[i]
- }
-
- select {
- case osi.batchChannel <- batchCopy:
- batch = batch[:0]
- case <-osi.stopChannel:
- return
- }
- }
- }
- case <-osi.stopChannel:
- return
- }
- }
- }
- // indexWorker processes batches of entries for indexing
- func (osi *OptimizedSearchIndexer) indexWorker(workerID int) {
- defer osi.wg.Done()
-
- for {
- select {
- case batch := <-osi.batchChannel:
- err := osi.indexBatch(batch)
- if err != nil {
- logger.Errorf("Worker %d: failed to index batch: %v", workerID, err)
- osi.mu.Lock()
- osi.errorCount++
- osi.mu.Unlock()
-
- select {
- case osi.errorChannel <- err:
- default:
- }
- } else {
- osi.mu.Lock()
- osi.indexedCount += int64(len(batch))
- osi.batchCount++
- osi.mu.Unlock()
- }
- case <-osi.stopChannel:
- return
- }
- }
- }
- // flushTimer periodically flushes the index
- func (osi *OptimizedSearchIndexer) flushTimer() {
- defer osi.wg.Done()
-
- ticker := time.NewTicker(osi.flushInterval)
- defer ticker.Stop()
-
- for {
- select {
- case <-ticker.C:
- osi.FlushIndex()
- case <-osi.stopChannel:
- return
- }
- }
- }
- // indexBatch indexes a batch of entries efficiently
- func (osi *OptimizedSearchIndexer) indexBatch(entries []*AccessLogEntry) error {
- batch := osi.index.NewBatch()
-
- for _, entry := range entries {
- doc := osi.createIndexDocument(entry)
- docID := fmt.Sprintf("%d_%s_%s",
- entry.Timestamp,
- entry.IP,
- entry.Path)
-
- err := batch.Index(docID, doc)
- if err != nil {
- return fmt.Errorf("failed to add document to batch: %w", err)
- }
- }
-
- err := osi.index.Batch(batch)
- if err != nil {
- return fmt.Errorf("failed to execute batch: %w", err)
- }
-
- return nil
- }
- // createIndexDocument creates an optimized document for indexing
- func (osi *OptimizedSearchIndexer) createIndexDocument(entry *AccessLogEntry) map[string]interface{} {
- doc := map[string]interface{}{
- "timestamp": entry.Timestamp,
- "ip": entry.IP,
- "method": entry.Method,
- "path": entry.Path,
- "protocol": entry.Protocol,
- "status": entry.Status,
- "bytes_sent": entry.BytesSent,
- "request_time": entry.RequestTime,
- "referer": entry.Referer,
- "user_agent": entry.UserAgent,
- "browser": entry.Browser,
- "browser_version": entry.BrowserVer,
- "os": entry.OS,
- "os_version": entry.OSVersion,
- "device_type": entry.DeviceType,
- "raw": entry.Raw,
- }
-
- // Add geographical fields if available
- if entry.RegionCode != "" {
- doc["region_code"] = entry.RegionCode
- }
- if entry.Province != "" {
- doc["province"] = entry.Province
- }
- if entry.City != "" {
- doc["city"] = entry.City
- }
-
- // Add upstream time if available
- if entry.UpstreamTime != nil {
- doc["upstream_time"] = *entry.UpstreamTime
- }
-
- return doc
- }
- // AddEntry adds a single entry for indexing (non-blocking)
- func (osi *OptimizedSearchIndexer) AddEntry(entry *AccessLogEntry) error {
- // Get entry from pool and copy data
- pooledEntry := osi.entryPool.Get().(*AccessLogEntry)
- *pooledEntry = *entry
-
- select {
- case osi.entryChannel <- pooledEntry:
- return nil
- default:
- osi.entryPool.Put(pooledEntry)
- return fmt.Errorf("entry channel is full")
- }
- }
- // AddEntries adds multiple entries for indexing
- func (osi *OptimizedSearchIndexer) AddEntries(entries []*AccessLogEntry) error {
- for _, entry := range entries {
- err := osi.AddEntry(entry)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // FlushIndex forces a flush of the index
- func (osi *OptimizedSearchIndexer) FlushIndex() error {
- start := time.Now()
- err := osi.index.Close()
- if err != nil {
- return fmt.Errorf("failed to flush index: %w", err)
- }
-
- // Reopen the index
- osi.index, err = bleve.Open(osi.indexPath)
- if err != nil {
- return fmt.Errorf("failed to reopen index after flush: %w", err)
- }
-
- logger.Debugf("Index flush completed in %v", time.Since(start))
- return nil
- }
- // GetStatistics returns indexing statistics
- func (osi *OptimizedSearchIndexer) GetStatistics() map[string]interface{} {
- osi.mu.RLock()
- defer osi.mu.RUnlock()
-
- return map[string]interface{}{
- "indexed_count": osi.indexedCount,
- "batch_count": osi.batchCount,
- "error_count": osi.errorCount,
- "batch_size": osi.batchSize,
- "worker_count": osi.workerCount,
- "queue_size": len(osi.entryChannel),
- "batch_queue_size": len(osi.batchChannel),
- }
- }
- // Wait waits for all pending entries to be indexed
- func (osi *OptimizedSearchIndexer) Wait() error {
- // Wait for entry channel to empty
- for len(osi.entryChannel) > 0 {
- time.Sleep(10 * time.Millisecond)
- }
-
- // Wait for batch channel to empty
- for len(osi.batchChannel) > 0 {
- time.Sleep(10 * time.Millisecond)
- }
-
- // Final flush
- return osi.FlushIndex()
- }
- // Close shuts down the optimized indexer
- func (osi *OptimizedSearchIndexer) Close() error {
- // Signal all workers to stop
- close(osi.stopChannel)
-
- // Wait for all workers to finish
- osi.wg.Wait()
-
- // Close channels
- close(osi.entryChannel)
- close(osi.batchChannel)
- close(osi.errorChannel)
-
- // Final flush and close index
- err := osi.index.Close()
- if err != nil {
- return fmt.Errorf("failed to close index: %w", err)
- }
-
- logger.Infof("Optimized indexer closed. Final stats: %+v", osi.GetStatistics())
- return nil
- }
- // BulkIndexFromParser indexes entries using the optimized parser in bulk
- func (osi *OptimizedSearchIndexer) BulkIndexFromParser(lines []string) error {
- start := time.Now()
-
- // Parse lines in parallel
- entries := osi.parser.ParseLinesParallel(lines)
-
- // Add to indexer
- err := osi.AddEntries(entries)
- if err != nil {
- return fmt.Errorf("failed to add entries for indexing: %w", err)
- }
-
- // Wait for indexing to complete
- err = osi.Wait()
- if err != nil {
- return fmt.Errorf("failed to complete indexing: %w", err)
- }
-
- duration := time.Since(start)
- rate := float64(len(lines)) / duration.Seconds()
-
- logger.Infof("Bulk indexed %d entries in %v (%.2f entries/sec)",
- len(lines), duration, rate)
-
- return nil
- }
- // ProcessLogFileOptimized processes a log file with optimized indexing
- func (osi *OptimizedSearchIndexer) ProcessLogFileOptimized(filePath string) error {
- // Use the streaming processor from the optimized parser
- processor := NewStreamingLogProcessor(nil, osi.batchSize, osi.workerCount)
-
- // Override the processBatch method to use our indexer
- processor.indexer = &LogIndexer{} // Placeholder
-
- // Read and process the file in chunks
- return osi.processFileInChunks(filePath)
- }
- // processFileInChunks processes a log file in chunks for memory efficiency
- func (osi *OptimizedSearchIndexer) processFileInChunks(filePath string) error {
- // This would implement chunked file processing
- // For now, return a simple implementation
- logger.Infof("Processing file %s with optimized indexer", filePath)
- return nil
- }
|