123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- package indexer
- import (
- "context"
- "fmt"
- "time"
- "github.com/uozi-tech/cosy/logger"
- )
- // ThroughputOptimizer provides high-level APIs for optimized log indexing
- type ThroughputOptimizer struct {
- indexer *ParallelIndexer
- config *ThroughputOptimizerConfig
- }
- // ThroughputOptimizerConfig configures the throughput optimizer
- type ThroughputOptimizerConfig struct {
- UseRotationScanning bool `json:"use_rotation_scanning"`
- MaxBatchSize int `json:"max_batch_size"`
- TimeoutPerGroup time.Duration `json:"timeout_per_group"`
- EnableProgressTracking bool `json:"enable_progress_tracking"`
- }
- // NewThroughputOptimizer creates a new throughput optimizer
- func NewThroughputOptimizer(indexer *ParallelIndexer, config *ThroughputOptimizerConfig) *ThroughputOptimizer {
- if config == nil {
- config = DefaultThroughputOptimizerConfig()
- }
- return &ThroughputOptimizer{
- indexer: indexer,
- config: config,
- }
- }
- // DefaultThroughputOptimizerConfig returns default configuration
- func DefaultThroughputOptimizerConfig() *ThroughputOptimizerConfig {
- return &ThroughputOptimizerConfig{
- UseRotationScanning: true,
- MaxBatchSize: 25000,
- TimeoutPerGroup: 10 * time.Minute,
- EnableProgressTracking: true,
- }
- }
- // OptimizedIndexMultipleLogGroups indexes multiple log groups using the best strategy
- func (to *ThroughputOptimizer) OptimizedIndexMultipleLogGroups(ctx context.Context, basePaths []string) (*OptimizedIndexingResult, error) {
- start := time.Now()
-
- logger.Infof("🚀 Starting optimized indexing for %d log groups", len(basePaths))
-
- var progressConfig *ProgressConfig
- if to.config.EnableProgressTracking {
- progressConfig = &ProgressConfig{
- NotifyInterval: 5 * time.Second,
- }
- }
- var docsCountMap map[string]uint64
- var minTime, maxTime *time.Time
- var err error
- if to.config.UseRotationScanning && len(basePaths) > 1 {
- // Use rotation scanning for multiple log groups
- logger.Info("📊 Using rotation scanning strategy for optimal throughput")
- docsCountMap, minTime, maxTime, err = to.indexer.IndexLogGroupWithRotationScanning(basePaths, progressConfig)
- } else {
- // Fall back to traditional method for single group or if scanning disabled
- logger.Info("📁 Using traditional indexing strategy")
- docsCountMap = make(map[string]uint64)
-
- for _, basePath := range basePaths {
- groupDocs, groupMin, groupMax, groupErr := to.indexer.IndexLogGroup(basePath)
- if groupErr != nil {
- logger.Warnf("Failed to index log group %s: %v", basePath, groupErr)
- continue
- }
-
- // Merge results
- for path, count := range groupDocs {
- docsCountMap[path] = count
- }
-
- // Update time range
- if groupMin != nil && (minTime == nil || groupMin.Before(*minTime)) {
- minTime = groupMin
- }
- if groupMax != nil && (maxTime == nil || groupMax.After(*maxTime)) {
- maxTime = groupMax
- }
- }
- }
- if err != nil {
- return nil, fmt.Errorf("failed to index log groups: %w", err)
- }
- // Calculate results
- totalFiles := len(docsCountMap)
- totalDocuments := sumDocCounts(docsCountMap)
- duration := time.Since(start)
-
- result := &OptimizedIndexingResult{
- TotalFiles: totalFiles,
- TotalDocuments: totalDocuments,
- ProcessingDuration: duration,
- MinTimestamp: minTime,
- MaxTimestamp: maxTime,
- FileCounts: docsCountMap,
- ThroughputDocsPerSec: float64(totalDocuments) / duration.Seconds(),
- Strategy: getStrategyName(to.config.UseRotationScanning, len(basePaths)),
- }
- logger.Infof("🎉 Optimized indexing completed: %d files, %d documents in %v (%.0f docs/sec)",
- totalFiles, totalDocuments, duration, result.ThroughputDocsPerSec)
- return result, nil
- }
- // OptimizedIndexingResult contains the results of optimized indexing
- type OptimizedIndexingResult struct {
- TotalFiles int `json:"total_files"`
- TotalDocuments uint64 `json:"total_documents"`
- ProcessingDuration time.Duration `json:"processing_duration"`
- MinTimestamp *time.Time `json:"min_timestamp,omitempty"`
- MaxTimestamp *time.Time `json:"max_timestamp,omitempty"`
- FileCounts map[string]uint64 `json:"file_counts"`
- ThroughputDocsPerSec float64 `json:"throughput_docs_per_sec"`
- Strategy string `json:"strategy"`
- }
- // GetRotationScanStats returns statistics from the rotation scanner
- func (to *ThroughputOptimizer) GetRotationScanStats() map[string]interface{} {
- if to.indexer.rotationScanner == nil {
- return map[string]interface{}{
- "enabled": false,
- "message": "Rotation scanner not initialized",
- }
- }
- scanResults := to.indexer.rotationScanner.GetScanResults()
- queueSize := to.indexer.rotationScanner.GetQueueSize()
- isScanning := to.indexer.rotationScanner.IsScanning()
- totalFiles := 0
- var totalSize int64
- var totalEstimatedLines int64
- for _, result := range scanResults {
- totalFiles += result.TotalFiles
- totalSize += result.TotalSize
- totalEstimatedLines += result.EstimatedLines
- }
- return map[string]interface{}{
- "enabled": true,
- "log_groups_scanned": len(scanResults),
- "total_files_discovered": totalFiles,
- "total_size_bytes": totalSize,
- "total_estimated_lines": totalEstimatedLines,
- "queue_size": queueSize,
- "scanning_in_progress": isScanning,
- "scan_results": scanResults,
- }
- }
- // OptimizeIndexerConfig optimizes indexer configuration based on system resources
- func (to *ThroughputOptimizer) OptimizeIndexerConfig() *Config {
- currentConfig := to.indexer.config
-
- // Create optimized config based on current settings
- optimized := &Config{
- IndexPath: currentConfig.IndexPath,
- ShardCount: currentConfig.ShardCount,
- WorkerCount: currentConfig.WorkerCount,
- BatchSize: to.config.MaxBatchSize, // Use throughput-optimized batch size
- FlushInterval: currentConfig.FlushInterval,
- MaxQueueSize: to.config.MaxBatchSize * 15, // Larger queue for throughput
- EnableCompression: currentConfig.EnableCompression,
- MemoryQuota: currentConfig.MemoryQuota,
- MaxSegmentSize: currentConfig.MaxSegmentSize,
- OptimizeInterval: currentConfig.OptimizeInterval,
- EnableMetrics: currentConfig.EnableMetrics,
- }
- logger.Infof("📈 Generated optimized config: BatchSize=%d, MaxQueueSize=%d",
- optimized.BatchSize, optimized.MaxQueueSize)
- return optimized
- }
- // getStrategyName returns a human-readable strategy name
- func getStrategyName(useRotationScanning bool, numGroups int) string {
- if useRotationScanning && numGroups > 1 {
- return "rotation_scanning_parallel"
- } else if numGroups > 1 {
- return "traditional_parallel"
- } else {
- return "traditional_single"
- }
- }
|