adaptive_optimization.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/shirou/gopsutil/v4/cpu"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // IndexerActivityPoller defines an interface to check if the indexer is busy.
  13. type IndexerActivityPoller interface {
  14. IsBusy() bool
  15. }
  16. // AdaptiveOptimizer provides intelligent batch size adjustment and CPU monitoring
  17. type AdaptiveOptimizer struct {
  18. config *Config
  19. cpuMonitor *CPUMonitor
  20. batchSizeController *BatchSizeController
  21. performanceHistory *PerformanceHistory
  22. // State
  23. running int32
  24. ctx context.Context
  25. cancel context.CancelFunc
  26. wg sync.WaitGroup
  27. // Metrics
  28. optimizationsMade int64
  29. avgThroughput float64
  30. avgLatency time.Duration
  31. metricsMutex sync.RWMutex
  32. // Callbacks
  33. onWorkerCountChange func(oldCount, newCount int)
  34. // Activity Poller
  35. activityPoller IndexerActivityPoller
  36. }
  37. // CPUMonitor monitors CPU utilization and suggests worker adjustments
  38. type CPUMonitor struct {
  39. targetUtilization float64
  40. measurementInterval time.Duration
  41. adjustmentThreshold float64
  42. maxWorkers int
  43. minWorkers int
  44. currentUtilization float64
  45. measurements []float64
  46. measurementsMutex sync.RWMutex
  47. }
  48. // BatchSizeController dynamically adjusts batch sizes based on performance metrics
  49. type BatchSizeController struct {
  50. baseBatchSize int
  51. minBatchSize int
  52. maxBatchSize int
  53. adjustmentFactor float64
  54. currentBatchSize int32
  55. latencyThreshold time.Duration
  56. throughputTarget float64
  57. adjustmentHistory []BatchAdjustment
  58. historyMutex sync.RWMutex
  59. }
  60. // PerformanceHistory tracks performance metrics for optimization decisions
  61. type PerformanceHistory struct {
  62. samples []PerformanceSample
  63. maxSamples int
  64. mutex sync.RWMutex
  65. movingAvgWindow int
  66. avgThroughput float64
  67. avgLatency time.Duration
  68. }
  69. // PerformanceSample represents a single performance measurement
  70. type PerformanceSample struct {
  71. Timestamp time.Time `json:"timestamp"`
  72. Throughput float64 `json:"throughput"`
  73. Latency time.Duration `json:"latency"`
  74. CPUUsage float64 `json:"cpu_usage"`
  75. BatchSize int `json:"batch_size"`
  76. WorkerCount int `json:"worker_count"`
  77. }
  78. // BatchAdjustment represents a batch size adjustment decision
  79. type BatchAdjustment struct {
  80. Timestamp time.Time `json:"timestamp"`
  81. OldBatchSize int `json:"old_batch_size"`
  82. NewBatchSize int `json:"new_batch_size"`
  83. Reason string `json:"reason"`
  84. ThroughputImpact float64 `json:"throughput_impact"`
  85. }
  86. // NewAdaptiveOptimizer creates a new adaptive optimizer
  87. func NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer {
  88. ctx, cancel := context.WithCancel(context.Background())
  89. ao := &AdaptiveOptimizer{
  90. config: config,
  91. cpuMonitor: &CPUMonitor{
  92. targetUtilization: 0.75, // Target 75% CPU utilization (more conservative)
  93. measurementInterval: 5 * time.Second,
  94. adjustmentThreshold: 0.10, // Adjust if 10% deviation from target (more sensitive)
  95. maxWorkers: runtime.GOMAXPROCS(0) * 3,
  96. minWorkers: max(1, runtime.GOMAXPROCS(0)/8), // Allow aggressive scaling down: 1/8 of available processors or 1, whichever is higher
  97. measurements: make([]float64, 0, 12), // 1 minute history at 5s intervals
  98. },
  99. batchSizeController: &BatchSizeController{
  100. baseBatchSize: config.BatchSize,
  101. minBatchSize: max(100, config.BatchSize/4),
  102. maxBatchSize: config.BatchSize * 3,
  103. adjustmentFactor: 0.2, // 20% adjustment steps
  104. currentBatchSize: int32(config.BatchSize),
  105. latencyThreshold: 5 * time.Second,
  106. throughputTarget: 25.0, // Target 25 MB/s
  107. },
  108. performanceHistory: &PerformanceHistory{
  109. samples: make([]PerformanceSample, 0, 120), // 2 minutes of 1s samples
  110. maxSamples: 120,
  111. movingAvgWindow: 12, // 12-sample moving average
  112. },
  113. ctx: ctx,
  114. cancel: cancel,
  115. }
  116. // Log initialization parameters for debugging
  117. logger.Infof("Adaptive optimizer initialized: workers=[%d, %d, %d] (min, current, max), target_cpu=%.1f%%, threshold=%.1f%%",
  118. ao.cpuMonitor.minWorkers, config.WorkerCount, ao.cpuMonitor.maxWorkers,
  119. ao.cpuMonitor.targetUtilization*100, ao.cpuMonitor.adjustmentThreshold*100)
  120. return ao
  121. }
  122. // Start begins the adaptive optimization process
  123. func (ao *AdaptiveOptimizer) Start() error {
  124. if !atomic.CompareAndSwapInt32(&ao.running, 0, 1) {
  125. logger.Error("Adaptive optimizer already running")
  126. return fmt.Errorf("adaptive optimizer already running")
  127. }
  128. // Start CPU monitoring
  129. ao.wg.Add(1)
  130. go ao.cpuMonitoringLoop()
  131. // Start batch size optimization
  132. ao.wg.Add(1)
  133. go ao.batchOptimizationLoop()
  134. // Start performance tracking
  135. ao.wg.Add(1)
  136. go ao.performanceTrackingLoop()
  137. logger.Info("Adaptive optimizer started")
  138. return nil
  139. }
  140. // Stop halts the adaptive optimization process
  141. func (ao *AdaptiveOptimizer) Stop() {
  142. if !atomic.CompareAndSwapInt32(&ao.running, 1, 0) {
  143. return
  144. }
  145. ao.cancel()
  146. ao.wg.Wait()
  147. logger.Info("Adaptive optimizer stopped")
  148. }
  149. // cpuMonitoringLoop continuously monitors CPU utilization
  150. func (ao *AdaptiveOptimizer) cpuMonitoringLoop() {
  151. defer ao.wg.Done()
  152. ticker := time.NewTicker(ao.cpuMonitor.measurementInterval)
  153. defer ticker.Stop()
  154. for {
  155. select {
  156. case <-ticker.C:
  157. ao.measureAndAdjustCPU()
  158. case <-ao.ctx.Done():
  159. return
  160. }
  161. }
  162. }
  163. // measureAndAdjustCPU measures current CPU utilization and suggests adjustments
  164. func (ao *AdaptiveOptimizer) measureAndAdjustCPU() {
  165. // Get current CPU utilization
  166. cpuUsage := ao.getCurrentCPUUtilization()
  167. ao.cpuMonitor.measurementsMutex.Lock()
  168. ao.cpuMonitor.measurements = append(ao.cpuMonitor.measurements, cpuUsage)
  169. if len(ao.cpuMonitor.measurements) > cap(ao.cpuMonitor.measurements) {
  170. // Remove oldest measurement
  171. ao.cpuMonitor.measurements = ao.cpuMonitor.measurements[1:]
  172. }
  173. ao.cpuMonitor.currentUtilization = cpuUsage
  174. ao.cpuMonitor.measurementsMutex.Unlock()
  175. // Calculate average CPU utilization
  176. ao.cpuMonitor.measurementsMutex.RLock()
  177. avgCPU := ao.calculateAverageCPU()
  178. ao.cpuMonitor.measurementsMutex.RUnlock()
  179. // Determine if adjustment is needed
  180. targetCPU := ao.cpuMonitor.targetUtilization
  181. if avgCPU < targetCPU-ao.cpuMonitor.adjustmentThreshold {
  182. // CPU underutilized - suggest increasing workers
  183. ao.suggestWorkerIncrease(avgCPU, targetCPU)
  184. } else if avgCPU > targetCPU+ao.cpuMonitor.adjustmentThreshold {
  185. // CPU over-utilized - suggest decreasing workers
  186. ao.suggestWorkerDecrease(avgCPU, targetCPU)
  187. }
  188. }
  189. // batchOptimizationLoop continuously optimizes batch sizes
  190. func (ao *AdaptiveOptimizer) batchOptimizationLoop() {
  191. defer ao.wg.Done()
  192. ticker := time.NewTicker(10 * time.Second) // Adjust batch size every 10 seconds
  193. defer ticker.Stop()
  194. for {
  195. select {
  196. case <-ticker.C:
  197. ao.optimizeBatchSize()
  198. case <-ao.ctx.Done():
  199. return
  200. }
  201. }
  202. }
  203. // optimizeBatchSize analyzes performance and adjusts batch size
  204. func (ao *AdaptiveOptimizer) optimizeBatchSize() {
  205. ao.performanceHistory.mutex.RLock()
  206. if len(ao.performanceHistory.samples) < 5 {
  207. ao.performanceHistory.mutex.RUnlock()
  208. return // Not enough data
  209. }
  210. recentSamples := ao.performanceHistory.samples[max(0, len(ao.performanceHistory.samples)-5):]
  211. avgThroughput := ao.calculateAverageThroughput(recentSamples)
  212. avgLatency := ao.calculateAverageLatency(recentSamples)
  213. ao.performanceHistory.mutex.RUnlock()
  214. currentBatchSize := int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
  215. newBatchSize := ao.calculateOptimalBatchSize(avgThroughput, avgLatency, currentBatchSize)
  216. if newBatchSize != currentBatchSize {
  217. ao.adjustBatchSize(currentBatchSize, newBatchSize, avgThroughput, avgLatency)
  218. atomic.AddInt64(&ao.optimizationsMade, 1)
  219. }
  220. }
  221. // calculateOptimalBatchSize determines the optimal batch size based on current performance
  222. func (ao *AdaptiveOptimizer) calculateOptimalBatchSize(throughput float64, latency time.Duration, currentBatch int) int {
  223. controller := ao.batchSizeController
  224. // If latency is too high, reduce batch size
  225. if latency > controller.latencyThreshold {
  226. reduction := int(float64(currentBatch) * controller.adjustmentFactor)
  227. newSize := currentBatch - max(50, reduction)
  228. return max(controller.minBatchSize, newSize)
  229. }
  230. // If throughput is below target and latency is acceptable, increase batch size
  231. if throughput < controller.throughputTarget && latency < controller.latencyThreshold/2 {
  232. increase := int(float64(currentBatch) * controller.adjustmentFactor)
  233. newSize := currentBatch + max(100, increase)
  234. return min(controller.maxBatchSize, newSize)
  235. }
  236. // Current batch size seems optimal
  237. return currentBatch
  238. }
  239. // adjustBatchSize applies the batch size adjustment
  240. func (ao *AdaptiveOptimizer) adjustBatchSize(oldSize, newSize int, throughput float64, latency time.Duration) {
  241. atomic.StoreInt32(&ao.batchSizeController.currentBatchSize, int32(newSize))
  242. var reason string
  243. if newSize > oldSize {
  244. reason = "Increasing batch size to improve throughput"
  245. } else {
  246. reason = "Reducing batch size to improve latency"
  247. }
  248. // Record adjustment
  249. adjustment := BatchAdjustment{
  250. Timestamp: time.Now(),
  251. OldBatchSize: oldSize,
  252. NewBatchSize: newSize,
  253. Reason: reason,
  254. ThroughputImpact: throughput,
  255. }
  256. ao.batchSizeController.historyMutex.Lock()
  257. ao.batchSizeController.adjustmentHistory = append(ao.batchSizeController.adjustmentHistory, adjustment)
  258. if len(ao.batchSizeController.adjustmentHistory) > 50 {
  259. // Keep only recent 50 adjustments
  260. ao.batchSizeController.adjustmentHistory = ao.batchSizeController.adjustmentHistory[1:]
  261. }
  262. ao.batchSizeController.historyMutex.Unlock()
  263. logger.Infof("Batch size adjusted: old_size=%d, new_size=%d, reason=%s", oldSize, newSize, reason)
  264. }
  265. // performanceTrackingLoop continuously tracks performance metrics
  266. func (ao *AdaptiveOptimizer) performanceTrackingLoop() {
  267. defer ao.wg.Done()
  268. ticker := time.NewTicker(1 * time.Second) // Sample every second
  269. defer ticker.Stop()
  270. for {
  271. select {
  272. case <-ticker.C:
  273. ao.recordPerformanceSample()
  274. case <-ao.ctx.Done():
  275. return
  276. }
  277. }
  278. }
  279. // recordPerformanceSample records current performance metrics
  280. func (ao *AdaptiveOptimizer) recordPerformanceSample() {
  281. sample := PerformanceSample{
  282. Timestamp: time.Now(),
  283. Throughput: ao.getCurrentThroughput(),
  284. Latency: ao.getCurrentLatency(),
  285. CPUUsage: ao.cpuMonitor.currentUtilization,
  286. BatchSize: int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
  287. WorkerCount: ao.config.WorkerCount,
  288. }
  289. ao.performanceHistory.mutex.Lock()
  290. ao.performanceHistory.samples = append(ao.performanceHistory.samples, sample)
  291. if len(ao.performanceHistory.samples) > ao.performanceHistory.maxSamples {
  292. // Remove oldest sample
  293. ao.performanceHistory.samples = ao.performanceHistory.samples[1:]
  294. }
  295. // Update moving averages
  296. ao.updateMovingAverages()
  297. ao.performanceHistory.mutex.Unlock()
  298. }
  299. // SetWorkerCountChangeCallback sets the callback function for worker count changes
  300. func (ao *AdaptiveOptimizer) SetWorkerCountChangeCallback(callback func(oldCount, newCount int)) {
  301. ao.onWorkerCountChange = callback
  302. }
  303. // SetActivityPoller sets the poller to check for indexer activity.
  304. func (ao *AdaptiveOptimizer) SetActivityPoller(poller IndexerActivityPoller) {
  305. ao.activityPoller = poller
  306. }
  307. // GetOptimalBatchSize returns the current optimal batch size
  308. func (ao *AdaptiveOptimizer) GetOptimalBatchSize() int {
  309. return int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
  310. }
  311. // GetCPUUtilization returns the current CPU utilization
  312. func (ao *AdaptiveOptimizer) GetCPUUtilization() float64 {
  313. ao.cpuMonitor.measurementsMutex.RLock()
  314. defer ao.cpuMonitor.measurementsMutex.RUnlock()
  315. return ao.cpuMonitor.currentUtilization
  316. }
  317. // GetOptimizationStats returns current optimization statistics
  318. func (ao *AdaptiveOptimizer) GetOptimizationStats() AdaptiveOptimizationStats {
  319. ao.metricsMutex.RLock()
  320. defer ao.metricsMutex.RUnlock()
  321. return AdaptiveOptimizationStats{
  322. OptimizationsMade: atomic.LoadInt64(&ao.optimizationsMade),
  323. CurrentBatchSize: int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
  324. AvgThroughput: ao.avgThroughput,
  325. AvgLatency: ao.avgLatency,
  326. CPUUtilization: ao.cpuMonitor.currentUtilization,
  327. }
  328. }
  329. // AdaptiveOptimizationStats represents current optimization statistics
  330. type AdaptiveOptimizationStats struct {
  331. OptimizationsMade int64 `json:"optimizations_made"`
  332. CurrentBatchSize int `json:"current_batch_size"`
  333. AvgThroughput float64 `json:"avg_throughput"`
  334. AvgLatency time.Duration `json:"avg_latency"`
  335. CPUUtilization float64 `json:"cpu_utilization"`
  336. }
  337. // Helper functions
  338. func (ao *AdaptiveOptimizer) getCurrentCPUUtilization() float64 {
  339. // Get CPU utilization since the last call.
  340. // Interval 0 means non-blocking and compares to the last measurement.
  341. // The first call will return 0.
  342. percentages, err := cpu.Percent(0, false)
  343. if err != nil || len(percentages) == 0 {
  344. logger.Warnf("Failed to get real CPU utilization, falling back to goroutine heuristic: %v", err)
  345. // Fallback to the old, less accurate method
  346. numGoroutines := float64(runtime.NumGoroutine())
  347. maxProcs := float64(runtime.GOMAXPROCS(0))
  348. // Simple heuristic: more goroutines = higher CPU usage
  349. utilization := numGoroutines / (maxProcs * 10)
  350. if utilization > 0.95 {
  351. utilization = 0.95
  352. }
  353. return utilization
  354. }
  355. // gopsutil returns a slice, for overall usage (percpu=false), it's the first element.
  356. // The value is a percentage (e.g., 8.3), so we convert it to a 0.0-1.0 scale for our calculations.
  357. return percentages[0] / 100.0
  358. }
  359. func (ao *AdaptiveOptimizer) getCurrentThroughput() float64 {
  360. // This would be implemented to get actual throughput from the indexer
  361. ao.metricsMutex.RLock()
  362. defer ao.metricsMutex.RUnlock()
  363. return ao.avgThroughput
  364. }
  365. func (ao *AdaptiveOptimizer) getCurrentLatency() time.Duration {
  366. // This would be implemented to get actual latency from the indexer
  367. ao.metricsMutex.RLock()
  368. defer ao.metricsMutex.RUnlock()
  369. return ao.avgLatency
  370. }
  371. func (ao *AdaptiveOptimizer) isIndexerBusy() bool {
  372. if ao.activityPoller == nil {
  373. return false
  374. }
  375. return ao.activityPoller.IsBusy()
  376. }
  377. func (ao *AdaptiveOptimizer) calculateAverageCPU() float64 {
  378. if len(ao.cpuMonitor.measurements) == 0 {
  379. return 0
  380. }
  381. sum := 0.0
  382. for _, cpu := range ao.cpuMonitor.measurements {
  383. sum += cpu
  384. }
  385. return sum / float64(len(ao.cpuMonitor.measurements))
  386. }
  387. func (ao *AdaptiveOptimizer) calculateAverageThroughput(samples []PerformanceSample) float64 {
  388. if len(samples) == 0 {
  389. return 0
  390. }
  391. sum := 0.0
  392. for _, sample := range samples {
  393. sum += sample.Throughput
  394. }
  395. return sum / float64(len(samples))
  396. }
  397. func (ao *AdaptiveOptimizer) calculateAverageLatency(samples []PerformanceSample) time.Duration {
  398. if len(samples) == 0 {
  399. return 0
  400. }
  401. var sum time.Duration
  402. for _, sample := range samples {
  403. sum += sample.Latency
  404. }
  405. return sum / time.Duration(len(samples))
  406. }
  407. func (ao *AdaptiveOptimizer) updateMovingAverages() {
  408. if len(ao.performanceHistory.samples) == 0 {
  409. return
  410. }
  411. windowSize := min(ao.performanceHistory.movingAvgWindow, len(ao.performanceHistory.samples))
  412. recentSamples := ao.performanceHistory.samples[len(ao.performanceHistory.samples)-windowSize:]
  413. ao.avgThroughput = ao.calculateAverageThroughput(recentSamples)
  414. ao.avgLatency = ao.calculateAverageLatency(recentSamples)
  415. }
  416. func (ao *AdaptiveOptimizer) suggestWorkerIncrease(currentCPU, targetCPU float64) {
  417. // If already at max workers, do nothing.
  418. if ao.config.WorkerCount >= ao.cpuMonitor.maxWorkers {
  419. return
  420. }
  421. // If the indexer is not busy, don't scale up workers even if CPU is low.
  422. if ao.activityPoller != nil && !ao.activityPoller.IsBusy() {
  423. return
  424. }
  425. logger.Debug("CPU underutilized, adjusting workers upward",
  426. "current_cpu", currentCPU, "target_cpu", targetCPU)
  427. // Calculate suggested increase (conservative approach)
  428. cpuUtilizationGap := targetCPU - currentCPU
  429. increaseRatio := cpuUtilizationGap / targetCPU
  430. // Limit increase to maximum 25% at a time and at least 1 worker
  431. maxIncrease := max(1, int(float64(ao.config.WorkerCount)*0.25))
  432. suggestedIncrease := max(1, int(float64(ao.config.WorkerCount)*increaseRatio))
  433. actualIncrease := min(maxIncrease, suggestedIncrease)
  434. newWorkerCount := min(ao.cpuMonitor.maxWorkers, ao.config.WorkerCount+actualIncrease)
  435. if newWorkerCount > ao.config.WorkerCount {
  436. ao.adjustWorkerCount(newWorkerCount)
  437. logger.Infof("Increased workers from %d to %d due to CPU underutilization",
  438. ao.config.WorkerCount, newWorkerCount)
  439. }
  440. }
  441. func (ao *AdaptiveOptimizer) suggestWorkerDecrease(currentCPU, targetCPU float64) {
  442. // If already at min workers, do nothing.
  443. if ao.config.WorkerCount <= ao.cpuMonitor.minWorkers {
  444. logger.Debugf("Worker count is already at its minimum (%d), skipping decrease.", ao.config.WorkerCount)
  445. return
  446. }
  447. logger.Debug("CPU over-utilized, adjusting workers downward",
  448. "current_cpu", currentCPU, "target_cpu", targetCPU)
  449. // Calculate suggested decrease (conservative approach)
  450. cpuOverUtilization := currentCPU - targetCPU
  451. decreaseRatio := cpuOverUtilization / targetCPU // Use target CPU as base for more accurate calculation
  452. // Limit decrease to maximum 25% at a time and at least 1 worker
  453. maxDecrease := max(1, int(float64(ao.config.WorkerCount)*0.25))
  454. suggestedDecrease := max(1, int(float64(ao.config.WorkerCount)*decreaseRatio*0.5)) // More conservative decrease
  455. actualDecrease := min(maxDecrease, suggestedDecrease)
  456. newWorkerCount := max(ao.cpuMonitor.minWorkers, ao.config.WorkerCount-actualDecrease)
  457. logger.Debugf("Worker decrease calculation: current=%d, suggested=%d, min=%d, new=%d",
  458. ao.config.WorkerCount, suggestedDecrease, ao.cpuMonitor.minWorkers, newWorkerCount)
  459. if newWorkerCount < ao.config.WorkerCount {
  460. logger.Debugf("About to adjust worker count from %d to %d", ao.config.WorkerCount, newWorkerCount)
  461. ao.adjustWorkerCount(newWorkerCount)
  462. logger.Infof("Decreased workers from %d to %d due to CPU over-utilization",
  463. ao.config.WorkerCount, newWorkerCount)
  464. } else {
  465. logger.Debugf("Worker count adjustment skipped: new=%d not less than current=%d", newWorkerCount, ao.config.WorkerCount)
  466. }
  467. }
  468. // adjustWorkerCount dynamically adjusts the worker count at runtime
  469. func (ao *AdaptiveOptimizer) adjustWorkerCount(newCount int) {
  470. if newCount <= 0 || newCount == ao.config.WorkerCount {
  471. logger.Debugf("Skipping worker adjustment: newCount=%d, currentCount=%d", newCount, ao.config.WorkerCount)
  472. return
  473. }
  474. logger.Infof("Adjusting worker count from %d to %d", ao.config.WorkerCount, newCount)
  475. // Update configuration
  476. oldCount := ao.config.WorkerCount
  477. ao.config.WorkerCount = newCount
  478. // Notify the indexer about worker count change
  479. // This would typically trigger a worker pool resize in the parallel indexer
  480. if ao.onWorkerCountChange != nil {
  481. logger.Debugf("Calling worker count change callback: %d -> %d", oldCount, newCount)
  482. ao.onWorkerCountChange(oldCount, newCount)
  483. } else {
  484. logger.Warnf("Worker count change callback is nil - worker adjustment will not take effect")
  485. }
  486. // Log the adjustment for monitoring
  487. atomic.AddInt64(&ao.optimizationsMade, 1)
  488. }
  489. // Utility functions
  490. func max(a, b int) int {
  491. if a > b {
  492. return a
  493. }
  494. return b
  495. }
  496. func min(a, b int) int {
  497. if a < b {
  498. return a
  499. }
  500. return b
  501. }