| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651 | package indexerimport (	"context"	"fmt"	"runtime"	"sync"	"sync/atomic"	"time"	"github.com/shirou/gopsutil/v4/cpu"	"github.com/uozi-tech/cosy/logger")// IndexerActivityPoller defines an interface to check if the indexer is busy.type IndexerActivityPoller interface {	IsBusy() bool}// AdaptiveOptimizer provides intelligent batch size adjustment and CPU monitoringtype AdaptiveOptimizer struct {	config              *Config	cpuMonitor          *CPUMonitor	batchSizeController *BatchSizeController	performanceHistory  *PerformanceHistory	// State	running int32	ctx     context.Context	cancel  context.CancelFunc	wg      sync.WaitGroup	// Metrics	optimizationsMade int64	avgThroughput     float64	avgLatency        time.Duration	metricsMutex      sync.RWMutex	// Callbacks	onWorkerCountChange func(oldCount, newCount int)	// Activity Poller	activityPoller IndexerActivityPoller	// Concurrency-safe mirror of worker count	workerCount int64}// CPUMonitor monitors CPU utilization and suggests worker adjustmentstype CPUMonitor struct {	targetUtilization   float64	measurementInterval time.Duration	adjustmentThreshold float64	maxWorkers          int	minWorkers          int	currentUtilization   float64	measurements         []float64	measurementsMutex    sync.RWMutex	maxSamples           int	lastValidUtilization float64}// BatchSizeController dynamically adjusts batch sizes based on performance metricstype BatchSizeController struct {	baseBatchSize    int	minBatchSize     int	maxBatchSize     int	adjustmentFactor float64	currentBatchSize int32	latencyThreshold time.Duration	throughputTarget float64	adjustmentHistory []BatchAdjustment	historyMutex      sync.RWMutex}// PerformanceHistory tracks performance metrics for optimization decisionstype PerformanceHistory struct {	samples    []PerformanceSample	maxSamples int	mutex      sync.RWMutex	movingAvgWindow int}// PerformanceSample represents a single performance measurementtype PerformanceSample struct {	Timestamp   time.Time     `json:"timestamp"`	Throughput  float64       `json:"throughput"`	Latency     time.Duration `json:"latency"`	CPUUsage    float64       `json:"cpu_usage"`	BatchSize   int           `json:"batch_size"`	WorkerCount int           `json:"worker_count"`}// BatchAdjustment represents a batch size adjustment decisiontype BatchAdjustment struct {	Timestamp        time.Time `json:"timestamp"`	OldBatchSize     int       `json:"old_batch_size"`	NewBatchSize     int       `json:"new_batch_size"`	Reason           string    `json:"reason"`	ThroughputImpact float64   `json:"throughput_impact"`}// NewAdaptiveOptimizer creates a new adaptive optimizerfunc NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer {	ctx, cancel := context.WithCancel(context.Background())	ao := &AdaptiveOptimizer{		config: config,		cpuMonitor: &CPUMonitor{			targetUtilization:   0.75, // Target 75% CPU utilization (more conservative)			measurementInterval: 5 * time.Second,			adjustmentThreshold: 0.10,                            // Adjust if 10% deviation from target (more sensitive)			maxWorkers:          runtime.GOMAXPROCS(0) * 6,       // Allow scaling up to 6x CPU cores for I/O-bound workloads			minWorkers:          max(2, runtime.GOMAXPROCS(0)/4), // Minimum 2 workers or 1/4 of cores for baseline performance			measurements:        make([]float64, 0, 12),          // 1 minute history at 5s intervals			maxSamples:          12,		},		batchSizeController: &BatchSizeController{			baseBatchSize:    config.BatchSize,			minBatchSize:     max(500, config.BatchSize/6), // Higher minimum for throughput			maxBatchSize:     config.BatchSize * 6,         // Increased to 6x for maximum throughput			adjustmentFactor: 0.25,                         // 25% adjustment steps for faster scaling			currentBatchSize: int32(config.BatchSize),			latencyThreshold: 8 * time.Second, // Higher latency tolerance for throughput			throughputTarget: 50.0,            // Target 50 MB/s - higher throughput target		},		performanceHistory: &PerformanceHistory{			samples:         make([]PerformanceSample, 0, 120), // 2 minutes of 1s samples			maxSamples:      120,			movingAvgWindow: 12, // 12-sample moving average		},		ctx:    ctx,		cancel: cancel,	}	// Log initialization parameters for debugging	logger.Infof("Adaptive optimizer initialized: workers=[%d, %d, %d] (min, current, max), target_cpu=%.1f%%, threshold=%.1f%%",		ao.cpuMonitor.minWorkers, config.WorkerCount, ao.cpuMonitor.maxWorkers,		ao.cpuMonitor.targetUtilization*100, ao.cpuMonitor.adjustmentThreshold*100)	// Initialize atomic mirror of worker count	atomic.StoreInt64(&ao.workerCount, int64(config.WorkerCount))	return ao}// Start begins the adaptive optimization processfunc (ao *AdaptiveOptimizer) Start() error {	if !atomic.CompareAndSwapInt32(&ao.running, 0, 1) {		logger.Error("Adaptive optimizer already running")		return fmt.Errorf("adaptive optimizer already running")	}	// Start CPU monitoring	ao.wg.Add(1)	go ao.cpuMonitoringLoop()	// Start batch size optimization	ao.wg.Add(1)	go ao.batchOptimizationLoop()	// Start performance tracking	ao.wg.Add(1)	go ao.performanceTrackingLoop()	logger.Info("Adaptive optimizer started")	return nil}// Stop halts the adaptive optimization processfunc (ao *AdaptiveOptimizer) Stop() {	if !atomic.CompareAndSwapInt32(&ao.running, 1, 0) {		return	}	ao.cancel()	ao.wg.Wait()	logger.Info("Adaptive optimizer stopped")}// cpuMonitoringLoop continuously monitors CPU utilizationfunc (ao *AdaptiveOptimizer) cpuMonitoringLoop() {	defer ao.wg.Done()	ticker := time.NewTicker(ao.cpuMonitor.measurementInterval)	defer ticker.Stop()	for {		select {		case <-ticker.C:			ao.measureAndAdjustCPU()		case <-ao.ctx.Done():			return		}	}}// measureAndAdjustCPU measures current CPU utilization and suggests adjustmentsfunc (ao *AdaptiveOptimizer) measureAndAdjustCPU() {	// Only adjust when indexer is actively processing	if !ao.isIndexerBusy() {		return	}	// Get current CPU utilization	cpuUsage := ao.getCurrentCPUUtilization()	ao.cpuMonitor.measurementsMutex.Lock()	// Filter spurious near-zero values using last valid utilization	if cpuUsage < 0.005 && ao.cpuMonitor.lastValidUtilization > 0 {		cpuUsage = ao.cpuMonitor.lastValidUtilization	}	// Maintain fixed-size window to avoid capacity growth	if ao.cpuMonitor.maxSamples > 0 && len(ao.cpuMonitor.measurements) == ao.cpuMonitor.maxSamples {		copy(ao.cpuMonitor.measurements, ao.cpuMonitor.measurements[1:])		ao.cpuMonitor.measurements[len(ao.cpuMonitor.measurements)-1] = cpuUsage	} else {		ao.cpuMonitor.measurements = append(ao.cpuMonitor.measurements, cpuUsage)	}	ao.cpuMonitor.currentUtilization = cpuUsage	if cpuUsage >= 0.005 {		ao.cpuMonitor.lastValidUtilization = cpuUsage	}	ao.cpuMonitor.measurementsMutex.Unlock()	// Calculate average CPU utilization	ao.cpuMonitor.measurementsMutex.RLock()	avgCPU := ao.calculateAverageCPU()	ao.cpuMonitor.measurementsMutex.RUnlock()	// Determine if adjustment is needed	targetCPU := ao.cpuMonitor.targetUtilization	if avgCPU < targetCPU-ao.cpuMonitor.adjustmentThreshold {		// CPU underutilized - suggest increasing workers		ao.suggestWorkerIncrease(avgCPU, targetCPU)	} else if avgCPU > targetCPU+ao.cpuMonitor.adjustmentThreshold {		// CPU over-utilized - suggest decreasing workers		ao.suggestWorkerDecrease(avgCPU, targetCPU)	}}// batchOptimizationLoop continuously optimizes batch sizesfunc (ao *AdaptiveOptimizer) batchOptimizationLoop() {	defer ao.wg.Done()	ticker := time.NewTicker(10 * time.Second) // Adjust batch size every 10 seconds	defer ticker.Stop()	for {		select {		case <-ticker.C:			ao.optimizeBatchSize()		case <-ao.ctx.Done():			return		}	}}// optimizeBatchSize analyzes performance and adjusts batch sizefunc (ao *AdaptiveOptimizer) optimizeBatchSize() {	// Only adjust when indexer is actively processing	if !ao.isIndexerBusy() {		return	}	ao.performanceHistory.mutex.RLock()	if len(ao.performanceHistory.samples) < 5 {		ao.performanceHistory.mutex.RUnlock()		return // Not enough data	}	recentSamples := ao.performanceHistory.samples[max(0, len(ao.performanceHistory.samples)-5):]	avgThroughput := ao.calculateAverageThroughput(recentSamples)	avgLatency := ao.calculateAverageLatency(recentSamples)	ao.performanceHistory.mutex.RUnlock()	currentBatchSize := int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))	newBatchSize := ao.calculateOptimalBatchSize(avgThroughput, avgLatency, currentBatchSize)	if newBatchSize != currentBatchSize {		ao.adjustBatchSize(currentBatchSize, newBatchSize, avgThroughput, avgLatency)		atomic.AddInt64(&ao.optimizationsMade, 1)	}}// calculateOptimalBatchSize determines the optimal batch size based on current performancefunc (ao *AdaptiveOptimizer) calculateOptimalBatchSize(throughput float64, latency time.Duration, currentBatch int) int {	controller := ao.batchSizeController	// If latency is too high, reduce batch size	if latency > controller.latencyThreshold {		reduction := int(float64(currentBatch) * controller.adjustmentFactor)		newSize := currentBatch - max(50, reduction)		return max(controller.minBatchSize, newSize)	}	// If throughput is below target and latency is acceptable, increase batch size	if throughput < controller.throughputTarget && latency < controller.latencyThreshold/2 {		increase := int(float64(currentBatch) * controller.adjustmentFactor)		newSize := currentBatch + max(100, increase)		return min(controller.maxBatchSize, newSize)	}	// Current batch size seems optimal	return currentBatch}// adjustBatchSize applies the batch size adjustmentfunc (ao *AdaptiveOptimizer) adjustBatchSize(oldSize, newSize int, throughput float64, latency time.Duration) {	// Only adjust when indexer is actively processing	if !ao.isIndexerBusy() {		return	}	atomic.StoreInt32(&ao.batchSizeController.currentBatchSize, int32(newSize))	var reason string	if newSize > oldSize {		reason = "Increasing batch size to improve throughput"	} else {		reason = "Reducing batch size to improve latency"	}	// Record adjustment	adjustment := BatchAdjustment{		Timestamp:        time.Now(),		OldBatchSize:     oldSize,		NewBatchSize:     newSize,		Reason:           reason,		ThroughputImpact: throughput,	}	ao.batchSizeController.historyMutex.Lock()	if len(ao.batchSizeController.adjustmentHistory) == 50 {		copy(ao.batchSizeController.adjustmentHistory, ao.batchSizeController.adjustmentHistory[1:])		ao.batchSizeController.adjustmentHistory[len(ao.batchSizeController.adjustmentHistory)-1] = adjustment	} else {		ao.batchSizeController.adjustmentHistory = append(ao.batchSizeController.adjustmentHistory, adjustment)	}	ao.batchSizeController.historyMutex.Unlock()	logger.Debugf("Batch size adjusted: old_size=%d, new_size=%d, reason=%s", oldSize, newSize, reason)}// performanceTrackingLoop continuously tracks performance metricsfunc (ao *AdaptiveOptimizer) performanceTrackingLoop() {	defer ao.wg.Done()	ticker := time.NewTicker(1 * time.Second) // Sample every second	defer ticker.Stop()	for {		select {		case <-ticker.C:			ao.recordPerformanceSample()		case <-ao.ctx.Done():			return		}	}}// recordPerformanceSample records current performance metricsfunc (ao *AdaptiveOptimizer) recordPerformanceSample() {	sample := PerformanceSample{		Timestamp:   time.Now(),		Throughput:  ao.getCurrentThroughput(),		Latency:     ao.getCurrentLatency(),		CPUUsage:    ao.GetCPUUtilization(),		BatchSize:   int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),		WorkerCount: int(atomic.LoadInt64(&ao.workerCount)),	}	ao.performanceHistory.mutex.Lock()	if ao.performanceHistory.maxSamples > 0 && len(ao.performanceHistory.samples) == ao.performanceHistory.maxSamples {		copy(ao.performanceHistory.samples, ao.performanceHistory.samples[1:])		ao.performanceHistory.samples[len(ao.performanceHistory.samples)-1] = sample	} else {		ao.performanceHistory.samples = append(ao.performanceHistory.samples, sample)	}	// Update moving averages	ao.updateMovingAverages()	ao.performanceHistory.mutex.Unlock()}// SetWorkerCountChangeCallback sets the callback function for worker count changesfunc (ao *AdaptiveOptimizer) SetWorkerCountChangeCallback(callback func(oldCount, newCount int)) {	ao.onWorkerCountChange = callback}// SetActivityPoller sets the poller to check for indexer activity.func (ao *AdaptiveOptimizer) SetActivityPoller(poller IndexerActivityPoller) {	ao.activityPoller = poller}// GetOptimalBatchSize returns the current optimal batch sizefunc (ao *AdaptiveOptimizer) GetOptimalBatchSize() int {	return int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize))}// GetCPUUtilization returns the current CPU utilizationfunc (ao *AdaptiveOptimizer) GetCPUUtilization() float64 {	ao.cpuMonitor.measurementsMutex.RLock()	defer ao.cpuMonitor.measurementsMutex.RUnlock()	return ao.cpuMonitor.currentUtilization}// GetOptimizationStats returns current optimization statisticsfunc (ao *AdaptiveOptimizer) GetOptimizationStats() AdaptiveOptimizationStats {	ao.metricsMutex.RLock()	defer ao.metricsMutex.RUnlock()	return AdaptiveOptimizationStats{		OptimizationsMade: atomic.LoadInt64(&ao.optimizationsMade),		CurrentBatchSize:  int(atomic.LoadInt32(&ao.batchSizeController.currentBatchSize)),		AvgThroughput:     ao.avgThroughput,		AvgLatency:        ao.avgLatency,		CPUUtilization:    ao.GetCPUUtilization(),	}}// AdaptiveOptimizationStats represents current optimization statisticstype AdaptiveOptimizationStats struct {	OptimizationsMade int64         `json:"optimizations_made"`	CurrentBatchSize  int           `json:"current_batch_size"`	AvgThroughput     float64       `json:"avg_throughput"`	AvgLatency        time.Duration `json:"avg_latency"`	CPUUtilization    float64       `json:"cpu_utilization"`}// Helper functionsfunc (ao *AdaptiveOptimizer) getCurrentCPUUtilization() float64 {	// Get CPU utilization since the last call.	// Interval 0 means non-blocking and compares to the last measurement.	// The first call will return 0.	percentages, err := cpu.Percent(0, false)	if err != nil || len(percentages) == 0 {		logger.Warnf("Failed to get real CPU utilization, falling back to goroutine heuristic: %v", err)		// Fallback to the old, less accurate method		numGoroutines := float64(runtime.NumGoroutine())		maxProcs := float64(runtime.GOMAXPROCS(0))		// Simple heuristic: more goroutines = higher CPU usage		utilization := numGoroutines / (maxProcs * 10)		if utilization > 0.95 {			utilization = 0.95		}		return utilization	}	// gopsutil returns a slice, for overall usage (percpu=false), it's the first element.	// The value is a percentage (e.g., 8.3), so we convert it to a 0.0-1.0 scale for our calculations.	return percentages[0] / 100.0}func (ao *AdaptiveOptimizer) getCurrentThroughput() float64 {	ao.metricsMutex.RLock()	v := ao.avgThroughput	ao.metricsMutex.RUnlock()	return v}func (ao *AdaptiveOptimizer) getCurrentLatency() time.Duration {	ao.metricsMutex.RLock()	v := ao.avgLatency	ao.metricsMutex.RUnlock()	return v}func (ao *AdaptiveOptimizer) calculateAverageCPU() float64 {	if len(ao.cpuMonitor.measurements) == 0 {		return 0	}	sum := 0.0	for _, usage := range ao.cpuMonitor.measurements {		sum += usage	}	return sum / float64(len(ao.cpuMonitor.measurements))}func (ao *AdaptiveOptimizer) calculateAverageThroughput(samples []PerformanceSample) float64 {	if len(samples) == 0 {		return 0	}	sum := 0.0	for _, sample := range samples {		sum += sample.Throughput	}	return sum / float64(len(samples))}func (ao *AdaptiveOptimizer) calculateAverageLatency(samples []PerformanceSample) time.Duration {	if len(samples) == 0 {		return 0	}	var sum time.Duration	for _, sample := range samples {		sum += sample.Latency	}	return sum / time.Duration(len(samples))}func (ao *AdaptiveOptimizer) updateMovingAverages() {	if len(ao.performanceHistory.samples) == 0 {		return	}	windowSize := min(ao.performanceHistory.movingAvgWindow, len(ao.performanceHistory.samples))	recentSamples := ao.performanceHistory.samples[len(ao.performanceHistory.samples)-windowSize:]	avgThroughput := ao.calculateAverageThroughput(recentSamples)	avgLatency := ao.calculateAverageLatency(recentSamples)	ao.metricsMutex.Lock()	ao.avgThroughput = avgThroughput	ao.avgLatency = avgLatency	ao.metricsMutex.Unlock()}func (ao *AdaptiveOptimizer) suggestWorkerIncrease(currentCPU, targetCPU float64) {	// If already at max workers, do nothing.	currentWorkers := int(atomic.LoadInt64(&ao.workerCount))	if currentWorkers >= ao.cpuMonitor.maxWorkers {		return	}	// If the indexer is not busy, don't scale up workers even if CPU is low.	if ao.activityPoller != nil && !ao.activityPoller.IsBusy() {		return	}	logger.Debug("CPU underutilized, adjusting workers upward",		"current_cpu", currentCPU, "target_cpu", targetCPU)	// Calculate suggested increase (conservative approach)	cpuUtilizationGap := targetCPU - currentCPU	increaseRatio := cpuUtilizationGap / targetCPU	// Limit increase to maximum 25% at a time and at least 1 worker	maxIncrease := max(1, int(float64(currentWorkers)*0.25))	suggestedIncrease := max(1, int(float64(currentWorkers)*increaseRatio))	actualIncrease := min(maxIncrease, suggestedIncrease)	newWorkerCount := min(ao.cpuMonitor.maxWorkers, currentWorkers+actualIncrease)	if newWorkerCount > currentWorkers {		ao.adjustWorkerCount(newWorkerCount)		logger.Infof("Increased workers from %d to %d due to CPU underutilization",			currentWorkers, newWorkerCount)	}}func (ao *AdaptiveOptimizer) suggestWorkerDecrease(currentCPU, targetCPU float64) {	// If the indexer is not busy, don't adjust workers	if !ao.isIndexerBusy() {		return	}	// If already at min workers, do nothing.	currentWorkers := int(atomic.LoadInt64(&ao.workerCount))	if currentWorkers <= ao.cpuMonitor.minWorkers {		logger.Debugf("Worker count is already at its minimum (%d), skipping decrease.", currentWorkers)		return	}	logger.Debug("CPU over-utilized, adjusting workers downward",		"current_cpu", currentCPU, "target_cpu", targetCPU)	// Calculate suggested decrease (conservative approach)	cpuOverUtilization := currentCPU - targetCPU	decreaseRatio := cpuOverUtilization / targetCPU // Use target CPU as base for more accurate calculation	// Limit decrease to maximum 25% at a time and at least 1 worker	maxDecrease := max(1, int(float64(currentWorkers)*0.25))	suggestedDecrease := max(1, int(float64(currentWorkers)*decreaseRatio*0.5)) // More conservative decrease	actualDecrease := min(maxDecrease, suggestedDecrease)	newWorkerCount := max(ao.cpuMonitor.minWorkers, currentWorkers-actualDecrease)	logger.Debugf("Worker decrease calculation: current=%d, suggested=%d, min=%d, new=%d",		currentWorkers, suggestedDecrease, ao.cpuMonitor.minWorkers, newWorkerCount)	if newWorkerCount < currentWorkers {		logger.Debugf("About to adjust worker count from %d to %d", currentWorkers, newWorkerCount)		ao.adjustWorkerCount(newWorkerCount)		logger.Infof("Decreased workers from %d to %d due to CPU over-utilization",			currentWorkers, newWorkerCount)	} else {		logger.Debugf("Worker count adjustment skipped: new=%d not less than current=%d", newWorkerCount, currentWorkers)	}}// adjustWorkerCount dynamically adjusts the worker count at runtimefunc (ao *AdaptiveOptimizer) adjustWorkerCount(newCount int) {	// Only adjust when indexer is actively processing	if !ao.isIndexerBusy() {		logger.Debugf("Skipping worker adjustment while idle: requested=%d", newCount)		return	}	oldCount := int(atomic.LoadInt64(&ao.workerCount))	if newCount <= 0 || newCount == oldCount {		logger.Debugf("Skipping worker adjustment: newCount=%d, currentCount=%d", newCount, oldCount)		return	}	logger.Infof("Adjusting worker count from %d to %d", oldCount, newCount)	// Update atomic mirror then keep config in sync	atomic.StoreInt64(&ao.workerCount, int64(newCount))	ao.config.WorkerCount = newCount	// Notify the indexer about worker count change	// This would typically trigger a worker pool resize in the parallel indexer	if ao.onWorkerCountChange != nil {		logger.Debugf("Calling worker count change callback: %d -> %d", oldCount, newCount)		ao.onWorkerCountChange(oldCount, newCount)	} else {		logger.Warnf("Worker count change callback is nil - worker adjustment will not take effect")	}	// Log the adjustment for monitoring	atomic.AddInt64(&ao.optimizationsMade, 1)}// Utility functionsfunc max(a, b int) int {	if a > b {		return a	}	return b}func min(a, b int) int {	if a < b {		return a	}	return b}// isIndexerBusy reports whether the indexer is currently processing work.// When no poller is configured, it returns false to avoid unintended adjustments.func (ao *AdaptiveOptimizer) isIndexerBusy() bool {	if ao.activityPoller == nil {		return false	}	return ao.activityPoller.IsBusy()}
 |