dynamic_shard_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. package indexer
  2. import (
  3. "context"
  4. "runtime"
  5. "testing"
  6. "time"
  7. "github.com/blevesearch/bleve/v2"
  8. )
  9. // TestDynamicShardAwareness tests the dynamic shard awareness system
  10. func TestDynamicShardAwareness(t *testing.T) {
  11. config := DefaultIndexerConfig()
  12. // Create dynamic awareness
  13. dsa := NewDynamicShardAwareness(config)
  14. // Test environment factor analysis
  15. factors := dsa.analyzeEnvironmentFactors()
  16. t.Logf("Environment Analysis:")
  17. t.Logf(" CPU Cores: %d", factors.CPUCores)
  18. t.Logf(" Memory GB: %.2f", factors.MemoryGB)
  19. t.Logf(" Expected Load: %s", factors.ExpectedLoad)
  20. t.Logf(" Data Volume: %s", factors.DataVolume)
  21. t.Logf(" Query Patterns: %s", factors.QueryPatterns)
  22. // Test shard manager selection
  23. shouldUseDynamic := dsa.shouldUseDynamicShards(factors)
  24. t.Logf("Should use dynamic shards: %v", shouldUseDynamic)
  25. // Test manager setup
  26. manager, err := dsa.DetectAndSetupShardManager()
  27. if err != nil {
  28. t.Fatalf("Failed to setup shard manager: %v", err)
  29. }
  30. isDynamic := dsa.IsDynamic()
  31. t.Logf("Dynamic shard management active: %v", isDynamic)
  32. // Verify manager type
  33. if isDynamic {
  34. if _, ok := manager.(*EnhancedDynamicShardManager); !ok {
  35. t.Errorf("Expected EnhancedDynamicShardManager, got %T", manager)
  36. }
  37. t.Logf("✅ Dynamic shard manager successfully created")
  38. } else {
  39. if _, ok := manager.(*DefaultShardManager); !ok {
  40. t.Errorf("Expected DefaultShardManager, got %T", manager)
  41. }
  42. t.Logf("✅ Static shard manager successfully created")
  43. }
  44. }
  45. // TestEnhancedDynamicShardManager tests the enhanced shard manager functionality
  46. func TestEnhancedDynamicShardManager(t *testing.T) {
  47. config := DefaultIndexerConfig()
  48. config.IndexPath = t.TempDir()
  49. // Create enhanced dynamic shard manager
  50. dsm := NewEnhancedDynamicShardManager(config)
  51. // Initialize
  52. if err := dsm.Initialize(); err != nil {
  53. t.Fatalf("Failed to initialize enhanced shard manager: %v", err)
  54. }
  55. // Check initial shard count
  56. initialCount := dsm.GetShardCount()
  57. t.Logf("Initial shard count: %d", initialCount)
  58. if initialCount != config.ShardCount {
  59. t.Errorf("Expected initial shard count %d, got %d", config.ShardCount, initialCount)
  60. }
  61. // Test scaling up
  62. targetCount := initialCount + 2
  63. t.Logf("Testing scale up to %d shards", targetCount)
  64. if err := dsm.ScaleShards(targetCount); err != nil {
  65. t.Errorf("Failed to scale up shards: %v", err)
  66. } else {
  67. newCount := dsm.GetShardCount()
  68. t.Logf("After scaling up: %d shards", newCount)
  69. if newCount != targetCount {
  70. t.Errorf("Expected %d shards after scaling up, got %d", targetCount, newCount)
  71. } else {
  72. t.Logf("✅ Scale up successful: %d → %d shards", initialCount, newCount)
  73. }
  74. }
  75. // Test scaling down
  76. targetCount = initialCount
  77. t.Logf("Testing scale down to %d shards", targetCount)
  78. if err := dsm.ScaleShards(targetCount); err != nil {
  79. t.Errorf("Failed to scale down shards: %v", err)
  80. } else {
  81. newCount := dsm.GetShardCount()
  82. t.Logf("After scaling down: %d shards", newCount)
  83. if newCount != targetCount {
  84. t.Errorf("Expected %d shards after scaling down, got %d", targetCount, newCount)
  85. } else {
  86. t.Logf("✅ Scale down successful: %d → %d shards", initialCount+2, newCount)
  87. }
  88. }
  89. // Cleanup
  90. if err := dsm.Close(); err != nil {
  91. t.Errorf("Failed to close shard manager: %v", err)
  92. }
  93. }
  94. // TestParallelIndexerWithDynamicShards tests ParallelIndexer with dynamic shard awareness
  95. func TestParallelIndexerWithDynamicShards(t *testing.T) {
  96. config := DefaultIndexerConfig()
  97. config.IndexPath = t.TempDir()
  98. config.WorkerCount = runtime.GOMAXPROCS(0) * 2 // Ensure high worker count for dynamic detection
  99. // Create indexer with nil shard manager to trigger dynamic detection
  100. indexer := NewParallelIndexer(config, nil)
  101. // Check if dynamic awareness is working
  102. if indexer.dynamicAwareness == nil {
  103. t.Fatal("Dynamic awareness should be initialized")
  104. }
  105. isDynamic := indexer.dynamicAwareness.IsDynamic()
  106. t.Logf("Dynamic shard management detected: %v", isDynamic)
  107. currentManager := indexer.dynamicAwareness.GetCurrentShardManager()
  108. t.Logf("Current shard manager type: %T", currentManager)
  109. // For M2 Pro with 12 cores, 24 workers, should detect dynamic management
  110. if runtime.GOMAXPROCS(0) >= 8 {
  111. if !isDynamic {
  112. t.Errorf("Expected dynamic shard management on high-core system (Procs: %d)", runtime.GOMAXPROCS(0))
  113. } else {
  114. t.Logf("✅ Dynamic shard management correctly detected on high-core system")
  115. }
  116. }
  117. // Test starting and stopping
  118. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  119. defer cancel()
  120. if err := indexer.Start(ctx); err != nil {
  121. t.Fatalf("Failed to start indexer: %v", err)
  122. }
  123. // Let it run briefly
  124. time.Sleep(1 * time.Second)
  125. if err := indexer.Stop(); err != nil {
  126. t.Errorf("Failed to stop indexer: %v", err)
  127. }
  128. t.Logf("✅ ParallelIndexer with dynamic shard awareness started and stopped successfully")
  129. }
  130. // TestDataMigrationDuringScaleDown tests that data is properly migrated during shard scale-down
  131. func TestDataMigrationDuringScaleDown(t *testing.T) {
  132. config := DefaultIndexerConfig()
  133. config.IndexPath = t.TempDir()
  134. config.ShardCount = 4 // Start with 4 shards
  135. // Create enhanced dynamic shard manager
  136. dsm := NewEnhancedDynamicShardManager(config)
  137. // Initialize
  138. if err := dsm.Initialize(); err != nil {
  139. t.Fatalf("Failed to initialize enhanced shard manager: %v", err)
  140. }
  141. defer dsm.Close()
  142. t.Logf("✅ Initialized shard manager with %d shards", dsm.GetShardCount())
  143. // Add test documents to different shards
  144. testDocs := []struct {
  145. id string
  146. data map[string]interface{}
  147. }{
  148. {"doc1", map[string]interface{}{"content": "test document 1", "type": "log"}},
  149. {"doc2", map[string]interface{}{"content": "test document 2", "type": "log"}},
  150. {"doc3", map[string]interface{}{"content": "test document 3", "type": "log"}},
  151. {"doc4", map[string]interface{}{"content": "test document 4", "type": "log"}},
  152. {"doc5", map[string]interface{}{"content": "test document 5", "type": "log"}},
  153. {"doc6", map[string]interface{}{"content": "test document 6", "type": "log"}},
  154. }
  155. // Index documents across shards
  156. var totalDocs int64
  157. shardDocCounts := make(map[int]int64)
  158. for _, testDoc := range testDocs {
  159. // Determine which shard this document belongs to
  160. shardID := dsm.DefaultShardManager.hashFunc(testDoc.id, config.ShardCount)
  161. shard, err := dsm.GetShardByID(shardID)
  162. if err != nil {
  163. t.Fatalf("Failed to get shard %d: %v", shardID, err)
  164. }
  165. // Index the document
  166. if err := shard.Index(testDoc.id, testDoc.data); err != nil {
  167. t.Fatalf("Failed to index document %s in shard %d: %v", testDoc.id, shardID, err)
  168. }
  169. shardDocCounts[shardID]++
  170. totalDocs++
  171. }
  172. t.Logf("✅ Indexed %d documents across shards", totalDocs)
  173. // Log distribution before scaling
  174. for shardID, count := range shardDocCounts {
  175. t.Logf("Shard %d: %d documents", shardID, count)
  176. }
  177. // Count total documents before scaling
  178. var beforeCount uint64
  179. for i := 0; i < config.ShardCount; i++ {
  180. shard, err := dsm.GetShardByID(i)
  181. if err != nil {
  182. continue
  183. }
  184. count, _ := shard.DocCount()
  185. beforeCount += count
  186. }
  187. t.Logf("Total documents before scaling: %d", beforeCount)
  188. // Scale down from 4 to 2 shards (should migrate data from shards 2 and 3)
  189. targetShards := 2
  190. t.Logf("🔄 Scaling down from %d to %d shards", config.ShardCount, targetShards)
  191. err := dsm.ScaleShards(targetShards)
  192. if err != nil {
  193. t.Fatalf("Failed to scale down shards: %v", err)
  194. }
  195. // Verify final shard count
  196. finalShardCount := dsm.GetShardCount()
  197. if finalShardCount != targetShards {
  198. t.Fatalf("Expected %d shards after scaling, got %d", targetShards, finalShardCount)
  199. }
  200. // Count total documents after scaling
  201. var afterCount uint64
  202. for i := 0; i < targetShards; i++ {
  203. shard, err := dsm.GetShardByID(i)
  204. if err != nil {
  205. t.Errorf("Failed to get shard %d after scaling: %v", i, err)
  206. continue
  207. }
  208. count, _ := shard.DocCount()
  209. afterCount += count
  210. t.Logf("Shard %d after scaling: %d documents", i, count)
  211. }
  212. t.Logf("Total documents after scaling: %d", afterCount)
  213. // Verify no data loss
  214. if afterCount != beforeCount {
  215. t.Errorf("Data loss detected! Before: %d documents, After: %d documents", beforeCount, afterCount)
  216. } else {
  217. t.Logf("✅ No data loss: %d documents preserved", afterCount)
  218. }
  219. // Verify all original documents are still searchable
  220. for _, testDoc := range testDocs {
  221. found := false
  222. for i := 0; i < targetShards; i++ {
  223. shard, err := dsm.GetShardByID(i)
  224. if err != nil {
  225. continue
  226. }
  227. // Try to find the document
  228. doc, err := shard.Document(testDoc.id)
  229. if err == nil && doc != nil {
  230. found = true
  231. t.Logf("✅ Document %s found in shard %d after migration", testDoc.id, i)
  232. break
  233. }
  234. }
  235. if !found {
  236. t.Errorf("❌ Document %s not found after migration", testDoc.id)
  237. }
  238. }
  239. // Test searching across all remaining shards
  240. for i := 0; i < targetShards; i++ {
  241. shard, err := dsm.GetShardByID(i)
  242. if err != nil {
  243. continue
  244. }
  245. // Search for all documents
  246. query := bleve.NewMatchAllQuery()
  247. searchReq := bleve.NewSearchRequest(query)
  248. searchReq.Size = 100
  249. results, err := shard.Search(searchReq)
  250. if err != nil {
  251. t.Errorf("Search failed in shard %d: %v", i, err)
  252. continue
  253. }
  254. t.Logf("Shard %d search results: %d hits", i, len(results.Hits))
  255. }
  256. t.Logf("✅ Data migration during scale-down completed successfully")
  257. }
  258. // TestDataMigrationBasicValidation tests the core data migration logic
  259. func TestDataMigrationBasicValidation(t *testing.T) {
  260. config := DefaultIndexerConfig()
  261. config.IndexPath = t.TempDir()
  262. config.ShardCount = 3 // Start with 3 shards
  263. // Create enhanced dynamic shard manager
  264. dsm := NewEnhancedDynamicShardManager(config)
  265. // Initialize
  266. if err := dsm.Initialize(); err != nil {
  267. t.Fatalf("Failed to initialize enhanced shard manager: %v", err)
  268. }
  269. defer dsm.Close()
  270. t.Logf("✅ Initialized with %d shards", dsm.GetShardCount())
  271. // Add test documents to shard 2 (which we'll migrate)
  272. testDocs := []struct {
  273. id string
  274. data map[string]interface{}
  275. }{
  276. {"test1", map[string]interface{}{"content": "migration test 1", "type": "test"}},
  277. {"test2", map[string]interface{}{"content": "migration test 2", "type": "test"}},
  278. }
  279. // Index documents directly to shard 2
  280. shard2, err := dsm.GetShardByID(2)
  281. if err != nil {
  282. t.Fatalf("Failed to get shard 2: %v", err)
  283. }
  284. for _, doc := range testDocs {
  285. if err := shard2.Index(doc.id, doc.data); err != nil {
  286. t.Fatalf("Failed to index %s: %v", doc.id, err)
  287. }
  288. }
  289. // Verify documents are in shard 2
  290. count, err := shard2.DocCount()
  291. if err != nil {
  292. t.Fatalf("Failed to get shard 2 doc count: %v", err)
  293. }
  294. t.Logf("Shard 2 has %d documents before migration", count)
  295. // Test direct data migration function (bypass ScaleShards to avoid lock issues)
  296. migratedCount, err := dsm.migrateShardData(2, 2) // Migrate shard 2 to shards 0-1
  297. if err != nil {
  298. t.Fatalf("Data migration failed: %v", err)
  299. }
  300. t.Logf("✅ Successfully migrated %d documents from shard 2", migratedCount)
  301. // Verify source shard is now empty
  302. count, err = shard2.DocCount()
  303. if err != nil {
  304. t.Fatalf("Failed to get shard 2 doc count after migration: %v", err)
  305. }
  306. if count != 0 {
  307. t.Errorf("Expected shard 2 to be empty, but has %d documents", count)
  308. } else {
  309. t.Logf("✅ Source shard 2 is now empty")
  310. }
  311. // Verify target shards received the documents
  312. var totalFound uint64
  313. for i := 0; i < 2; i++ {
  314. shard, err := dsm.GetShardByID(i)
  315. if err != nil {
  316. continue
  317. }
  318. count, _ := shard.DocCount()
  319. totalFound += count
  320. if count > 0 {
  321. t.Logf("Shard %d now has %d documents", i, count)
  322. }
  323. }
  324. if totalFound < uint64(len(testDocs)) {
  325. t.Errorf("Expected at least %d documents in target shards, found %d", len(testDocs), totalFound)
  326. } else {
  327. t.Logf("✅ All %d documents successfully migrated to target shards", totalFound)
  328. }
  329. // Verify documents are searchable in target shards
  330. foundDocs := make(map[string]bool)
  331. for i := 0; i < 2; i++ {
  332. shard, err := dsm.GetShardByID(i)
  333. if err != nil {
  334. continue
  335. }
  336. for _, testDoc := range testDocs {
  337. _, err := shard.Document(testDoc.id)
  338. if err == nil && !foundDocs[testDoc.id] {
  339. foundDocs[testDoc.id] = true
  340. t.Logf("✅ Document %s found in shard %d", testDoc.id, i)
  341. }
  342. }
  343. }
  344. if len(foundDocs) != len(testDocs) {
  345. t.Errorf("Expected to find %d unique documents, found %d", len(testDocs), len(foundDocs))
  346. } else {
  347. t.Logf("✅ All %d documents are searchable after migration", len(foundDocs))
  348. }
  349. }