123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- package indexer
- import (
- "context"
- "runtime"
- "testing"
- "time"
-
- "github.com/blevesearch/bleve/v2"
- )
- // TestDynamicShardAwareness tests the dynamic shard awareness system
- func TestDynamicShardAwareness(t *testing.T) {
- config := DefaultIndexerConfig()
-
- // Create dynamic awareness
- dsa := NewDynamicShardAwareness(config)
-
- // Test environment factor analysis
- factors := dsa.analyzeEnvironmentFactors()
-
- t.Logf("Environment Analysis:")
- t.Logf(" CPU Cores: %d", factors.CPUCores)
- t.Logf(" Memory GB: %.2f", factors.MemoryGB)
- t.Logf(" Expected Load: %s", factors.ExpectedLoad)
- t.Logf(" Data Volume: %s", factors.DataVolume)
- t.Logf(" Query Patterns: %s", factors.QueryPatterns)
-
- // Test shard manager selection
- shouldUseDynamic := dsa.shouldUseDynamicShards(factors)
- t.Logf("Should use dynamic shards: %v", shouldUseDynamic)
-
- // Test manager setup
- manager, err := dsa.DetectAndSetupShardManager()
- if err != nil {
- t.Fatalf("Failed to setup shard manager: %v", err)
- }
-
- isDynamic := dsa.IsDynamic()
- t.Logf("Dynamic shard management active: %v", isDynamic)
-
- // Verify manager type
- if isDynamic {
- if _, ok := manager.(*EnhancedDynamicShardManager); !ok {
- t.Errorf("Expected EnhancedDynamicShardManager, got %T", manager)
- }
- t.Logf("✅ Dynamic shard manager successfully created")
- } else {
- if _, ok := manager.(*DefaultShardManager); !ok {
- t.Errorf("Expected DefaultShardManager, got %T", manager)
- }
- t.Logf("✅ Static shard manager successfully created")
- }
- }
- // TestEnhancedDynamicShardManager tests the enhanced shard manager functionality
- func TestEnhancedDynamicShardManager(t *testing.T) {
- config := DefaultIndexerConfig()
- config.IndexPath = t.TempDir()
-
- // Create enhanced dynamic shard manager
- dsm := NewEnhancedDynamicShardManager(config)
-
- // Initialize
- if err := dsm.Initialize(); err != nil {
- t.Fatalf("Failed to initialize enhanced shard manager: %v", err)
- }
-
- // Check initial shard count
- initialCount := dsm.GetShardCount()
- t.Logf("Initial shard count: %d", initialCount)
-
- if initialCount != config.ShardCount {
- t.Errorf("Expected initial shard count %d, got %d", config.ShardCount, initialCount)
- }
-
- // Test scaling up
- targetCount := initialCount + 2
- t.Logf("Testing scale up to %d shards", targetCount)
-
- if err := dsm.ScaleShards(targetCount); err != nil {
- t.Errorf("Failed to scale up shards: %v", err)
- } else {
- newCount := dsm.GetShardCount()
- t.Logf("After scaling up: %d shards", newCount)
-
- if newCount != targetCount {
- t.Errorf("Expected %d shards after scaling up, got %d", targetCount, newCount)
- } else {
- t.Logf("✅ Scale up successful: %d → %d shards", initialCount, newCount)
- }
- }
-
- // Test scaling down
- targetCount = initialCount
- t.Logf("Testing scale down to %d shards", targetCount)
-
- if err := dsm.ScaleShards(targetCount); err != nil {
- t.Errorf("Failed to scale down shards: %v", err)
- } else {
- newCount := dsm.GetShardCount()
- t.Logf("After scaling down: %d shards", newCount)
-
- if newCount != targetCount {
- t.Errorf("Expected %d shards after scaling down, got %d", targetCount, newCount)
- } else {
- t.Logf("✅ Scale down successful: %d → %d shards", initialCount+2, newCount)
- }
- }
-
- // Cleanup
- if err := dsm.Close(); err != nil {
- t.Errorf("Failed to close shard manager: %v", err)
- }
- }
- // TestParallelIndexerWithDynamicShards tests ParallelIndexer with dynamic shard awareness
- func TestParallelIndexerWithDynamicShards(t *testing.T) {
- config := DefaultIndexerConfig()
- config.IndexPath = t.TempDir()
- config.WorkerCount = runtime.GOMAXPROCS(0) * 2 // Ensure high worker count for dynamic detection
-
- // Create indexer with nil shard manager to trigger dynamic detection
- indexer := NewParallelIndexer(config, nil)
-
- // Check if dynamic awareness is working
- if indexer.dynamicAwareness == nil {
- t.Fatal("Dynamic awareness should be initialized")
- }
-
- isDynamic := indexer.dynamicAwareness.IsDynamic()
- t.Logf("Dynamic shard management detected: %v", isDynamic)
-
- currentManager := indexer.dynamicAwareness.GetCurrentShardManager()
- t.Logf("Current shard manager type: %T", currentManager)
-
- // For M2 Pro with 12 cores, 24 workers, should detect dynamic management
- if runtime.GOMAXPROCS(0) >= 8 {
- if !isDynamic {
- t.Errorf("Expected dynamic shard management on high-core system (Procs: %d)", runtime.GOMAXPROCS(0))
- } else {
- t.Logf("✅ Dynamic shard management correctly detected on high-core system")
- }
- }
-
- // Test starting and stopping
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- if err := indexer.Start(ctx); err != nil {
- t.Fatalf("Failed to start indexer: %v", err)
- }
-
- // Let it run briefly
- time.Sleep(1 * time.Second)
-
- if err := indexer.Stop(); err != nil {
- t.Errorf("Failed to stop indexer: %v", err)
- }
-
- t.Logf("✅ ParallelIndexer with dynamic shard awareness started and stopped successfully")
- }
- // TestDataMigrationDuringScaleDown tests that data is properly migrated during shard scale-down
- func TestDataMigrationDuringScaleDown(t *testing.T) {
- config := DefaultIndexerConfig()
- config.IndexPath = t.TempDir()
- config.ShardCount = 4 // Start with 4 shards
-
- // Create enhanced dynamic shard manager
- dsm := NewEnhancedDynamicShardManager(config)
-
- // Initialize
- if err := dsm.Initialize(); err != nil {
- t.Fatalf("Failed to initialize enhanced shard manager: %v", err)
- }
- defer dsm.Close()
-
- t.Logf("✅ Initialized shard manager with %d shards", dsm.GetShardCount())
-
- // Add test documents to different shards
- testDocs := []struct {
- id string
- data map[string]interface{}
- }{
- {"doc1", map[string]interface{}{"content": "test document 1", "type": "log"}},
- {"doc2", map[string]interface{}{"content": "test document 2", "type": "log"}},
- {"doc3", map[string]interface{}{"content": "test document 3", "type": "log"}},
- {"doc4", map[string]interface{}{"content": "test document 4", "type": "log"}},
- {"doc5", map[string]interface{}{"content": "test document 5", "type": "log"}},
- {"doc6", map[string]interface{}{"content": "test document 6", "type": "log"}},
- }
-
- // Index documents across shards
- var totalDocs int64
- shardDocCounts := make(map[int]int64)
-
- for _, testDoc := range testDocs {
- // Determine which shard this document belongs to
- shardID := dsm.DefaultShardManager.hashFunc(testDoc.id, config.ShardCount)
- shard, err := dsm.GetShardByID(shardID)
- if err != nil {
- t.Fatalf("Failed to get shard %d: %v", shardID, err)
- }
-
- // Index the document
- if err := shard.Index(testDoc.id, testDoc.data); err != nil {
- t.Fatalf("Failed to index document %s in shard %d: %v", testDoc.id, shardID, err)
- }
-
- shardDocCounts[shardID]++
- totalDocs++
- }
-
- t.Logf("✅ Indexed %d documents across shards", totalDocs)
-
- // Log distribution before scaling
- for shardID, count := range shardDocCounts {
- t.Logf("Shard %d: %d documents", shardID, count)
- }
-
- // Count total documents before scaling
- var beforeCount uint64
- for i := 0; i < config.ShardCount; i++ {
- shard, err := dsm.GetShardByID(i)
- if err != nil {
- continue
- }
- count, _ := shard.DocCount()
- beforeCount += count
- }
- t.Logf("Total documents before scaling: %d", beforeCount)
-
- // Scale down from 4 to 2 shards (should migrate data from shards 2 and 3)
- targetShards := 2
- t.Logf("🔄 Scaling down from %d to %d shards", config.ShardCount, targetShards)
-
- err := dsm.ScaleShards(targetShards)
- if err != nil {
- t.Fatalf("Failed to scale down shards: %v", err)
- }
-
- // Verify final shard count
- finalShardCount := dsm.GetShardCount()
- if finalShardCount != targetShards {
- t.Fatalf("Expected %d shards after scaling, got %d", targetShards, finalShardCount)
- }
-
- // Count total documents after scaling
- var afterCount uint64
- for i := 0; i < targetShards; i++ {
- shard, err := dsm.GetShardByID(i)
- if err != nil {
- t.Errorf("Failed to get shard %d after scaling: %v", i, err)
- continue
- }
- count, _ := shard.DocCount()
- afterCount += count
- t.Logf("Shard %d after scaling: %d documents", i, count)
- }
-
- t.Logf("Total documents after scaling: %d", afterCount)
-
- // Verify no data loss
- if afterCount != beforeCount {
- t.Errorf("Data loss detected! Before: %d documents, After: %d documents", beforeCount, afterCount)
- } else {
- t.Logf("✅ No data loss: %d documents preserved", afterCount)
- }
-
- // Verify all original documents are still searchable
- for _, testDoc := range testDocs {
- found := false
- for i := 0; i < targetShards; i++ {
- shard, err := dsm.GetShardByID(i)
- if err != nil {
- continue
- }
-
- // Try to find the document
- doc, err := shard.Document(testDoc.id)
- if err == nil && doc != nil {
- found = true
- t.Logf("✅ Document %s found in shard %d after migration", testDoc.id, i)
- break
- }
- }
-
- if !found {
- t.Errorf("❌ Document %s not found after migration", testDoc.id)
- }
- }
-
- // Test searching across all remaining shards
- for i := 0; i < targetShards; i++ {
- shard, err := dsm.GetShardByID(i)
- if err != nil {
- continue
- }
-
- // Search for all documents
- query := bleve.NewMatchAllQuery()
- searchReq := bleve.NewSearchRequest(query)
- searchReq.Size = 100
-
- results, err := shard.Search(searchReq)
- if err != nil {
- t.Errorf("Search failed in shard %d: %v", i, err)
- continue
- }
-
- t.Logf("Shard %d search results: %d hits", i, len(results.Hits))
- }
-
- t.Logf("✅ Data migration during scale-down completed successfully")
- }
- // TestDataMigrationBasicValidation tests the core data migration logic
- func TestDataMigrationBasicValidation(t *testing.T) {
- config := DefaultIndexerConfig()
- config.IndexPath = t.TempDir()
- config.ShardCount = 3 // Start with 3 shards
-
- // Create enhanced dynamic shard manager
- dsm := NewEnhancedDynamicShardManager(config)
-
- // Initialize
- if err := dsm.Initialize(); err != nil {
- t.Fatalf("Failed to initialize enhanced shard manager: %v", err)
- }
- defer dsm.Close()
-
- t.Logf("✅ Initialized with %d shards", dsm.GetShardCount())
-
- // Add test documents to shard 2 (which we'll migrate)
- testDocs := []struct {
- id string
- data map[string]interface{}
- }{
- {"test1", map[string]interface{}{"content": "migration test 1", "type": "test"}},
- {"test2", map[string]interface{}{"content": "migration test 2", "type": "test"}},
- }
-
- // Index documents directly to shard 2
- shard2, err := dsm.GetShardByID(2)
- if err != nil {
- t.Fatalf("Failed to get shard 2: %v", err)
- }
-
- for _, doc := range testDocs {
- if err := shard2.Index(doc.id, doc.data); err != nil {
- t.Fatalf("Failed to index %s: %v", doc.id, err)
- }
- }
-
- // Verify documents are in shard 2
- count, err := shard2.DocCount()
- if err != nil {
- t.Fatalf("Failed to get shard 2 doc count: %v", err)
- }
- t.Logf("Shard 2 has %d documents before migration", count)
-
- // Test direct data migration function (bypass ScaleShards to avoid lock issues)
- migratedCount, err := dsm.migrateShardData(2, 2) // Migrate shard 2 to shards 0-1
- if err != nil {
- t.Fatalf("Data migration failed: %v", err)
- }
-
- t.Logf("✅ Successfully migrated %d documents from shard 2", migratedCount)
-
- // Verify source shard is now empty
- count, err = shard2.DocCount()
- if err != nil {
- t.Fatalf("Failed to get shard 2 doc count after migration: %v", err)
- }
- if count != 0 {
- t.Errorf("Expected shard 2 to be empty, but has %d documents", count)
- } else {
- t.Logf("✅ Source shard 2 is now empty")
- }
-
- // Verify target shards received the documents
- var totalFound uint64
- for i := 0; i < 2; i++ {
- shard, err := dsm.GetShardByID(i)
- if err != nil {
- continue
- }
- count, _ := shard.DocCount()
- totalFound += count
- if count > 0 {
- t.Logf("Shard %d now has %d documents", i, count)
- }
- }
-
- if totalFound < uint64(len(testDocs)) {
- t.Errorf("Expected at least %d documents in target shards, found %d", len(testDocs), totalFound)
- } else {
- t.Logf("✅ All %d documents successfully migrated to target shards", totalFound)
- }
-
- // Verify documents are searchable in target shards
- foundDocs := make(map[string]bool)
- for i := 0; i < 2; i++ {
- shard, err := dsm.GetShardByID(i)
- if err != nil {
- continue
- }
-
- for _, testDoc := range testDocs {
- _, err := shard.Document(testDoc.id)
- if err == nil && !foundDocs[testDoc.id] {
- foundDocs[testDoc.id] = true
- t.Logf("✅ Document %s found in shard %d", testDoc.id, i)
- }
- }
- }
-
- if len(foundDocs) != len(testDocs) {
- t.Errorf("Expected to find %d unique documents, found %d", len(testDocs), len(foundDocs))
- } else {
- t.Logf("✅ All %d documents are searchable after migration", len(foundDocs))
- }
- }
|