adaptive_optimization.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/shirou/gopsutil/v4/cpu"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // IndexerActivityPoller defines an interface to check if the indexer is busy.
  13. type IndexerActivityPoller interface {
  14. IsBusy() bool
  15. }
  16. // AdaptiveOptimizer provides intelligent batch size adjustment and CPU monitoring
  17. type AdaptiveOptimizer struct {
  18. config *Config
  19. cpuMonitor *CPUMonitor
  20. batchSizeController *BatchSizeController
  21. performanceHistory *PerformanceHistory
  22. // State
  23. running int32
  24. ctx context.Context
  25. cancel context.CancelFunc
  26. wg sync.WaitGroup
  27. // Metrics
  28. optimizationsMade int64
  29. avgThroughput float64
  30. avgLatency time.Duration
  31. metricsMutex sync.RWMutex
  32. // Callbacks
  33. onWorkerCountChange func(oldCount, newCount int)
  34. // Activity Poller
  35. activityPoller IndexerActivityPoller
  36. // Concurrency-safe mirror of worker count
  37. workerCount int64
  38. }
  39. // CPUMonitor monitors CPU utilization and suggests worker adjustments
  40. type CPUMonitor struct {
  41. targetUtilization float64
  42. measurementInterval time.Duration
  43. adjustmentThreshold float64
  44. maxWorkers int
  45. minWorkers int
  46. currentUtilization float64
  47. measurements []float64
  48. measurementsMutex sync.RWMutex
  49. maxSamples int
  50. lastValidUtilization float64
  51. }
  52. // BatchSizeController dynamically adjusts batch sizes based on performance metrics
  53. type BatchSizeController struct {
  54. baseBatchSize int
  55. minBatchSize int
  56. maxBatchSize int
  57. adjustmentFactor float64
  58. currentBatchSize int32
  59. latencyThreshold time.Duration
  60. throughputTarget float64
  61. adjustmentHistory []BatchAdjustment
  62. historyMutex sync.RWMutex
  63. }
  64. // PerformanceHistory tracks performance metrics for optimization decisions
  65. type PerformanceHistory struct {
  66. samples []PerformanceSample
  67. maxSamples int
  68. mutex sync.RWMutex
  69. movingAvgWindow int
  70. }
  71. // PerformanceSample represents a single performance measurement
  72. type PerformanceSample struct {
  73. Timestamp time.Time `json:"timestamp"`
  74. Throughput float64 `json:"throughput"`
  75. Latency time.Duration `json:"latency"`
  76. CPUUsage float64 `json:"cpu_usage"`
  77. BatchSize int `json:"batch_size"`
  78. WorkerCount int `json:"worker_count"`
  79. }
  80. // BatchAdjustment represents a batch size adjustment decision
  81. type BatchAdjustment struct {
  82. Timestamp time.Time `json:"timestamp"`
  83. OldBatchSize int `json:"old_batch_size"`
  84. NewBatchSize int `json:"new_batch_size"`
  85. Reason string `json:"reason"`
  86. ThroughputImpact float64 `json:"throughput_impact"`
  87. }
  88. // NewAdaptiveOptimizer creates a new adaptive optimizer
  89. func NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer {
  90. ctx, cancel := context.WithCancel(context.Background())
  91. ao := &AdaptiveOptimizer{
  92. config: config,
  93. cpuMonitor: &CPUMonitor{
  94. targetUtilization: 0.75, // Target 75% CPU utilization (more conservative)
  95. measurementInterval: 5 * time.Second,
  96. adjustmentThreshold: 0.10, // Adjust if 10% deviation from target (more sensitive)
  97. maxWorkers: runtime.GOMAXPROCS(0) * 6, // Allow scaling up to 6x CPU cores for I/O-bound workloads
  98. minWorkers: max(2, runtime.GOMAXPROCS(0)/4), // Minimum 2 workers or 1/4 of cores for baseline performance
  99. measurements: make([]float64, 0, 12), // 1 minute history at 5s intervals
  100. maxSamples: 12,
  101. },
  102. batchSizeController: &BatchSizeController{
  103. baseBatchSize: config.BatchSize,
  104. minBatchSize: max(500, config.BatchSize/6), // Higher minimum for throughput
  105. maxBatchSize: config.BatchSize * 6, // Increased to 6x for maximum throughput
  106. adjustmentFactor: 0.25, // 25% adjustment steps for faster scaling
  107. currentBatchSize: int32(config.BatchSize),
  108. latencyThreshold: 8 * time.Second, // Higher latency tolerance for throughput
  109. throughputTarget: 50.0, // Target 50 MB/s - higher throughput target
  110. },
  111. performanceHistory: &PerformanceHistory{
  112. samples: make([]PerformanceSample, 0, 120), // 2 minutes of 1s samples
  113. maxSamples: 120,
  114. movingAvgWindow: 12, // 12-sample moving average
  115. },
  116. ctx: ctx,
  117. cancel: cancel,
  118. }
  119. // Log initialization parameters for debugging
  120. logger.Infof("Adaptive optimizer initialized: workers=[%d, %d, %d] (min, current, max), target_cpu=%.1f%%, threshold=%.1f%%",
  121. ao.cpuMonitor.minWorkers, config.WorkerCount, ao.cpuMonitor.maxWorkers,
  122. ao.cpuMonitor.targetUtilization*100, ao.cpuMonitor.adjustmentThreshold*100)
  123. // Initialize atomic mirror of worker count
  124. atomic.StoreInt64(&ao.workerCount, int64(config.WorkerCount))
  125. return ao
  126. }
  127. // Start begins the adaptive optimization process
  128. func (ao *AdaptiveOptimizer) Start() error {
  129. if !atomic.CompareAndSwapInt32(&ao.running, 0, 1) {
  130. logger.Error("Adaptive optimizer already running")
  131. return fmt.Errorf("adaptive optimizer already running")
  132. }
  133. // Start CPU monitoring
  134. ao.wg.Add(1)
  135. go ao.cpuMonitoringLoop()
  136. // Start batch size optimization
  137. ao.wg.Add(1)
  138. go ao.batchOptimizationLoop()
  139. // Start performance tracking
  140. ao.wg.Add(1)
  141. go ao.performanceTrackingLoop()
  142. logger.Info("Adaptive optimizer started")
  143. return nil
  144. }
  145. // Stop halts the adaptive optimization process
  146. func (ao *AdaptiveOptimizer) Stop() {
  147. if !atomic.CompareAndSwapInt32(&ao.running, 1, 0) {
  148. return
  149. }
  150. ao.cancel()
  151. ao.wg.Wait()
  152. logger.Info("Adaptive optimizer stopped")
  153. }
  154. // cpuMonitoringLoop continuously monitors CPU utilization
  155. func (ao *AdaptiveOptimizer) cpuMonitoringLoop() {
  156. defer ao.wg.Done()
  157. ticker := time.NewTicker(ao.cpuMonitor.measurementInterval)
  158. defer ticker.Stop()
  159. for {
  160. select {
  161. case <-ticker.C:
  162. ao.measureAndAdjustCPU()
  163. case <-ao.ctx.Done():
  164. return
  165. }
  166. }
  167. }
  168. // measureAndAdjustCPU measures current CPU utilization and suggests adjustments
  169. func (ao *AdaptiveOptimizer) measureAndAdjustCPU() {
  170. // Get current CPU utilization
  171. cpuUsage := ao.getCurrentCPUUtilization()
  172. ao.cpuMonitor.measurementsMutex.Lock()
  173. // Filter spurious near-zero values using last valid utilization
  174. if cpuUsage < 0.005 && ao.cpuMonitor.lastValidUtilization > 0 {
  175. cpuUsage = ao.cpuMonitor.lastValidUtilization
  176. }
  177. // Maintain fixed-size window to avoid capacity growth
  178. if ao.cpuMonitor.maxSamples > 0 && len(ao.cpuMonitor.measurements) == ao.cpuMonitor.maxSamples {
  179. copy(ao.cpuMonitor.measurements, ao.cpuMonitor.measurements[1:])
  180. ao.cpuMonitor.measurements[len(ao.cpuMonitor.measurements)-1] = cpuUsage
  181. } else {
  182. ao.cpuMonitor.measurements = append(ao.cpuMonitor.measurements, cpuUsage)
  183. }
  184. ao.cpuMonitor.currentUtilization = cpuUsage
  185. if cpuUsage >= 0.005 {
  186. ao.cpuMonitor.lastValidUtilization = cpuUsage
  187. }
  188. ao.cpuMonitor.measurementsMutex.Unlock()
  189. // Calculate average CPU utilization
  190. ao.cpuMonitor.measurementsMutex.RLock()
  191. avgCPU := ao.calculateAverageCPU()
  192. ao.cpuMonitor.measurementsMutex.RUnlock()
  193. // Determine if adjustment is needed
  194. targetCPU := ao.cpuMonitor.targetUtilization
  195. if avgCPU < targetCPU-ao.cpuMonitor.adjustmentThreshold {
  196. // CPU underutilized - suggest increasing workers
  197. ao.suggestWorkerIncrease(avgCPU, targetCPU)
  198. } else if avgCPU > targetCPU+ao.cpuMonitor.adjustmentThreshold {
  199. // CPU over-utilized - suggest decreasing workers
  200. ao.suggestWorkerDecrease(avgCPU, targetCPU)
  201. }
  202. }
  203. // batchOptimizationLoop continuously optimizes batch sizes
  204. func (ao *AdaptiveOptimizer) batchOptimizationLoop() {
  205. defer ao.wg.Done()
  206. ticker := time.NewTicker(10 * time.Second) // Adjust batch size every 10 seconds
  207. defer ticker.Stop()
  208. for {
  209. select {
  210. case <-ticker.C:
  211. ao.optimizeBatchSize()
  212. case <-ao.ctx.Done():
  213. return
  214. }
  215. }
  216. }
  217. // optimizeBatchSize analyzes performance and adjusts batch size
  218. func (ao *AdaptiveOptimizer) optimizeBatchSize() {
  219. ao.performanceHistory.mutex.RLock()
  220. if len(ao.performanceHistory.samples) < 5 {
  221. ao.performanceHistory.mutex.RUnlock()
  222. return // Not enough data
  223. }
  224. recentSamples := ao.performanceHistory.samples[max(0, len(ao.performanceHistory.samples)-5):]
  225. avgThroughput := ao.calculateAverageThroughput(recentSamples)
  226. avgLatency := ao.calculateAverageLatency(recentSamples)
  227. ao.performanceHistory.mutex.RUnlock()
  228. currentBatchSize := int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
  229. newBatchSize := ao.calculateOptimalBatchSize(avgThroughput, avgLatency, currentBatchSize)
  230. if newBatchSize != currentBatchSize {
  231. ao.adjustBatchSize(currentBatchSize, newBatchSize, avgThroughput, avgLatency)
  232. atomic.AddInt64(&ao.optimizationsMade, 1)
  233. }
  234. }
  235. // calculateOptimalBatchSize determines the optimal batch size based on current performance
  236. func (ao *AdaptiveOptimizer) calculateOptimalBatchSize(throughput float64, latency time.Duration, currentBatch int) int {
  237. controller := ao.batchSizeController
  238. // If latency is too high, reduce batch size
  239. if latency > controller.latencyThreshold {
  240. reduction := int(float64(currentBatch) * controller.adjustmentFactor)
  241. newSize := currentBatch - max(50, reduction)
  242. return max(controller.minBatchSize, newSize)
  243. }
  244. // If throughput is below target and latency is acceptable, increase batch size
  245. if throughput < controller.throughputTarget && latency < controller.latencyThreshold/2 {
  246. increase := int(float64(currentBatch) * controller.adjustmentFactor)
  247. newSize := currentBatch + max(100, increase)
  248. return min(controller.maxBatchSize, newSize)
  249. }
  250. // Current batch size seems optimal
  251. return currentBatch
  252. }
  253. // adjustBatchSize applies the batch size adjustment
  254. func (ao *AdaptiveOptimizer) adjustBatchSize(oldSize, newSize int, throughput float64, latency time.Duration) {
  255. atomic.StoreInt32(&ao.batchSizeController.currentBatchSize, int32(newSize))
  256. var reason string
  257. if newSize > oldSize {
  258. reason = "Increasing batch size to improve throughput"
  259. } else {
  260. reason = "Reducing batch size to improve latency"
  261. }
  262. // Record adjustment
  263. adjustment := BatchAdjustment{
  264. Timestamp: time.Now(),
  265. OldBatchSize: oldSize,
  266. NewBatchSize: newSize,
  267. Reason: reason,
  268. ThroughputImpact: throughput,
  269. }
  270. ao.batchSizeController.historyMutex.Lock()
  271. if len(ao.batchSizeController.adjustmentHistory) == 50 {
  272. copy(ao.batchSizeController.adjustmentHistory, ao.batchSizeController.adjustmentHistory[1:])
  273. ao.batchSizeController.adjustmentHistory[len(ao.batchSizeController.adjustmentHistory)-1] = adjustment
  274. } else {
  275. ao.batchSizeController.adjustmentHistory = append(ao.batchSizeController.adjustmentHistory, adjustment)
  276. }
  277. ao.batchSizeController.historyMutex.Unlock()
  278. logger.Infof("Batch size adjusted: old_size=%d, new_size=%d, reason=%s", oldSize, newSize, reason)
  279. }
  280. // performanceTrackingLoop continuously tracks performance metrics
  281. func (ao *AdaptiveOptimizer) performanceTrackingLoop() {
  282. defer ao.wg.Done()
  283. ticker := time.NewTicker(1 * time.Second) // Sample every second
  284. defer ticker.Stop()
  285. for {
  286. select {
  287. case <-ticker.C:
  288. ao.recordPerformanceSample()
  289. case <-ao.ctx.Done():
  290. return
  291. }
  292. }
  293. }
  294. // recordPerformanceSample records current performance metrics
  295. func (ao *AdaptiveOptimizer) recordPerformanceSample() {
  296. sample := PerformanceSample{
  297. Timestamp: time.Now(),
  298. Throughput: ao.getCurrentThroughput(),
  299. Latency: ao.getCurrentLatency(),
  300. CPUUsage: ao.GetCPUUtilization(),
  301. BatchSize: int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
  302. WorkerCount: int(atomic.LoadInt64(&ao.workerCount)),
  303. }
  304. ao.performanceHistory.mutex.Lock()
  305. if ao.performanceHistory.maxSamples > 0 && len(ao.performanceHistory.samples) == ao.performanceHistory.maxSamples {
  306. copy(ao.performanceHistory.samples, ao.performanceHistory.samples[1:])
  307. ao.performanceHistory.samples[len(ao.performanceHistory.samples)-1] = sample
  308. } else {
  309. ao.performanceHistory.samples = append(ao.performanceHistory.samples, sample)
  310. }
  311. // Update moving averages
  312. ao.updateMovingAverages()
  313. ao.performanceHistory.mutex.Unlock()
  314. }
  315. // SetWorkerCountChangeCallback sets the callback function for worker count changes
  316. func (ao *AdaptiveOptimizer) SetWorkerCountChangeCallback(callback func(oldCount, newCount int)) {
  317. ao.onWorkerCountChange = callback
  318. }
  319. // SetActivityPoller sets the poller to check for indexer activity.
  320. func (ao *AdaptiveOptimizer) SetActivityPoller(poller IndexerActivityPoller) {
  321. ao.activityPoller = poller
  322. }
  323. // GetOptimalBatchSize returns the current optimal batch size
  324. func (ao *AdaptiveOptimizer) GetOptimalBatchSize() int {
  325. return int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
  326. }
  327. // GetCPUUtilization returns the current CPU utilization
  328. func (ao *AdaptiveOptimizer) GetCPUUtilization() float64 {
  329. ao.cpuMonitor.measurementsMutex.RLock()
  330. defer ao.cpuMonitor.measurementsMutex.RUnlock()
  331. return ao.cpuMonitor.currentUtilization
  332. }
  333. // GetOptimizationStats returns current optimization statistics
  334. func (ao *AdaptiveOptimizer) GetOptimizationStats() AdaptiveOptimizationStats {
  335. ao.metricsMutex.RLock()
  336. defer ao.metricsMutex.RUnlock()
  337. return AdaptiveOptimizationStats{
  338. OptimizationsMade: atomic.LoadInt64(&ao.optimizationsMade),
  339. CurrentBatchSize: int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
  340. AvgThroughput: ao.avgThroughput,
  341. AvgLatency: ao.avgLatency,
  342. CPUUtilization: ao.GetCPUUtilization(),
  343. }
  344. }
  345. // AdaptiveOptimizationStats represents current optimization statistics
  346. type AdaptiveOptimizationStats struct {
  347. OptimizationsMade int64 `json:"optimizations_made"`
  348. CurrentBatchSize int `json:"current_batch_size"`
  349. AvgThroughput float64 `json:"avg_throughput"`
  350. AvgLatency time.Duration `json:"avg_latency"`
  351. CPUUtilization float64 `json:"cpu_utilization"`
  352. }
  353. // Helper functions
  354. func (ao *AdaptiveOptimizer) getCurrentCPUUtilization() float64 {
  355. // Get CPU utilization since the last call.
  356. // Interval 0 means non-blocking and compares to the last measurement.
  357. // The first call will return 0.
  358. percentages, err := cpu.Percent(0, false)
  359. if err != nil || len(percentages) == 0 {
  360. logger.Warnf("Failed to get real CPU utilization, falling back to goroutine heuristic: %v", err)
  361. // Fallback to the old, less accurate method
  362. numGoroutines := float64(runtime.NumGoroutine())
  363. maxProcs := float64(runtime.GOMAXPROCS(0))
  364. // Simple heuristic: more goroutines = higher CPU usage
  365. utilization := numGoroutines / (maxProcs * 10)
  366. if utilization > 0.95 {
  367. utilization = 0.95
  368. }
  369. return utilization
  370. }
  371. // gopsutil returns a slice, for overall usage (percpu=false), it's the first element.
  372. // The value is a percentage (e.g., 8.3), so we convert it to a 0.0-1.0 scale for our calculations.
  373. return percentages[0] / 100.0
  374. }
  375. func (ao *AdaptiveOptimizer) getCurrentThroughput() float64 {
  376. ao.metricsMutex.RLock()
  377. v := ao.avgThroughput
  378. ao.metricsMutex.RUnlock()
  379. return v
  380. }
  381. func (ao *AdaptiveOptimizer) getCurrentLatency() time.Duration {
  382. ao.metricsMutex.RLock()
  383. v := ao.avgLatency
  384. ao.metricsMutex.RUnlock()
  385. return v
  386. }
  387. func (ao *AdaptiveOptimizer) calculateAverageCPU() float64 {
  388. if len(ao.cpuMonitor.measurements) == 0 {
  389. return 0
  390. }
  391. sum := 0.0
  392. for _, usage := range ao.cpuMonitor.measurements {
  393. sum += usage
  394. }
  395. return sum / float64(len(ao.cpuMonitor.measurements))
  396. }
  397. func (ao *AdaptiveOptimizer) calculateAverageThroughput(samples []PerformanceSample) float64 {
  398. if len(samples) == 0 {
  399. return 0
  400. }
  401. sum := 0.0
  402. for _, sample := range samples {
  403. sum += sample.Throughput
  404. }
  405. return sum / float64(len(samples))
  406. }
  407. func (ao *AdaptiveOptimizer) calculateAverageLatency(samples []PerformanceSample) time.Duration {
  408. if len(samples) == 0 {
  409. return 0
  410. }
  411. var sum time.Duration
  412. for _, sample := range samples {
  413. sum += sample.Latency
  414. }
  415. return sum / time.Duration(len(samples))
  416. }
  417. func (ao *AdaptiveOptimizer) updateMovingAverages() {
  418. if len(ao.performanceHistory.samples) == 0 {
  419. return
  420. }
  421. windowSize := min(ao.performanceHistory.movingAvgWindow, len(ao.performanceHistory.samples))
  422. recentSamples := ao.performanceHistory.samples[len(ao.performanceHistory.samples)-windowSize:]
  423. avgThroughput := ao.calculateAverageThroughput(recentSamples)
  424. avgLatency := ao.calculateAverageLatency(recentSamples)
  425. ao.metricsMutex.Lock()
  426. ao.avgThroughput = avgThroughput
  427. ao.avgLatency = avgLatency
  428. ao.metricsMutex.Unlock()
  429. }
  430. func (ao *AdaptiveOptimizer) suggestWorkerIncrease(currentCPU, targetCPU float64) {
  431. // If already at max workers, do nothing.
  432. currentWorkers := int(atomic.LoadInt64(&ao.workerCount))
  433. if currentWorkers >= ao.cpuMonitor.maxWorkers {
  434. return
  435. }
  436. // If the indexer is not busy, don't scale up workers even if CPU is low.
  437. if ao.activityPoller != nil && !ao.activityPoller.IsBusy() {
  438. return
  439. }
  440. logger.Debug("CPU underutilized, adjusting workers upward",
  441. "current_cpu", currentCPU, "target_cpu", targetCPU)
  442. // Calculate suggested increase (conservative approach)
  443. cpuUtilizationGap := targetCPU - currentCPU
  444. increaseRatio := cpuUtilizationGap / targetCPU
  445. // Limit increase to maximum 25% at a time and at least 1 worker
  446. maxIncrease := max(1, int(float64(currentWorkers)*0.25))
  447. suggestedIncrease := max(1, int(float64(currentWorkers)*increaseRatio))
  448. actualIncrease := min(maxIncrease, suggestedIncrease)
  449. newWorkerCount := min(ao.cpuMonitor.maxWorkers, currentWorkers+actualIncrease)
  450. if newWorkerCount > currentWorkers {
  451. ao.adjustWorkerCount(newWorkerCount)
  452. logger.Infof("Increased workers from %d to %d due to CPU underutilization",
  453. currentWorkers, newWorkerCount)
  454. }
  455. }
  456. func (ao *AdaptiveOptimizer) suggestWorkerDecrease(currentCPU, targetCPU float64) {
  457. // If already at min workers, do nothing.
  458. currentWorkers := int(atomic.LoadInt64(&ao.workerCount))
  459. if currentWorkers <= ao.cpuMonitor.minWorkers {
  460. logger.Debugf("Worker count is already at its minimum (%d), skipping decrease.", currentWorkers)
  461. return
  462. }
  463. logger.Debug("CPU over-utilized, adjusting workers downward",
  464. "current_cpu", currentCPU, "target_cpu", targetCPU)
  465. // Calculate suggested decrease (conservative approach)
  466. cpuOverUtilization := currentCPU - targetCPU
  467. decreaseRatio := cpuOverUtilization / targetCPU // Use target CPU as base for more accurate calculation
  468. // Limit decrease to maximum 25% at a time and at least 1 worker
  469. maxDecrease := max(1, int(float64(currentWorkers)*0.25))
  470. suggestedDecrease := max(1, int(float64(currentWorkers)*decreaseRatio*0.5)) // More conservative decrease
  471. actualDecrease := min(maxDecrease, suggestedDecrease)
  472. newWorkerCount := max(ao.cpuMonitor.minWorkers, currentWorkers-actualDecrease)
  473. logger.Debugf("Worker decrease calculation: current=%d, suggested=%d, min=%d, new=%d",
  474. currentWorkers, suggestedDecrease, ao.cpuMonitor.minWorkers, newWorkerCount)
  475. if newWorkerCount < currentWorkers {
  476. logger.Debugf("About to adjust worker count from %d to %d", currentWorkers, newWorkerCount)
  477. ao.adjustWorkerCount(newWorkerCount)
  478. logger.Infof("Decreased workers from %d to %d due to CPU over-utilization",
  479. currentWorkers, newWorkerCount)
  480. } else {
  481. logger.Debugf("Worker count adjustment skipped: new=%d not less than current=%d", newWorkerCount, currentWorkers)
  482. }
  483. }
  484. // adjustWorkerCount dynamically adjusts the worker count at runtime
  485. func (ao *AdaptiveOptimizer) adjustWorkerCount(newCount int) {
  486. oldCount := int(atomic.LoadInt64(&ao.workerCount))
  487. if newCount <= 0 || newCount == oldCount {
  488. logger.Debugf("Skipping worker adjustment: newCount=%d, currentCount=%d", newCount, oldCount)
  489. return
  490. }
  491. logger.Infof("Adjusting worker count from %d to %d", oldCount, newCount)
  492. // Update atomic mirror then keep config in sync
  493. atomic.StoreInt64(&ao.workerCount, int64(newCount))
  494. ao.config.WorkerCount = newCount
  495. // Notify the indexer about worker count change
  496. // This would typically trigger a worker pool resize in the parallel indexer
  497. if ao.onWorkerCountChange != nil {
  498. logger.Debugf("Calling worker count change callback: %d -> %d", oldCount, newCount)
  499. ao.onWorkerCountChange(oldCount, newCount)
  500. } else {
  501. logger.Warnf("Worker count change callback is nil - worker adjustment will not take effect")
  502. }
  503. // Log the adjustment for monitoring
  504. atomic.AddInt64(&ao.optimizationsMade, 1)
  505. }
  506. // Utility functions
  507. func max(a, b int) int {
  508. if a > b {
  509. return a
  510. }
  511. return b
  512. }
  513. func min(a, b int) int {
  514. if a < b {
  515. return a
  516. }
  517. return b
  518. }