dynamic_shard_awareness.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "runtime"
  7. "sync"
  8. "time"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // DynamicShardAwareness provides automatic shard management detection and integration
  12. type DynamicShardAwareness struct {
  13. config *Config
  14. currentShardManager interface{} // Can be DefaultShardManager or EnhancedDynamicShardManager
  15. isDynamic bool
  16. enhancedManager *EnhancedDynamicShardManager
  17. // Monitoring and adaptation
  18. performanceMonitor *PerformanceMonitor
  19. adaptationEnabled bool
  20. lastAdaptation time.Time
  21. adaptationCooldown time.Duration
  22. mutex sync.RWMutex
  23. }
  24. // PerformanceMonitor tracks system performance for shard adaptation decisions
  25. type PerformanceMonitor struct {
  26. samples []PerformanceSample
  27. maxSamples int
  28. currentThroughput float64
  29. averageLatency time.Duration
  30. lastOptimization time.Time
  31. mutex sync.RWMutex
  32. }
  33. // NewDynamicShardAwareness creates a new shard awareness system
  34. func NewDynamicShardAwareness(config *Config) *DynamicShardAwareness {
  35. return &DynamicShardAwareness{
  36. config: config,
  37. adaptationEnabled: true,
  38. adaptationCooldown: 2 * time.Minute, // Conservative adaptation interval
  39. performanceMonitor: &PerformanceMonitor{
  40. samples: make([]PerformanceSample, 0, 60), // Keep 60 samples (1 minute at 1s intervals)
  41. maxSamples: 60,
  42. },
  43. }
  44. }
  45. // DetectAndSetupShardManager automatically detects the optimal shard manager type
  46. func (dsa *DynamicShardAwareness) DetectAndSetupShardManager() (interface{}, error) {
  47. dsa.mutex.Lock()
  48. defer dsa.mutex.Unlock()
  49. // Decision factors for dynamic vs static shard management
  50. factors := dsa.analyzeEnvironmentFactors()
  51. if dsa.shouldUseDynamicShards(factors) {
  52. logger.Info("Dynamic shard management detected as optimal",
  53. "cpu_cores", factors.CPUCores,
  54. "memory_gb", factors.MemoryGB,
  55. "expected_load", factors.ExpectedLoad)
  56. // Create enhanced dynamic shard manager
  57. enhancedManager := NewEnhancedDynamicShardManager(dsa.config)
  58. dsa.enhancedManager = enhancedManager
  59. dsa.currentShardManager = enhancedManager
  60. dsa.isDynamic = true
  61. // Initialize the enhanced manager
  62. if err := enhancedManager.Initialize(); err != nil {
  63. logger.Warnf("Failed to initialize enhanced dynamic shard manager, falling back to static: %v", err)
  64. return dsa.setupStaticShardManager()
  65. }
  66. return enhancedManager, nil
  67. } else {
  68. logger.Info("Static shard management selected",
  69. "cpu_cores", factors.CPUCores,
  70. "shard_count", dsa.config.ShardCount)
  71. return dsa.setupStaticShardManager()
  72. }
  73. }
  74. // EnvironmentFactors represents system environment analysis
  75. type EnvironmentFactors struct {
  76. CPUCores int `json:"cpu_cores"`
  77. MemoryGB float64 `json:"memory_gb"`
  78. ExpectedLoad string `json:"expected_load"` // "low", "medium", "high", "variable"
  79. DataVolume string `json:"data_volume"` // "small", "medium", "large", "growing"
  80. QueryPatterns string `json:"query_patterns"` // "simple", "complex", "mixed"
  81. AvailableSpace int64 `json:"available_space"` // Available disk space in bytes
  82. }
  83. // analyzeEnvironmentFactors analyzes the current environment
  84. func (dsa *DynamicShardAwareness) analyzeEnvironmentFactors() EnvironmentFactors {
  85. factors := EnvironmentFactors{
  86. CPUCores: runtime.NumCPU(),
  87. }
  88. // Get memory info (simplified)
  89. var m runtime.MemStats
  90. runtime.ReadMemStats(&m)
  91. factors.MemoryGB = float64(m.Sys) / (1024 * 1024 * 1024)
  92. // Analyze expected load based on configuration
  93. factors.ExpectedLoad = dsa.analyzeExpectedLoad()
  94. factors.DataVolume = dsa.analyzeDataVolume()
  95. factors.QueryPatterns = dsa.analyzeQueryPatterns()
  96. // Check available disk space
  97. if stat, err := os.Stat(dsa.config.IndexPath); err == nil && stat.IsDir() {
  98. // Simple approximation - in production, use syscall for actual free space
  99. factors.AvailableSpace = 10 * 1024 * 1024 * 1024 // 10GB default assumption
  100. }
  101. return factors
  102. }
  103. // shouldUseDynamicShards determines if dynamic shard management is beneficial
  104. func (dsa *DynamicShardAwareness) shouldUseDynamicShards(factors EnvironmentFactors) bool {
  105. // Dynamic shards are beneficial when:
  106. // 1. High-core systems (8+ cores) can benefit from dynamic scaling
  107. if factors.CPUCores >= 8 {
  108. return true
  109. }
  110. // 2. Variable or high expected load
  111. if factors.ExpectedLoad == "high" || factors.ExpectedLoad == "variable" {
  112. return true
  113. }
  114. // 3. Large or growing data volumes
  115. if factors.DataVolume == "large" || factors.DataVolume == "growing" {
  116. return true
  117. }
  118. // 4. Systems with significant memory (4GB+) can handle dynamic overhead
  119. if factors.MemoryGB >= 4.0 {
  120. return true
  121. }
  122. // 5. Complex or mixed query patterns benefit from dynamic optimization
  123. if factors.QueryPatterns == "complex" || factors.QueryPatterns == "mixed" {
  124. return true
  125. }
  126. // Default to static for simpler environments
  127. return false
  128. }
  129. // setupStaticShardManager creates a static shard manager
  130. func (dsa *DynamicShardAwareness) setupStaticShardManager() (interface{}, error) {
  131. staticManager := NewDefaultShardManager(dsa.config)
  132. dsa.currentShardManager = staticManager
  133. dsa.isDynamic = false
  134. if err := staticManager.Initialize(); err != nil {
  135. return nil, fmt.Errorf("failed to initialize static shard manager: %w", err)
  136. }
  137. return staticManager, nil
  138. }
  139. // analyzeExpectedLoad analyzes expected system load
  140. func (dsa *DynamicShardAwareness) analyzeExpectedLoad() string {
  141. // Based on worker count and batch size configuration
  142. workerCount := dsa.config.WorkerCount
  143. batchSize := dsa.config.BatchSize
  144. // High configuration suggests high load expectations
  145. if workerCount >= 16 || batchSize >= 2000 {
  146. return "high"
  147. }
  148. // Variable load if workers are significantly higher than CPU cores
  149. if workerCount > runtime.NumCPU()*2 {
  150. return "variable"
  151. }
  152. // Medium configuration
  153. if workerCount >= 8 || batchSize >= 1000 {
  154. return "medium"
  155. }
  156. return "low"
  157. }
  158. // analyzeDataVolume analyzes expected data volume
  159. func (dsa *DynamicShardAwareness) analyzeDataVolume() string {
  160. // Based on shard count and memory quota
  161. shardCount := dsa.config.ShardCount
  162. memoryQuota := dsa.config.MemoryQuota
  163. // Large configuration suggests large data volumes
  164. if shardCount >= 8 || memoryQuota >= 2*1024*1024*1024 { // 2GB+
  165. return "large"
  166. }
  167. // Growing if shard count is configured higher than default
  168. if shardCount > 4 {
  169. return "growing"
  170. }
  171. // Medium configuration
  172. if shardCount >= 4 || memoryQuota >= 1024*1024*1024 { // 1GB+
  173. return "medium"
  174. }
  175. return "small"
  176. }
  177. // analyzeQueryPatterns analyzes expected query complexity
  178. func (dsa *DynamicShardAwareness) analyzeQueryPatterns() string {
  179. // Based on optimization interval and metrics enablement
  180. if dsa.config.OptimizeInterval <= 10*time.Minute {
  181. return "complex" // Frequent optimization suggests complex queries
  182. }
  183. if dsa.config.EnableMetrics {
  184. return "mixed" // Metrics collection suggests varied query patterns
  185. }
  186. return "simple"
  187. }
  188. // StartMonitoring begins performance monitoring for adaptation decisions
  189. func (dsa *DynamicShardAwareness) StartMonitoring(ctx context.Context) {
  190. if !dsa.adaptationEnabled {
  191. return
  192. }
  193. go dsa.monitoringLoop(ctx)
  194. }
  195. // monitoringLoop runs continuous performance monitoring
  196. func (dsa *DynamicShardAwareness) monitoringLoop(ctx context.Context) {
  197. ticker := time.NewTicker(1 * time.Second) // Sample every second
  198. defer ticker.Stop()
  199. for {
  200. select {
  201. case <-ticker.C:
  202. dsa.collectPerformanceSample()
  203. // Check if adaptation is needed every 30 samples (30 seconds)
  204. if len(dsa.performanceMonitor.samples) > 0 && len(dsa.performanceMonitor.samples)%30 == 0 {
  205. dsa.considerAdaptation()
  206. }
  207. case <-ctx.Done():
  208. return
  209. }
  210. }
  211. }
  212. // collectPerformanceSample collects current performance data
  213. func (dsa *DynamicShardAwareness) collectPerformanceSample() {
  214. dsa.performanceMonitor.mutex.Lock()
  215. defer dsa.performanceMonitor.mutex.Unlock()
  216. sample := PerformanceSample{
  217. Timestamp: time.Now(),
  218. Throughput: dsa.getCurrentThroughput(),
  219. Latency: dsa.getCurrentLatency(),
  220. CPUUsage: dsa.getCurrentCPUUsage(),
  221. WorkerCount: dsa.config.WorkerCount,
  222. }
  223. // Add sample
  224. dsa.performanceMonitor.samples = append(dsa.performanceMonitor.samples, sample)
  225. // Rotate samples if we exceed max
  226. if len(dsa.performanceMonitor.samples) > dsa.performanceMonitor.maxSamples {
  227. dsa.performanceMonitor.samples = dsa.performanceMonitor.samples[1:]
  228. }
  229. // Update current metrics
  230. dsa.updateCurrentMetrics()
  231. }
  232. // considerAdaptation evaluates whether dynamic adaptations should be made
  233. func (dsa *DynamicShardAwareness) considerAdaptation() {
  234. // Check cooldown
  235. if time.Since(dsa.lastAdaptation) < dsa.adaptationCooldown {
  236. return
  237. }
  238. dsa.mutex.RLock()
  239. isDynamic := dsa.isDynamic
  240. enhancedManager := dsa.enhancedManager
  241. dsa.mutex.RUnlock()
  242. if !isDynamic || enhancedManager == nil {
  243. return // Only adapt if using dynamic shard manager
  244. }
  245. // Get performance analysis
  246. analysis := dsa.analyzeCurrentPerformance()
  247. if analysis.ShouldAdapt {
  248. logger.Info("Performance analysis suggests adaptation",
  249. "reason", analysis.Reason,
  250. "confidence", analysis.Confidence)
  251. // Trigger auto-scaling on the enhanced manager
  252. if err := enhancedManager.AutoScale(); err != nil {
  253. logger.Warnf("Auto-scaling adaptation failed: %v", err)
  254. } else {
  255. dsa.lastAdaptation = time.Now()
  256. }
  257. }
  258. }
  259. // PerformanceAnalysis represents performance analysis results
  260. type PerformanceAnalysis struct {
  261. ShouldAdapt bool `json:"should_adapt"`
  262. Reason string `json:"reason"`
  263. Confidence float64 `json:"confidence"`
  264. CurrentThroughput float64 `json:"current_throughput"`
  265. AverageLatency time.Duration `json:"average_latency"`
  266. TrendAnalysis string `json:"trend_analysis"`
  267. }
  268. // analyzeCurrentPerformance analyzes current performance trends
  269. func (dsa *DynamicShardAwareness) analyzeCurrentPerformance() PerformanceAnalysis {
  270. dsa.performanceMonitor.mutex.RLock()
  271. defer dsa.performanceMonitor.mutex.RUnlock()
  272. samples := dsa.performanceMonitor.samples
  273. if len(samples) < 30 { // Need at least 30 samples for analysis
  274. return PerformanceAnalysis{
  275. ShouldAdapt: false,
  276. Reason: "Insufficient performance data",
  277. Confidence: 0.0,
  278. }
  279. }
  280. // Analyze recent vs historical performance
  281. recentSamples := samples[len(samples)-10:] // Last 10 samples
  282. historicalSamples := samples[:len(samples)-10]
  283. recentAvgThroughput := dsa.calculateAverageThroughput(recentSamples)
  284. historicalAvgThroughput := dsa.calculateAverageThroughput(historicalSamples)
  285. recentAvgLatency := dsa.calculateAverageLatency(recentSamples)
  286. historicalAvgLatency := dsa.calculateAverageLatency(historicalSamples)
  287. // Check for performance degradation
  288. throughputDrop := (historicalAvgThroughput - recentAvgThroughput) / historicalAvgThroughput
  289. latencyIncrease := float64(recentAvgLatency - historicalAvgLatency) / float64(historicalAvgLatency)
  290. // Adaptation triggers
  291. if throughputDrop > 0.20 { // 20% throughput drop
  292. return PerformanceAnalysis{
  293. ShouldAdapt: true,
  294. Reason: fmt.Sprintf("Throughput dropped by %.1f%%", throughputDrop*100),
  295. Confidence: 0.8,
  296. CurrentThroughput: recentAvgThroughput,
  297. AverageLatency: recentAvgLatency,
  298. TrendAnalysis: "degrading",
  299. }
  300. }
  301. if latencyIncrease > 0.50 { // 50% latency increase
  302. return PerformanceAnalysis{
  303. ShouldAdapt: true,
  304. Reason: fmt.Sprintf("Latency increased by %.1f%%", latencyIncrease*100),
  305. Confidence: 0.7,
  306. CurrentThroughput: recentAvgThroughput,
  307. AverageLatency: recentAvgLatency,
  308. TrendAnalysis: "degrading",
  309. }
  310. }
  311. return PerformanceAnalysis{
  312. ShouldAdapt: false,
  313. Reason: "Performance stable",
  314. Confidence: 0.6,
  315. CurrentThroughput: recentAvgThroughput,
  316. AverageLatency: recentAvgLatency,
  317. TrendAnalysis: "stable",
  318. }
  319. }
  320. // Helper methods for performance calculation
  321. func (dsa *DynamicShardAwareness) calculateAverageThroughput(samples []PerformanceSample) float64 {
  322. if len(samples) == 0 {
  323. return 0.0
  324. }
  325. total := 0.0
  326. for _, sample := range samples {
  327. total += sample.Throughput
  328. }
  329. return total / float64(len(samples))
  330. }
  331. func (dsa *DynamicShardAwareness) calculateAverageLatency(samples []PerformanceSample) time.Duration {
  332. if len(samples) == 0 {
  333. return 0
  334. }
  335. var total time.Duration
  336. for _, sample := range samples {
  337. total += sample.Latency
  338. }
  339. return total / time.Duration(len(samples))
  340. }
  341. // getCurrentThroughput gets current system throughput (placeholder)
  342. func (dsa *DynamicShardAwareness) getCurrentThroughput() float64 {
  343. // TODO: Integration with actual indexer metrics
  344. return 1000.0 // Default placeholder
  345. }
  346. // getCurrentLatency gets current system latency (placeholder)
  347. func (dsa *DynamicShardAwareness) getCurrentLatency() time.Duration {
  348. // TODO: Integration with actual indexer metrics
  349. return 100 * time.Millisecond // Default placeholder
  350. }
  351. // getCurrentCPUUsage gets current CPU usage (placeholder)
  352. func (dsa *DynamicShardAwareness) getCurrentCPUUsage() float64 {
  353. // TODO: Integration with actual system metrics
  354. return 0.5 // Default placeholder
  355. }
  356. // updateCurrentMetrics updates current performance metrics
  357. func (dsa *DynamicShardAwareness) updateCurrentMetrics() {
  358. samplesLen := len(dsa.performanceMonitor.samples)
  359. if samplesLen == 0 {
  360. return
  361. }
  362. // Get recent samples with bounds checking
  363. recentCount := 10
  364. if samplesLen < recentCount {
  365. recentCount = samplesLen
  366. }
  367. recent := dsa.performanceMonitor.samples[samplesLen-recentCount:]
  368. dsa.performanceMonitor.currentThroughput = dsa.calculateAverageThroughput(recent)
  369. dsa.performanceMonitor.averageLatency = dsa.calculateAverageLatency(recent)
  370. }
  371. // GetCurrentShardManager returns the current shard manager
  372. func (dsa *DynamicShardAwareness) GetCurrentShardManager() interface{} {
  373. dsa.mutex.RLock()
  374. defer dsa.mutex.RUnlock()
  375. return dsa.currentShardManager
  376. }
  377. // IsDynamic returns whether dynamic shard management is active
  378. func (dsa *DynamicShardAwareness) IsDynamic() bool {
  379. dsa.mutex.RLock()
  380. defer dsa.mutex.RUnlock()
  381. return dsa.isDynamic
  382. }
  383. // GetPerformanceAnalysis returns current performance analysis
  384. func (dsa *DynamicShardAwareness) GetPerformanceAnalysis() PerformanceAnalysis {
  385. return dsa.analyzeCurrentPerformance()
  386. }
  387. // SetAdaptationEnabled enables or disables automatic adaptation
  388. func (dsa *DynamicShardAwareness) SetAdaptationEnabled(enabled bool) {
  389. dsa.adaptationEnabled = enabled
  390. logger.Info("Dynamic shard adaptation setting changed", "enabled", enabled)
  391. }