throughput_optimizer.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/uozi-tech/cosy/logger"
  7. )
  8. // ThroughputOptimizer provides high-level APIs for optimized log indexing
  9. type ThroughputOptimizer struct {
  10. indexer *ParallelIndexer
  11. config *ThroughputOptimizerConfig
  12. }
  13. // ThroughputOptimizerConfig configures the throughput optimizer
  14. type ThroughputOptimizerConfig struct {
  15. UseRotationScanning bool `json:"use_rotation_scanning"`
  16. MaxBatchSize int `json:"max_batch_size"`
  17. TimeoutPerGroup time.Duration `json:"timeout_per_group"`
  18. EnableProgressTracking bool `json:"enable_progress_tracking"`
  19. }
  20. // NewThroughputOptimizer creates a new throughput optimizer
  21. func NewThroughputOptimizer(indexer *ParallelIndexer, config *ThroughputOptimizerConfig) *ThroughputOptimizer {
  22. if config == nil {
  23. config = DefaultThroughputOptimizerConfig()
  24. }
  25. return &ThroughputOptimizer{
  26. indexer: indexer,
  27. config: config,
  28. }
  29. }
  30. // DefaultThroughputOptimizerConfig returns default configuration
  31. func DefaultThroughputOptimizerConfig() *ThroughputOptimizerConfig {
  32. return &ThroughputOptimizerConfig{
  33. UseRotationScanning: true,
  34. MaxBatchSize: 25000,
  35. TimeoutPerGroup: 10 * time.Minute,
  36. EnableProgressTracking: true,
  37. }
  38. }
  39. // OptimizedIndexMultipleLogGroups indexes multiple log groups using the best strategy
  40. func (to *ThroughputOptimizer) OptimizedIndexMultipleLogGroups(ctx context.Context, basePaths []string) (*OptimizedIndexingResult, error) {
  41. start := time.Now()
  42. logger.Infof("🚀 Starting optimized indexing for %d log groups", len(basePaths))
  43. var progressConfig *ProgressConfig
  44. if to.config.EnableProgressTracking {
  45. progressConfig = &ProgressConfig{
  46. NotifyInterval: 5 * time.Second,
  47. }
  48. }
  49. var docsCountMap map[string]uint64
  50. var minTime, maxTime *time.Time
  51. var err error
  52. if to.config.UseRotationScanning && len(basePaths) > 1 {
  53. // Use rotation scanning for multiple log groups
  54. logger.Info("📊 Using rotation scanning strategy for optimal throughput")
  55. docsCountMap, minTime, maxTime, err = to.indexer.IndexLogGroupWithRotationScanning(basePaths, progressConfig)
  56. } else {
  57. // Fall back to traditional method for single group or if scanning disabled
  58. logger.Info("📁 Using traditional indexing strategy")
  59. docsCountMap = make(map[string]uint64)
  60. for _, basePath := range basePaths {
  61. groupDocs, groupMin, groupMax, groupErr := to.indexer.IndexLogGroup(basePath)
  62. if groupErr != nil {
  63. logger.Warnf("Failed to index log group %s: %v", basePath, groupErr)
  64. continue
  65. }
  66. // Merge results
  67. for path, count := range groupDocs {
  68. docsCountMap[path] = count
  69. }
  70. // Update time range
  71. if groupMin != nil && (minTime == nil || groupMin.Before(*minTime)) {
  72. minTime = groupMin
  73. }
  74. if groupMax != nil && (maxTime == nil || groupMax.After(*maxTime)) {
  75. maxTime = groupMax
  76. }
  77. }
  78. }
  79. if err != nil {
  80. return nil, fmt.Errorf("failed to index log groups: %w", err)
  81. }
  82. // Calculate results
  83. totalFiles := len(docsCountMap)
  84. totalDocuments := sumDocCounts(docsCountMap)
  85. duration := time.Since(start)
  86. result := &OptimizedIndexingResult{
  87. TotalFiles: totalFiles,
  88. TotalDocuments: totalDocuments,
  89. ProcessingDuration: duration,
  90. MinTimestamp: minTime,
  91. MaxTimestamp: maxTime,
  92. FileCounts: docsCountMap,
  93. ThroughputDocsPerSec: float64(totalDocuments) / duration.Seconds(),
  94. Strategy: getStrategyName(to.config.UseRotationScanning, len(basePaths)),
  95. }
  96. logger.Infof("🎉 Optimized indexing completed: %d files, %d documents in %v (%.0f docs/sec)",
  97. totalFiles, totalDocuments, duration, result.ThroughputDocsPerSec)
  98. return result, nil
  99. }
  100. // OptimizedIndexingResult contains the results of optimized indexing
  101. type OptimizedIndexingResult struct {
  102. TotalFiles int `json:"total_files"`
  103. TotalDocuments uint64 `json:"total_documents"`
  104. ProcessingDuration time.Duration `json:"processing_duration"`
  105. MinTimestamp *time.Time `json:"min_timestamp,omitempty"`
  106. MaxTimestamp *time.Time `json:"max_timestamp,omitempty"`
  107. FileCounts map[string]uint64 `json:"file_counts"`
  108. ThroughputDocsPerSec float64 `json:"throughput_docs_per_sec"`
  109. Strategy string `json:"strategy"`
  110. }
  111. // GetRotationScanStats returns statistics from the rotation scanner
  112. func (to *ThroughputOptimizer) GetRotationScanStats() map[string]interface{} {
  113. if to.indexer.rotationScanner == nil {
  114. return map[string]interface{}{
  115. "enabled": false,
  116. "message": "Rotation scanner not initialized",
  117. }
  118. }
  119. scanResults := to.indexer.rotationScanner.GetScanResults()
  120. queueSize := to.indexer.rotationScanner.GetQueueSize()
  121. isScanning := to.indexer.rotationScanner.IsScanning()
  122. totalFiles := 0
  123. var totalSize int64
  124. var totalEstimatedLines int64
  125. for _, result := range scanResults {
  126. totalFiles += result.TotalFiles
  127. totalSize += result.TotalSize
  128. totalEstimatedLines += result.EstimatedLines
  129. }
  130. return map[string]interface{}{
  131. "enabled": true,
  132. "log_groups_scanned": len(scanResults),
  133. "total_files_discovered": totalFiles,
  134. "total_size_bytes": totalSize,
  135. "total_estimated_lines": totalEstimatedLines,
  136. "queue_size": queueSize,
  137. "scanning_in_progress": isScanning,
  138. "scan_results": scanResults,
  139. }
  140. }
  141. // OptimizeIndexerConfig optimizes indexer configuration based on system resources
  142. func (to *ThroughputOptimizer) OptimizeIndexerConfig() *Config {
  143. currentConfig := to.indexer.config
  144. // Create optimized config based on current settings
  145. optimized := &Config{
  146. IndexPath: currentConfig.IndexPath,
  147. ShardCount: currentConfig.ShardCount,
  148. WorkerCount: currentConfig.WorkerCount,
  149. BatchSize: to.config.MaxBatchSize, // Use throughput-optimized batch size
  150. FlushInterval: currentConfig.FlushInterval,
  151. MaxQueueSize: to.config.MaxBatchSize * 15, // Larger queue for throughput
  152. EnableCompression: currentConfig.EnableCompression,
  153. MemoryQuota: currentConfig.MemoryQuota,
  154. MaxSegmentSize: currentConfig.MaxSegmentSize,
  155. OptimizeInterval: currentConfig.OptimizeInterval,
  156. EnableMetrics: currentConfig.EnableMetrics,
  157. }
  158. logger.Infof("📈 Generated optimized config: BatchSize=%d, MaxQueueSize=%d",
  159. optimized.BatchSize, optimized.MaxQueueSize)
  160. return optimized
  161. }
  162. // getStrategyName returns a human-readable strategy name
  163. func getStrategyName(useRotationScanning bool, numGroups int) string {
  164. if useRotationScanning && numGroups > 1 {
  165. return "rotation_scanning_parallel"
  166. } else if numGroups > 1 {
  167. return "traditional_parallel"
  168. } else {
  169. return "traditional_single"
  170. }
  171. }