1
0

adaptive_optimization.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/shirou/gopsutil/v4/cpu"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // IndexerActivityPoller defines an interface to check if the indexer is busy.
  13. type IndexerActivityPoller interface {
  14. IsBusy() bool
  15. }
  16. // AdaptiveOptimizer provides intelligent batch size adjustment and CPU monitoring
  17. type AdaptiveOptimizer struct {
  18. config *Config
  19. cpuMonitor *CPUMonitor
  20. batchSizeController *BatchSizeController
  21. performanceHistory *PerformanceHistory
  22. // State
  23. running int32
  24. ctx context.Context
  25. cancel context.CancelFunc
  26. wg sync.WaitGroup
  27. // Metrics
  28. optimizationsMade int64
  29. avgThroughput float64
  30. avgLatency time.Duration
  31. metricsMutex sync.RWMutex
  32. // Callbacks
  33. onWorkerCountChange func(oldCount, newCount int)
  34. // Activity Poller
  35. activityPoller IndexerActivityPoller
  36. // Concurrency-safe mirror of worker count
  37. workerCount int64
  38. }
  39. // CPUMonitor monitors CPU utilization and suggests worker adjustments
  40. type CPUMonitor struct {
  41. targetUtilization float64
  42. measurementInterval time.Duration
  43. adjustmentThreshold float64
  44. maxWorkers int
  45. minWorkers int
  46. currentUtilization float64
  47. measurements []float64
  48. measurementsMutex sync.RWMutex
  49. maxSamples int
  50. lastValidUtilization float64
  51. }
  52. // BatchSizeController dynamically adjusts batch sizes based on performance metrics
  53. type BatchSizeController struct {
  54. baseBatchSize int
  55. minBatchSize int
  56. maxBatchSize int
  57. adjustmentFactor float64
  58. currentBatchSize int32
  59. latencyThreshold time.Duration
  60. throughputTarget float64
  61. adjustmentHistory []BatchAdjustment
  62. historyMutex sync.RWMutex
  63. }
  64. // PerformanceHistory tracks performance metrics for optimization decisions
  65. type PerformanceHistory struct {
  66. samples []PerformanceSample
  67. maxSamples int
  68. mutex sync.RWMutex
  69. movingAvgWindow int
  70. }
  71. // PerformanceSample represents a single performance measurement
  72. type PerformanceSample struct {
  73. Timestamp time.Time `json:"timestamp"`
  74. Throughput float64 `json:"throughput"`
  75. Latency time.Duration `json:"latency"`
  76. CPUUsage float64 `json:"cpu_usage"`
  77. BatchSize int `json:"batch_size"`
  78. WorkerCount int `json:"worker_count"`
  79. }
  80. // BatchAdjustment represents a batch size adjustment decision
  81. type BatchAdjustment struct {
  82. Timestamp time.Time `json:"timestamp"`
  83. OldBatchSize int `json:"old_batch_size"`
  84. NewBatchSize int `json:"new_batch_size"`
  85. Reason string `json:"reason"`
  86. ThroughputImpact float64 `json:"throughput_impact"`
  87. }
  88. // NewAdaptiveOptimizer creates a new adaptive optimizer
  89. func NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer {
  90. ctx, cancel := context.WithCancel(context.Background())
  91. ao := &AdaptiveOptimizer{
  92. config: config,
  93. cpuMonitor: &CPUMonitor{
  94. targetUtilization: 0.75, // Target 75% CPU utilization (more conservative)
  95. measurementInterval: 5 * time.Second,
  96. adjustmentThreshold: 0.10, // Adjust if 10% deviation from target (more sensitive)
  97. maxWorkers: runtime.GOMAXPROCS(0) * 6, // Allow scaling up to 6x CPU cores for I/O-bound workloads
  98. minWorkers: max(2, runtime.GOMAXPROCS(0)/4), // Minimum 2 workers or 1/4 of cores for baseline performance
  99. measurements: make([]float64, 0, 12), // 1 minute history at 5s intervals
  100. maxSamples: 12,
  101. },
  102. batchSizeController: &BatchSizeController{
  103. baseBatchSize: config.BatchSize,
  104. minBatchSize: max(500, config.BatchSize/6), // Higher minimum for throughput
  105. maxBatchSize: config.BatchSize * 6, // Increased to 6x for maximum throughput
  106. adjustmentFactor: 0.25, // 25% adjustment steps for faster scaling
  107. currentBatchSize: int32(config.BatchSize),
  108. latencyThreshold: 8 * time.Second, // Higher latency tolerance for throughput
  109. throughputTarget: 50.0, // Target 50 MB/s - higher throughput target
  110. },
  111. performanceHistory: &PerformanceHistory{
  112. samples: make([]PerformanceSample, 0, 120), // 2 minutes of 1s samples
  113. maxSamples: 120,
  114. movingAvgWindow: 12, // 12-sample moving average
  115. },
  116. ctx: ctx,
  117. cancel: cancel,
  118. }
  119. // Log initialization parameters for debugging
  120. logger.Infof("Adaptive optimizer initialized: workers=[%d, %d, %d] (min, current, max), target_cpu=%.1f%%, threshold=%.1f%%",
  121. ao.cpuMonitor.minWorkers, config.WorkerCount, ao.cpuMonitor.maxWorkers,
  122. ao.cpuMonitor.targetUtilization*100, ao.cpuMonitor.adjustmentThreshold*100)
  123. // Initialize atomic mirror of worker count
  124. atomic.StoreInt64(&ao.workerCount, int64(config.WorkerCount))
  125. return ao
  126. }
  127. // Start begins the adaptive optimization process
  128. func (ao *AdaptiveOptimizer) Start() error {
  129. if !atomic.CompareAndSwapInt32(&ao.running, 0, 1) {
  130. logger.Error("Adaptive optimizer already running")
  131. return fmt.Errorf("adaptive optimizer already running")
  132. }
  133. // Start CPU monitoring
  134. ao.wg.Add(1)
  135. go ao.cpuMonitoringLoop()
  136. // Start batch size optimization
  137. ao.wg.Add(1)
  138. go ao.batchOptimizationLoop()
  139. // Start performance tracking
  140. ao.wg.Add(1)
  141. go ao.performanceTrackingLoop()
  142. logger.Info("Adaptive optimizer started")
  143. return nil
  144. }
  145. // Stop halts the adaptive optimization process
  146. func (ao *AdaptiveOptimizer) Stop() {
  147. if !atomic.CompareAndSwapInt32(&ao.running, 1, 0) {
  148. return
  149. }
  150. ao.cancel()
  151. ao.wg.Wait()
  152. logger.Info("Adaptive optimizer stopped")
  153. }
  154. // cpuMonitoringLoop continuously monitors CPU utilization
  155. func (ao *AdaptiveOptimizer) cpuMonitoringLoop() {
  156. defer ao.wg.Done()
  157. ticker := time.NewTicker(ao.cpuMonitor.measurementInterval)
  158. defer ticker.Stop()
  159. for {
  160. select {
  161. case <-ticker.C:
  162. ao.measureAndAdjustCPU()
  163. case <-ao.ctx.Done():
  164. return
  165. }
  166. }
  167. }
  168. // measureAndAdjustCPU measures current CPU utilization and suggests adjustments
  169. func (ao *AdaptiveOptimizer) measureAndAdjustCPU() {
  170. // Only adjust when indexer is actively processing
  171. if !ao.isIndexerBusy() {
  172. return
  173. }
  174. // Get current CPU utilization
  175. cpuUsage := ao.getCurrentCPUUtilization()
  176. ao.cpuMonitor.measurementsMutex.Lock()
  177. // Filter spurious near-zero values using last valid utilization
  178. if cpuUsage < 0.005 && ao.cpuMonitor.lastValidUtilization > 0 {
  179. cpuUsage = ao.cpuMonitor.lastValidUtilization
  180. }
  181. // Maintain fixed-size window to avoid capacity growth
  182. if ao.cpuMonitor.maxSamples > 0 && len(ao.cpuMonitor.measurements) == ao.cpuMonitor.maxSamples {
  183. copy(ao.cpuMonitor.measurements, ao.cpuMonitor.measurements[1:])
  184. ao.cpuMonitor.measurements[len(ao.cpuMonitor.measurements)-1] = cpuUsage
  185. } else {
  186. ao.cpuMonitor.measurements = append(ao.cpuMonitor.measurements, cpuUsage)
  187. }
  188. ao.cpuMonitor.currentUtilization = cpuUsage
  189. if cpuUsage >= 0.005 {
  190. ao.cpuMonitor.lastValidUtilization = cpuUsage
  191. }
  192. ao.cpuMonitor.measurementsMutex.Unlock()
  193. // Calculate average CPU utilization
  194. ao.cpuMonitor.measurementsMutex.RLock()
  195. avgCPU := ao.calculateAverageCPU()
  196. ao.cpuMonitor.measurementsMutex.RUnlock()
  197. // Determine if adjustment is needed
  198. targetCPU := ao.cpuMonitor.targetUtilization
  199. if avgCPU < targetCPU-ao.cpuMonitor.adjustmentThreshold {
  200. // CPU underutilized - suggest increasing workers
  201. ao.suggestWorkerIncrease(avgCPU, targetCPU)
  202. } else if avgCPU > targetCPU+ao.cpuMonitor.adjustmentThreshold {
  203. // CPU over-utilized - suggest decreasing workers
  204. ao.suggestWorkerDecrease(avgCPU, targetCPU)
  205. }
  206. }
  207. // batchOptimizationLoop continuously optimizes batch sizes
  208. func (ao *AdaptiveOptimizer) batchOptimizationLoop() {
  209. defer ao.wg.Done()
  210. ticker := time.NewTicker(10 * time.Second) // Adjust batch size every 10 seconds
  211. defer ticker.Stop()
  212. for {
  213. select {
  214. case <-ticker.C:
  215. ao.optimizeBatchSize()
  216. case <-ao.ctx.Done():
  217. return
  218. }
  219. }
  220. }
  221. // optimizeBatchSize analyzes performance and adjusts batch size
  222. func (ao *AdaptiveOptimizer) optimizeBatchSize() {
  223. // Only adjust when indexer is actively processing
  224. if !ao.isIndexerBusy() {
  225. return
  226. }
  227. ao.performanceHistory.mutex.RLock()
  228. if len(ao.performanceHistory.samples) < 5 {
  229. ao.performanceHistory.mutex.RUnlock()
  230. return // Not enough data
  231. }
  232. recentSamples := ao.performanceHistory.samples[max(0, len(ao.performanceHistory.samples)-5):]
  233. avgThroughput := ao.calculateAverageThroughput(recentSamples)
  234. avgLatency := ao.calculateAverageLatency(recentSamples)
  235. ao.performanceHistory.mutex.RUnlock()
  236. currentBatchSize := int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
  237. newBatchSize := ao.calculateOptimalBatchSize(avgThroughput, avgLatency, currentBatchSize)
  238. if newBatchSize != currentBatchSize {
  239. ao.adjustBatchSize(currentBatchSize, newBatchSize, avgThroughput, avgLatency)
  240. atomic.AddInt64(&ao.optimizationsMade, 1)
  241. }
  242. }
  243. // calculateOptimalBatchSize determines the optimal batch size based on current performance
  244. func (ao *AdaptiveOptimizer) calculateOptimalBatchSize(throughput float64, latency time.Duration, currentBatch int) int {
  245. controller := ao.batchSizeController
  246. // If latency is too high, reduce batch size
  247. if latency > controller.latencyThreshold {
  248. reduction := int(float64(currentBatch) * controller.adjustmentFactor)
  249. newSize := currentBatch - max(50, reduction)
  250. return max(controller.minBatchSize, newSize)
  251. }
  252. // If throughput is below target and latency is acceptable, increase batch size
  253. if throughput < controller.throughputTarget && latency < controller.latencyThreshold/2 {
  254. increase := int(float64(currentBatch) * controller.adjustmentFactor)
  255. newSize := currentBatch + max(100, increase)
  256. return min(controller.maxBatchSize, newSize)
  257. }
  258. // Current batch size seems optimal
  259. return currentBatch
  260. }
  261. // adjustBatchSize applies the batch size adjustment
  262. func (ao *AdaptiveOptimizer) adjustBatchSize(oldSize, newSize int, throughput float64, latency time.Duration) {
  263. // Only adjust when indexer is actively processing
  264. if !ao.isIndexerBusy() {
  265. return
  266. }
  267. atomic.StoreInt32(&ao.batchSizeController.currentBatchSize, int32(newSize))
  268. var reason string
  269. if newSize > oldSize {
  270. reason = "Increasing batch size to improve throughput"
  271. } else {
  272. reason = "Reducing batch size to improve latency"
  273. }
  274. // Record adjustment
  275. adjustment := BatchAdjustment{
  276. Timestamp: time.Now(),
  277. OldBatchSize: oldSize,
  278. NewBatchSize: newSize,
  279. Reason: reason,
  280. ThroughputImpact: throughput,
  281. }
  282. ao.batchSizeController.historyMutex.Lock()
  283. if len(ao.batchSizeController.adjustmentHistory) == 50 {
  284. copy(ao.batchSizeController.adjustmentHistory, ao.batchSizeController.adjustmentHistory[1:])
  285. ao.batchSizeController.adjustmentHistory[len(ao.batchSizeController.adjustmentHistory)-1] = adjustment
  286. } else {
  287. ao.batchSizeController.adjustmentHistory = append(ao.batchSizeController.adjustmentHistory, adjustment)
  288. }
  289. ao.batchSizeController.historyMutex.Unlock()
  290. logger.Debugf("Batch size adjusted: old_size=%d, new_size=%d, reason=%s", oldSize, newSize, reason)
  291. }
  292. // performanceTrackingLoop continuously tracks performance metrics
  293. func (ao *AdaptiveOptimizer) performanceTrackingLoop() {
  294. defer ao.wg.Done()
  295. ticker := time.NewTicker(1 * time.Second) // Sample every second
  296. defer ticker.Stop()
  297. for {
  298. select {
  299. case <-ticker.C:
  300. ao.recordPerformanceSample()
  301. case <-ao.ctx.Done():
  302. return
  303. }
  304. }
  305. }
  306. // recordPerformanceSample records current performance metrics
  307. func (ao *AdaptiveOptimizer) recordPerformanceSample() {
  308. sample := PerformanceSample{
  309. Timestamp: time.Now(),
  310. Throughput: ao.getCurrentThroughput(),
  311. Latency: ao.getCurrentLatency(),
  312. CPUUsage: ao.GetCPUUtilization(),
  313. BatchSize: int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
  314. WorkerCount: int(atomic.LoadInt64(&ao.workerCount)),
  315. }
  316. ao.performanceHistory.mutex.Lock()
  317. if ao.performanceHistory.maxSamples > 0 && len(ao.performanceHistory.samples) == ao.performanceHistory.maxSamples {
  318. copy(ao.performanceHistory.samples, ao.performanceHistory.samples[1:])
  319. ao.performanceHistory.samples[len(ao.performanceHistory.samples)-1] = sample
  320. } else {
  321. ao.performanceHistory.samples = append(ao.performanceHistory.samples, sample)
  322. }
  323. // Update moving averages
  324. ao.updateMovingAverages()
  325. ao.performanceHistory.mutex.Unlock()
  326. }
  327. // SetWorkerCountChangeCallback sets the callback function for worker count changes
  328. func (ao *AdaptiveOptimizer) SetWorkerCountChangeCallback(callback func(oldCount, newCount int)) {
  329. ao.onWorkerCountChange = callback
  330. }
  331. // SetActivityPoller sets the poller to check for indexer activity.
  332. func (ao *AdaptiveOptimizer) SetActivityPoller(poller IndexerActivityPoller) {
  333. ao.activityPoller = poller
  334. }
  335. // GetOptimalBatchSize returns the current optimal batch size
  336. func (ao *AdaptiveOptimizer) GetOptimalBatchSize() int {
  337. return int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
  338. }
  339. // GetCPUUtilization returns the current CPU utilization
  340. func (ao *AdaptiveOptimizer) GetCPUUtilization() float64 {
  341. ao.cpuMonitor.measurementsMutex.RLock()
  342. defer ao.cpuMonitor.measurementsMutex.RUnlock()
  343. return ao.cpuMonitor.currentUtilization
  344. }
  345. // GetOptimizationStats returns current optimization statistics
  346. func (ao *AdaptiveOptimizer) GetOptimizationStats() AdaptiveOptimizationStats {
  347. ao.metricsMutex.RLock()
  348. defer ao.metricsMutex.RUnlock()
  349. return AdaptiveOptimizationStats{
  350. OptimizationsMade: atomic.LoadInt64(&ao.optimizationsMade),
  351. CurrentBatchSize: int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
  352. AvgThroughput: ao.avgThroughput,
  353. AvgLatency: ao.avgLatency,
  354. CPUUtilization: ao.GetCPUUtilization(),
  355. }
  356. }
  357. // AdaptiveOptimizationStats represents current optimization statistics
  358. type AdaptiveOptimizationStats struct {
  359. OptimizationsMade int64 `json:"optimizations_made"`
  360. CurrentBatchSize int `json:"current_batch_size"`
  361. AvgThroughput float64 `json:"avg_throughput"`
  362. AvgLatency time.Duration `json:"avg_latency"`
  363. CPUUtilization float64 `json:"cpu_utilization"`
  364. }
  365. // Helper functions
  366. func (ao *AdaptiveOptimizer) getCurrentCPUUtilization() float64 {
  367. // Get CPU utilization since the last call.
  368. // Interval 0 means non-blocking and compares to the last measurement.
  369. // The first call will return 0.
  370. percentages, err := cpu.Percent(0, false)
  371. if err != nil || len(percentages) == 0 {
  372. logger.Warnf("Failed to get real CPU utilization, falling back to goroutine heuristic: %v", err)
  373. // Fallback to the old, less accurate method
  374. numGoroutines := float64(runtime.NumGoroutine())
  375. maxProcs := float64(runtime.GOMAXPROCS(0))
  376. // Simple heuristic: more goroutines = higher CPU usage
  377. utilization := numGoroutines / (maxProcs * 10)
  378. if utilization > 0.95 {
  379. utilization = 0.95
  380. }
  381. return utilization
  382. }
  383. // gopsutil returns a slice, for overall usage (percpu=false), it's the first element.
  384. // The value is a percentage (e.g., 8.3), so we convert it to a 0.0-1.0 scale for our calculations.
  385. return percentages[0] / 100.0
  386. }
  387. func (ao *AdaptiveOptimizer) getCurrentThroughput() float64 {
  388. ao.metricsMutex.RLock()
  389. v := ao.avgThroughput
  390. ao.metricsMutex.RUnlock()
  391. return v
  392. }
  393. func (ao *AdaptiveOptimizer) getCurrentLatency() time.Duration {
  394. ao.metricsMutex.RLock()
  395. v := ao.avgLatency
  396. ao.metricsMutex.RUnlock()
  397. return v
  398. }
  399. func (ao *AdaptiveOptimizer) calculateAverageCPU() float64 {
  400. if len(ao.cpuMonitor.measurements) == 0 {
  401. return 0
  402. }
  403. sum := 0.0
  404. for _, usage := range ao.cpuMonitor.measurements {
  405. sum += usage
  406. }
  407. return sum / float64(len(ao.cpuMonitor.measurements))
  408. }
  409. func (ao *AdaptiveOptimizer) calculateAverageThroughput(samples []PerformanceSample) float64 {
  410. if len(samples) == 0 {
  411. return 0
  412. }
  413. sum := 0.0
  414. for _, sample := range samples {
  415. sum += sample.Throughput
  416. }
  417. return sum / float64(len(samples))
  418. }
  419. func (ao *AdaptiveOptimizer) calculateAverageLatency(samples []PerformanceSample) time.Duration {
  420. if len(samples) == 0 {
  421. return 0
  422. }
  423. var sum time.Duration
  424. for _, sample := range samples {
  425. sum += sample.Latency
  426. }
  427. return sum / time.Duration(len(samples))
  428. }
  429. func (ao *AdaptiveOptimizer) updateMovingAverages() {
  430. if len(ao.performanceHistory.samples) == 0 {
  431. return
  432. }
  433. windowSize := min(ao.performanceHistory.movingAvgWindow, len(ao.performanceHistory.samples))
  434. recentSamples := ao.performanceHistory.samples[len(ao.performanceHistory.samples)-windowSize:]
  435. avgThroughput := ao.calculateAverageThroughput(recentSamples)
  436. avgLatency := ao.calculateAverageLatency(recentSamples)
  437. ao.metricsMutex.Lock()
  438. ao.avgThroughput = avgThroughput
  439. ao.avgLatency = avgLatency
  440. ao.metricsMutex.Unlock()
  441. }
  442. func (ao *AdaptiveOptimizer) suggestWorkerIncrease(currentCPU, targetCPU float64) {
  443. // If already at max workers, do nothing.
  444. currentWorkers := int(atomic.LoadInt64(&ao.workerCount))
  445. if currentWorkers >= ao.cpuMonitor.maxWorkers {
  446. return
  447. }
  448. // If the indexer is not busy, don't scale up workers even if CPU is low.
  449. if ao.activityPoller != nil && !ao.activityPoller.IsBusy() {
  450. return
  451. }
  452. logger.Debug("CPU underutilized, adjusting workers upward",
  453. "current_cpu", currentCPU, "target_cpu", targetCPU)
  454. // Calculate suggested increase (conservative approach)
  455. cpuUtilizationGap := targetCPU - currentCPU
  456. increaseRatio := cpuUtilizationGap / targetCPU
  457. // Limit increase to maximum 25% at a time and at least 1 worker
  458. maxIncrease := max(1, int(float64(currentWorkers)*0.25))
  459. suggestedIncrease := max(1, int(float64(currentWorkers)*increaseRatio))
  460. actualIncrease := min(maxIncrease, suggestedIncrease)
  461. newWorkerCount := min(ao.cpuMonitor.maxWorkers, currentWorkers+actualIncrease)
  462. if newWorkerCount > currentWorkers {
  463. ao.adjustWorkerCount(newWorkerCount)
  464. logger.Infof("Increased workers from %d to %d due to CPU underutilization",
  465. currentWorkers, newWorkerCount)
  466. }
  467. }
  468. func (ao *AdaptiveOptimizer) suggestWorkerDecrease(currentCPU, targetCPU float64) {
  469. // If the indexer is not busy, don't adjust workers
  470. if !ao.isIndexerBusy() {
  471. return
  472. }
  473. // If already at min workers, do nothing.
  474. currentWorkers := int(atomic.LoadInt64(&ao.workerCount))
  475. if currentWorkers <= ao.cpuMonitor.minWorkers {
  476. logger.Debugf("Worker count is already at its minimum (%d), skipping decrease.", currentWorkers)
  477. return
  478. }
  479. logger.Debug("CPU over-utilized, adjusting workers downward",
  480. "current_cpu", currentCPU, "target_cpu", targetCPU)
  481. // Calculate suggested decrease (conservative approach)
  482. cpuOverUtilization := currentCPU - targetCPU
  483. decreaseRatio := cpuOverUtilization / targetCPU // Use target CPU as base for more accurate calculation
  484. // Limit decrease to maximum 25% at a time and at least 1 worker
  485. maxDecrease := max(1, int(float64(currentWorkers)*0.25))
  486. suggestedDecrease := max(1, int(float64(currentWorkers)*decreaseRatio*0.5)) // More conservative decrease
  487. actualDecrease := min(maxDecrease, suggestedDecrease)
  488. newWorkerCount := max(ao.cpuMonitor.minWorkers, currentWorkers-actualDecrease)
  489. logger.Debugf("Worker decrease calculation: current=%d, suggested=%d, min=%d, new=%d",
  490. currentWorkers, suggestedDecrease, ao.cpuMonitor.minWorkers, newWorkerCount)
  491. if newWorkerCount < currentWorkers {
  492. logger.Debugf("About to adjust worker count from %d to %d", currentWorkers, newWorkerCount)
  493. ao.adjustWorkerCount(newWorkerCount)
  494. logger.Infof("Decreased workers from %d to %d due to CPU over-utilization",
  495. currentWorkers, newWorkerCount)
  496. } else {
  497. logger.Debugf("Worker count adjustment skipped: new=%d not less than current=%d", newWorkerCount, currentWorkers)
  498. }
  499. }
  500. // adjustWorkerCount dynamically adjusts the worker count at runtime
  501. func (ao *AdaptiveOptimizer) adjustWorkerCount(newCount int) {
  502. // Only adjust when indexer is actively processing
  503. if !ao.isIndexerBusy() {
  504. logger.Debugf("Skipping worker adjustment while idle: requested=%d", newCount)
  505. return
  506. }
  507. oldCount := int(atomic.LoadInt64(&ao.workerCount))
  508. if newCount <= 0 || newCount == oldCount {
  509. logger.Debugf("Skipping worker adjustment: newCount=%d, currentCount=%d", newCount, oldCount)
  510. return
  511. }
  512. logger.Infof("Adjusting worker count from %d to %d", oldCount, newCount)
  513. // Update atomic mirror then keep config in sync
  514. atomic.StoreInt64(&ao.workerCount, int64(newCount))
  515. ao.config.WorkerCount = newCount
  516. // Notify the indexer about worker count change
  517. // This would typically trigger a worker pool resize in the parallel indexer
  518. if ao.onWorkerCountChange != nil {
  519. logger.Debugf("Calling worker count change callback: %d -> %d", oldCount, newCount)
  520. ao.onWorkerCountChange(oldCount, newCount)
  521. } else {
  522. logger.Warnf("Worker count change callback is nil - worker adjustment will not take effect")
  523. }
  524. // Log the adjustment for monitoring
  525. atomic.AddInt64(&ao.optimizationsMade, 1)
  526. }
  527. // Utility functions
  528. func max(a, b int) int {
  529. if a > b {
  530. return a
  531. }
  532. return b
  533. }
  534. func min(a, b int) int {
  535. if a < b {
  536. return a
  537. }
  538. return b
  539. }
  540. // isIndexerBusy reports whether the indexer is currently processing work.
  541. // When no poller is configured, it returns false to avoid unintended adjustments.
  542. func (ao *AdaptiveOptimizer) isIndexerBusy() bool {
  543. if ao.activityPoller == nil {
  544. return false
  545. }
  546. return ao.activityPoller.IsBusy()
  547. }