adaptive_optimization.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // AdaptiveOptimizer provides intelligent batch size adjustment and CPU monitoring
  12. type AdaptiveOptimizer struct {
  13. config *Config
  14. cpuMonitor *CPUMonitor
  15. batchSizeController *BatchSizeController
  16. performanceHistory *PerformanceHistory
  17. // State
  18. running int32
  19. ctx context.Context
  20. cancel context.CancelFunc
  21. wg sync.WaitGroup
  22. // Metrics
  23. optimizationsMade int64
  24. avgThroughput float64
  25. avgLatency time.Duration
  26. metricsMutex sync.RWMutex
  27. }
  28. // CPUMonitor monitors CPU utilization and suggests worker adjustments
  29. type CPUMonitor struct {
  30. targetUtilization float64
  31. measurementInterval time.Duration
  32. adjustmentThreshold float64
  33. maxWorkers int
  34. minWorkers int
  35. currentUtilization float64
  36. measurements []float64
  37. measurementsMutex sync.RWMutex
  38. }
  39. // BatchSizeController dynamically adjusts batch sizes based on performance metrics
  40. type BatchSizeController struct {
  41. baseBatchSize int
  42. minBatchSize int
  43. maxBatchSize int
  44. adjustmentFactor float64
  45. currentBatchSize int32
  46. latencyThreshold time.Duration
  47. throughputTarget float64
  48. adjustmentHistory []BatchAdjustment
  49. historyMutex sync.RWMutex
  50. }
  51. // PerformanceHistory tracks performance metrics for optimization decisions
  52. type PerformanceHistory struct {
  53. samples []PerformanceSample
  54. maxSamples int
  55. mutex sync.RWMutex
  56. movingAvgWindow int
  57. avgThroughput float64
  58. avgLatency time.Duration
  59. }
  60. // PerformanceSample represents a single performance measurement
  61. type PerformanceSample struct {
  62. Timestamp time.Time `json:"timestamp"`
  63. Throughput float64 `json:"throughput"`
  64. Latency time.Duration `json:"latency"`
  65. CPUUsage float64 `json:"cpu_usage"`
  66. BatchSize int `json:"batch_size"`
  67. WorkerCount int `json:"worker_count"`
  68. }
  69. // BatchAdjustment represents a batch size adjustment decision
  70. type BatchAdjustment struct {
  71. Timestamp time.Time `json:"timestamp"`
  72. OldBatchSize int `json:"old_batch_size"`
  73. NewBatchSize int `json:"new_batch_size"`
  74. Reason string `json:"reason"`
  75. ThroughputImpact float64 `json:"throughput_impact"`
  76. }
  77. // NewAdaptiveOptimizer creates a new adaptive optimizer
  78. func NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer {
  79. ctx, cancel := context.WithCancel(context.Background())
  80. return &AdaptiveOptimizer{
  81. config: config,
  82. cpuMonitor: &CPUMonitor{
  83. targetUtilization: 0.80, // Target 80% CPU utilization
  84. measurementInterval: 5 * time.Second,
  85. adjustmentThreshold: 0.15, // Adjust if 15% deviation from target
  86. maxWorkers: runtime.NumCPU() * 3,
  87. minWorkers: max(2, runtime.NumCPU()/2),
  88. measurements: make([]float64, 0, 12), // 1 minute history at 5s intervals
  89. },
  90. batchSizeController: &BatchSizeController{
  91. baseBatchSize: config.BatchSize,
  92. minBatchSize: max(100, config.BatchSize/4),
  93. maxBatchSize: config.BatchSize * 3,
  94. adjustmentFactor: 0.2, // 20% adjustment steps
  95. currentBatchSize: int32(config.BatchSize),
  96. latencyThreshold: 5 * time.Second,
  97. throughputTarget: 25.0, // Target 25 MB/s
  98. },
  99. performanceHistory: &PerformanceHistory{
  100. samples: make([]PerformanceSample, 0, 120), // 2 minutes of 1s samples
  101. maxSamples: 120,
  102. movingAvgWindow: 12, // 12-sample moving average
  103. },
  104. ctx: ctx,
  105. cancel: cancel,
  106. }
  107. }
  108. // Start begins the adaptive optimization process
  109. func (ao *AdaptiveOptimizer) Start() error {
  110. if !atomic.CompareAndSwapInt32(&ao.running, 0, 1) {
  111. logger.Error("Adaptive optimizer already running")
  112. return fmt.Errorf("adaptive optimizer already running")
  113. }
  114. // Start CPU monitoring
  115. ao.wg.Add(1)
  116. go ao.cpuMonitoringLoop()
  117. // Start batch size optimization
  118. ao.wg.Add(1)
  119. go ao.batchOptimizationLoop()
  120. // Start performance tracking
  121. ao.wg.Add(1)
  122. go ao.performanceTrackingLoop()
  123. logger.Info("Adaptive optimizer started")
  124. return nil
  125. }
  126. // Stop halts the adaptive optimization process
  127. func (ao *AdaptiveOptimizer) Stop() {
  128. if !atomic.CompareAndSwapInt32(&ao.running, 1, 0) {
  129. return
  130. }
  131. ao.cancel()
  132. ao.wg.Wait()
  133. logger.Info("Adaptive optimizer stopped")
  134. }
  135. // cpuMonitoringLoop continuously monitors CPU utilization
  136. func (ao *AdaptiveOptimizer) cpuMonitoringLoop() {
  137. defer ao.wg.Done()
  138. ticker := time.NewTicker(ao.cpuMonitor.measurementInterval)
  139. defer ticker.Stop()
  140. for {
  141. select {
  142. case <-ticker.C:
  143. ao.measureAndAdjustCPU()
  144. case <-ao.ctx.Done():
  145. return
  146. }
  147. }
  148. }
  149. // measureAndAdjustCPU measures current CPU utilization and suggests adjustments
  150. func (ao *AdaptiveOptimizer) measureAndAdjustCPU() {
  151. // Get current CPU utilization
  152. cpuUsage := ao.getCurrentCPUUtilization()
  153. ao.cpuMonitor.measurementsMutex.Lock()
  154. ao.cpuMonitor.measurements = append(ao.cpuMonitor.measurements, cpuUsage)
  155. if len(ao.cpuMonitor.measurements) > cap(ao.cpuMonitor.measurements) {
  156. // Remove oldest measurement
  157. ao.cpuMonitor.measurements = ao.cpuMonitor.measurements[1:]
  158. }
  159. ao.cpuMonitor.currentUtilization = cpuUsage
  160. ao.cpuMonitor.measurementsMutex.Unlock()
  161. // Calculate average CPU utilization
  162. ao.cpuMonitor.measurementsMutex.RLock()
  163. avgCPU := ao.calculateAverageCPU()
  164. ao.cpuMonitor.measurementsMutex.RUnlock()
  165. // Determine if adjustment is needed
  166. targetCPU := ao.cpuMonitor.targetUtilization
  167. if avgCPU < targetCPU-ao.cpuMonitor.adjustmentThreshold {
  168. // CPU underutilized - suggest increasing workers
  169. ao.suggestWorkerIncrease(avgCPU, targetCPU)
  170. } else if avgCPU > targetCPU+ao.cpuMonitor.adjustmentThreshold {
  171. // CPU over-utilized - suggest decreasing workers
  172. ao.suggestWorkerDecrease(avgCPU, targetCPU)
  173. }
  174. }
  175. // batchOptimizationLoop continuously optimizes batch sizes
  176. func (ao *AdaptiveOptimizer) batchOptimizationLoop() {
  177. defer ao.wg.Done()
  178. ticker := time.NewTicker(10 * time.Second) // Adjust batch size every 10 seconds
  179. defer ticker.Stop()
  180. for {
  181. select {
  182. case <-ticker.C:
  183. ao.optimizeBatchSize()
  184. case <-ao.ctx.Done():
  185. return
  186. }
  187. }
  188. }
  189. // optimizeBatchSize analyzes performance and adjusts batch size
  190. func (ao *AdaptiveOptimizer) optimizeBatchSize() {
  191. ao.performanceHistory.mutex.RLock()
  192. if len(ao.performanceHistory.samples) < 5 {
  193. ao.performanceHistory.mutex.RUnlock()
  194. return // Not enough data
  195. }
  196. recentSamples := ao.performanceHistory.samples[max(0, len(ao.performanceHistory.samples)-5):]
  197. avgThroughput := ao.calculateAverageThroughput(recentSamples)
  198. avgLatency := ao.calculateAverageLatency(recentSamples)
  199. ao.performanceHistory.mutex.RUnlock()
  200. currentBatchSize := int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
  201. newBatchSize := ao.calculateOptimalBatchSize(avgThroughput, avgLatency, currentBatchSize)
  202. if newBatchSize != currentBatchSize {
  203. ao.adjustBatchSize(currentBatchSize, newBatchSize, avgThroughput, avgLatency)
  204. atomic.AddInt64(&ao.optimizationsMade, 1)
  205. }
  206. }
  207. // calculateOptimalBatchSize determines the optimal batch size based on current performance
  208. func (ao *AdaptiveOptimizer) calculateOptimalBatchSize(throughput float64, latency time.Duration, currentBatch int) int {
  209. controller := ao.batchSizeController
  210. // If latency is too high, reduce batch size
  211. if latency > controller.latencyThreshold {
  212. reduction := int(float64(currentBatch) * controller.adjustmentFactor)
  213. newSize := currentBatch - max(50, reduction)
  214. return max(controller.minBatchSize, newSize)
  215. }
  216. // If throughput is below target and latency is acceptable, increase batch size
  217. if throughput < controller.throughputTarget && latency < controller.latencyThreshold/2 {
  218. increase := int(float64(currentBatch) * controller.adjustmentFactor)
  219. newSize := currentBatch + max(100, increase)
  220. return min(controller.maxBatchSize, newSize)
  221. }
  222. // Current batch size seems optimal
  223. return currentBatch
  224. }
  225. // adjustBatchSize applies the batch size adjustment
  226. func (ao *AdaptiveOptimizer) adjustBatchSize(oldSize, newSize int, throughput float64, latency time.Duration) {
  227. atomic.StoreInt32(&ao.batchSizeController.currentBatchSize, int32(newSize))
  228. var reason string
  229. if newSize > oldSize {
  230. reason = "Increasing batch size to improve throughput"
  231. } else {
  232. reason = "Reducing batch size to improve latency"
  233. }
  234. // Record adjustment
  235. adjustment := BatchAdjustment{
  236. Timestamp: time.Now(),
  237. OldBatchSize: oldSize,
  238. NewBatchSize: newSize,
  239. Reason: reason,
  240. ThroughputImpact: throughput,
  241. }
  242. ao.batchSizeController.historyMutex.Lock()
  243. ao.batchSizeController.adjustmentHistory = append(ao.batchSizeController.adjustmentHistory, adjustment)
  244. if len(ao.batchSizeController.adjustmentHistory) > 50 {
  245. // Keep only recent 50 adjustments
  246. ao.batchSizeController.adjustmentHistory = ao.batchSizeController.adjustmentHistory[1:]
  247. }
  248. ao.batchSizeController.historyMutex.Unlock()
  249. logger.Infof("Batch size adjusted: old_size=%d, new_size=%d, reason=%s", oldSize, newSize, reason)
  250. }
  251. // performanceTrackingLoop continuously tracks performance metrics
  252. func (ao *AdaptiveOptimizer) performanceTrackingLoop() {
  253. defer ao.wg.Done()
  254. ticker := time.NewTicker(1 * time.Second) // Sample every second
  255. defer ticker.Stop()
  256. for {
  257. select {
  258. case <-ticker.C:
  259. ao.recordPerformanceSample()
  260. case <-ao.ctx.Done():
  261. return
  262. }
  263. }
  264. }
  265. // recordPerformanceSample records current performance metrics
  266. func (ao *AdaptiveOptimizer) recordPerformanceSample() {
  267. sample := PerformanceSample{
  268. Timestamp: time.Now(),
  269. Throughput: ao.getCurrentThroughput(),
  270. Latency: ao.getCurrentLatency(),
  271. CPUUsage: ao.cpuMonitor.currentUtilization,
  272. BatchSize: int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
  273. WorkerCount: ao.config.WorkerCount,
  274. }
  275. ao.performanceHistory.mutex.Lock()
  276. ao.performanceHistory.samples = append(ao.performanceHistory.samples, sample)
  277. if len(ao.performanceHistory.samples) > ao.performanceHistory.maxSamples {
  278. // Remove oldest sample
  279. ao.performanceHistory.samples = ao.performanceHistory.samples[1:]
  280. }
  281. // Update moving averages
  282. ao.updateMovingAverages()
  283. ao.performanceHistory.mutex.Unlock()
  284. }
  285. // GetOptimalBatchSize returns the current optimal batch size
  286. func (ao *AdaptiveOptimizer) GetOptimalBatchSize() int {
  287. return int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
  288. }
  289. // GetCPUUtilization returns the current CPU utilization
  290. func (ao *AdaptiveOptimizer) GetCPUUtilization() float64 {
  291. ao.cpuMonitor.measurementsMutex.RLock()
  292. defer ao.cpuMonitor.measurementsMutex.RUnlock()
  293. return ao.cpuMonitor.currentUtilization
  294. }
  295. // GetOptimizationStats returns current optimization statistics
  296. func (ao *AdaptiveOptimizer) GetOptimizationStats() AdaptiveOptimizationStats {
  297. ao.metricsMutex.RLock()
  298. defer ao.metricsMutex.RUnlock()
  299. return AdaptiveOptimizationStats{
  300. OptimizationsMade: atomic.LoadInt64(&ao.optimizationsMade),
  301. CurrentBatchSize: int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
  302. AvgThroughput: ao.avgThroughput,
  303. AvgLatency: ao.avgLatency,
  304. CPUUtilization: ao.cpuMonitor.currentUtilization,
  305. }
  306. }
  307. // AdaptiveOptimizationStats represents current optimization statistics
  308. type AdaptiveOptimizationStats struct {
  309. OptimizationsMade int64 `json:"optimizations_made"`
  310. CurrentBatchSize int `json:"current_batch_size"`
  311. AvgThroughput float64 `json:"avg_throughput"`
  312. AvgLatency time.Duration `json:"avg_latency"`
  313. CPUUtilization float64 `json:"cpu_utilization"`
  314. }
  315. // Helper functions
  316. func (ao *AdaptiveOptimizer) getCurrentCPUUtilization() float64 {
  317. // This is a simplified implementation
  318. // In production, you'd use a proper CPU monitoring library
  319. runtime.GC()
  320. var m runtime.MemStats
  321. runtime.ReadMemStats(&m)
  322. // Approximate CPU usage based on GC activity and goroutines
  323. numGoroutines := float64(runtime.NumGoroutine())
  324. numCPU := float64(runtime.NumCPU())
  325. // Simple heuristic: more goroutines = higher CPU usage
  326. utilization := numGoroutines / (numCPU * 10)
  327. if utilization > 0.95 {
  328. utilization = 0.95
  329. }
  330. return utilization
  331. }
  332. func (ao *AdaptiveOptimizer) getCurrentThroughput() float64 {
  333. // This would be implemented to get actual throughput from the indexer
  334. ao.metricsMutex.RLock()
  335. defer ao.metricsMutex.RUnlock()
  336. return ao.avgThroughput
  337. }
  338. func (ao *AdaptiveOptimizer) getCurrentLatency() time.Duration {
  339. // This would be implemented to get actual latency from the indexer
  340. ao.metricsMutex.RLock()
  341. defer ao.metricsMutex.RUnlock()
  342. return ao.avgLatency
  343. }
  344. func (ao *AdaptiveOptimizer) calculateAverageCPU() float64 {
  345. if len(ao.cpuMonitor.measurements) == 0 {
  346. return 0
  347. }
  348. sum := 0.0
  349. for _, cpu := range ao.cpuMonitor.measurements {
  350. sum += cpu
  351. }
  352. return sum / float64(len(ao.cpuMonitor.measurements))
  353. }
  354. func (ao *AdaptiveOptimizer) calculateAverageThroughput(samples []PerformanceSample) float64 {
  355. if len(samples) == 0 {
  356. return 0
  357. }
  358. sum := 0.0
  359. for _, sample := range samples {
  360. sum += sample.Throughput
  361. }
  362. return sum / float64(len(samples))
  363. }
  364. func (ao *AdaptiveOptimizer) calculateAverageLatency(samples []PerformanceSample) time.Duration {
  365. if len(samples) == 0 {
  366. return 0
  367. }
  368. var sum time.Duration
  369. for _, sample := range samples {
  370. sum += sample.Latency
  371. }
  372. return sum / time.Duration(len(samples))
  373. }
  374. func (ao *AdaptiveOptimizer) updateMovingAverages() {
  375. if len(ao.performanceHistory.samples) == 0 {
  376. return
  377. }
  378. windowSize := min(ao.performanceHistory.movingAvgWindow, len(ao.performanceHistory.samples))
  379. recentSamples := ao.performanceHistory.samples[len(ao.performanceHistory.samples)-windowSize:]
  380. ao.avgThroughput = ao.calculateAverageThroughput(recentSamples)
  381. ao.avgLatency = ao.calculateAverageLatency(recentSamples)
  382. }
  383. func (ao *AdaptiveOptimizer) suggestWorkerIncrease(currentCPU, targetCPU float64) {
  384. logger.Debug("CPU underutilized, consider increasing workers",
  385. "current_cpu", currentCPU, "target_cpu", targetCPU)
  386. }
  387. func (ao *AdaptiveOptimizer) suggestWorkerDecrease(currentCPU, targetCPU float64) {
  388. logger.Debug("CPU over-utilized, consider decreasing workers",
  389. "current_cpu", currentCPU, "target_cpu", targetCPU)
  390. }
  391. // Utility functions
  392. func max(a, b int) int {
  393. if a > b {
  394. return a
  395. }
  396. return b
  397. }
  398. func min(a, b int) int {
  399. if a < b {
  400. return a
  401. }
  402. return b
  403. }