123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- package indexer
- import (
- "context"
- "sync"
- "testing"
- "time"
- "github.com/blevesearch/bleve/v2"
- )
- // Mock shard manager for parallel indexer tests
- type mockShardManagerForWorkerTest struct{}
- func (m *mockShardManagerForWorkerTest) GetShard(key string) (bleve.Index, int, error) {
- return nil, 0, nil
- }
- func (m *mockShardManagerForWorkerTest) GetShardForDocument(mainLogPath string, key string) (bleve.Index, int, error) {
- return m.GetShard(key)
- }
- func (m *mockShardManagerForWorkerTest) GetShardByID(id int) (bleve.Index, error) {
- return nil, nil
- }
- func (m *mockShardManagerForWorkerTest) GetAllShards() []bleve.Index {
- return []bleve.Index{}
- }
- func (m *mockShardManagerForWorkerTest) GetShardCount() int {
- return 1
- }
- func (m *mockShardManagerForWorkerTest) Initialize() error {
- return nil
- }
- func (m *mockShardManagerForWorkerTest) GetShardStats() []*ShardInfo {
- return []*ShardInfo{}
- }
- func (m *mockShardManagerForWorkerTest) CreateShard(id int, path string) error {
- return nil
- }
- func (m *mockShardManagerForWorkerTest) Close() error {
- return nil
- }
- func (m *mockShardManagerForWorkerTest) CloseShard(id int) error {
- return nil
- }
- func (m *mockShardManagerForWorkerTest) HealthCheck() error {
- return nil
- }
- func (m *mockShardManagerForWorkerTest) OptimizeShard(id int) error {
- return nil
- }
- // Test helper to create parallel indexer for worker tests
- func createTestParallelIndexer(workerCount int) *ParallelIndexer {
- config := &Config{
- WorkerCount: workerCount,
- BatchSize: 100,
- MaxQueueSize: 1000,
- }
- shardManager := &mockShardManagerForWorkerTest{}
- return NewParallelIndexer(config, shardManager)
- }
- func TestParallelIndexer_handleWorkerCountChange_Increase(t *testing.T) {
- pi := createTestParallelIndexer(4)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- initialWorkerCount := len(pi.workers)
- if initialWorkerCount != 4 {
- t.Fatalf("Expected 4 initial workers, got %d", initialWorkerCount)
- }
- // Test increasing worker count
- pi.handleWorkerCountChange(4, 6)
- // Verify worker count increased
- newWorkerCount := len(pi.workers)
- if newWorkerCount != 6 {
- t.Errorf("Expected 6 workers after increase, got %d", newWorkerCount)
- }
- // Verify config was updated
- if pi.config.WorkerCount != 6 {
- t.Errorf("Expected config worker count to be 6, got %d", pi.config.WorkerCount)
- }
- // Verify stats were updated
- if len(pi.stats.WorkerStats) != 6 {
- t.Errorf("Expected 6 worker stats, got %d", len(pi.stats.WorkerStats))
- }
- }
- func TestParallelIndexer_handleWorkerCountChange_Decrease(t *testing.T) {
- pi := createTestParallelIndexer(6)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- initialWorkerCount := len(pi.workers)
- if initialWorkerCount != 6 {
- t.Fatalf("Expected 6 initial workers, got %d", initialWorkerCount)
- }
- // Test decreasing worker count
- pi.handleWorkerCountChange(6, 4)
- // Verify worker count decreased
- newWorkerCount := len(pi.workers)
- if newWorkerCount != 4 {
- t.Errorf("Expected 4 workers after decrease, got %d", newWorkerCount)
- }
- // Verify config was updated
- if pi.config.WorkerCount != 4 {
- t.Errorf("Expected config worker count to be 4, got %d", pi.config.WorkerCount)
- }
- // Verify stats were updated
- if len(pi.stats.WorkerStats) != 4 {
- t.Errorf("Expected 4 worker stats, got %d", len(pi.stats.WorkerStats))
- }
- }
- func TestParallelIndexer_handleWorkerCountChange_NoChange(t *testing.T) {
- pi := createTestParallelIndexer(4)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- initialWorkerCount := len(pi.workers)
- // Test no change scenario
- pi.handleWorkerCountChange(4, 4)
- // Verify worker count didn't change
- newWorkerCount := len(pi.workers)
- if newWorkerCount != initialWorkerCount {
- t.Errorf("Expected worker count to remain %d, got %d", initialWorkerCount, newWorkerCount)
- }
- }
- func TestParallelIndexer_handleWorkerCountChange_NotRunning(t *testing.T) {
- pi := createTestParallelIndexer(4)
- // Don't start the indexer - it should be in stopped state
- initialWorkerCount := len(pi.workers)
- // Test worker count change when not running
- pi.handleWorkerCountChange(4, 6)
- // Verify no change occurred
- newWorkerCount := len(pi.workers)
- if newWorkerCount != initialWorkerCount {
- t.Errorf("Expected no worker change when not running, initial: %d, new: %d",
- initialWorkerCount, newWorkerCount)
- }
- // Verify config wasn't updated
- if pi.config.WorkerCount != 4 {
- t.Errorf("Expected config worker count to remain 4, got %d", pi.config.WorkerCount)
- }
- }
- func TestParallelIndexer_addWorkers(t *testing.T) {
- pi := createTestParallelIndexer(2)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- initialCount := len(pi.workers)
- if initialCount != 2 {
- t.Fatalf("Expected 2 initial workers, got %d", initialCount)
- }
- // Add 3 workers
- pi.addWorkers(3)
- // Verify workers were added
- newCount := len(pi.workers)
- if newCount != 5 {
- t.Errorf("Expected 5 workers after adding 3, got %d", newCount)
- }
- // Verify worker IDs are sequential
- for i, worker := range pi.workers {
- if worker.id != i {
- t.Errorf("Expected worker %d to have ID %d, got %d", i, i, worker.id)
- }
- }
- // Verify stats were updated
- if len(pi.stats.WorkerStats) != 5 {
- t.Errorf("Expected 5 worker stats, got %d", len(pi.stats.WorkerStats))
- }
- }
- func TestParallelIndexer_removeWorkers(t *testing.T) {
- pi := createTestParallelIndexer(5)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- initialCount := len(pi.workers)
- if initialCount != 5 {
- t.Fatalf("Expected 5 initial workers, got %d", initialCount)
- }
- // Remove 2 workers
- pi.removeWorkers(2)
- // Verify workers were removed
- newCount := len(pi.workers)
- if newCount != 3 {
- t.Errorf("Expected 3 workers after removing 2, got %d", newCount)
- }
- // Verify stats were updated
- if len(pi.stats.WorkerStats) != 3 {
- t.Errorf("Expected 3 worker stats, got %d", len(pi.stats.WorkerStats))
- }
- }
- func TestParallelIndexer_removeWorkers_KeepMinimum(t *testing.T) {
- pi := createTestParallelIndexer(2)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- initialCount := len(pi.workers)
- if initialCount != 2 {
- t.Fatalf("Expected 2 initial workers, got %d", initialCount)
- }
- // Try to remove all workers (should keep at least one)
- pi.removeWorkers(2)
- // Verify at least one worker remains
- newCount := len(pi.workers)
- if newCount != 1 {
- t.Errorf("Expected 1 worker to remain after trying to remove all, got %d", newCount)
- }
- // Verify stats were updated
- if len(pi.stats.WorkerStats) != 1 {
- t.Errorf("Expected 1 worker stat, got %d", len(pi.stats.WorkerStats))
- }
- }
- func TestParallelIndexer_AdaptiveOptimizerIntegration(t *testing.T) {
- pi := createTestParallelIndexer(4)
- // Enable optimization
- pi.optimizationEnabled = true
- pi.adaptiveOptimizer = NewAdaptiveOptimizer(pi.config)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- // Verify adaptive optimizer callback was set
- if pi.adaptiveOptimizer.onWorkerCountChange == nil {
- t.Error("Expected adaptive optimizer callback to be set")
- }
- // Simulate worker count change from adaptive optimizer
- initialWorkerCount := len(pi.workers)
- // Trigger callback (simulate adaptive optimizer decision)
- if pi.adaptiveOptimizer.onWorkerCountChange != nil {
- pi.adaptiveOptimizer.onWorkerCountChange(4, 6)
- }
- // Verify worker count changed
- newWorkerCount := len(pi.workers)
- if newWorkerCount == initialWorkerCount {
- t.Error("Expected worker count to change from adaptive optimizer callback")
- }
- }
- func TestParallelIndexer_ConcurrentWorkerAdjustments(t *testing.T) {
- pi := createTestParallelIndexer(4)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- var wg sync.WaitGroup
- // Simulate concurrent worker adjustments
- for i := 0; i < 10; i++ {
- wg.Add(1)
- go func(iteration int) {
- defer wg.Done()
- // Alternate between increasing and decreasing
- if iteration%2 == 0 {
- pi.handleWorkerCountChange(pi.config.WorkerCount, pi.config.WorkerCount+1)
- } else {
- if pi.config.WorkerCount > 2 {
- pi.handleWorkerCountChange(pi.config.WorkerCount, pi.config.WorkerCount-1)
- }
- }
- }(i)
- }
- wg.Wait()
- // Verify final state is consistent
- workerCount := len(pi.workers)
- configCount := pi.config.WorkerCount
- statsCount := len(pi.stats.WorkerStats)
- if workerCount != configCount {
- t.Errorf("Worker count (%d) doesn't match config count (%d)", workerCount, configCount)
- }
- if workerCount != statsCount {
- t.Errorf("Worker count (%d) doesn't match stats count (%d)", workerCount, statsCount)
- }
- // Verify worker IDs are sequential and unique
- workerIDs := make(map[int]bool)
- for i, worker := range pi.workers {
- if worker.id != i {
- t.Errorf("Expected worker at index %d to have ID %d, got %d", i, i, worker.id)
- }
- if workerIDs[worker.id] {
- t.Errorf("Duplicate worker ID found: %d", worker.id)
- }
- workerIDs[worker.id] = true
- }
- }
- func TestParallelIndexer_WorkerStatsConsistency(t *testing.T) {
- pi := createTestParallelIndexer(3)
- // Start the indexer
- ctx := context.Background()
- err := pi.Start(ctx)
- if err != nil {
- t.Fatalf("Failed to start parallel indexer: %v", err)
- }
- defer pi.Stop()
- // Allow time for initialization
- time.Sleep(100 * time.Millisecond)
- // Test adding workers
- pi.addWorkers(2)
- // Verify stats consistency
- workerCount := len(pi.workers)
- statsCount := len(pi.stats.WorkerStats)
- if workerCount != statsCount {
- t.Errorf("Worker count (%d) doesn't match stats count (%d)", workerCount, statsCount)
- }
- // Verify each worker has corresponding stats
- for i, worker := range pi.workers {
- if pi.stats.WorkerStats[i].ID != worker.id {
- t.Errorf("Worker %d ID (%d) doesn't match stats ID (%d)",
- i, worker.id, pi.stats.WorkerStats[i].ID)
- }
- if worker.stats != pi.stats.WorkerStats[i] {
- t.Errorf("Worker %d stats pointer doesn't match global stats", i)
- }
- }
- // Test removing workers
- pi.removeWorkers(1)
- // Verify stats consistency after removal
- workerCount = len(pi.workers)
- statsCount = len(pi.stats.WorkerStats)
- if workerCount != statsCount {
- t.Errorf("After removal, worker count (%d) doesn't match stats count (%d)",
- workerCount, statsCount)
- }
- }
|