parallel_indexer_worker_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. package indexer
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/blevesearch/bleve/v2"
  8. )
  9. // Mock shard manager for parallel indexer tests
  10. type mockShardManagerForWorkerTest struct{}
  11. func (m *mockShardManagerForWorkerTest) GetShard(key string) (bleve.Index, int, error) {
  12. return nil, 0, nil
  13. }
  14. func (m *mockShardManagerForWorkerTest) GetShardByID(id int) (bleve.Index, error) {
  15. return nil, nil
  16. }
  17. func (m *mockShardManagerForWorkerTest) GetAllShards() []bleve.Index {
  18. return []bleve.Index{}
  19. }
  20. func (m *mockShardManagerForWorkerTest) GetShardCount() int {
  21. return 1
  22. }
  23. func (m *mockShardManagerForWorkerTest) Initialize() error {
  24. return nil
  25. }
  26. func (m *mockShardManagerForWorkerTest) GetShardStats() []*ShardInfo {
  27. return []*ShardInfo{}
  28. }
  29. func (m *mockShardManagerForWorkerTest) CreateShard(id int, path string) error {
  30. return nil
  31. }
  32. func (m *mockShardManagerForWorkerTest) Close() error {
  33. return nil
  34. }
  35. func (m *mockShardManagerForWorkerTest) CloseShard(id int) error {
  36. return nil
  37. }
  38. func (m *mockShardManagerForWorkerTest) HealthCheck() error {
  39. return nil
  40. }
  41. func (m *mockShardManagerForWorkerTest) OptimizeShard(id int) error {
  42. return nil
  43. }
  44. // Test helper to create parallel indexer for worker tests
  45. func createTestParallelIndexer(workerCount int) *ParallelIndexer {
  46. config := &Config{
  47. WorkerCount: workerCount,
  48. BatchSize: 100,
  49. MaxQueueSize: 1000,
  50. }
  51. shardManager := &mockShardManagerForWorkerTest{}
  52. return NewParallelIndexer(config, shardManager)
  53. }
  54. func TestParallelIndexer_handleWorkerCountChange_Increase(t *testing.T) {
  55. pi := createTestParallelIndexer(4)
  56. // Start the indexer
  57. ctx := context.Background()
  58. err := pi.Start(ctx)
  59. if err != nil {
  60. t.Fatalf("Failed to start parallel indexer: %v", err)
  61. }
  62. defer pi.Stop()
  63. // Allow time for initialization
  64. time.Sleep(100 * time.Millisecond)
  65. initialWorkerCount := len(pi.workers)
  66. if initialWorkerCount != 4 {
  67. t.Fatalf("Expected 4 initial workers, got %d", initialWorkerCount)
  68. }
  69. // Test increasing worker count
  70. pi.handleWorkerCountChange(4, 6)
  71. // Verify worker count increased
  72. newWorkerCount := len(pi.workers)
  73. if newWorkerCount != 6 {
  74. t.Errorf("Expected 6 workers after increase, got %d", newWorkerCount)
  75. }
  76. // Verify config was updated
  77. if pi.config.WorkerCount != 6 {
  78. t.Errorf("Expected config worker count to be 6, got %d", pi.config.WorkerCount)
  79. }
  80. // Verify stats were updated
  81. if len(pi.stats.WorkerStats) != 6 {
  82. t.Errorf("Expected 6 worker stats, got %d", len(pi.stats.WorkerStats))
  83. }
  84. }
  85. func TestParallelIndexer_handleWorkerCountChange_Decrease(t *testing.T) {
  86. pi := createTestParallelIndexer(6)
  87. // Start the indexer
  88. ctx := context.Background()
  89. err := pi.Start(ctx)
  90. if err != nil {
  91. t.Fatalf("Failed to start parallel indexer: %v", err)
  92. }
  93. defer pi.Stop()
  94. // Allow time for initialization
  95. time.Sleep(100 * time.Millisecond)
  96. initialWorkerCount := len(pi.workers)
  97. if initialWorkerCount != 6 {
  98. t.Fatalf("Expected 6 initial workers, got %d", initialWorkerCount)
  99. }
  100. // Test decreasing worker count
  101. pi.handleWorkerCountChange(6, 4)
  102. // Verify worker count decreased
  103. newWorkerCount := len(pi.workers)
  104. if newWorkerCount != 4 {
  105. t.Errorf("Expected 4 workers after decrease, got %d", newWorkerCount)
  106. }
  107. // Verify config was updated
  108. if pi.config.WorkerCount != 4 {
  109. t.Errorf("Expected config worker count to be 4, got %d", pi.config.WorkerCount)
  110. }
  111. // Verify stats were updated
  112. if len(pi.stats.WorkerStats) != 4 {
  113. t.Errorf("Expected 4 worker stats, got %d", len(pi.stats.WorkerStats))
  114. }
  115. }
  116. func TestParallelIndexer_handleWorkerCountChange_NoChange(t *testing.T) {
  117. pi := createTestParallelIndexer(4)
  118. // Start the indexer
  119. ctx := context.Background()
  120. err := pi.Start(ctx)
  121. if err != nil {
  122. t.Fatalf("Failed to start parallel indexer: %v", err)
  123. }
  124. defer pi.Stop()
  125. // Allow time for initialization
  126. time.Sleep(100 * time.Millisecond)
  127. initialWorkerCount := len(pi.workers)
  128. // Test no change scenario
  129. pi.handleWorkerCountChange(4, 4)
  130. // Verify worker count didn't change
  131. newWorkerCount := len(pi.workers)
  132. if newWorkerCount != initialWorkerCount {
  133. t.Errorf("Expected worker count to remain %d, got %d", initialWorkerCount, newWorkerCount)
  134. }
  135. }
  136. func TestParallelIndexer_handleWorkerCountChange_NotRunning(t *testing.T) {
  137. pi := createTestParallelIndexer(4)
  138. // Don't start the indexer - it should be in stopped state
  139. initialWorkerCount := len(pi.workers)
  140. // Test worker count change when not running
  141. pi.handleWorkerCountChange(4, 6)
  142. // Verify no change occurred
  143. newWorkerCount := len(pi.workers)
  144. if newWorkerCount != initialWorkerCount {
  145. t.Errorf("Expected no worker change when not running, initial: %d, new: %d",
  146. initialWorkerCount, newWorkerCount)
  147. }
  148. // Verify config wasn't updated
  149. if pi.config.WorkerCount != 4 {
  150. t.Errorf("Expected config worker count to remain 4, got %d", pi.config.WorkerCount)
  151. }
  152. }
  153. func TestParallelIndexer_addWorkers(t *testing.T) {
  154. pi := createTestParallelIndexer(2)
  155. // Start the indexer
  156. ctx := context.Background()
  157. err := pi.Start(ctx)
  158. if err != nil {
  159. t.Fatalf("Failed to start parallel indexer: %v", err)
  160. }
  161. defer pi.Stop()
  162. // Allow time for initialization
  163. time.Sleep(100 * time.Millisecond)
  164. initialCount := len(pi.workers)
  165. if initialCount != 2 {
  166. t.Fatalf("Expected 2 initial workers, got %d", initialCount)
  167. }
  168. // Add 3 workers
  169. pi.addWorkers(3)
  170. // Verify workers were added
  171. newCount := len(pi.workers)
  172. if newCount != 5 {
  173. t.Errorf("Expected 5 workers after adding 3, got %d", newCount)
  174. }
  175. // Verify worker IDs are sequential
  176. for i, worker := range pi.workers {
  177. if worker.id != i {
  178. t.Errorf("Expected worker %d to have ID %d, got %d", i, i, worker.id)
  179. }
  180. }
  181. // Verify stats were updated
  182. if len(pi.stats.WorkerStats) != 5 {
  183. t.Errorf("Expected 5 worker stats, got %d", len(pi.stats.WorkerStats))
  184. }
  185. }
  186. func TestParallelIndexer_removeWorkers(t *testing.T) {
  187. pi := createTestParallelIndexer(5)
  188. // Start the indexer
  189. ctx := context.Background()
  190. err := pi.Start(ctx)
  191. if err != nil {
  192. t.Fatalf("Failed to start parallel indexer: %v", err)
  193. }
  194. defer pi.Stop()
  195. // Allow time for initialization
  196. time.Sleep(100 * time.Millisecond)
  197. initialCount := len(pi.workers)
  198. if initialCount != 5 {
  199. t.Fatalf("Expected 5 initial workers, got %d", initialCount)
  200. }
  201. // Remove 2 workers
  202. pi.removeWorkers(2)
  203. // Verify workers were removed
  204. newCount := len(pi.workers)
  205. if newCount != 3 {
  206. t.Errorf("Expected 3 workers after removing 2, got %d", newCount)
  207. }
  208. // Verify stats were updated
  209. if len(pi.stats.WorkerStats) != 3 {
  210. t.Errorf("Expected 3 worker stats, got %d", len(pi.stats.WorkerStats))
  211. }
  212. }
  213. func TestParallelIndexer_removeWorkers_KeepMinimum(t *testing.T) {
  214. pi := createTestParallelIndexer(2)
  215. // Start the indexer
  216. ctx := context.Background()
  217. err := pi.Start(ctx)
  218. if err != nil {
  219. t.Fatalf("Failed to start parallel indexer: %v", err)
  220. }
  221. defer pi.Stop()
  222. // Allow time for initialization
  223. time.Sleep(100 * time.Millisecond)
  224. initialCount := len(pi.workers)
  225. if initialCount != 2 {
  226. t.Fatalf("Expected 2 initial workers, got %d", initialCount)
  227. }
  228. // Try to remove all workers (should keep at least one)
  229. pi.removeWorkers(2)
  230. // Verify at least one worker remains
  231. newCount := len(pi.workers)
  232. if newCount != 1 {
  233. t.Errorf("Expected 1 worker to remain after trying to remove all, got %d", newCount)
  234. }
  235. // Verify stats were updated
  236. if len(pi.stats.WorkerStats) != 1 {
  237. t.Errorf("Expected 1 worker stat, got %d", len(pi.stats.WorkerStats))
  238. }
  239. }
  240. func TestParallelIndexer_AdaptiveOptimizerIntegration(t *testing.T) {
  241. pi := createTestParallelIndexer(4)
  242. // Enable optimization
  243. pi.optimizationEnabled = true
  244. pi.adaptiveOptimizer = NewAdaptiveOptimizer(pi.config)
  245. // Start the indexer
  246. ctx := context.Background()
  247. err := pi.Start(ctx)
  248. if err != nil {
  249. t.Fatalf("Failed to start parallel indexer: %v", err)
  250. }
  251. defer pi.Stop()
  252. // Allow time for initialization
  253. time.Sleep(100 * time.Millisecond)
  254. // Verify adaptive optimizer callback was set
  255. if pi.adaptiveOptimizer.onWorkerCountChange == nil {
  256. t.Error("Expected adaptive optimizer callback to be set")
  257. }
  258. // Simulate worker count change from adaptive optimizer
  259. initialWorkerCount := len(pi.workers)
  260. // Trigger callback (simulate adaptive optimizer decision)
  261. if pi.adaptiveOptimizer.onWorkerCountChange != nil {
  262. pi.adaptiveOptimizer.onWorkerCountChange(4, 6)
  263. }
  264. // Verify worker count changed
  265. newWorkerCount := len(pi.workers)
  266. if newWorkerCount == initialWorkerCount {
  267. t.Error("Expected worker count to change from adaptive optimizer callback")
  268. }
  269. }
  270. func TestParallelIndexer_ConcurrentWorkerAdjustments(t *testing.T) {
  271. pi := createTestParallelIndexer(4)
  272. // Start the indexer
  273. ctx := context.Background()
  274. err := pi.Start(ctx)
  275. if err != nil {
  276. t.Fatalf("Failed to start parallel indexer: %v", err)
  277. }
  278. defer pi.Stop()
  279. // Allow time for initialization
  280. time.Sleep(100 * time.Millisecond)
  281. var wg sync.WaitGroup
  282. // Simulate concurrent worker adjustments
  283. for i := 0; i < 10; i++ {
  284. wg.Add(1)
  285. go func(iteration int) {
  286. defer wg.Done()
  287. // Alternate between increasing and decreasing
  288. if iteration%2 == 0 {
  289. pi.handleWorkerCountChange(pi.config.WorkerCount, pi.config.WorkerCount+1)
  290. } else {
  291. if pi.config.WorkerCount > 2 {
  292. pi.handleWorkerCountChange(pi.config.WorkerCount, pi.config.WorkerCount-1)
  293. }
  294. }
  295. }(i)
  296. }
  297. wg.Wait()
  298. // Verify final state is consistent
  299. workerCount := len(pi.workers)
  300. configCount := pi.config.WorkerCount
  301. statsCount := len(pi.stats.WorkerStats)
  302. if workerCount != configCount {
  303. t.Errorf("Worker count (%d) doesn't match config count (%d)", workerCount, configCount)
  304. }
  305. if workerCount != statsCount {
  306. t.Errorf("Worker count (%d) doesn't match stats count (%d)", workerCount, statsCount)
  307. }
  308. // Verify worker IDs are sequential and unique
  309. workerIDs := make(map[int]bool)
  310. for i, worker := range pi.workers {
  311. if worker.id != i {
  312. t.Errorf("Expected worker at index %d to have ID %d, got %d", i, i, worker.id)
  313. }
  314. if workerIDs[worker.id] {
  315. t.Errorf("Duplicate worker ID found: %d", worker.id)
  316. }
  317. workerIDs[worker.id] = true
  318. }
  319. }
  320. func TestParallelIndexer_WorkerStatsConsistency(t *testing.T) {
  321. pi := createTestParallelIndexer(3)
  322. // Start the indexer
  323. ctx := context.Background()
  324. err := pi.Start(ctx)
  325. if err != nil {
  326. t.Fatalf("Failed to start parallel indexer: %v", err)
  327. }
  328. defer pi.Stop()
  329. // Allow time for initialization
  330. time.Sleep(100 * time.Millisecond)
  331. // Test adding workers
  332. pi.addWorkers(2)
  333. // Verify stats consistency
  334. workerCount := len(pi.workers)
  335. statsCount := len(pi.stats.WorkerStats)
  336. if workerCount != statsCount {
  337. t.Errorf("Worker count (%d) doesn't match stats count (%d)", workerCount, statsCount)
  338. }
  339. // Verify each worker has corresponding stats
  340. for i, worker := range pi.workers {
  341. if pi.stats.WorkerStats[i].ID != worker.id {
  342. t.Errorf("Worker %d ID (%d) doesn't match stats ID (%d)",
  343. i, worker.id, pi.stats.WorkerStats[i].ID)
  344. }
  345. if worker.stats != pi.stats.WorkerStats[i] {
  346. t.Errorf("Worker %d stats pointer doesn't match global stats", i)
  347. }
  348. }
  349. // Test removing workers
  350. pi.removeWorkers(1)
  351. // Verify stats consistency after removal
  352. workerCount = len(pi.workers)
  353. statsCount = len(pi.stats.WorkerStats)
  354. if workerCount != statsCount {
  355. t.Errorf("After removal, worker count (%d) doesn't match stats count (%d)",
  356. workerCount, statsCount)
  357. }
  358. }