Browse Source

feat(nginx_log): adaptive optimization and worker count adjustment

0xJacky 1 tháng trước cách đây
mục cha
commit
e3ce9a9023

+ 1 - 1
app/package.json

@@ -18,7 +18,7 @@
     "@fingerprintjs/fingerprintjs": "^4.6.2",
     "@formkit/auto-animate": "^0.8.4",
     "@simplewebauthn/browser": "^13.1.2",
-    "@uozi-admin/curd": "^4.15.4",
+    "@uozi-admin/curd": "^4.15.5",
     "@uozi-admin/request": "^2.8.4",
     "@vue/reactivity": "^3.5.21",
     "@vue/shared": "^3.5.21",

+ 13 - 13
app/pnpm-lock.yaml

@@ -24,8 +24,8 @@ importers:
         specifier: ^13.1.2
         version: 13.1.2
       '@uozi-admin/curd':
-        specifier: ^4.15.4
-        version: 4.15.4(@ant-design/icons-vue@7.0.1(vue@3.5.21(typescript@5.9.2)))(@vueuse/core@13.9.0(vue@3.5.21(typescript@5.9.2)))(@vueuse/router@13.9.0(vue-router@4.5.1(vue@3.5.21(typescript@5.9.2)))(vue@3.5.21(typescript@5.9.2)))(ant-design-vue@4.2.6(vue@3.5.21(typescript@5.9.2)))(dayjs@1.11.18)(lodash-es@4.17.21)(vue-router@4.5.1(vue@3.5.21(typescript@5.9.2)))(vue@3.5.21(typescript@5.9.2))
+        specifier: ^4.15.5
+        version: 4.15.5(@ant-design/icons-vue@7.0.1(vue@3.5.21(typescript@5.9.2)))(@vueuse/core@13.9.0(vue@3.5.21(typescript@5.9.2)))(@vueuse/router@13.9.0(vue-router@4.5.1(vue@3.5.21(typescript@5.9.2)))(vue@3.5.21(typescript@5.9.2)))(ant-design-vue@4.2.6(vue@3.5.21(typescript@5.9.2)))(dayjs@1.11.18)(lodash-es@4.17.21)(vue-router@4.5.1(vue@3.5.21(typescript@5.9.2)))(vue@3.5.21(typescript@5.9.2))
       '@uozi-admin/request':
         specifier: ^2.8.4
         version: 2.8.4(lodash-es@4.17.21)
@@ -1684,8 +1684,8 @@ packages:
     peerDependencies:
       vite: ^2.9.0 || ^3.0.0-0 || ^4.0.0 || ^5.0.0-0 || ^6.0.0-0 || ^7.0.0-0
 
-  '@uozi-admin/curd@4.15.4':
-    resolution: {integrity: sha512-f+DL9nzlyHkzlhNycmDCIIex8hUylEjLpgzmggdkj4lN+f0J7dsLA1z5YYjTseEkvEoAYkvGqWzeYVkkBAn0Gg==}
+  '@uozi-admin/curd@4.15.5':
+    resolution: {integrity: sha512-FR7hrZlI/8tQQAQPqBZhb7xOyJsq/WpuVuyv2/tl3+lQFsnmPoQ4foMUsmFf94JdsKT5CBvHItCfRsKJ2nSZ8g==}
     hasBin: true
     peerDependencies:
       '@ant-design/icons-vue': '>=7.0.1'
@@ -1723,8 +1723,8 @@ packages:
       vite: ^5.0.0 || ^6.0.0 || ^7.0.0
       vue: ^3.2.25
 
-  '@vitest/eslint-plugin@1.3.8':
-    resolution: {integrity: sha512-+M0eRDo/UiIF4xZZbZBBAR2Resx0ihdLRNpYevkrDJ6F3xHuEXSAAJGt6Ahabd0eJC4mQKvLA1JY1qBM058Cag==}
+  '@vitest/eslint-plugin@1.3.9':
+    resolution: {integrity: sha512-wsNe7xy44ovm/h9ISDkDNcv0aOnUsaOYDqan2y6qCFAUQ0odFr6df/+FdGKHZN+mCM+SvIDWoXuvm5T5V3Kh6w==}
     peerDependencies:
       eslint: '>= 8.57.0'
       typescript: '>= 5.0.0'
@@ -3461,8 +3461,8 @@ packages:
     resolution: {integrity: sha512-A32kRGjXtwQ+uSa3GrXiCl8HVFY0Jy6IiKFO7UjagAKSaOOrruxB2Qf/w7TP5QtNfB3uOiHTu3cjhp8k/C0PCg==}
     engines: {node: '>=16', pnpm: '>=8'}
 
-  node-releases@2.0.19:
-    resolution: {integrity: sha512-xxOWJsBKtzAq7DY0J+DTzuz58K8e7sJbdgwkbMWQe8UYB6ekmsQ45q0M/tJDsGaZmbC+l7n57UV8Hl5tHxO9uw==}
+  node-releases@2.0.20:
+    resolution: {integrity: sha512-7gK6zSXEH6neM212JgfYFXe+GmZQM+fia5SsusuBIUgnPheLFBmIPhtFoAQRj8/7wASYQnbDlHPVwY0BefoFgA==}
 
   normalize-path@3.0.0:
     resolution: {integrity: sha512-6eZs5Ls3WtCisHWp9S2GUy8dqkpGi4BVSz3GaqiE6ezub0512ESztXUwUB6C6IKbQkY2Pnb/mD4WYojCRwcwLA==}
@@ -4400,7 +4400,7 @@ snapshots:
       '@stylistic/eslint-plugin': 5.3.1(eslint@9.35.0(jiti@2.5.1))
       '@typescript-eslint/eslint-plugin': 8.42.0(@typescript-eslint/parser@8.42.0(eslint@9.35.0(jiti@2.5.1))(typescript@5.9.2))(eslint@9.35.0(jiti@2.5.1))(typescript@5.9.2)
       '@typescript-eslint/parser': 8.42.0(eslint@9.35.0(jiti@2.5.1))(typescript@5.9.2)
-      '@vitest/eslint-plugin': 1.3.8(eslint@9.35.0(jiti@2.5.1))(typescript@5.9.2)
+      '@vitest/eslint-plugin': 1.3.9(eslint@9.35.0(jiti@2.5.1))(typescript@5.9.2)
       ansis: 4.1.0
       cac: 6.7.14
       eslint: 9.35.0(jiti@2.5.1)
@@ -5936,7 +5936,7 @@ snapshots:
       unplugin-utils: 0.3.0
       vite: 7.1.4(@types/node@24.3.1)(jiti@2.5.1)(less@4.4.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.19.2)(yaml@2.8.1)
 
-  '@uozi-admin/curd@4.15.4(@ant-design/icons-vue@7.0.1(vue@3.5.21(typescript@5.9.2)))(@vueuse/core@13.9.0(vue@3.5.21(typescript@5.9.2)))(@vueuse/router@13.9.0(vue-router@4.5.1(vue@3.5.21(typescript@5.9.2)))(vue@3.5.21(typescript@5.9.2)))(ant-design-vue@4.2.6(vue@3.5.21(typescript@5.9.2)))(dayjs@1.11.18)(lodash-es@4.17.21)(vue-router@4.5.1(vue@3.5.21(typescript@5.9.2)))(vue@3.5.21(typescript@5.9.2))':
+  '@uozi-admin/curd@4.15.5(@ant-design/icons-vue@7.0.1(vue@3.5.21(typescript@5.9.2)))(@vueuse/core@13.9.0(vue@3.5.21(typescript@5.9.2)))(@vueuse/router@13.9.0(vue-router@4.5.1(vue@3.5.21(typescript@5.9.2)))(vue@3.5.21(typescript@5.9.2)))(ant-design-vue@4.2.6(vue@3.5.21(typescript@5.9.2)))(dayjs@1.11.18)(lodash-es@4.17.21)(vue-router@4.5.1(vue@3.5.21(typescript@5.9.2)))(vue@3.5.21(typescript@5.9.2))':
     dependencies:
       '@ant-design/icons-vue': 7.0.1(vue@3.5.21(typescript@5.9.2))
       '@vueuse/core': 13.9.0(vue@3.5.21(typescript@5.9.2))
@@ -5995,7 +5995,7 @@ snapshots:
       vite: 7.1.4(@types/node@24.3.1)(jiti@2.5.1)(less@4.4.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.19.2)(yaml@2.8.1)
       vue: 3.5.21(typescript@5.9.2)
 
-  '@vitest/eslint-plugin@1.3.8(eslint@9.35.0(jiti@2.5.1))(typescript@5.9.2)':
+  '@vitest/eslint-plugin@1.3.9(eslint@9.35.0(jiti@2.5.1))(typescript@5.9.2)':
     dependencies:
       '@typescript-eslint/scope-manager': 8.42.0
       '@typescript-eslint/utils': 8.42.0(eslint@9.35.0(jiti@2.5.1))(typescript@5.9.2)
@@ -6373,7 +6373,7 @@ snapshots:
     dependencies:
       caniuse-lite: 1.0.30001741
       electron-to-chromium: 1.5.214
-      node-releases: 2.0.19
+      node-releases: 2.0.20
       update-browserslist-db: 1.1.3(browserslist@4.25.4)
 
   buffer-crc32@0.2.13: {}
@@ -7986,7 +7986,7 @@ snapshots:
 
   node-object-hash@3.1.1: {}
 
-  node-releases@2.0.19: {}
+  node-releases@2.0.20: {}
 
   normalize-path@3.0.0: {}
 

+ 2 - 3
app/src/views/preference/tabs/ExternalNotify.vue

@@ -1,7 +1,6 @@
 <script setup lang="ts">
 import type { ExternalNotify } from '@/api/external_notify'
 import { StdCurd } from '@uozi-admin/curd'
-import { Button } from 'ant-design-vue'
 import externalNotify, { testMessage } from '@/api/external_notify'
 import columns from '../components/ExternalNotify/columns'
 
@@ -49,14 +48,14 @@ async function handleTestSingleMessage(record: ExternalNotify) {
     disable-search
   >
     <template #beforeActions="{ record }">
-      <Button
+      <AButton
         type="link"
         size="small"
         :loading="loadingStates[record.id] || false"
         @click="handleTestSingleMessage(record)"
       >
         {{ $gettext('Test') }}
-      </Button>
+      </AButton>
     </template>
   </StdCurd>
 </template>

+ 1 - 0
go.mod

@@ -294,6 +294,7 @@ require (
 	github.com/scaleway/scaleway-sdk-go v1.0.0-beta.34 // indirect
 	github.com/selectel/domains-go v1.1.0 // indirect
 	github.com/selectel/go-selvpcclient/v4 v4.1.0 // indirect
+	github.com/shirou/gopsutil/v3 v3.24.5 // indirect
 	github.com/shopspring/decimal v1.4.0 // indirect
 	github.com/sirupsen/logrus v1.9.3 // indirect
 	github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9 // indirect

+ 2 - 0
go.sum

@@ -1874,6 +1874,8 @@ github.com/selectel/domains-go v1.1.0 h1:futG50J43ALLKQAnZk9H9yOtLGnSUh7c5hSvuC5
 github.com/selectel/domains-go v1.1.0/go.mod h1:SugRKfq4sTpnOHquslCpzda72wV8u0cMBHx0C0l+bzA=
 github.com/selectel/go-selvpcclient/v4 v4.1.0 h1:22lBp+rzg9g2MP4iiGhpVAcCt0kMv7I7uV1W3taLSvQ=
 github.com/selectel/go-selvpcclient/v4 v4.1.0/go.mod h1:eFhL1KUW159KOJVeGO7k/Uxl0TYd/sBkWXjuF5WxmYk=
+github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
+github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
 github.com/shirou/gopsutil/v4 v4.25.7 h1:bNb2JuqKuAu3tRlPv5piSmBZyMfecwQ+t/ILq+1JqVM=
 github.com/shirou/gopsutil/v4 v4.25.7/go.mod h1:XV/egmwJtd3ZQjBpJVY5kndsiOO4IRqy9TQnmm6VP7U=
 github.com/shirou/gopsutil/v4 v4.25.8 h1:NnAsw9lN7587WHxjJA9ryDnqhJpFH6A+wagYWTOH970=

+ 12 - 12
internal/cron/incremental_indexing.go

@@ -83,8 +83,8 @@ func performIncrementalIndexing() {
 // needsIncrementalIndexing checks if a log file needs incremental indexing
 func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex) bool {
 	// Skip if already indexing or queued
-	if log.IndexStatus == string(indexer.IndexStatusIndexing) || 
-	   log.IndexStatus == string(indexer.IndexStatusQueued) {
+	if log.IndexStatus == string(indexer.IndexStatusIndexing) ||
+		log.IndexStatus == string(indexer.IndexStatusQueued) {
 		return false
 	}
 
@@ -106,14 +106,14 @@ func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex) bool {
 
 	// File was modified after last index and size increased
 	if fileModTime.After(lastModified) && fileSize > log.LastSize {
-		logger.Debugf("File %s needs incremental indexing: mod_time=%s, size=%d", 
+		logger.Debugf("File %s needs incremental indexing: mod_time=%s, size=%d",
 			log.Path, fileModTime.Format("2006-01-02 15:04:05"), fileSize)
 		return true
 	}
 
 	// File size decreased - might be file rotation
 	if fileSize < log.LastSize {
-		logger.Debugf("File %s needs full re-indexing due to size decrease: old_size=%d, new_size=%d", 
+		logger.Debugf("File %s needs full re-indexing due to size decrease: old_size=%d, new_size=%d",
 			log.Path, log.LastSize, fileSize)
 		return true
 	}
@@ -131,7 +131,7 @@ func queueIncrementalIndexing(logPath string, modernIndexer interface{}, logFile
 	// Queue the indexing job asynchronously
 	go func() {
 		logger.Infof("Starting incremental indexing for file: %s", logPath)
-		
+
 		// Set status to indexing
 		if err := setFileIndexStatus(logPath, string(indexer.IndexStatusIndexing), logFileManager); err != nil {
 			logger.Errorf("Failed to set indexing status for %s: %v", logPath, err)
@@ -140,8 +140,8 @@ func queueIncrementalIndexing(logPath string, modernIndexer interface{}, logFile
 
 		// Perform incremental indexing
 		startTime := time.Now()
-		docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(logPath, nil)
-		
+		docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexSingleFileIncrementally(logPath, nil)
+
 		if err != nil {
 			logger.Errorf("Failed incremental indexing for %s: %v", logPath, err)
 			// Set error status
@@ -184,18 +184,18 @@ func setFileIndexStatus(logPath, status string, logFileManager interface{}) erro
 	if logFileManager == nil {
 		return fmt.Errorf("log file manager not available")
 	}
-	
+
 	// Get persistence manager
 	lfm, ok := logFileManager.(*indexer.LogFileManager)
 	if !ok {
 		return fmt.Errorf("invalid log file manager type")
 	}
-	
+
 	persistence := lfm.GetPersistence()
 	if persistence == nil {
 		return fmt.Errorf("persistence manager not available")
 	}
-	
+
 	// Use enhanced SetIndexStatus method with queue position for queued status
 	queuePosition := 0
 	if status == string(indexer.IndexStatusQueued) {
@@ -203,6 +203,6 @@ func setFileIndexStatus(logPath, status string, logFileManager interface{}) erro
 		// They will be processed as they come
 		queuePosition = int(time.Now().Unix() % 1000) // Simple ordering by time
 	}
-	
+
 	return persistence.SetIndexStatus(logPath, status, queuePosition, "")
-}
+}

+ 3 - 3
internal/nginx_log/indexer/README.md

@@ -367,13 +367,13 @@ Based on comprehensive benchmarking on Apple M2 Pro:
 1. **Worker Count Configuration**
 ```go
 // CPU-bound workloads
-config.WorkerCount = runtime.NumCPU()
+config.WorkerCount = runtime.GOMAXPROCS(0)
 
 // I/O-bound workloads  
-config.WorkerCount = runtime.NumCPU() * 2
+config.WorkerCount = runtime.GOMAXPROCS(0) * 2
 
 // Memory-constrained environments
-config.WorkerCount = max(2, runtime.NumCPU()/2)
+config.WorkerCount = max(2, runtime.GOMAXPROCS(0)/2)
 ```
 
 2. **Shard Count Optimization**

+ 192 - 0
internal/nginx_log/indexer/adaptive_min_worker_test.go

@@ -0,0 +1,192 @@
+package indexer
+
+import (
+	"runtime"
+	"testing"
+)
+
+// TestMinWorkerCalculation tests the minimum worker calculation for different CPU configurations
+func TestMinWorkerCalculation(t *testing.T) {
+	testCases := []struct {
+		name            string
+		maxProcs        int
+		expectedMin     int
+		description     string
+	}{
+		{
+			name:        "Single core system",
+			maxProcs:    1,
+			expectedMin: 1,
+			description: "max(1, 1/8) = max(1, 0) = 1",
+		},
+		{
+			name:        "Dual core system",
+			maxProcs:    2,
+			expectedMin: 1,
+			description: "max(1, 2/8) = max(1, 0) = 1",
+		},
+		{
+			name:        "Quad core system",
+			maxProcs:    4,
+			expectedMin: 1,
+			description: "max(1, 4/8) = max(1, 0) = 1",
+		},
+		{
+			name:        "8-core system",
+			maxProcs:    8,
+			expectedMin: 1,
+			description: "max(1, 8/8) = max(1, 1) = 1",
+		},
+		{
+			name:        "16-core system",
+			maxProcs:    16,
+			expectedMin: 2,
+			description: "max(1, 16/8) = max(1, 2) = 2",
+		},
+		{
+			name:        "24-core system",
+			maxProcs:    24,
+			expectedMin: 3,
+			description: "max(1, 24/8) = max(1, 3) = 3",
+		},
+	}
+	
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			// Simulate the min worker calculation logic
+			calculatedMin := max(1, tc.maxProcs/8)
+			
+			if calculatedMin != tc.expectedMin {
+				t.Errorf("Expected min workers %d, got %d for %d cores", 
+					tc.expectedMin, calculatedMin, tc.maxProcs)
+			}
+			
+			t.Logf("✅ %s: %s -> min workers = %d", 
+				tc.name, tc.description, calculatedMin)
+		})
+	}
+}
+
+// TestCPUOptimizationScenario simulates the CPU over-utilization scenario
+func TestCPUOptimizationScenario(t *testing.T) {
+	// Simulate a 2-core system (common in containers/VMs)
+	simulatedGOMAXPROCS := 2
+	
+	// Create a mock config similar to production
+	config := &Config{
+		WorkerCount: 2, // Starting with 2 workers
+	}
+	
+	// Create adaptive optimizer with simulated CPU configuration
+	ao := &AdaptiveOptimizer{
+		config: config,
+		cpuMonitor: &CPUMonitor{
+			targetUtilization:   0.75, // Target 75% CPU utilization
+			measurementInterval: 5,
+			adjustmentThreshold: 0.10, // 10% threshold
+			maxWorkers:         simulatedGOMAXPROCS * 3,
+			minWorkers:         max(1, simulatedGOMAXPROCS/8), // New formula
+			measurements:       make([]float64, 0, 12),
+		},
+		performanceHistory: &PerformanceHistory{
+			samples: make([]PerformanceSample, 0, 120),
+		},
+	}
+	
+	// Mock worker count change callback
+	workerAdjustments := []int{}
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		workerAdjustments = append(workerAdjustments, newCount)
+		config.WorkerCount = newCount // Update the config
+		t.Logf("🔧 Worker count adjusted from %d to %d", oldCount, newCount)
+	})
+	
+	// Simulate CPU over-utilization scenario
+	currentCPU := 0.95 // 95% CPU usage
+	targetCPU := 0.75  // 75% target
+	
+	t.Logf("📊 Initial state: workers=%d, minWorkers=%d, CPU=%.1f%%, target=%.1f%%", 
+		config.WorkerCount, ao.cpuMonitor.minWorkers, currentCPU*100, targetCPU*100)
+	
+	// Test the worker decrease logic
+	ao.suggestWorkerDecrease(currentCPU, targetCPU)
+	
+	// Verify that adjustment happened
+	if len(workerAdjustments) == 0 {
+		t.Errorf("Expected worker count adjustment, but none occurred")
+	} else {
+		originalWorkers := 2 // We started with 2 workers
+		newCount := workerAdjustments[0]
+		if newCount >= originalWorkers {
+			t.Errorf("Expected worker count to decrease from %d, but got %d", 
+				originalWorkers, newCount)
+		} else {
+			t.Logf("✅ Successfully reduced workers from %d to %d", originalWorkers, newCount)
+		}
+		
+		// Verify minimum constraint is respected
+		if newCount < ao.cpuMonitor.minWorkers {
+			t.Errorf("Worker count %d is below minimum %d", newCount, ao.cpuMonitor.minWorkers)
+		}
+	}
+	
+	// Test repeated optimization calls (simulating the loop issue)
+	t.Logf("🔄 Testing repeated optimization calls...")
+	
+	initialWorkerCount := config.WorkerCount
+	for i := 0; i < 5; i++ {
+		ao.suggestWorkerDecrease(currentCPU, targetCPU)
+		t.Logf("Iteration %d: workers=%d", i+1, config.WorkerCount)
+		
+		// If worker count reached minimum, it should stop decreasing
+		if config.WorkerCount == ao.cpuMonitor.minWorkers {
+			t.Logf("✅ Reached minimum worker count %d", config.WorkerCount)
+			break
+		}
+	}
+	
+	// Verify we didn't get stuck in infinite loop
+	if config.WorkerCount < initialWorkerCount {
+		t.Logf("✅ Worker count successfully reduced from %d to %d", 
+			initialWorkerCount, config.WorkerCount)
+	}
+}
+
+// TestCurrentSystemConfiguration tests with actual system GOMAXPROCS
+func TestCurrentSystemConfiguration(t *testing.T) {
+	currentCores := runtime.GOMAXPROCS(0)
+	minWorkers := max(1, currentCores/8)
+	
+	t.Logf("🖥️ Current system configuration:")
+	t.Logf("   GOMAXPROCS(0): %d", currentCores)
+	t.Logf("   Calculated min workers: %d", minWorkers)
+	t.Logf("   Max workers: %d", currentCores*3)
+	
+	// Verify that we can always scale down to minimum
+	if minWorkers >= 2 && currentCores <= 16 {
+		t.Errorf("Min workers %d seems too high for %d cores - may prevent scaling down", 
+			minWorkers, currentCores)
+	}
+	
+	// Test scaling scenarios
+	scenarios := []struct {
+		startWorkers int
+		canScaleDown bool
+	}{
+		{startWorkers: 8, canScaleDown: true},
+		{startWorkers: 4, canScaleDown: true}, 
+		{startWorkers: 2, canScaleDown: minWorkers < 2},
+		{startWorkers: 1, canScaleDown: false},
+	}
+	
+	for _, scenario := range scenarios {
+		actualCanScale := scenario.startWorkers > minWorkers
+		if actualCanScale != scenario.canScaleDown {
+			t.Logf("⚠️ Scenario mismatch: starting with %d workers, min=%d, expected canScale=%v, actual=%v", 
+				scenario.startWorkers, minWorkers, scenario.canScaleDown, actualCanScale)
+		} else {
+			t.Logf("✅ Scaling scenario: %d workers -> min %d (can scale down: %v)", 
+				scenario.startWorkers, minWorkers, actualCanScale)
+		}
+	}
+}

+ 240 - 124
internal/nginx_log/indexer/adaptive_optimization.go

@@ -8,109 +8,121 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/shirou/gopsutil/v4/cpu"
 	"github.com/uozi-tech/cosy/logger"
 )
 
+// IndexerActivityPoller defines an interface to check if the indexer is busy.
+type IndexerActivityPoller interface {
+	IsBusy() bool
+}
+
 // AdaptiveOptimizer provides intelligent batch size adjustment and CPU monitoring
 type AdaptiveOptimizer struct {
-	config                *Config
-	cpuMonitor           *CPUMonitor
-	batchSizeController  *BatchSizeController
-	performanceHistory   *PerformanceHistory
-	
+	config              *Config
+	cpuMonitor          *CPUMonitor
+	batchSizeController *BatchSizeController
+	performanceHistory  *PerformanceHistory
+
 	// State
-	running              int32
-	ctx                  context.Context
-	cancel              context.CancelFunc
-	wg                  sync.WaitGroup
-	
+	running int32
+	ctx     context.Context
+	cancel  context.CancelFunc
+	wg      sync.WaitGroup
+
 	// Metrics
-	optimizationsMade   int64
-	avgThroughput      float64
-	avgLatency         time.Duration
-	metricsMutex       sync.RWMutex
+	optimizationsMade int64
+	avgThroughput     float64
+	avgLatency        time.Duration
+	metricsMutex      sync.RWMutex
+
+	// Callbacks
+	onWorkerCountChange func(oldCount, newCount int)
+
+	// Activity Poller
+	activityPoller IndexerActivityPoller
 }
 
 // CPUMonitor monitors CPU utilization and suggests worker adjustments
 type CPUMonitor struct {
-	targetUtilization    float64
-	measurementInterval  time.Duration
-	adjustmentThreshold  float64
+	targetUtilization   float64
+	measurementInterval time.Duration
+	adjustmentThreshold float64
 	maxWorkers          int
 	minWorkers          int
-	
-	currentUtilization  float64
-	measurements        []float64
-	measurementsMutex   sync.RWMutex
+
+	currentUtilization float64
+	measurements       []float64
+	measurementsMutex  sync.RWMutex
 }
 
 // BatchSizeController dynamically adjusts batch sizes based on performance metrics
 type BatchSizeController struct {
-	baseBatchSize       int
-	minBatchSize        int
-	maxBatchSize        int
-	adjustmentFactor    float64
-	
-	currentBatchSize    int32
-	latencyThreshold    time.Duration
-	throughputTarget    float64
-	
-	adjustmentHistory   []BatchAdjustment
-	historyMutex       sync.RWMutex
+	baseBatchSize    int
+	minBatchSize     int
+	maxBatchSize     int
+	adjustmentFactor float64
+
+	currentBatchSize int32
+	latencyThreshold time.Duration
+	throughputTarget float64
+
+	adjustmentHistory []BatchAdjustment
+	historyMutex      sync.RWMutex
 }
 
 // PerformanceHistory tracks performance metrics for optimization decisions
 type PerformanceHistory struct {
-	samples            []PerformanceSample
-	maxSamples         int
-	mutex              sync.RWMutex
-	
-	movingAvgWindow    int
-	avgThroughput      float64
-	avgLatency         time.Duration
+	samples    []PerformanceSample
+	maxSamples int
+	mutex      sync.RWMutex
+
+	movingAvgWindow int
+	avgThroughput   float64
+	avgLatency      time.Duration
 }
 
 // PerformanceSample represents a single performance measurement
 type PerformanceSample struct {
-	Timestamp    time.Time     `json:"timestamp"`
-	Throughput   float64       `json:"throughput"`
-	Latency      time.Duration `json:"latency"`
-	CPUUsage     float64       `json:"cpu_usage"`
-	BatchSize    int           `json:"batch_size"`
-	WorkerCount  int           `json:"worker_count"`
+	Timestamp   time.Time     `json:"timestamp"`
+	Throughput  float64       `json:"throughput"`
+	Latency     time.Duration `json:"latency"`
+	CPUUsage    float64       `json:"cpu_usage"`
+	BatchSize   int           `json:"batch_size"`
+	WorkerCount int           `json:"worker_count"`
 }
 
 // BatchAdjustment represents a batch size adjustment decision
 type BatchAdjustment struct {
-	Timestamp     time.Time     `json:"timestamp"`
-	OldBatchSize  int           `json:"old_batch_size"`
-	NewBatchSize  int           `json:"new_batch_size"`
-	Reason        string        `json:"reason"`
-	ThroughputImpact float64    `json:"throughput_impact"`
+	Timestamp        time.Time `json:"timestamp"`
+	OldBatchSize     int       `json:"old_batch_size"`
+	NewBatchSize     int       `json:"new_batch_size"`
+	Reason           string    `json:"reason"`
+	ThroughputImpact float64   `json:"throughput_impact"`
 }
 
 // NewAdaptiveOptimizer creates a new adaptive optimizer
 func NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer {
 	ctx, cancel := context.WithCancel(context.Background())
-	
-	return &AdaptiveOptimizer{
+
+	ao := &AdaptiveOptimizer{
 		config: config,
 		cpuMonitor: &CPUMonitor{
-			targetUtilization:   0.80, // Target 80% CPU utilization
+			targetUtilization:   0.75, // Target 75% CPU utilization (more conservative)
 			measurementInterval: 5 * time.Second,
-			adjustmentThreshold: 0.15, // Adjust if 15% deviation from target
-			maxWorkers:         runtime.NumCPU() * 3,
-			minWorkers:         max(2, runtime.NumCPU()/2),
-			measurements:       make([]float64, 0, 12), // 1 minute history at 5s intervals
+			adjustmentThreshold: 0.10, // Adjust if 10% deviation from target (more sensitive)
+			maxWorkers:          runtime.GOMAXPROCS(0) * 3,
+			minWorkers:          max(1, runtime.GOMAXPROCS(0)/8), // Allow aggressive scaling down: 1/8 of available processors or 1, whichever is higher
+			measurements:        make([]float64, 0, 12),          // 1 minute history at 5s intervals
 		},
 		batchSizeController: &BatchSizeController{
-			baseBatchSize:     config.BatchSize,
-			minBatchSize:      max(100, config.BatchSize/4),
-			maxBatchSize:      config.BatchSize * 3,
-			adjustmentFactor:  0.2, // 20% adjustment steps
-			currentBatchSize:  int32(config.BatchSize),
-			latencyThreshold:  5 * time.Second,
-			throughputTarget:  25.0, // Target 25 MB/s
+			baseBatchSize:    config.BatchSize,
+			minBatchSize:     max(100, config.BatchSize/4),
+			maxBatchSize:     config.BatchSize * 3,
+			adjustmentFactor: 0.2, // 20% adjustment steps
+			currentBatchSize: int32(config.BatchSize),
+			latencyThreshold: 5 * time.Second,
+			throughputTarget: 25.0, // Target 25 MB/s
 		},
 		performanceHistory: &PerformanceHistory{
 			samples:         make([]PerformanceSample, 0, 120), // 2 minutes of 1s samples
@@ -120,6 +132,13 @@ func NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer {
 		ctx:    ctx,
 		cancel: cancel,
 	}
+
+	// Log initialization parameters for debugging
+	logger.Infof("Adaptive optimizer initialized: workers=[%d, %d, %d] (min, current, max), target_cpu=%.1f%%, threshold=%.1f%%",
+		ao.cpuMonitor.minWorkers, config.WorkerCount, ao.cpuMonitor.maxWorkers,
+		ao.cpuMonitor.targetUtilization*100, ao.cpuMonitor.adjustmentThreshold*100)
+
+	return ao
 }
 
 // Start begins the adaptive optimization process
@@ -128,19 +147,19 @@ func (ao *AdaptiveOptimizer) Start() error {
 		logger.Error("Adaptive optimizer already running")
 		return fmt.Errorf("adaptive optimizer already running")
 	}
-	
+
 	// Start CPU monitoring
 	ao.wg.Add(1)
 	go ao.cpuMonitoringLoop()
-	
+
 	// Start batch size optimization
 	ao.wg.Add(1)
 	go ao.batchOptimizationLoop()
-	
+
 	// Start performance tracking
 	ao.wg.Add(1)
 	go ao.performanceTrackingLoop()
-	
+
 	logger.Info("Adaptive optimizer started")
 	return nil
 }
@@ -150,20 +169,20 @@ func (ao *AdaptiveOptimizer) Stop() {
 	if !atomic.CompareAndSwapInt32(&ao.running, 1, 0) {
 		return
 	}
-	
+
 	ao.cancel()
 	ao.wg.Wait()
-	
+
 	logger.Info("Adaptive optimizer stopped")
 }
 
 // cpuMonitoringLoop continuously monitors CPU utilization
 func (ao *AdaptiveOptimizer) cpuMonitoringLoop() {
 	defer ao.wg.Done()
-	
+
 	ticker := time.NewTicker(ao.cpuMonitor.measurementInterval)
 	defer ticker.Stop()
-	
+
 	for {
 		select {
 		case <-ticker.C:
@@ -178,7 +197,7 @@ func (ao *AdaptiveOptimizer) cpuMonitoringLoop() {
 func (ao *AdaptiveOptimizer) measureAndAdjustCPU() {
 	// Get current CPU utilization
 	cpuUsage := ao.getCurrentCPUUtilization()
-	
+
 	ao.cpuMonitor.measurementsMutex.Lock()
 	ao.cpuMonitor.measurements = append(ao.cpuMonitor.measurements, cpuUsage)
 	if len(ao.cpuMonitor.measurements) > cap(ao.cpuMonitor.measurements) {
@@ -187,19 +206,19 @@ func (ao *AdaptiveOptimizer) measureAndAdjustCPU() {
 	}
 	ao.cpuMonitor.currentUtilization = cpuUsage
 	ao.cpuMonitor.measurementsMutex.Unlock()
-	
+
 	// Calculate average CPU utilization
 	ao.cpuMonitor.measurementsMutex.RLock()
 	avgCPU := ao.calculateAverageCPU()
 	ao.cpuMonitor.measurementsMutex.RUnlock()
-	
+
 	// Determine if adjustment is needed
 	targetCPU := ao.cpuMonitor.targetUtilization
 	if avgCPU < targetCPU-ao.cpuMonitor.adjustmentThreshold {
 		// CPU underutilized - suggest increasing workers
 		ao.suggestWorkerIncrease(avgCPU, targetCPU)
 	} else if avgCPU > targetCPU+ao.cpuMonitor.adjustmentThreshold {
-		// CPU over-utilized - suggest decreasing workers  
+		// CPU over-utilized - suggest decreasing workers
 		ao.suggestWorkerDecrease(avgCPU, targetCPU)
 	}
 }
@@ -207,10 +226,10 @@ func (ao *AdaptiveOptimizer) measureAndAdjustCPU() {
 // batchOptimizationLoop continuously optimizes batch sizes
 func (ao *AdaptiveOptimizer) batchOptimizationLoop() {
 	defer ao.wg.Done()
-	
+
 	ticker := time.NewTicker(10 * time.Second) // Adjust batch size every 10 seconds
 	defer ticker.Stop()
-	
+
 	for {
 		select {
 		case <-ticker.C:
@@ -228,15 +247,15 @@ func (ao *AdaptiveOptimizer) optimizeBatchSize() {
 		ao.performanceHistory.mutex.RUnlock()
 		return // Not enough data
 	}
-	
+
 	recentSamples := ao.performanceHistory.samples[max(0, len(ao.performanceHistory.samples)-5):]
 	avgThroughput := ao.calculateAverageThroughput(recentSamples)
 	avgLatency := ao.calculateAverageLatency(recentSamples)
 	ao.performanceHistory.mutex.RUnlock()
-	
+
 	currentBatchSize := int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
 	newBatchSize := ao.calculateOptimalBatchSize(avgThroughput, avgLatency, currentBatchSize)
-	
+
 	if newBatchSize != currentBatchSize {
 		ao.adjustBatchSize(currentBatchSize, newBatchSize, avgThroughput, avgLatency)
 		atomic.AddInt64(&ao.optimizationsMade, 1)
@@ -246,21 +265,21 @@ func (ao *AdaptiveOptimizer) optimizeBatchSize() {
 // calculateOptimalBatchSize determines the optimal batch size based on current performance
 func (ao *AdaptiveOptimizer) calculateOptimalBatchSize(throughput float64, latency time.Duration, currentBatch int) int {
 	controller := ao.batchSizeController
-	
+
 	// If latency is too high, reduce batch size
 	if latency > controller.latencyThreshold {
 		reduction := int(float64(currentBatch) * controller.adjustmentFactor)
 		newSize := currentBatch - max(50, reduction)
 		return max(controller.minBatchSize, newSize)
 	}
-	
+
 	// If throughput is below target and latency is acceptable, increase batch size
 	if throughput < controller.throughputTarget && latency < controller.latencyThreshold/2 {
 		increase := int(float64(currentBatch) * controller.adjustmentFactor)
 		newSize := currentBatch + max(100, increase)
 		return min(controller.maxBatchSize, newSize)
 	}
-	
+
 	// Current batch size seems optimal
 	return currentBatch
 }
@@ -268,23 +287,23 @@ func (ao *AdaptiveOptimizer) calculateOptimalBatchSize(throughput float64, laten
 // adjustBatchSize applies the batch size adjustment
 func (ao *AdaptiveOptimizer) adjustBatchSize(oldSize, newSize int, throughput float64, latency time.Duration) {
 	atomic.StoreInt32(&ao.batchSizeController.currentBatchSize, int32(newSize))
-	
+
 	var reason string
 	if newSize > oldSize {
 		reason = "Increasing batch size to improve throughput"
 	} else {
 		reason = "Reducing batch size to improve latency"
 	}
-	
+
 	// Record adjustment
 	adjustment := BatchAdjustment{
 		Timestamp:        time.Now(),
 		OldBatchSize:     oldSize,
 		NewBatchSize:     newSize,
-		Reason:          reason,
+		Reason:           reason,
 		ThroughputImpact: throughput,
 	}
-	
+
 	ao.batchSizeController.historyMutex.Lock()
 	ao.batchSizeController.adjustmentHistory = append(ao.batchSizeController.adjustmentHistory, adjustment)
 	if len(ao.batchSizeController.adjustmentHistory) > 50 {
@@ -292,17 +311,17 @@ func (ao *AdaptiveOptimizer) adjustBatchSize(oldSize, newSize int, throughput fl
 		ao.batchSizeController.adjustmentHistory = ao.batchSizeController.adjustmentHistory[1:]
 	}
 	ao.batchSizeController.historyMutex.Unlock()
-	
+
 	logger.Infof("Batch size adjusted: old_size=%d, new_size=%d, reason=%s", oldSize, newSize, reason)
 }
 
 // performanceTrackingLoop continuously tracks performance metrics
 func (ao *AdaptiveOptimizer) performanceTrackingLoop() {
 	defer ao.wg.Done()
-	
+
 	ticker := time.NewTicker(1 * time.Second) // Sample every second
 	defer ticker.Stop()
-	
+
 	for {
 		select {
 		case <-ticker.C:
@@ -323,19 +342,29 @@ func (ao *AdaptiveOptimizer) recordPerformanceSample() {
 		BatchSize:   int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
 		WorkerCount: ao.config.WorkerCount,
 	}
-	
+
 	ao.performanceHistory.mutex.Lock()
 	ao.performanceHistory.samples = append(ao.performanceHistory.samples, sample)
 	if len(ao.performanceHistory.samples) > ao.performanceHistory.maxSamples {
 		// Remove oldest sample
 		ao.performanceHistory.samples = ao.performanceHistory.samples[1:]
 	}
-	
+
 	// Update moving averages
 	ao.updateMovingAverages()
 	ao.performanceHistory.mutex.Unlock()
 }
 
+// SetWorkerCountChangeCallback sets the callback function for worker count changes
+func (ao *AdaptiveOptimizer) SetWorkerCountChangeCallback(callback func(oldCount, newCount int)) {
+	ao.onWorkerCountChange = callback
+}
+
+// SetActivityPoller sets the poller to check for indexer activity.
+func (ao *AdaptiveOptimizer) SetActivityPoller(poller IndexerActivityPoller) {
+	ao.activityPoller = poller
+}
+
 // GetOptimalBatchSize returns the current optimal batch size
 func (ao *AdaptiveOptimizer) GetOptimalBatchSize() int {
 	return int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))
@@ -352,13 +381,13 @@ func (ao *AdaptiveOptimizer) GetCPUUtilization() float64 {
 func (ao *AdaptiveOptimizer) GetOptimizationStats() AdaptiveOptimizationStats {
 	ao.metricsMutex.RLock()
 	defer ao.metricsMutex.RUnlock()
-	
+
 	return AdaptiveOptimizationStats{
 		OptimizationsMade: atomic.LoadInt64(&ao.optimizationsMade),
 		CurrentBatchSize:  int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),
-		AvgThroughput:    ao.avgThroughput,
-		AvgLatency:       ao.avgLatency,
-		CPUUtilization:   ao.cpuMonitor.currentUtilization,
+		AvgThroughput:     ao.avgThroughput,
+		AvgLatency:        ao.avgLatency,
+		CPUUtilization:    ao.cpuMonitor.currentUtilization,
 	}
 }
 
@@ -366,29 +395,34 @@ func (ao *AdaptiveOptimizer) GetOptimizationStats() AdaptiveOptimizationStats {
 type AdaptiveOptimizationStats struct {
 	OptimizationsMade int64         `json:"optimizations_made"`
 	CurrentBatchSize  int           `json:"current_batch_size"`
-	AvgThroughput    float64       `json:"avg_throughput"`
-	AvgLatency       time.Duration `json:"avg_latency"`
-	CPUUtilization   float64       `json:"cpu_utilization"`
+	AvgThroughput     float64       `json:"avg_throughput"`
+	AvgLatency        time.Duration `json:"avg_latency"`
+	CPUUtilization    float64       `json:"cpu_utilization"`
 }
 
 // Helper functions
 func (ao *AdaptiveOptimizer) getCurrentCPUUtilization() float64 {
-	// This is a simplified implementation
-	// In production, you'd use a proper CPU monitoring library
-	runtime.GC()
-	var m runtime.MemStats
-	runtime.ReadMemStats(&m)
-	
-	// Approximate CPU usage based on GC activity and goroutines
-	numGoroutines := float64(runtime.NumGoroutine())
-	numCPU := float64(runtime.NumCPU())
-	
-	// Simple heuristic: more goroutines = higher CPU usage
-	utilization := numGoroutines / (numCPU * 10)
-	if utilization > 0.95 {
-		utilization = 0.95
-	}
-	return utilization
+	// Get CPU utilization since the last call.
+	// Interval 0 means non-blocking and compares to the last measurement.
+	// The first call will return 0.
+	percentages, err := cpu.Percent(0, false)
+	if err != nil || len(percentages) == 0 {
+		logger.Warnf("Failed to get real CPU utilization, falling back to goroutine heuristic: %v", err)
+		// Fallback to the old, less accurate method
+		numGoroutines := float64(runtime.NumGoroutine())
+		maxProcs := float64(runtime.GOMAXPROCS(0))
+
+		// Simple heuristic: more goroutines = higher CPU usage
+		utilization := numGoroutines / (maxProcs * 10)
+		if utilization > 0.95 {
+			utilization = 0.95
+		}
+		return utilization
+	}
+
+	// gopsutil returns a slice, for overall usage (percpu=false), it's the first element.
+	// The value is a percentage (e.g., 8.3), so we convert it to a 0.0-1.0 scale for our calculations.
+	return percentages[0] / 100.0
 }
 
 func (ao *AdaptiveOptimizer) getCurrentThroughput() float64 {
@@ -409,7 +443,7 @@ func (ao *AdaptiveOptimizer) calculateAverageCPU() float64 {
 	if len(ao.cpuMonitor.measurements) == 0 {
 		return 0
 	}
-	
+
 	sum := 0.0
 	for _, cpu := range ao.cpuMonitor.measurements {
 		sum += cpu
@@ -421,7 +455,7 @@ func (ao *AdaptiveOptimizer) calculateAverageThroughput(samples []PerformanceSam
 	if len(samples) == 0 {
 		return 0
 	}
-	
+
 	sum := 0.0
 	for _, sample := range samples {
 		sum += sample.Throughput
@@ -433,7 +467,7 @@ func (ao *AdaptiveOptimizer) calculateAverageLatency(samples []PerformanceSample
 	if len(samples) == 0 {
 		return 0
 	}
-	
+
 	var sum time.Duration
 	for _, sample := range samples {
 		sum += sample.Latency
@@ -445,22 +479,104 @@ func (ao *AdaptiveOptimizer) updateMovingAverages() {
 	if len(ao.performanceHistory.samples) == 0 {
 		return
 	}
-	
+
 	windowSize := min(ao.performanceHistory.movingAvgWindow, len(ao.performanceHistory.samples))
 	recentSamples := ao.performanceHistory.samples[len(ao.performanceHistory.samples)-windowSize:]
-	
+
 	ao.avgThroughput = ao.calculateAverageThroughput(recentSamples)
 	ao.avgLatency = ao.calculateAverageLatency(recentSamples)
 }
 
 func (ao *AdaptiveOptimizer) suggestWorkerIncrease(currentCPU, targetCPU float64) {
-	logger.Debug("CPU underutilized, consider increasing workers", 
+	// If already at max workers, do nothing.
+	if ao.config.WorkerCount >= ao.cpuMonitor.maxWorkers {
+		return
+	}
+
+	// If the indexer is not busy, don't scale up workers even if CPU is low.
+	if ao.activityPoller != nil && !ao.activityPoller.IsBusy() {
+		return
+	}
+
+	logger.Debug("CPU underutilized, adjusting workers upward",
 		"current_cpu", currentCPU, "target_cpu", targetCPU)
+
+	// Calculate suggested increase (conservative approach)
+	cpuUtilizationGap := targetCPU - currentCPU
+	increaseRatio := cpuUtilizationGap / targetCPU
+
+	// Limit increase to maximum 25% at a time and at least 1 worker
+	maxIncrease := max(1, int(float64(ao.config.WorkerCount)*0.25))
+	suggestedIncrease := max(1, int(float64(ao.config.WorkerCount)*increaseRatio))
+	actualIncrease := min(maxIncrease, suggestedIncrease)
+
+	newWorkerCount := min(ao.cpuMonitor.maxWorkers, ao.config.WorkerCount+actualIncrease)
+
+	if newWorkerCount > ao.config.WorkerCount {
+		ao.adjustWorkerCount(newWorkerCount)
+		logger.Infof("Increased workers from %d to %d due to CPU underutilization",
+			ao.config.WorkerCount, newWorkerCount)
+	}
 }
 
 func (ao *AdaptiveOptimizer) suggestWorkerDecrease(currentCPU, targetCPU float64) {
-	logger.Debug("CPU over-utilized, consider decreasing workers", 
+	// If already at min workers, do nothing.
+	if ao.config.WorkerCount <= ao.cpuMonitor.minWorkers {
+		logger.Debugf("Worker count is already at its minimum (%d), skipping decrease.", ao.config.WorkerCount)
+		return
+	}
+
+	logger.Debug("CPU over-utilized, adjusting workers downward",
 		"current_cpu", currentCPU, "target_cpu", targetCPU)
+
+	// Calculate suggested decrease (conservative approach)
+	cpuOverUtilization := currentCPU - targetCPU
+	decreaseRatio := cpuOverUtilization / targetCPU // Use target CPU as base for more accurate calculation
+
+	// Limit decrease to maximum 25% at a time and at least 1 worker
+	maxDecrease := max(1, int(float64(ao.config.WorkerCount)*0.25))
+	suggestedDecrease := max(1, int(float64(ao.config.WorkerCount)*decreaseRatio*0.5)) // More conservative decrease
+	actualDecrease := min(maxDecrease, suggestedDecrease)
+
+	newWorkerCount := max(ao.cpuMonitor.minWorkers, ao.config.WorkerCount-actualDecrease)
+
+	logger.Debugf("Worker decrease calculation: current=%d, suggested=%d, min=%d, new=%d",
+		ao.config.WorkerCount, suggestedDecrease, ao.cpuMonitor.minWorkers, newWorkerCount)
+
+	if newWorkerCount < ao.config.WorkerCount {
+		logger.Debugf("About to adjust worker count from %d to %d", ao.config.WorkerCount, newWorkerCount)
+		ao.adjustWorkerCount(newWorkerCount)
+		logger.Infof("Decreased workers from %d to %d due to CPU over-utilization",
+			ao.config.WorkerCount, newWorkerCount)
+	} else {
+		logger.Debugf("Worker count adjustment skipped: new=%d not less than current=%d", newWorkerCount, ao.config.WorkerCount)
+	}
+}
+
+// adjustWorkerCount dynamically adjusts the worker count at runtime
+func (ao *AdaptiveOptimizer) adjustWorkerCount(newCount int) {
+	if newCount <= 0 || newCount == ao.config.WorkerCount {
+		logger.Debugf("Skipping worker adjustment: newCount=%d, currentCount=%d", newCount, ao.config.WorkerCount)
+		return
+	}
+
+	logger.Infof("Adjusting worker count from %d to %d", ao.config.WorkerCount, newCount)
+
+	// Update configuration
+	oldCount := ao.config.WorkerCount
+	ao.config.WorkerCount = newCount
+
+	// Notify the indexer about worker count change
+	// This would typically trigger a worker pool resize in the parallel indexer
+	if ao.onWorkerCountChange != nil {
+		logger.Debugf("Calling worker count change callback: %d -> %d", oldCount, newCount)
+		ao.onWorkerCountChange(oldCount, newCount)
+	} else {
+		logger.Warnf("Worker count change callback is nil - worker adjustment will not take effect")
+	}
+
+	// Log the adjustment for monitoring
+	atomic.AddInt64(&ao.optimizationsMade, 1)
 }
 
 // Utility functions
@@ -476,4 +592,4 @@ func min(a, b int) int {
 		return a
 	}
 	return b
-}
+}

+ 395 - 0
internal/nginx_log/indexer/adaptive_optimization_test.go

@@ -0,0 +1,395 @@
+package indexer
+
+import (
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+// Mock config for testing
+type mockConfigForAdaptive struct {
+	workerCount int
+	batchSize   int
+}
+
+func (m *mockConfigForAdaptive) GetWorkerCount() int {
+	return m.workerCount
+}
+
+func (m *mockConfigForAdaptive) SetWorkerCount(count int) {
+	m.workerCount = count
+}
+
+// Test helper to create adaptive optimizer with mock config
+func createTestAdaptiveOptimizer(workerCount int) *AdaptiveOptimizer {
+	config := &Config{
+		WorkerCount: workerCount,
+		BatchSize:   1000,
+	}
+	
+	return NewAdaptiveOptimizer(config)
+}
+
+func TestAdaptiveOptimizer_NewAdaptiveOptimizer(t *testing.T) {
+	config := &Config{
+		WorkerCount: 8,
+		BatchSize:   1000,
+	}
+	
+	ao := NewAdaptiveOptimizer(config)
+	
+	if ao == nil {
+		t.Fatal("NewAdaptiveOptimizer returned nil")
+	}
+	
+	if ao.config.WorkerCount != 8 {
+		t.Errorf("Expected worker count 8, got %d", ao.config.WorkerCount)
+	}
+	
+	if ao.cpuMonitor.targetUtilization != 0.75 {
+		t.Errorf("Expected target CPU utilization 0.75, got %f", ao.cpuMonitor.targetUtilization)
+	}
+	
+	if ao.batchSizeController.baseBatchSize != 1000 {
+		t.Errorf("Expected base batch size 1000, got %d", ao.batchSizeController.baseBatchSize)
+	}
+}
+
+func TestAdaptiveOptimizer_SetWorkerCountChangeCallback(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	var callbackOldCount, callbackNewCount int
+	callbackCalled := false
+	
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		callbackOldCount = oldCount
+		callbackNewCount = newCount
+		callbackCalled = true
+	})
+	
+	// Trigger a callback
+	if ao.onWorkerCountChange != nil {
+		ao.onWorkerCountChange(4, 6)
+	}
+	
+	if !callbackCalled {
+		t.Error("Expected callback to be called")
+	}
+	
+	if callbackOldCount != 4 {
+		t.Errorf("Expected old count 4, got %d", callbackOldCount)
+	}
+	
+	if callbackNewCount != 6 {
+		t.Errorf("Expected new count 6, got %d", callbackNewCount)
+	}
+}
+
+func TestAdaptiveOptimizer_suggestWorkerIncrease(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	var actualOldCount, actualNewCount int
+	var callbackCalled bool
+	
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		actualOldCount = oldCount
+		actualNewCount = newCount
+		callbackCalled = true
+	})
+	
+	// Test CPU underutilization scenario
+	currentCPU := 0.5  // 50% utilization
+	targetCPU := 0.8   // 80% target
+	
+	ao.suggestWorkerIncrease(currentCPU, targetCPU)
+	
+	if !callbackCalled {
+		t.Error("Expected worker count change callback to be called")
+	}
+	
+	if actualOldCount != 4 {
+		t.Errorf("Expected old worker count 4, got %d", actualOldCount)
+	}
+	
+	// Should increase workers, but not more than max allowed
+	if actualNewCount <= 4 {
+		t.Errorf("Expected new worker count to be greater than 4, got %d", actualNewCount)
+	}
+	
+	// Verify config was updated
+	if ao.config.WorkerCount != actualNewCount {
+		t.Errorf("Expected config worker count to be updated to %d, got %d", actualNewCount, ao.config.WorkerCount)
+	}
+}
+
+func TestAdaptiveOptimizer_suggestWorkerDecrease(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(8)
+	
+	var actualOldCount, actualNewCount int
+	var callbackCalled bool
+	
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		actualOldCount = oldCount
+		actualNewCount = newCount
+		callbackCalled = true
+	})
+	
+	// Test CPU over-utilization scenario
+	currentCPU := 0.95  // 95% utilization
+	targetCPU := 0.8    // 80% target
+	
+	ao.suggestWorkerDecrease(currentCPU, targetCPU)
+	
+	if !callbackCalled {
+		t.Error("Expected worker count change callback to be called")
+	}
+	
+	if actualOldCount != 8 {
+		t.Errorf("Expected old worker count 8, got %d", actualOldCount)
+	}
+	
+	// Should decrease workers, but not below minimum
+	if actualNewCount >= 8 {
+		t.Errorf("Expected new worker count to be less than 8, got %d", actualNewCount)
+	}
+	
+	// Should not go below minimum
+	if actualNewCount < ao.cpuMonitor.minWorkers {
+		t.Errorf("New worker count %d should not be below minimum %d", actualNewCount, ao.cpuMonitor.minWorkers)
+	}
+	
+	// Verify config was updated
+	if ao.config.WorkerCount != actualNewCount {
+		t.Errorf("Expected config worker count to be updated to %d, got %d", actualNewCount, ao.config.WorkerCount)
+	}
+}
+
+func TestAdaptiveOptimizer_adjustWorkerCount_NoChange(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	var callbackCalled bool
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		callbackCalled = true
+	})
+	
+	// Test no change scenario
+	ao.adjustWorkerCount(4) // Same as current
+	
+	if callbackCalled {
+		t.Error("Expected no callback when worker count doesn't change")
+	}
+	
+	if ao.config.WorkerCount != 4 {
+		t.Errorf("Expected worker count to remain 4, got %d", ao.config.WorkerCount)
+	}
+}
+
+func TestAdaptiveOptimizer_adjustWorkerCount_InvalidCount(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	var callbackCalled bool
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		callbackCalled = true
+	})
+	
+	// Test invalid count (0 or negative)
+	ao.adjustWorkerCount(0)
+	ao.adjustWorkerCount(-1)
+	
+	if callbackCalled {
+		t.Error("Expected no callback for invalid worker counts")
+	}
+	
+	if ao.config.WorkerCount != 4 {
+		t.Errorf("Expected worker count to remain 4, got %d", ao.config.WorkerCount)
+	}
+}
+
+func TestAdaptiveOptimizer_GetOptimalBatchSize(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	// Initial batch size should be from config
+	batchSize := ao.GetOptimalBatchSize()
+	expectedInitial := int32(1000)
+	if batchSize != int(expectedInitial) {
+		t.Errorf("Expected initial batch size %d, got %d", expectedInitial, batchSize)
+	}
+	
+	// Test updating batch size
+	newBatchSize := int32(1500)
+	atomic.StoreInt32(&ao.batchSizeController.currentBatchSize, newBatchSize)
+	
+	batchSize = ao.GetOptimalBatchSize()
+	if batchSize != int(newBatchSize) {
+		t.Errorf("Expected updated batch size %d, got %d", newBatchSize, batchSize)
+	}
+}
+
+func TestAdaptiveOptimizer_measureAndAdjustCPU_WithinThreshold(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	var callbackCalled bool
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		callbackCalled = true
+	})
+	
+	// Mock CPU measurements within threshold
+	ao.cpuMonitor.measurements = []float64{0.78, 0.79, 0.81, 0.82} // Around 0.8 target
+	
+	ao.measureAndAdjustCPU()
+	
+	// Should not trigger worker adjustment if within threshold
+	if callbackCalled {
+		t.Error("Expected no worker adjustment when CPU is within threshold")
+	}
+}
+
+func TestAdaptiveOptimizer_GetCPUUtilization(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	// Set current utilization
+	ao.cpuMonitor.currentUtilization = 0.75
+	
+	utilization := ao.GetCPUUtilization()
+	if utilization != 0.75 {
+		t.Errorf("Expected CPU utilization 0.75, got %f", utilization)
+	}
+}
+
+func TestAdaptiveOptimizer_GetOptimizationStats(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	// Set some test values
+	atomic.StoreInt64(&ao.optimizationsMade, 5)
+	ao.avgThroughput = 25.5
+	ao.avgLatency = 2 * time.Second
+	ao.cpuMonitor.currentUtilization = 0.85
+	
+	stats := ao.GetOptimizationStats()
+	
+	if stats.OptimizationsMade != 5 {
+		t.Errorf("Expected 5 optimizations made, got %d", stats.OptimizationsMade)
+	}
+	
+	if stats.AvgThroughput != 25.5 {
+		t.Errorf("Expected avg throughput 25.5, got %f", stats.AvgThroughput)
+	}
+	
+	if stats.AvgLatency != 2*time.Second {
+		t.Errorf("Expected avg latency 2s, got %v", stats.AvgLatency)
+	}
+	
+	if stats.CPUUtilization != 0.85 {
+		t.Errorf("Expected CPU utilization 0.85, got %f", stats.CPUUtilization)
+	}
+	
+	if stats.CurrentBatchSize != 1000 {
+		t.Errorf("Expected current batch size 1000, got %d", stats.CurrentBatchSize)
+	}
+}
+
+func TestAdaptiveOptimizer_StartStop(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	// Test start
+	err := ao.Start()
+	if err != nil {
+		t.Fatalf("Failed to start adaptive optimizer: %v", err)
+	}
+	
+	// Verify running state
+	if atomic.LoadInt32(&ao.running) != 1 {
+		t.Error("Expected adaptive optimizer to be running")
+	}
+	
+	// Test starting again (should fail)
+	err = ao.Start()
+	if err == nil {
+		t.Error("Expected error when starting already running optimizer")
+	}
+	
+	// Small delay to let goroutines start
+	time.Sleep(100 * time.Millisecond)
+	
+	// Test stop
+	ao.Stop()
+	
+	// Verify stopped state
+	if atomic.LoadInt32(&ao.running) != 0 {
+		t.Error("Expected adaptive optimizer to be stopped")
+	}
+}
+
+func TestAdaptiveOptimizer_WorkerAdjustmentLimits(t *testing.T) {
+	// Test maximum worker limit
+	ao := createTestAdaptiveOptimizer(16) // Start with high count
+	ao.cpuMonitor.maxWorkers = 20
+	
+	var actualNewCount int
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		actualNewCount = newCount
+	})
+	
+	// Try to increase beyond max
+	ao.suggestWorkerIncrease(0.2, 0.8) // Very low CPU, should want to increase
+	
+	if actualNewCount > ao.cpuMonitor.maxWorkers {
+		t.Errorf("New worker count %d exceeds maximum %d", actualNewCount, ao.cpuMonitor.maxWorkers)
+	}
+	
+	// Test minimum worker limit
+	ao2 := createTestAdaptiveOptimizer(3)
+	ao2.cpuMonitor.minWorkers = 2
+	
+	ao2.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		actualNewCount = newCount
+	})
+	
+	// Try to decrease below min
+	ao2.suggestWorkerDecrease(0.98, 0.8) // Very high CPU, should want to decrease
+	
+	if actualNewCount < ao2.cpuMonitor.minWorkers {
+		t.Errorf("New worker count %d below minimum %d", actualNewCount, ao2.cpuMonitor.minWorkers)
+	}
+}
+
+func TestAdaptiveOptimizer_ConcurrentAccess(t *testing.T) {
+	ao := createTestAdaptiveOptimizer(4)
+	
+	var wg sync.WaitGroup
+	var adjustmentCount int32
+	
+	ao.SetWorkerCountChangeCallback(func(oldCount, newCount int) {
+		atomic.AddInt32(&adjustmentCount, 1)
+	})
+	
+	// Simulate concurrent CPU measurements and adjustments
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			
+			// Simulate alternating high and low CPU
+			if i%2 == 0 {
+				ao.suggestWorkerIncrease(0.4, 0.8)
+			} else {
+				ao.suggestWorkerDecrease(0.95, 0.8)
+			}
+		}()
+	}
+	
+	wg.Wait()
+	
+	// Verify that some adjustments were made
+	finalCount := atomic.LoadInt32(&adjustmentCount)
+	if finalCount == 0 {
+		t.Error("Expected some worker adjustments to be made")
+	}
+	
+	// Verify final state is valid
+	if ao.config.WorkerCount < ao.cpuMonitor.minWorkers || ao.config.WorkerCount > ao.cpuMonitor.maxWorkers {
+		t.Errorf("Final worker count %d outside valid range [%d, %d]", 
+			ao.config.WorkerCount, ao.cpuMonitor.minWorkers, ao.cpuMonitor.maxWorkers)
+	}
+}

+ 3 - 3
internal/nginx_log/indexer/cpu_optimization_test.go

@@ -18,9 +18,9 @@ func BenchmarkCPUUtilization(b *testing.B) {
 		queueSize   int
 	}{
 		{"Current_8W_1000B", 8, 1000, 10000},
-		{"CPU_Match", runtime.NumCPU(), 1000, 10000},
-		{"CPU_Double", runtime.NumCPU() * 2, 1000, 10000},
-		{"CPU_Triple", runtime.NumCPU() * 3, 1000, 10000},
+		{"CPU_Match", runtime.GOMAXPROCS(0), 1000, 10000},
+		{"CPU_Double", runtime.GOMAXPROCS(0) * 2, 1000, 10000},
+		{"CPU_Triple", runtime.GOMAXPROCS(0) * 3, 1000, 10000},
 		{"HighBatch_8W_2000B", 8, 2000, 10000},
 		{"HighBatch_12W_2000B", 12, 2000, 20000},
 		{"LowLatency_16W_500B", 16, 500, 20000},

+ 3 - 3
internal/nginx_log/indexer/dynamic_shard_awareness.go

@@ -99,7 +99,7 @@ type EnvironmentFactors struct {
 // analyzeEnvironmentFactors analyzes the current environment
 func (dsa *DynamicShardAwareness) analyzeEnvironmentFactors() EnvironmentFactors {
 	factors := EnvironmentFactors{
-		CPUCores: runtime.NumCPU(),
+		CPUCores: runtime.GOMAXPROCS(0), // Use GOMAXPROCS for container-aware processor count
 	}
 	
 	// Get memory info (simplified)
@@ -178,8 +178,8 @@ func (dsa *DynamicShardAwareness) analyzeExpectedLoad() string {
 		return "high"
 	}
 	
-	// Variable load if workers are significantly higher than CPU cores
-	if workerCount > runtime.NumCPU()*2 {
+	// Variable load if workers are significantly higher than available processors
+	if workerCount > runtime.GOMAXPROCS(0)*2 {
 		return "variable"
 	}
 	

+ 3 - 3
internal/nginx_log/indexer/dynamic_shard_test.go

@@ -118,7 +118,7 @@ func TestEnhancedDynamicShardManager(t *testing.T) {
 func TestParallelIndexerWithDynamicShards(t *testing.T) {
 	config := DefaultIndexerConfig()
 	config.IndexPath = t.TempDir()
-	config.WorkerCount = runtime.NumCPU() * 2 // Ensure high worker count for dynamic detection
+	config.WorkerCount = runtime.GOMAXPROCS(0) * 2 // Ensure high worker count for dynamic detection
 	
 	// Create indexer with nil shard manager to trigger dynamic detection
 	indexer := NewParallelIndexer(config, nil)
@@ -135,9 +135,9 @@ func TestParallelIndexerWithDynamicShards(t *testing.T) {
 	t.Logf("Current shard manager type: %T", currentManager)
 	
 	// For M2 Pro with 12 cores, 24 workers, should detect dynamic management
-	if runtime.NumCPU() >= 8 {
+	if runtime.GOMAXPROCS(0) >= 8 {
 		if !isDynamic {
-			t.Errorf("Expected dynamic shard management on high-core system (CPU: %d)", runtime.NumCPU())
+			t.Errorf("Expected dynamic shard management on high-core system (Procs: %d)", runtime.GOMAXPROCS(0))
 		} else {
 			t.Logf("✅ Dynamic shard management correctly detected on high-core system")
 		}

+ 212 - 51
internal/nginx_log/indexer/parallel_indexer.go

@@ -14,9 +14,9 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/0xJacky/Nginx-UI/internal/nginx_log/utils"
 	"github.com/blevesearch/bleve/v2"
 	"github.com/uozi-tech/cosy/logger"
-	"github.com/0xJacky/Nginx-UI/internal/nginx_log/utils"
 )
 
 // ParallelIndexer provides high-performance parallel indexing with sharding
@@ -35,9 +35,9 @@ type ParallelIndexer struct {
 	cancel  context.CancelFunc
 	wg      sync.WaitGroup
 	running int32
-	
+
 	// Cleanup control
-	stopOnce sync.Once
+	stopOnce       sync.Once
 	channelsClosed int32
 
 	// Statistics
@@ -50,9 +50,9 @@ type ParallelIndexer struct {
 	adaptiveOptimizer   *AdaptiveOptimizer
 	zeroAllocProcessor  *ZeroAllocBatchProcessor
 	optimizationEnabled bool
-	
+
 	// Dynamic shard awareness
-	dynamicAwareness    *DynamicShardAwareness
+	dynamicAwareness *DynamicShardAwareness
 }
 
 // indexWorker represents a single indexing worker
@@ -73,7 +73,7 @@ func NewParallelIndexer(config *Config, shardManager ShardManager) *ParallelInde
 
 	// Initialize dynamic shard awareness
 	dynamicAwareness := NewDynamicShardAwareness(config)
-	
+
 	// If no shard manager provided, use dynamic awareness to detect optimal type
 	var actualShardManager ShardManager
 	if shardManager == nil {
@@ -83,7 +83,7 @@ func NewParallelIndexer(config *Config, shardManager ShardManager) *ParallelInde
 			detected = NewDefaultShardManager(config)
 			detected.(*DefaultShardManager).Initialize()
 		}
-		
+
 		// Type assertion to ShardManager interface
 		if sm, ok := detected.(ShardManager); ok {
 			actualShardManager = sm
@@ -96,9 +96,11 @@ func NewParallelIndexer(config *Config, shardManager ShardManager) *ParallelInde
 		actualShardManager = shardManager
 	}
 
+	ao := NewAdaptiveOptimizer(config)
+
 	indexer := &ParallelIndexer{
-		config:              config,
-		shardManager:        actualShardManager,
+		config:             config,
+		shardManager:       actualShardManager,
 		metrics:            NewDefaultMetricsCollector(),
 		jobQueue:           make(chan *IndexJob, config.MaxQueueSize),
 		resultQueue:        make(chan *IndexResult, config.WorkerCount),
@@ -107,12 +109,17 @@ func NewParallelIndexer(config *Config, shardManager ShardManager) *ParallelInde
 		stats: &IndexStats{
 			WorkerStats: make([]*WorkerStats, config.WorkerCount),
 		},
-		adaptiveOptimizer:   NewAdaptiveOptimizer(config),
-		zeroAllocProcessor:  NewZeroAllocBatchProcessor(config),
+		adaptiveOptimizer:   ao,
+		zeroAllocProcessor: NewZeroAllocBatchProcessor(config),
 		optimizationEnabled: true, // Enable optimizations by default
 		dynamicAwareness:    dynamicAwareness,
 	}
 
+	// Set up the activity poller for the adaptive optimizer
+	if indexer.adaptiveOptimizer != nil {
+		indexer.adaptiveOptimizer.SetActivityPoller(indexer)
+	}
+
 	// Initialize workers
 	indexer.workers = make([]*indexWorker, config.WorkerCount)
 	for i := 0; i < config.WorkerCount; i++ {
@@ -166,15 +173,21 @@ func (pi *ParallelIndexer) Start(ctx context.Context) error {
 
 	// Start adaptive optimizer if enabled
 	if pi.optimizationEnabled && pi.adaptiveOptimizer != nil {
+		// Set worker count change callback
+		logger.Debugf("Setting up adaptive optimizer callback for worker count changes")
+		pi.adaptiveOptimizer.SetWorkerCountChangeCallback(pi.handleWorkerCountChange)
+
 		if err := pi.adaptiveOptimizer.Start(); err != nil {
 			logger.Warnf("Failed to start adaptive optimizer: %v", err)
+		} else {
+			logger.Debugf("Adaptive optimizer started successfully")
 		}
 	}
-	
+
 	// Start dynamic shard awareness monitoring if enabled
 	if pi.dynamicAwareness != nil {
 		pi.dynamicAwareness.StartMonitoring(ctx)
-		
+
 		if pi.dynamicAwareness.IsDynamic() {
 			logger.Info("Dynamic shard management is active with automatic scaling")
 		} else {
@@ -185,10 +198,90 @@ func (pi *ParallelIndexer) Start(ctx context.Context) error {
 	return nil
 }
 
+// handleWorkerCountChange handles dynamic worker count adjustments from adaptive optimizer
+func (pi *ParallelIndexer) handleWorkerCountChange(oldCount, newCount int) {
+	logger.Infof("Handling worker count change from %d to %d", oldCount, newCount)
+
+	// Check if indexer is running
+	if atomic.LoadInt32(&pi.running) != 1 {
+		logger.Warn("Cannot adjust worker count: indexer not running")
+		return
+	}
+
+	// Prevent concurrent worker adjustments
+	pi.statsMutex.Lock()
+	defer pi.statsMutex.Unlock()
+
+	currentWorkerCount := len(pi.workers)
+	if currentWorkerCount == newCount {
+		return // Already at desired count
+	}
+
+	if newCount > currentWorkerCount {
+		// Add more workers
+		pi.addWorkers(newCount - currentWorkerCount)
+	} else {
+		// Remove workers
+		pi.removeWorkers(currentWorkerCount - newCount)
+	}
+
+	// Update config to reflect the change
+	pi.config.WorkerCount = newCount
+
+	logger.Infof("Successfully adjusted worker count to %d", newCount)
+}
+
+// addWorkers adds new workers to the pool
+func (pi *ParallelIndexer) addWorkers(count int) {
+	for i := 0; i < count; i++ {
+		workerID := len(pi.workers)
+		worker := &indexWorker{
+			id:      workerID,
+			indexer: pi,
+			stats: &WorkerStats{
+				ID:     workerID,
+				Status: WorkerStatusIdle,
+			},
+		}
+
+		pi.workers = append(pi.workers, worker)
+		pi.stats.WorkerStats = append(pi.stats.WorkerStats, worker.stats)
+
+		// Start the new worker
+		pi.wg.Add(1)
+		go worker.run()
+
+		logger.Debugf("Added worker %d", workerID)
+	}
+}
+
+// removeWorkers gracefully removes workers from the pool
+func (pi *ParallelIndexer) removeWorkers(count int) {
+	if count >= len(pi.workers) {
+		logger.Warn("Cannot remove all workers, keeping at least one")
+		count = len(pi.workers) - 1
+	}
+
+	// Remove workers from the end of the slice
+	workersToRemove := pi.workers[len(pi.workers)-count:]
+	pi.workers = pi.workers[:len(pi.workers)-count]
+	pi.stats.WorkerStats = pi.stats.WorkerStats[:len(pi.stats.WorkerStats)-count]
+
+	// Note: In a full implementation, you would need to:
+	// 1. Signal workers to stop gracefully after finishing current jobs
+	// 2. Wait for them to complete
+	// 3. Clean up their resources
+	// For now, we just remove them from tracking
+
+	for _, worker := range workersToRemove {
+		logger.Debugf("Removed worker %d", worker.id)
+	}
+}
+
 // Stop gracefully stops the indexer
 func (pi *ParallelIndexer) Stop() error {
 	var stopErr error
-	
+
 	pi.stopOnce.Do(func() {
 		// Set running to 0
 		if !atomic.CompareAndSwapInt32(&pi.running, 1, 0) {
@@ -359,59 +452,59 @@ func (pi *ParallelIndexer) EnableOptimizations(enabled bool) {
 func (pi *ParallelIndexer) GetDynamicShardInfo() *DynamicShardInfo {
 	if pi.dynamicAwareness == nil {
 		return &DynamicShardInfo{
-			IsEnabled:    false,
-			IsActive:     false,
-			ShardCount:   pi.config.ShardCount,
-			ShardType:    "static",
+			IsEnabled:  false,
+			IsActive:   false,
+			ShardCount: pi.config.ShardCount,
+			ShardType:  "static",
 		}
 	}
-	
+
 	isDynamic := pi.dynamicAwareness.IsDynamic()
 	shardManager := pi.dynamicAwareness.GetCurrentShardManager()
-	
+
 	info := &DynamicShardInfo{
-		IsEnabled:    true,
-		IsActive:     isDynamic,
-		ShardCount:   pi.config.ShardCount,
-		ShardType:    "static",
+		IsEnabled:  true,
+		IsActive:   isDynamic,
+		ShardCount: pi.config.ShardCount,
+		ShardType:  "static",
 	}
-	
+
 	if isDynamic {
 		info.ShardType = "dynamic"
-		
+
 		if enhancedManager, ok := shardManager.(*EnhancedDynamicShardManager); ok {
 			info.TargetShardCount = enhancedManager.GetTargetShardCount()
 			info.IsScaling = enhancedManager.IsScalingInProgress()
 			info.AutoScaleEnabled = enhancedManager.IsAutoScaleEnabled()
-			
+
 			// Get scaling recommendation
 			recommendation := enhancedManager.GetScalingRecommendations()
 			info.Recommendation = recommendation
-			
+
 			// Get shard health
 			info.ShardHealth = enhancedManager.GetShardHealth()
 		}
 	}
-	
+
 	// Get performance analysis
 	analysis := pi.dynamicAwareness.GetPerformanceAnalysis()
 	info.PerformanceAnalysis = &analysis
-	
+
 	return info
 }
 
 // DynamicShardInfo contains information about dynamic shard management status
 type DynamicShardInfo struct {
-	IsEnabled           bool                             `json:"is_enabled"`
-	IsActive            bool                             `json:"is_active"`
-	ShardType           string                           `json:"shard_type"`        // "static" or "dynamic"
-	ShardCount          int                              `json:"shard_count"`
-	TargetShardCount    int                              `json:"target_shard_count,omitempty"`
-	IsScaling           bool                             `json:"is_scaling,omitempty"`
-	AutoScaleEnabled    bool                             `json:"auto_scale_enabled,omitempty"`
-	Recommendation      *ScalingRecommendation           `json:"recommendation,omitempty"`
-	ShardHealth         map[int]*ShardHealthStatus       `json:"shard_health,omitempty"`
-	PerformanceAnalysis *PerformanceAnalysis             `json:"performance_analysis,omitempty"`
+	IsEnabled           bool                       `json:"is_enabled"`
+	IsActive            bool                       `json:"is_active"`
+	ShardType           string                     `json:"shard_type"` // "static" or "dynamic"
+	ShardCount          int                        `json:"shard_count"`
+	TargetShardCount    int                        `json:"target_shard_count,omitempty"`
+	IsScaling           bool                       `json:"is_scaling,omitempty"`
+	AutoScaleEnabled    bool                       `json:"auto_scale_enabled,omitempty"`
+	Recommendation      *ScalingRecommendation     `json:"recommendation,omitempty"`
+	ShardHealth         map[int]*ShardHealthStatus `json:"shard_health,omitempty"`
+	PerformanceAnalysis *PerformanceAnalysis       `json:"performance_analysis,omitempty"`
 }
 
 // FlushAll flushes all pending operations
@@ -535,7 +628,7 @@ func (pi *ParallelIndexer) IndexLogFile(filePath string) error {
 		docIDBuf = append(docIDBuf, filePath...)
 		docIDBuf = append(docIDBuf, '-')
 		docIDBuf = utils.AppendInt(docIDBuf, int(docCount))
-		
+
 		doc := &Document{
 			ID:     utils.BytesToStringUnsafe(docIDBuf),
 			Fields: logDoc,
@@ -596,6 +689,28 @@ func (pi *ParallelIndexer) IsRunning() bool {
 	return atomic.LoadInt32(&pi.running) != 0
 }
 
+// IsBusy checks if the indexer has pending jobs or any active workers.
+func (pi *ParallelIndexer) IsBusy() bool {
+	if len(pi.jobQueue) > 0 {
+		return true
+	}
+
+	// This RLock protects the pi.workers slice from changing during iteration (e.g. scaling)
+	pi.statsMutex.RLock()
+	defer pi.statsMutex.RUnlock()
+
+	for _, worker := range pi.workers {
+		worker.statsMutex.RLock()
+		isBusy := worker.stats.Status == WorkerStatusBusy
+		worker.statsMutex.RUnlock()
+		if isBusy {
+			return true
+		}
+	}
+
+	return false
+}
+
 // GetShardInfo returns information about a specific shard
 func (pi *ParallelIndexer) GetShardInfo(shardID int) (*ShardInfo, error) {
 	shardStats := pi.shardManager.GetShardStats()
@@ -653,43 +768,43 @@ func (pi *ParallelIndexer) DeleteIndexByLogGroup(basePath string, logFileManager
 	// Delete documents from all shards for these files
 	shards := pi.shardManager.GetAllShards()
 	var deleteErrors []error
-	
+
 	for _, shard := range shards {
 		// Search for documents with matching file_path
 		for _, filePath := range filesToDelete {
 			query := bleve.NewTermQuery(filePath)
 			query.SetField("file_path")
-			
+
 			searchRequest := bleve.NewSearchRequest(query)
 			searchRequest.Size = 1000 // Process in batches
 			searchRequest.Fields = []string{"file_path"}
-			
+
 			for {
 				searchResult, err := shard.Search(searchRequest)
 				if err != nil {
 					deleteErrors = append(deleteErrors, fmt.Errorf("failed to search for documents in file %s: %w", filePath, err))
 					break
 				}
-				
+
 				if len(searchResult.Hits) == 0 {
 					break // No more documents to delete
 				}
-				
+
 				// Delete documents in batch
 				batch := shard.NewBatch()
 				for _, hit := range searchResult.Hits {
 					batch.Delete(hit.ID)
 				}
-				
+
 				if err := shard.Batch(batch); err != nil {
 					deleteErrors = append(deleteErrors, fmt.Errorf("failed to delete batch for file %s: %w", filePath, err))
 				}
-				
+
 				// If we got fewer results than requested, we're done
 				if len(searchResult.Hits) < searchRequest.Size {
 					break
 				}
-				
+
 				// Continue from where we left off
 				searchRequest.From += searchRequest.Size
 			}
@@ -704,13 +819,12 @@ func (pi *ParallelIndexer) DeleteIndexByLogGroup(basePath string, logFileManager
 	return nil
 }
 
-
 // DestroyAllIndexes closes and deletes all index data from disk.
 func (pi *ParallelIndexer) DestroyAllIndexes(parentCtx context.Context) error {
 	// Stop all background routines before deleting files
 	pi.cancel()
 	pi.wg.Wait()
-	
+
 	// Safely close channels if they haven't been closed yet
 	if atomic.CompareAndSwapInt32(&pi.channelsClosed, 0, 1) {
 		close(pi.jobQueue)
@@ -803,6 +917,53 @@ func (pi *ParallelIndexer) IndexLogGroup(basePath string) (map[string]uint64, *t
 	return docsCountMap, overallMinTime, overallMaxTime, nil
 }
 
+// IndexSingleFileIncrementally is a more efficient version for incremental updates.
+// It indexes only the specified single file instead of the entire log group.
+func (pi *ParallelIndexer) IndexSingleFileIncrementally(filePath string, progressConfig *ProgressConfig) (map[string]uint64, *time.Time, *time.Time, error) {
+	if !pi.IsHealthy() {
+		return nil, nil, nil, fmt.Errorf("indexer not healthy")
+	}
+
+	// Create progress tracker if config is provided
+	var progressTracker *ProgressTracker
+	if progressConfig != nil {
+		progressTracker = NewProgressTracker(filePath, progressConfig)
+		// Setup file for tracking
+		isCompressed := IsCompressedFile(filePath)
+		progressTracker.AddFile(filePath, isCompressed)
+		if stat, err := os.Stat(filePath); err == nil {
+			progressTracker.SetFileSize(filePath, stat.Size())
+			if estimatedLines, err := EstimateFileLines(context.Background(), filePath, stat.Size(), isCompressed); err == nil {
+				progressTracker.SetFileEstimate(filePath, estimatedLines)
+			}
+		}
+	}
+
+	docsCountMap := make(map[string]uint64)
+
+	if progressTracker != nil {
+		progressTracker.StartFile(filePath)
+	}
+
+	docsIndexed, minTime, maxTime, err := pi.indexSingleFileWithProgress(filePath, progressTracker)
+	if err != nil {
+		logger.Warnf("Failed to incrementally index file '%s', skipping: %v", filePath, err)
+		if progressTracker != nil {
+			progressTracker.FailFile(filePath, err.Error())
+		}
+		// Return empty results and the error
+		return docsCountMap, nil, nil, err
+	}
+
+	docsCountMap[filePath] = docsIndexed
+
+	if progressTracker != nil {
+		progressTracker.CompleteFile(filePath, int64(docsIndexed))
+	}
+
+	return docsCountMap, minTime, maxTime, nil
+}
+
 // indexSingleFile contains the logic to process one physical log file.
 // It returns the number of documents indexed from the file, and the min/max timestamps.
 func (pi *ParallelIndexer) indexSingleFile(filePath string) (uint64, *time.Time, *time.Time, error) {

+ 455 - 0
internal/nginx_log/indexer/parallel_indexer_worker_test.go

@@ -0,0 +1,455 @@
+package indexer
+
+import (
+	"context"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/blevesearch/bleve/v2"
+)
+
+// Mock shard manager for parallel indexer tests
+type mockShardManagerForWorkerTest struct{}
+
+func (m *mockShardManagerForWorkerTest) GetShard(key string) (bleve.Index, int, error) {
+	return nil, 0, nil
+}
+
+func (m *mockShardManagerForWorkerTest) GetShardByID(id int) (bleve.Index, error) {
+	return nil, nil
+}
+
+func (m *mockShardManagerForWorkerTest) GetAllShards() []bleve.Index {
+	return []bleve.Index{}
+}
+
+func (m *mockShardManagerForWorkerTest) GetShardCount() int {
+	return 1
+}
+
+func (m *mockShardManagerForWorkerTest) Initialize() error {
+	return nil
+}
+
+func (m *mockShardManagerForWorkerTest) GetShardStats() []*ShardInfo {
+	return []*ShardInfo{}
+}
+
+func (m *mockShardManagerForWorkerTest) CreateShard(id int, path string) error {
+	return nil
+}
+
+func (m *mockShardManagerForWorkerTest) Close() error {
+	return nil
+}
+
+func (m *mockShardManagerForWorkerTest) CloseShard(id int) error {
+	return nil
+}
+
+func (m *mockShardManagerForWorkerTest) HealthCheck() error {
+	return nil
+}
+
+func (m *mockShardManagerForWorkerTest) OptimizeShard(id int) error {
+	return nil
+}
+
+// Test helper to create parallel indexer for worker tests
+func createTestParallelIndexer(workerCount int) *ParallelIndexer {
+	config := &Config{
+		WorkerCount:  workerCount,
+		BatchSize:    100,
+		MaxQueueSize: 1000,
+	}
+	
+	shardManager := &mockShardManagerForWorkerTest{}
+	return NewParallelIndexer(config, shardManager)
+}
+
+func TestParallelIndexer_handleWorkerCountChange_Increase(t *testing.T) {
+	pi := createTestParallelIndexer(4)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	initialWorkerCount := len(pi.workers)
+	if initialWorkerCount != 4 {
+		t.Fatalf("Expected 4 initial workers, got %d", initialWorkerCount)
+	}
+	
+	// Test increasing worker count
+	pi.handleWorkerCountChange(4, 6)
+	
+	// Verify worker count increased
+	newWorkerCount := len(pi.workers)
+	if newWorkerCount != 6 {
+		t.Errorf("Expected 6 workers after increase, got %d", newWorkerCount)
+	}
+	
+	// Verify config was updated
+	if pi.config.WorkerCount != 6 {
+		t.Errorf("Expected config worker count to be 6, got %d", pi.config.WorkerCount)
+	}
+	
+	// Verify stats were updated
+	if len(pi.stats.WorkerStats) != 6 {
+		t.Errorf("Expected 6 worker stats, got %d", len(pi.stats.WorkerStats))
+	}
+}
+
+func TestParallelIndexer_handleWorkerCountChange_Decrease(t *testing.T) {
+	pi := createTestParallelIndexer(6)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	initialWorkerCount := len(pi.workers)
+	if initialWorkerCount != 6 {
+		t.Fatalf("Expected 6 initial workers, got %d", initialWorkerCount)
+	}
+	
+	// Test decreasing worker count
+	pi.handleWorkerCountChange(6, 4)
+	
+	// Verify worker count decreased
+	newWorkerCount := len(pi.workers)
+	if newWorkerCount != 4 {
+		t.Errorf("Expected 4 workers after decrease, got %d", newWorkerCount)
+	}
+	
+	// Verify config was updated
+	if pi.config.WorkerCount != 4 {
+		t.Errorf("Expected config worker count to be 4, got %d", pi.config.WorkerCount)
+	}
+	
+	// Verify stats were updated
+	if len(pi.stats.WorkerStats) != 4 {
+		t.Errorf("Expected 4 worker stats, got %d", len(pi.stats.WorkerStats))
+	}
+}
+
+func TestParallelIndexer_handleWorkerCountChange_NoChange(t *testing.T) {
+	pi := createTestParallelIndexer(4)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	initialWorkerCount := len(pi.workers)
+	
+	// Test no change scenario
+	pi.handleWorkerCountChange(4, 4)
+	
+	// Verify worker count didn't change
+	newWorkerCount := len(pi.workers)
+	if newWorkerCount != initialWorkerCount {
+		t.Errorf("Expected worker count to remain %d, got %d", initialWorkerCount, newWorkerCount)
+	}
+}
+
+func TestParallelIndexer_handleWorkerCountChange_NotRunning(t *testing.T) {
+	pi := createTestParallelIndexer(4)
+	
+	// Don't start the indexer - it should be in stopped state
+	
+	initialWorkerCount := len(pi.workers)
+	
+	// Test worker count change when not running
+	pi.handleWorkerCountChange(4, 6)
+	
+	// Verify no change occurred
+	newWorkerCount := len(pi.workers)
+	if newWorkerCount != initialWorkerCount {
+		t.Errorf("Expected no worker change when not running, initial: %d, new: %d", 
+			initialWorkerCount, newWorkerCount)
+	}
+	
+	// Verify config wasn't updated
+	if pi.config.WorkerCount != 4 {
+		t.Errorf("Expected config worker count to remain 4, got %d", pi.config.WorkerCount)
+	}
+}
+
+func TestParallelIndexer_addWorkers(t *testing.T) {
+	pi := createTestParallelIndexer(2)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	initialCount := len(pi.workers)
+	if initialCount != 2 {
+		t.Fatalf("Expected 2 initial workers, got %d", initialCount)
+	}
+	
+	// Add 3 workers
+	pi.addWorkers(3)
+	
+	// Verify workers were added
+	newCount := len(pi.workers)
+	if newCount != 5 {
+		t.Errorf("Expected 5 workers after adding 3, got %d", newCount)
+	}
+	
+	// Verify worker IDs are sequential
+	for i, worker := range pi.workers {
+		if worker.id != i {
+			t.Errorf("Expected worker %d to have ID %d, got %d", i, i, worker.id)
+		}
+	}
+	
+	// Verify stats were updated
+	if len(pi.stats.WorkerStats) != 5 {
+		t.Errorf("Expected 5 worker stats, got %d", len(pi.stats.WorkerStats))
+	}
+}
+
+func TestParallelIndexer_removeWorkers(t *testing.T) {
+	pi := createTestParallelIndexer(5)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	initialCount := len(pi.workers)
+	if initialCount != 5 {
+		t.Fatalf("Expected 5 initial workers, got %d", initialCount)
+	}
+	
+	// Remove 2 workers
+	pi.removeWorkers(2)
+	
+	// Verify workers were removed
+	newCount := len(pi.workers)
+	if newCount != 3 {
+		t.Errorf("Expected 3 workers after removing 2, got %d", newCount)
+	}
+	
+	// Verify stats were updated
+	if len(pi.stats.WorkerStats) != 3 {
+		t.Errorf("Expected 3 worker stats, got %d", len(pi.stats.WorkerStats))
+	}
+}
+
+func TestParallelIndexer_removeWorkers_KeepMinimum(t *testing.T) {
+	pi := createTestParallelIndexer(2)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	initialCount := len(pi.workers)
+	if initialCount != 2 {
+		t.Fatalf("Expected 2 initial workers, got %d", initialCount)
+	}
+	
+	// Try to remove all workers (should keep at least one)
+	pi.removeWorkers(2)
+	
+	// Verify at least one worker remains
+	newCount := len(pi.workers)
+	if newCount != 1 {
+		t.Errorf("Expected 1 worker to remain after trying to remove all, got %d", newCount)
+	}
+	
+	// Verify stats were updated
+	if len(pi.stats.WorkerStats) != 1 {
+		t.Errorf("Expected 1 worker stat, got %d", len(pi.stats.WorkerStats))
+	}
+}
+
+func TestParallelIndexer_AdaptiveOptimizerIntegration(t *testing.T) {
+	pi := createTestParallelIndexer(4)
+	
+	// Enable optimization
+	pi.optimizationEnabled = true
+	pi.adaptiveOptimizer = NewAdaptiveOptimizer(pi.config)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	// Verify adaptive optimizer callback was set
+	if pi.adaptiveOptimizer.onWorkerCountChange == nil {
+		t.Error("Expected adaptive optimizer callback to be set")
+	}
+	
+	// Simulate worker count change from adaptive optimizer
+	initialWorkerCount := len(pi.workers)
+	
+	// Trigger callback (simulate adaptive optimizer decision)
+	if pi.adaptiveOptimizer.onWorkerCountChange != nil {
+		pi.adaptiveOptimizer.onWorkerCountChange(4, 6)
+	}
+	
+	// Verify worker count changed
+	newWorkerCount := len(pi.workers)
+	if newWorkerCount == initialWorkerCount {
+		t.Error("Expected worker count to change from adaptive optimizer callback")
+	}
+}
+
+func TestParallelIndexer_ConcurrentWorkerAdjustments(t *testing.T) {
+	pi := createTestParallelIndexer(4)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	var wg sync.WaitGroup
+	
+	// Simulate concurrent worker adjustments
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func(iteration int) {
+			defer wg.Done()
+			
+			// Alternate between increasing and decreasing
+			if iteration%2 == 0 {
+				pi.handleWorkerCountChange(pi.config.WorkerCount, pi.config.WorkerCount+1)
+			} else {
+				if pi.config.WorkerCount > 2 {
+					pi.handleWorkerCountChange(pi.config.WorkerCount, pi.config.WorkerCount-1)
+				}
+			}
+		}(i)
+	}
+	
+	wg.Wait()
+	
+	// Verify final state is consistent
+	workerCount := len(pi.workers)
+	configCount := pi.config.WorkerCount
+	statsCount := len(pi.stats.WorkerStats)
+	
+	if workerCount != configCount {
+		t.Errorf("Worker count (%d) doesn't match config count (%d)", workerCount, configCount)
+	}
+	
+	if workerCount != statsCount {
+		t.Errorf("Worker count (%d) doesn't match stats count (%d)", workerCount, statsCount)
+	}
+	
+	// Verify worker IDs are sequential and unique
+	workerIDs := make(map[int]bool)
+	for i, worker := range pi.workers {
+		if worker.id != i {
+			t.Errorf("Expected worker at index %d to have ID %d, got %d", i, i, worker.id)
+		}
+		
+		if workerIDs[worker.id] {
+			t.Errorf("Duplicate worker ID found: %d", worker.id)
+		}
+		workerIDs[worker.id] = true
+	}
+}
+
+func TestParallelIndexer_WorkerStatsConsistency(t *testing.T) {
+	pi := createTestParallelIndexer(3)
+	
+	// Start the indexer
+	ctx := context.Background()
+	err := pi.Start(ctx)
+	if err != nil {
+		t.Fatalf("Failed to start parallel indexer: %v", err)
+	}
+	defer pi.Stop()
+	
+	// Allow time for initialization
+	time.Sleep(100 * time.Millisecond)
+	
+	// Test adding workers
+	pi.addWorkers(2)
+	
+	// Verify stats consistency
+	workerCount := len(pi.workers)
+	statsCount := len(pi.stats.WorkerStats)
+	
+	if workerCount != statsCount {
+		t.Errorf("Worker count (%d) doesn't match stats count (%d)", workerCount, statsCount)
+	}
+	
+	// Verify each worker has corresponding stats
+	for i, worker := range pi.workers {
+		if pi.stats.WorkerStats[i].ID != worker.id {
+			t.Errorf("Worker %d ID (%d) doesn't match stats ID (%d)", 
+				i, worker.id, pi.stats.WorkerStats[i].ID)
+		}
+		
+		if worker.stats != pi.stats.WorkerStats[i] {
+			t.Errorf("Worker %d stats pointer doesn't match global stats", i)
+		}
+	}
+	
+	// Test removing workers
+	pi.removeWorkers(1)
+	
+	// Verify stats consistency after removal
+	workerCount = len(pi.workers)
+	statsCount = len(pi.stats.WorkerStats)
+	
+	if workerCount != statsCount {
+		t.Errorf("After removal, worker count (%d) doesn't match stats count (%d)", 
+			workerCount, statsCount)
+	}
+}

+ 297 - 15
internal/nginx_log/indexer/rebuild.go

@@ -1,13 +1,19 @@
 package indexer
 
 import (
+	"bufio"
+	"compress/gzip"
 	"context"
 	"fmt"
+	"io"
 	"os"
 	"path/filepath"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
+	
+	"github.com/uozi-tech/cosy/logger"
 )
 
 // RebuildManager handles index rebuilding operations
@@ -228,7 +234,7 @@ func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath stri
 		}
 	}
 
-	// Process each file
+	// Process each file with smart change detection
 	for _, file := range files {
 		// Check context
 		if ctx.Err() != nil {
@@ -236,6 +242,15 @@ func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath stri
 			return ctx.Err()
 		}
 
+		// Skip unchanged files (especially compressed archives)
+		shouldProcess, skipReason := rm.shouldProcessFile(file)
+		if !shouldProcess {
+			logger.Infof("Skipping file %s: %s", file.Path, skipReason)
+			// Mark as completed without processing
+			tracker.CompleteFile(file.Path, 0)
+			continue
+		}
+
 		// Create file-specific context with timeout
 		fileCtx, cancel := context.WithTimeout(ctx, rm.config.TimeoutPerFile)
 
@@ -266,10 +281,72 @@ func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath stri
 	return nil
 }
 
+// shouldProcessFile determines if a file needs to be processed based on change detection
+func (rm *RebuildManager) shouldProcessFile(file *LogGroupFile) (bool, string) {
+	// Get file information
+	fileInfo, err := os.Stat(file.Path)
+	if err != nil {
+		return true, fmt.Sprintf("cannot stat file (will process): %v", err)
+	}
+	
+	// For compressed files (.gz), check if we've already processed them and they haven't changed
+	if file.IsCompressed {
+		// Check if we have persistence information for this file
+		if rm.persistence != nil {
+			if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
+				// Check if file hasn't changed since last indexing
+				currentModTime := fileInfo.ModTime().Unix()
+				currentSize := fileInfo.Size()
+				
+				if info.LastModified == currentModTime && 
+				   info.LastSize == currentSize && 
+				   info.LastPosition == currentSize {
+					return false, "compressed file already fully indexed and unchanged"
+				}
+			}
+		}
+	}
+	
+	// For active log files (non-compressed), always process but may resume from checkpoint
+	if !file.IsCompressed {
+		// Check if file has grown or changed
+		if rm.persistence != nil {
+			if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
+				currentModTime := fileInfo.ModTime().Unix()
+				currentSize := fileInfo.Size()
+				
+				// File hasn't changed at all
+				if info.LastModified == currentModTime && 
+				   info.LastSize == currentSize && 
+				   info.LastPosition == currentSize {
+					return false, "active file unchanged since last indexing"
+				}
+				
+				// File has shrunk (possible log rotation)
+				if currentSize < info.LastSize {
+					return true, "active file appears to have been rotated (size decreased)"
+				}
+				
+				// File has grown or been modified
+				if currentSize > info.LastSize || currentModTime > info.LastModified {
+					return true, "active file has new content"
+				}
+			}
+		}
+		
+		// No persistence info available, process the file
+		return true, "no previous indexing record found for active file"
+	}
+	
+	// Default: process compressed files if no persistence info
+	return true, "no previous indexing record found for compressed file"
+}
+
 // LogGroupFile represents a file in a log group
 type LogGroupFile struct {
 	Path           string
 	Size           int64
+	ModTime        int64  // Unix timestamp of file modification time
 	IsCompressed   bool
 	EstimatedLines int64
 	ProcessedLines int64
@@ -302,6 +379,7 @@ func (rm *RebuildManager) discoverLogGroupFiles(logGroupPath string) ([]*LogGrou
 			file := &LogGroupFile{
 				Path:         path,
 				Size:         info.Size(),
+				ModTime:      info.ModTime().Unix(),
 				IsCompressed: IsCompressedFile(path),
 			}
 
@@ -324,31 +402,235 @@ func (rm *RebuildManager) discoverLogGroupFiles(logGroupPath string) ([]*LogGrou
 	return files, nil
 }
 
-// indexFile indexes a single file
+// indexFile indexes a single file with checkpoint/resume support
 func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tracker *ProgressTracker) error {
 	// Create a batch writer
 	batch := NewBatchWriter(rm.indexer, rm.config.BatchSize)
 	defer batch.Flush()
 
-	// Open and process the file
-	// This is simplified - in real implementation, you would:
-	// 1. Open the file (handling compression)
-	// 2. Parse log lines
-	// 3. Create documents
-	// 4. Add to batch
-	// 5. Update progress
+	// Get checkpoint information from persistence layer
+	var startPosition int64 = 0
+	var resuming bool = false
+	
+	if rm.persistence != nil {
+		if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
+			// Get current file modification time
+			fileInfo, err := os.Stat(file.Path)
+			if err != nil {
+				return fmt.Errorf("failed to stat file %s: %w", file.Path, err)
+			}
+			
+			currentModTime := fileInfo.ModTime().Unix()
+			currentSize := fileInfo.Size()
+			
+			// Check if file hasn't changed since last indexing
+			if info.LastIndexed > 0 && 
+			   info.LastModified == currentModTime && 
+			   info.LastSize == currentSize && 
+			   info.LastPosition == currentSize {
+				// File hasn't changed and was fully indexed
+				logger.Infof("Skipping indexing for unchanged file %s (last indexed: %v)", 
+					file.Path, time.Unix(info.LastIndexed, 0))
+				file.ProcessedLines = 0 // No new lines processed
+				file.DocumentCount = 0  // No new documents added
+				file.LastPosition = currentSize
+				return nil
+			}
+			
+			// Check if we should resume from a previous position
+			if info.LastPosition > 0 && info.LastPosition < currentSize {
+				// File has grown since last indexing
+				startPosition = info.LastPosition
+				resuming = true
+				logger.Infof("Resuming indexing from position %d for file %s (file size: %d -> %d)", 
+					startPosition, file.Path, info.LastSize, currentSize)
+			} else if currentSize < info.LastSize {
+				// File has been truncated or rotated, start from beginning
+				startPosition = 0
+				logger.Infof("File %s has been truncated/rotated (size: %d -> %d), reindexing from start", 
+					file.Path, info.LastSize, currentSize)
+			} else if info.LastPosition >= currentSize && currentSize > 0 {
+				// File size hasn't changed and we've already processed it completely
+				if info.LastModified == currentModTime {
+					logger.Infof("File %s already fully indexed and unchanged, skipping", file.Path)
+					file.ProcessedLines = 0
+					file.DocumentCount = 0
+					file.LastPosition = currentSize
+					return nil
+				}
+				// File has same size but different modification time, reindex from start
+				startPosition = 0
+				logger.Infof("File %s has same size but different mod time, reindexing from start", file.Path)
+			}
+		}
+	}
 
-	// For now, return a placeholder implementation
-	file.ProcessedLines = file.EstimatedLines
-	file.DocumentCount = uint64(file.EstimatedLines)
-	file.LastPosition = file.Size
+	// Open file with resume support
+	reader, err := rm.openFileFromPosition(file.Path, startPosition)
+	if err != nil {
+		return fmt.Errorf("failed to open file %s from position %d: %w", file.Path, startPosition, err)
+	}
+	defer reader.Close()
+
+	// Process file line by line with checkpointing
+	var processedLines int64 = 0
+	var currentPosition int64 = startPosition
+	var documentCount uint64 = 0
+	checkpointInterval := int64(1000) // Save checkpoint every 1000 lines
+	
+	scanner := bufio.NewScanner(reader)
+	for scanner.Scan() {
+		// Check context for cancellation
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+		}
+		
+		line := scanner.Text()
+		currentPosition += int64(len(line)) + 1 // +1 for newline
+		
+		// Process the log line (parse and add to batch)
+		// This would typically involve:
+		// 1. Parse log entry using parser
+		// 2. Create search document
+		// 3. Add to batch
+		
+		processedLines++
+		documentCount++
+		
+		// Update progress
+		tracker.UpdateFileProgress(file.Path, processedLines)
+		
+		// Periodic checkpoint saving
+		if processedLines%checkpointInterval == 0 {
+			if rm.persistence != nil {
+				// Get current file modification time for checkpoint
+				fileInfo, err := os.Stat(file.Path)
+				var modTime int64
+				if err == nil {
+					modTime = fileInfo.ModTime().Unix()
+				} else {
+					modTime = time.Now().Unix()
+				}
+				
+				info := &LogFileInfo{
+					Path:         file.Path,
+					LastPosition: currentPosition,
+					LastIndexed:  time.Now().Unix(),
+					LastModified: modTime,
+					LastSize:     file.Size,
+				}
+				if err := rm.persistence.UpdateIncrementalInfo(file.Path, info); err != nil {
+					logger.Warnf("Failed to save checkpoint for %s: %v", file.Path, err)
+				}
+			}
+		}
+	}
+	
+	if err := scanner.Err(); err != nil {
+		return fmt.Errorf("error reading file %s: %w", file.Path, err)
+	}
 
-	// Update progress periodically
-	tracker.UpdateFileProgress(file.Path, file.ProcessedLines)
+	// Update file statistics
+	file.ProcessedLines = processedLines
+	file.DocumentCount = documentCount
+	file.LastPosition = currentPosition
+	
+	// Save final checkpoint
+	if rm.persistence != nil {
+		// Get current file info for accurate metadata
+		fileInfo, err := os.Stat(file.Path)
+		var modTime int64
+		if err == nil {
+			modTime = fileInfo.ModTime().Unix()
+		} else {
+			modTime = time.Now().Unix()
+		}
+		
+		info := &LogFileInfo{
+			Path:         file.Path,
+			LastPosition: currentPosition,
+			LastIndexed:  time.Now().Unix(),
+			LastModified: modTime,
+			LastSize:     file.Size,
+		}
+		if err := rm.persistence.UpdateIncrementalInfo(file.Path, info); err != nil {
+			logger.Warnf("Failed to save final checkpoint for %s: %v", file.Path, err)
+		}
+	}
+
+	if resuming {
+		logger.Infof("Completed resumed indexing for %s: %d lines, %d documents", 
+			file.Path, processedLines, documentCount)
+	}
 
 	return nil
 }
 
+// openFileFromPosition opens a file and seeks to the specified position
+// Handles both compressed (.gz) and regular files
+func (rm *RebuildManager) openFileFromPosition(filePath string, startPosition int64) (io.ReadCloser, error) {
+	file, err := os.Open(filePath)
+	if err != nil {
+		return nil, err
+	}
+
+	// Check if file is compressed
+	isGzipped := strings.HasSuffix(filePath, ".gz")
+	
+	if isGzipped {
+		// For gzip files, we need to read from the beginning and skip to position
+		// This is because gzip doesn't support random seeking
+		gzReader, err := gzip.NewReader(file)
+		if err != nil {
+			file.Close()
+			return nil, fmt.Errorf("failed to create gzip reader: %w", err)
+		}
+		
+		if startPosition > 0 {
+			// Skip to the start position by reading and discarding bytes
+			_, err := io.CopyN(io.Discard, gzReader, startPosition)
+			if err != nil && err != io.EOF {
+				gzReader.Close()
+				file.Close()
+				return nil, fmt.Errorf("failed to seek to position %d in gzip file: %w", startPosition, err)
+			}
+		}
+		
+		// Return a wrapped reader that closes both gzReader and file
+		return &gzipReaderCloser{gzReader: gzReader, file: file}, nil
+	} else {
+		// For regular files, seek directly
+		if startPosition > 0 {
+			_, err := file.Seek(startPosition, io.SeekStart)
+			if err != nil {
+				file.Close()
+				return nil, fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
+			}
+		}
+		return file, nil
+	}
+}
+
+// gzipReaderCloser wraps gzip.Reader to close both the gzip reader and underlying file
+type gzipReaderCloser struct {
+	gzReader *gzip.Reader
+	file     *os.File
+}
+
+func (g *gzipReaderCloser) Read(p []byte) (n int, err error) {
+	return g.gzReader.Read(p)
+}
+
+func (g *gzipReaderCloser) Close() error {
+	if err := g.gzReader.Close(); err != nil {
+		g.file.Close() // Still close file even if gzip reader fails
+		return err
+	}
+	return g.file.Close()
+}
+
 // getAllLogGroups returns all unique log groups
 func (rm *RebuildManager) getAllLogGroups() ([]string, error) {
 	if rm.persistence == nil {

+ 55 - 0
internal/nginx_log/indexer/rebuild_simple_test.go

@@ -0,0 +1,55 @@
+package indexer
+
+import (
+	"os"
+	"path/filepath"
+	"testing"
+)
+
+// TestBasicOptimizationLogic tests the core optimization logic without complex mocks
+func TestBasicOptimizationLogic(t *testing.T) {
+	// Create temporary directory
+	tmpDir := t.TempDir()
+	
+	// Create test files
+	activeLogPath := filepath.Join(tmpDir, "access.log")
+	compressedLogPath := filepath.Join(tmpDir, "access.log.1.gz")
+	
+	if err := os.WriteFile(activeLogPath, []byte("test log content\n"), 0644); err != nil {
+		t.Fatalf("Failed to create active log file: %v", err)
+	}
+	if err := os.WriteFile(compressedLogPath, []byte("compressed content"), 0644); err != nil {
+		t.Fatalf("Failed to create compressed log file: %v", err)
+	}
+	
+	// Create rebuild manager with no persistence (should always process)
+	config := DefaultRebuildConfig()
+	rm := &RebuildManager{
+		persistence: nil,
+		config:      config,
+	}
+	
+	// Test active file without persistence
+	activeFile := &LogGroupFile{
+		Path:         activeLogPath,
+		IsCompressed: false,
+	}
+	
+	shouldProcess, reason := rm.shouldProcessFile(activeFile)
+	if !shouldProcess {
+		t.Errorf("Expected to process active file without persistence, got false: %s", reason)
+	}
+	t.Logf("✅ Active file without persistence: shouldProcess=%v, reason=%s", shouldProcess, reason)
+	
+	// Test compressed file without persistence
+	compressedFile := &LogGroupFile{
+		Path:         compressedLogPath,
+		IsCompressed: true,
+	}
+	
+	shouldProcess, reason = rm.shouldProcessFile(compressedFile)
+	if !shouldProcess {
+		t.Errorf("Expected to process compressed file without persistence, got false: %s", reason)
+	}
+	t.Logf("✅ Compressed file without persistence: shouldProcess=%v, reason=%s", shouldProcess, reason)
+}

+ 8 - 8
internal/nginx_log/indexer/types.go

@@ -58,13 +58,13 @@ type Config struct {
 	EnableMetrics     bool          `json:"enable_metrics"`
 }
 
-// DefaultIndexerConfig returns default indexer configuration with CPU optimization
+// DefaultIndexerConfig returns default indexer configuration with processor optimization
 func DefaultIndexerConfig() *Config {
-	numCPU := runtime.NumCPU()
+	maxProcs := runtime.GOMAXPROCS(0)
 	return &Config{
 		IndexPath:         "./log-index",
 		ShardCount:        4,
-		WorkerCount:       numCPU * 2,            // Optimized: CPU cores * 2 for better utilization
+		WorkerCount:       maxProcs * 2,            // Optimized: Available processors * 2 for better utilization
 		BatchSize:         1500,                   // Optimized: Increased from 1000 to 1500 for better throughput
 		FlushInterval:     5 * time.Second,
 		MaxQueueSize:      15000,                  // Optimized: Increased from 10000 to 15000
@@ -79,19 +79,19 @@ func DefaultIndexerConfig() *Config {
 // GetOptimizedConfig returns configuration optimized for specific scenarios
 func GetOptimizedConfig(scenario string) *Config {
 	base := DefaultIndexerConfig()
-	numCPU := runtime.NumCPU()
+	maxProcs := runtime.GOMAXPROCS(0)
 	
 	switch scenario {
 	case "high_throughput":
 		// Maximize throughput at cost of higher latency
-		base.WorkerCount = numCPU * 2
+		base.WorkerCount = maxProcs * 2
 		base.BatchSize = 2000
 		base.MaxQueueSize = 20000
 		base.FlushInterval = 10 * time.Second
 		
 	case "low_latency":
 		// Minimize latency with reasonable throughput
-		base.WorkerCount = int(float64(numCPU) * 1.5)
+		base.WorkerCount = int(float64(maxProcs) * 1.5)
 		base.BatchSize = 500
 		base.MaxQueueSize = 10000
 		base.FlushInterval = 2 * time.Second
@@ -102,14 +102,14 @@ func GetOptimizedConfig(scenario string) *Config {
 		
 	case "memory_constrained":
 		// Reduce memory usage
-		base.WorkerCount = max(2, numCPU/2)
+		base.WorkerCount = max(2, maxProcs/2)
 		base.BatchSize = 250
 		base.MaxQueueSize = 5000
 		base.MemoryQuota = 256 * 1024 * 1024 // 256MB
 		
 	case "cpu_intensive":
 		// CPU-heavy workloads (parsing, etc.)
-		base.WorkerCount = numCPU * 3
+		base.WorkerCount = maxProcs * 3
 		base.BatchSize = 1000
 		base.MaxQueueSize = 25000
 	}