1
0

adaptive_optimization_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package indexer
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. )
  8. // Test helper to create adaptive optimizer with mock config
  9. func createTestAdaptiveOptimizer(workerCount int) *AdaptiveOptimizer {
  10. config := &Config{
  11. WorkerCount: workerCount,
  12. BatchSize: 1000,
  13. }
  14. return NewAdaptiveOptimizer(config)
  15. }
  16. func TestAdaptiveOptimizer_NewAdaptiveOptimizer(t *testing.T) {
  17. config := &Config{
  18. WorkerCount: 8,
  19. BatchSize: 1000,
  20. }
  21. ao := NewAdaptiveOptimizer(config)
  22. if ao == nil {
  23. t.Fatal("NewAdaptiveOptimizer returned nil")
  24. }
  25. if ao.config.WorkerCount != 8 {
  26. t.Errorf("Expected worker count 8, got %d", ao.config.WorkerCount)
  27. }
  28. if ao.cpuMonitor.targetUtilization != 0.75 {
  29. t.Errorf("Expected target CPU utilization 0.75, got %f", ao.cpuMonitor.targetUtilization)
  30. }
  31. if ao.batchSizeController.baseBatchSize != 1000 {
  32. t.Errorf("Expected base batch size 1000, got %d", ao.batchSizeController.baseBatchSize)
  33. }
  34. }
  35. func TestAdaptiveOptimizer_SetWorkerCountChangeCallback(t *testing.T) {
  36. ao := createTestAdaptiveOptimizer(4)
  37. var callbackOldCount, callbackNewCount int
  38. callbackCalled := false
  39. ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  40. callbackOldCount = oldCount
  41. callbackNewCount = newCount
  42. callbackCalled = true
  43. })
  44. // Trigger a callback
  45. if ao.onWorkerCountChange != nil {
  46. ao.onWorkerCountChange(4, 6)
  47. }
  48. if !callbackCalled {
  49. t.Error("Expected callback to be called")
  50. }
  51. if callbackOldCount != 4 {
  52. t.Errorf("Expected old count 4, got %d", callbackOldCount)
  53. }
  54. if callbackNewCount != 6 {
  55. t.Errorf("Expected new count 6, got %d", callbackNewCount)
  56. }
  57. }
  58. func TestAdaptiveOptimizer_suggestWorkerIncrease(t *testing.T) {
  59. ao := createTestAdaptiveOptimizer(4)
  60. var actualOldCount, actualNewCount int
  61. var callbackCalled bool
  62. ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  63. actualOldCount = oldCount
  64. actualNewCount = newCount
  65. callbackCalled = true
  66. })
  67. // Test CPU underutilization scenario
  68. currentCPU := 0.5 // 50% utilization
  69. targetCPU := 0.8 // 80% target
  70. ao.suggestWorkerIncrease(currentCPU, targetCPU)
  71. if !callbackCalled {
  72. t.Error("Expected worker count change callback to be called")
  73. }
  74. if actualOldCount != 4 {
  75. t.Errorf("Expected old worker count 4, got %d", actualOldCount)
  76. }
  77. // Should increase workers, but not more than max allowed
  78. if actualNewCount <= 4 {
  79. t.Errorf("Expected new worker count to be greater than 4, got %d", actualNewCount)
  80. }
  81. // Verify config was updated
  82. if ao.config.WorkerCount != actualNewCount {
  83. t.Errorf("Expected config worker count to be updated to %d, got %d", actualNewCount, ao.config.WorkerCount)
  84. }
  85. }
  86. func TestAdaptiveOptimizer_suggestWorkerDecrease(t *testing.T) {
  87. ao := createTestAdaptiveOptimizer(8)
  88. var actualOldCount, actualNewCount int
  89. var callbackCalled bool
  90. ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  91. actualOldCount = oldCount
  92. actualNewCount = newCount
  93. callbackCalled = true
  94. })
  95. // Test CPU over-utilization scenario
  96. currentCPU := 0.95 // 95% utilization
  97. targetCPU := 0.8 // 80% target
  98. ao.suggestWorkerDecrease(currentCPU, targetCPU)
  99. if !callbackCalled {
  100. t.Error("Expected worker count change callback to be called")
  101. }
  102. if actualOldCount != 8 {
  103. t.Errorf("Expected old worker count 8, got %d", actualOldCount)
  104. }
  105. // Should decrease workers, but not below minimum
  106. if actualNewCount >= 8 {
  107. t.Errorf("Expected new worker count to be less than 8, got %d", actualNewCount)
  108. }
  109. // Should not go below minimum
  110. if actualNewCount < ao.cpuMonitor.minWorkers {
  111. t.Errorf("New worker count %d should not be below minimum %d", actualNewCount, ao.cpuMonitor.minWorkers)
  112. }
  113. // Verify config was updated
  114. if ao.config.WorkerCount != actualNewCount {
  115. t.Errorf("Expected config worker count to be updated to %d, got %d", actualNewCount, ao.config.WorkerCount)
  116. }
  117. }
  118. func TestAdaptiveOptimizer_adjustWorkerCount_NoChange(t *testing.T) {
  119. ao := createTestAdaptiveOptimizer(4)
  120. var callbackCalled bool
  121. ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  122. callbackCalled = true
  123. })
  124. // Test no change scenario
  125. ao.adjustWorkerCount(4) // Same as current
  126. if callbackCalled {
  127. t.Error("Expected no callback when worker count doesn't change")
  128. }
  129. if ao.config.WorkerCount != 4 {
  130. t.Errorf("Expected worker count to remain 4, got %d", ao.config.WorkerCount)
  131. }
  132. }
  133. func TestAdaptiveOptimizer_adjustWorkerCount_InvalidCount(t *testing.T) {
  134. ao := createTestAdaptiveOptimizer(4)
  135. var callbackCalled bool
  136. ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  137. callbackCalled = true
  138. })
  139. // Test invalid count (0 or negative)
  140. ao.adjustWorkerCount(0)
  141. ao.adjustWorkerCount(-1)
  142. if callbackCalled {
  143. t.Error("Expected no callback for invalid worker counts")
  144. }
  145. if ao.config.WorkerCount != 4 {
  146. t.Errorf("Expected worker count to remain 4, got %d", ao.config.WorkerCount)
  147. }
  148. }
  149. func TestAdaptiveOptimizer_GetOptimalBatchSize(t *testing.T) {
  150. ao := createTestAdaptiveOptimizer(4)
  151. // Initial batch size should be from config
  152. batchSize := ao.GetOptimalBatchSize()
  153. expectedInitial := int32(1000)
  154. if batchSize != int(expectedInitial) {
  155. t.Errorf("Expected initial batch size %d, got %d", expectedInitial, batchSize)
  156. }
  157. // Test updating batch size
  158. newBatchSize := int32(1500)
  159. atomic.StoreInt32(&ao.batchSizeController.currentBatchSize, newBatchSize)
  160. batchSize = ao.GetOptimalBatchSize()
  161. if batchSize != int(newBatchSize) {
  162. t.Errorf("Expected updated batch size %d, got %d", newBatchSize, batchSize)
  163. }
  164. }
  165. func TestAdaptiveOptimizer_measureAndAdjustCPU_WithinThreshold(t *testing.T) {
  166. ao := createTestAdaptiveOptimizer(4)
  167. var callbackCalled bool
  168. ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  169. callbackCalled = true
  170. })
  171. // Mock CPU measurements within threshold
  172. ao.cpuMonitor.measurements = []float64{0.78, 0.79, 0.81, 0.82} // Around 0.8 target
  173. ao.measureAndAdjustCPU()
  174. // Should not trigger worker adjustment if within threshold
  175. if callbackCalled {
  176. t.Error("Expected no worker adjustment when CPU is within threshold")
  177. }
  178. }
  179. func TestAdaptiveOptimizer_GetCPUUtilization(t *testing.T) {
  180. ao := createTestAdaptiveOptimizer(4)
  181. // Set current utilization
  182. ao.cpuMonitor.currentUtilization = 0.75
  183. utilization := ao.GetCPUUtilization()
  184. if utilization != 0.75 {
  185. t.Errorf("Expected CPU utilization 0.75, got %f", utilization)
  186. }
  187. }
  188. func TestAdaptiveOptimizer_GetOptimizationStats(t *testing.T) {
  189. ao := createTestAdaptiveOptimizer(4)
  190. // Set some test values
  191. atomic.StoreInt64(&ao.optimizationsMade, 5)
  192. ao.avgThroughput = 25.5
  193. ao.avgLatency = 2 * time.Second
  194. ao.cpuMonitor.currentUtilization = 0.85
  195. stats := ao.GetOptimizationStats()
  196. if stats.OptimizationsMade != 5 {
  197. t.Errorf("Expected 5 optimizations made, got %d", stats.OptimizationsMade)
  198. }
  199. if stats.AvgThroughput != 25.5 {
  200. t.Errorf("Expected avg throughput 25.5, got %f", stats.AvgThroughput)
  201. }
  202. if stats.AvgLatency != 2*time.Second {
  203. t.Errorf("Expected avg latency 2s, got %v", stats.AvgLatency)
  204. }
  205. if stats.CPUUtilization != 0.85 {
  206. t.Errorf("Expected CPU utilization 0.85, got %f", stats.CPUUtilization)
  207. }
  208. if stats.CurrentBatchSize != 1000 {
  209. t.Errorf("Expected current batch size 1000, got %d", stats.CurrentBatchSize)
  210. }
  211. }
  212. func TestAdaptiveOptimizer_StartStop(t *testing.T) {
  213. ao := createTestAdaptiveOptimizer(4)
  214. // Test start
  215. err := ao.Start()
  216. if err != nil {
  217. t.Fatalf("Failed to start adaptive optimizer: %v", err)
  218. }
  219. // Verify running state
  220. if atomic.LoadInt32(&ao.running) != 1 {
  221. t.Error("Expected adaptive optimizer to be running")
  222. }
  223. // Test starting again (should fail)
  224. err = ao.Start()
  225. if err == nil {
  226. t.Error("Expected error when starting already running optimizer")
  227. }
  228. // Small delay to let goroutines start
  229. time.Sleep(100 * time.Millisecond)
  230. // Test stop
  231. ao.Stop()
  232. // Verify stopped state
  233. if atomic.LoadInt32(&ao.running) != 0 {
  234. t.Error("Expected adaptive optimizer to be stopped")
  235. }
  236. }
  237. func TestAdaptiveOptimizer_WorkerAdjustmentLimits(t *testing.T) {
  238. // Test maximum worker limit
  239. ao := createTestAdaptiveOptimizer(16) // Start with high count
  240. ao.cpuMonitor.maxWorkers = 20
  241. var actualNewCount int
  242. ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  243. actualNewCount = newCount
  244. })
  245. // Try to increase beyond max
  246. ao.suggestWorkerIncrease(0.2, 0.8) // Very low CPU, should want to increase
  247. if actualNewCount > ao.cpuMonitor.maxWorkers {
  248. t.Errorf("New worker count %d exceeds maximum %d", actualNewCount, ao.cpuMonitor.maxWorkers)
  249. }
  250. // Test minimum worker limit
  251. ao2 := createTestAdaptiveOptimizer(3)
  252. ao2.cpuMonitor.minWorkers = 2
  253. ao2.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  254. actualNewCount = newCount
  255. })
  256. // Try to decrease below min
  257. ao2.suggestWorkerDecrease(0.98, 0.8) // Very high CPU, should want to decrease
  258. if actualNewCount < ao2.cpuMonitor.minWorkers {
  259. t.Errorf("New worker count %d below minimum %d", actualNewCount, ao2.cpuMonitor.minWorkers)
  260. }
  261. }
  262. func TestAdaptiveOptimizer_ConcurrentAccess(t *testing.T) {
  263. ao := createTestAdaptiveOptimizer(4)
  264. var wg sync.WaitGroup
  265. var adjustmentCount int32
  266. ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
  267. atomic.AddInt32(&adjustmentCount, 1)
  268. })
  269. // Simulate concurrent CPU measurements and adjustments
  270. for i := 0; i < 10; i++ {
  271. wg.Add(1)
  272. go func() {
  273. defer wg.Done()
  274. // Simulate alternating high and low CPU
  275. if i%2 == 0 {
  276. ao.suggestWorkerIncrease(0.4, 0.8)
  277. } else {
  278. ao.suggestWorkerDecrease(0.95, 0.8)
  279. }
  280. }()
  281. }
  282. wg.Wait()
  283. // Verify that some adjustments were made
  284. finalCount := atomic.LoadInt32(&adjustmentCount)
  285. if finalCount == 0 {
  286. t.Error("Expected some worker adjustments to be made")
  287. }
  288. // Verify final state is valid
  289. if ao.config.WorkerCount < ao.cpuMonitor.minWorkers || ao.config.WorkerCount > ao.cpuMonitor.maxWorkers {
  290. t.Errorf("Final worker count %d outside valid range [%d, %d]",
  291. ao.config.WorkerCount, ao.cpuMonitor.minWorkers, ao.cpuMonitor.maxWorkers)
  292. }
  293. }