dynamic_shard_manager.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/uozi-tech/cosy/logger"
  9. )
  10. // DynamicShardManager extends ShardManager with dynamic scaling capabilities
  11. type DynamicShardManager interface {
  12. // Basic ShardManager interface (would be defined elsewhere)
  13. Initialize() error
  14. Close() error
  15. GetShardCount() int
  16. GetShard(shardID int) (interface{}, error) // Returns actual shard implementation
  17. // Dynamic scaling methods
  18. ScaleShards(targetCount int) error
  19. AutoScale(metrics LoadMetrics) error
  20. RebalanceShards() error
  21. GetShardMetrics() []ShardMetrics
  22. // Configuration
  23. SetAutoScaleEnabled(enabled bool)
  24. IsAutoScaleEnabled() bool
  25. }
  26. // LoadMetrics represents system load metrics for scaling decisions
  27. type LoadMetrics struct {
  28. IndexingThroughput float64 `json:"indexing_throughput"` // docs/sec
  29. SearchLatency time.Duration `json:"search_latency"`
  30. CPUUtilization float64 `json:"cpu_utilization"`
  31. MemoryUsage float64 `json:"memory_usage"`
  32. ShardSizes []int64 `json:"shard_sizes"`
  33. ActiveQueries int `json:"active_queries"`
  34. QueueLength int `json:"queue_length"`
  35. }
  36. // ShardMetrics represents metrics for a single shard
  37. type ShardMetrics struct {
  38. ShardID int `json:"shard_id"`
  39. DocumentCount int64 `json:"document_count"`
  40. IndexSize int64 `json:"index_size"`
  41. SearchLatency time.Duration `json:"search_latency"`
  42. IndexingRate float64 `json:"indexing_rate"`
  43. CPUUsage float64 `json:"cpu_usage"`
  44. MemoryUsage int64 `json:"memory_usage"`
  45. LastOptimized time.Time `json:"last_optimized"`
  46. }
  47. // DefaultDynamicShardManager implements DynamicShardManager
  48. type DefaultDynamicShardManager struct {
  49. config *Config
  50. currentShardCount int32
  51. shards map[int]interface{} // Abstract shard storage
  52. shardsLock sync.RWMutex
  53. // Auto-scaling
  54. autoScaleEnabled bool
  55. scalingInProgress int32
  56. lastScaleTime time.Time
  57. scalingCooldown time.Duration
  58. // Monitoring
  59. metricsCollector *ShardMetricsCollector
  60. loadThresholds *ScalingThresholds
  61. // Context and control
  62. ctx context.Context
  63. cancel context.CancelFunc
  64. stopOnce sync.Once
  65. }
  66. // ScalingThresholds defines when to scale up or down
  67. type ScalingThresholds struct {
  68. // Scale up thresholds
  69. MaxSearchLatency time.Duration `json:"max_search_latency"`
  70. MaxCPUUtilization float64 `json:"max_cpu_utilization"`
  71. MaxMemoryUsage float64 `json:"max_memory_usage"`
  72. MaxDocsPerShard int64 `json:"max_docs_per_shard"`
  73. MaxShardSize int64 `json:"max_shard_size"`
  74. // Scale down thresholds
  75. MinSearchLatency time.Duration `json:"min_search_latency"`
  76. MinCPUUtilization float64 `json:"min_cpu_utilization"`
  77. MinDocsPerShard int64 `json:"min_docs_per_shard"`
  78. MinShardSize int64 `json:"min_shard_size"`
  79. // Constraints
  80. MinShards int `json:"min_shards"`
  81. MaxShards int `json:"max_shards"`
  82. ScalingCooldown time.Duration `json:"scaling_cooldown"`
  83. }
  84. // ShardMetricsCollector collects and aggregates shard performance metrics
  85. type ShardMetricsCollector struct {
  86. realCollector *RealShardMetricsCollector // Real metrics collector
  87. metrics []ShardMetrics
  88. metricsLock sync.RWMutex
  89. collectInterval time.Duration
  90. running int32
  91. ctx context.Context
  92. cancel context.CancelFunc
  93. }
  94. // NewDynamicShardManager creates a new dynamic shard manager
  95. func NewDynamicShardManager(config *Config) *DefaultDynamicShardManager {
  96. ctx, cancel := context.WithCancel(context.Background())
  97. dsm := &DefaultDynamicShardManager{
  98. config: config,
  99. currentShardCount: int32(config.ShardCount),
  100. shards: make(map[int]interface{}),
  101. autoScaleEnabled: true,
  102. scalingCooldown: 5 * time.Minute, // Prevent rapid scaling
  103. ctx: ctx,
  104. cancel: cancel,
  105. loadThresholds: &ScalingThresholds{
  106. MaxSearchLatency: 5 * time.Second,
  107. MaxCPUUtilization: 0.85,
  108. MaxMemoryUsage: 0.80,
  109. MaxDocsPerShard: 10000000, // 10M docs per shard
  110. MaxShardSize: 100 * 1024 * 1024 * 1024, // 100GB per shard
  111. MinSearchLatency: 1 * time.Second,
  112. MinCPUUtilization: 0.30,
  113. MinDocsPerShard: 1000000, // 1M docs minimum
  114. MinShardSize: 10 * 1024 * 1024 * 1024, // 10GB minimum
  115. MinShards: 2,
  116. MaxShards: max(32, config.WorkerCount*2), // Scale with workers
  117. ScalingCooldown: 5 * time.Minute,
  118. },
  119. }
  120. // Initialize metrics collector
  121. dsm.metricsCollector = NewShardMetricsCollector(ctx, 30*time.Second)
  122. return dsm
  123. }
  124. // Initialize starts the dynamic shard manager
  125. func (dsm *DefaultDynamicShardManager) Initialize() error {
  126. // Initialize base shards
  127. for i := 0; i < int(atomic.LoadInt32(&dsm.currentShardCount)); i++ {
  128. dsm.shards[i] = fmt.Sprintf("shard-%d", i) // Placeholder
  129. }
  130. // Start metrics collection
  131. if err := dsm.metricsCollector.Start(); err != nil {
  132. return fmt.Errorf("failed to start metrics collector: %w", err)
  133. }
  134. // Start auto-scaling monitor if enabled
  135. if dsm.autoScaleEnabled {
  136. go dsm.autoScaleMonitor()
  137. }
  138. logger.Info("Dynamic shard manager initialized",
  139. "initial_shards", dsm.currentShardCount,
  140. "max_shards", dsm.loadThresholds.MaxShards)
  141. return nil
  142. }
  143. // Close shuts down the dynamic shard manager
  144. func (dsm *DefaultDynamicShardManager) Close() error {
  145. var closeErr error
  146. dsm.stopOnce.Do(func() {
  147. dsm.cancel()
  148. // Stop metrics collection
  149. if dsm.metricsCollector != nil {
  150. dsm.metricsCollector.Stop()
  151. }
  152. // Close all shards
  153. dsm.shardsLock.Lock()
  154. defer dsm.shardsLock.Unlock()
  155. for id := range dsm.shards {
  156. // Close shard implementation
  157. delete(dsm.shards, id)
  158. }
  159. })
  160. return closeErr
  161. }
  162. // GetShardCount returns current number of shards
  163. func (dsm *DefaultDynamicShardManager) GetShardCount() int {
  164. return int(atomic.LoadInt32(&dsm.currentShardCount))
  165. }
  166. // GetShard returns a specific shard
  167. func (dsm *DefaultDynamicShardManager) GetShard(shardID int) (interface{}, error) {
  168. dsm.shardsLock.RLock()
  169. defer dsm.shardsLock.RUnlock()
  170. shard, exists := dsm.shards[shardID]
  171. if !exists {
  172. return nil, fmt.Errorf("shard %d does not exist", shardID)
  173. }
  174. return shard, nil
  175. }
  176. // ScaleShards scales to target shard count
  177. func (dsm *DefaultDynamicShardManager) ScaleShards(targetCount int) error {
  178. if !atomic.CompareAndSwapInt32(&dsm.scalingInProgress, 0, 1) {
  179. return fmt.Errorf("scaling operation already in progress")
  180. }
  181. defer atomic.StoreInt32(&dsm.scalingInProgress, 0)
  182. currentCount := int(atomic.LoadInt32(&dsm.currentShardCount))
  183. // Validate target count
  184. if targetCount < dsm.loadThresholds.MinShards {
  185. targetCount = dsm.loadThresholds.MinShards
  186. }
  187. if targetCount > dsm.loadThresholds.MaxShards {
  188. targetCount = dsm.loadThresholds.MaxShards
  189. }
  190. if targetCount == currentCount {
  191. return nil // No change needed
  192. }
  193. logger.Info("Scaling shards",
  194. "current", currentCount,
  195. "target", targetCount)
  196. dsm.shardsLock.Lock()
  197. defer dsm.shardsLock.Unlock()
  198. if targetCount > currentCount {
  199. // Scale up - add new shards
  200. for i := currentCount; i < targetCount; i++ {
  201. dsm.shards[i] = fmt.Sprintf("shard-%d", i) // Create new shard
  202. logger.Debug("Created new shard", "shard_id", i)
  203. }
  204. } else {
  205. // Scale down - remove shards (would need data migration)
  206. for i := currentCount - 1; i >= targetCount; i-- {
  207. // TODO: Implement data migration before removal
  208. delete(dsm.shards, i)
  209. logger.Debug("Removed shard", "shard_id", i)
  210. }
  211. }
  212. atomic.StoreInt32(&dsm.currentShardCount, int32(targetCount))
  213. dsm.lastScaleTime = time.Now()
  214. logger.Info("Shard scaling completed",
  215. "new_count", targetCount,
  216. "operation", map[bool]string{true: "scale_up", false: "scale_down"}[targetCount > currentCount])
  217. return nil
  218. }
  219. // AutoScale performs automatic scaling based on load metrics
  220. func (dsm *DefaultDynamicShardManager) AutoScale(metrics LoadMetrics) error {
  221. if !dsm.autoScaleEnabled {
  222. return nil
  223. }
  224. // Check cooldown period
  225. if time.Since(dsm.lastScaleTime) < dsm.scalingCooldown {
  226. return nil
  227. }
  228. currentShards := dsm.GetShardCount()
  229. decision := dsm.makeScalingDecision(metrics, currentShards)
  230. if decision.Action != "none" {
  231. logger.Info("Auto-scaling decision",
  232. "action", decision.Action,
  233. "current_shards", currentShards,
  234. "target_shards", decision.TargetShards,
  235. "reason", decision.Reason)
  236. return dsm.ScaleShards(decision.TargetShards)
  237. }
  238. return nil
  239. }
  240. // ScalingDecision represents a scaling decision
  241. type ScalingDecision struct {
  242. Action string `json:"action"` // "scale_up", "scale_down", "none"
  243. TargetShards int `json:"target_shards"`
  244. Reason string `json:"reason"`
  245. Confidence float64 `json:"confidence"` // 0.0-1.0
  246. }
  247. // makeScalingDecision analyzes metrics and decides on scaling
  248. func (dsm *DefaultDynamicShardManager) makeScalingDecision(metrics LoadMetrics, currentShards int) ScalingDecision {
  249. thresholds := dsm.loadThresholds
  250. // Check scale-up conditions
  251. if metrics.SearchLatency > thresholds.MaxSearchLatency {
  252. return ScalingDecision{
  253. Action: "scale_up",
  254. TargetShards: min(currentShards+2, thresholds.MaxShards),
  255. Reason: fmt.Sprintf("High search latency: %v > %v", metrics.SearchLatency, thresholds.MaxSearchLatency),
  256. Confidence: 0.9,
  257. }
  258. }
  259. if metrics.CPUUtilization > thresholds.MaxCPUUtilization {
  260. return ScalingDecision{
  261. Action: "scale_up",
  262. TargetShards: min(currentShards+1, thresholds.MaxShards),
  263. Reason: fmt.Sprintf("High CPU utilization: %.2f > %.2f", metrics.CPUUtilization, thresholds.MaxCPUUtilization),
  264. Confidence: 0.8,
  265. }
  266. }
  267. // Check if any shard is too large
  268. maxShardSize := int64(0)
  269. for _, size := range metrics.ShardSizes {
  270. if size > maxShardSize {
  271. maxShardSize = size
  272. }
  273. }
  274. if maxShardSize > thresholds.MaxShardSize {
  275. return ScalingDecision{
  276. Action: "scale_up",
  277. TargetShards: min(currentShards+1, thresholds.MaxShards),
  278. Reason: fmt.Sprintf("Large shard detected: %d bytes > %d bytes", maxShardSize, thresholds.MaxShardSize),
  279. Confidence: 0.7,
  280. }
  281. }
  282. // Check scale-down conditions (more conservative)
  283. if currentShards > thresholds.MinShards &&
  284. metrics.SearchLatency < thresholds.MinSearchLatency &&
  285. metrics.CPUUtilization < thresholds.MinCPUUtilization {
  286. // Check if all shards are underutilized
  287. allShardsSmall := true
  288. for _, size := range metrics.ShardSizes {
  289. if size > thresholds.MinShardSize {
  290. allShardsSmall = false
  291. break
  292. }
  293. }
  294. if allShardsSmall {
  295. return ScalingDecision{
  296. Action: "scale_down",
  297. TargetShards: max(currentShards-1, thresholds.MinShards),
  298. Reason: "All shards underutilized",
  299. Confidence: 0.6,
  300. }
  301. }
  302. }
  303. return ScalingDecision{
  304. Action: "none",
  305. TargetShards: currentShards,
  306. Reason: "Current configuration optimal",
  307. Confidence: 0.5,
  308. }
  309. }
  310. // autoScaleMonitor runs the auto-scaling monitoring loop
  311. func (dsm *DefaultDynamicShardManager) autoScaleMonitor() {
  312. ticker := time.NewTicker(60 * time.Second) // Check every minute
  313. defer ticker.Stop()
  314. for {
  315. select {
  316. case <-ticker.C:
  317. metrics := dsm.collectLoadMetrics()
  318. if err := dsm.AutoScale(metrics); err != nil {
  319. logger.Warnf("Auto-scaling failed: %v", err)
  320. }
  321. case <-dsm.ctx.Done():
  322. return
  323. }
  324. }
  325. }
  326. // collectLoadMetrics gathers current system metrics
  327. func (dsm *DefaultDynamicShardManager) collectLoadMetrics() LoadMetrics {
  328. shardMetrics := dsm.GetShardMetrics()
  329. shardSizes := make([]int64, len(shardMetrics))
  330. var totalLatency time.Duration
  331. var totalCPU float64
  332. for i, shard := range shardMetrics {
  333. shardSizes[i] = shard.IndexSize
  334. totalLatency += shard.SearchLatency
  335. totalCPU += shard.CPUUsage
  336. }
  337. avgLatency := time.Duration(0)
  338. avgCPU := 0.0
  339. if len(shardMetrics) > 0 {
  340. avgLatency = totalLatency / time.Duration(len(shardMetrics))
  341. avgCPU = totalCPU / float64(len(shardMetrics))
  342. }
  343. return LoadMetrics{
  344. IndexingThroughput: dsm.getIndexingThroughput(),
  345. SearchLatency: avgLatency,
  346. CPUUtilization: avgCPU,
  347. MemoryUsage: dsm.getMemoryUsage(),
  348. ShardSizes: shardSizes,
  349. ActiveQueries: dsm.getActiveQueries(),
  350. QueueLength: dsm.getQueueLength(),
  351. }
  352. }
  353. // RebalanceShards redistributes data across shards for optimal performance
  354. func (dsm *DefaultDynamicShardManager) RebalanceShards() error {
  355. // This would implement sophisticated data rebalancing
  356. logger.Info("Shard rebalancing initiated")
  357. // TODO: Implement actual rebalancing logic
  358. // 1. Analyze current data distribution
  359. // 2. Calculate optimal distribution
  360. // 3. Create migration plan
  361. // 4. Execute migration with minimal downtime
  362. return nil
  363. }
  364. // GetShardMetrics returns current metrics for all shards
  365. func (dsm *DefaultDynamicShardManager) GetShardMetrics() []ShardMetrics {
  366. if dsm.metricsCollector != nil {
  367. return dsm.metricsCollector.GetMetrics()
  368. }
  369. return []ShardMetrics{}
  370. }
  371. // SetAutoScaleEnabled enables or disables auto-scaling
  372. func (dsm *DefaultDynamicShardManager) SetAutoScaleEnabled(enabled bool) {
  373. dsm.autoScaleEnabled = enabled
  374. logger.Info("Auto-scaling setting changed", "enabled", enabled)
  375. }
  376. // IsAutoScaleEnabled returns current auto-scaling status
  377. func (dsm *DefaultDynamicShardManager) IsAutoScaleEnabled() bool {
  378. return dsm.autoScaleEnabled
  379. }
  380. // Helper methods for metrics collection
  381. func (dsm *DefaultDynamicShardManager) getIndexingThroughput() float64 {
  382. // TODO: Get actual throughput from indexer
  383. return 1000.0 // Placeholder
  384. }
  385. func (dsm *DefaultDynamicShardManager) getMemoryUsage() float64 {
  386. // TODO: Get actual memory usage
  387. return 0.5 // Placeholder
  388. }
  389. func (dsm *DefaultDynamicShardManager) getActiveQueries() int {
  390. // TODO: Get actual active query count
  391. return 0 // Placeholder
  392. }
  393. func (dsm *DefaultDynamicShardManager) getQueueLength() int {
  394. // TODO: Get actual queue length
  395. return 0 // Placeholder
  396. }
  397. // NewShardMetricsCollector creates a new metrics collector
  398. func NewShardMetricsCollector(ctx context.Context, interval time.Duration) *ShardMetricsCollector {
  399. collectorCtx, cancel := context.WithCancel(ctx)
  400. return &ShardMetricsCollector{
  401. metrics: make([]ShardMetrics, 0),
  402. collectInterval: interval,
  403. ctx: collectorCtx,
  404. cancel: cancel,
  405. }
  406. }
  407. // Start begins metrics collection
  408. func (smc *ShardMetricsCollector) Start() error {
  409. if smc.realCollector != nil {
  410. return smc.realCollector.Start()
  411. }
  412. if !atomic.CompareAndSwapInt32(&smc.running, 0, 1) {
  413. return fmt.Errorf("metrics collector already running")
  414. }
  415. go smc.collectLoop()
  416. return nil
  417. }
  418. // Stop halts metrics collection
  419. func (smc *ShardMetricsCollector) Stop() {
  420. if smc.realCollector != nil {
  421. smc.realCollector.Stop()
  422. return
  423. }
  424. if atomic.CompareAndSwapInt32(&smc.running, 1, 0) {
  425. smc.cancel()
  426. }
  427. }
  428. // collectLoop runs the metrics collection loop
  429. func (smc *ShardMetricsCollector) collectLoop() {
  430. ticker := time.NewTicker(smc.collectInterval)
  431. defer ticker.Stop()
  432. for {
  433. select {
  434. case <-ticker.C:
  435. smc.collectMetrics()
  436. case <-smc.ctx.Done():
  437. return
  438. }
  439. }
  440. }
  441. // collectMetrics gathers current shard metrics
  442. func (smc *ShardMetricsCollector) collectMetrics() {
  443. // TODO: Implement actual metrics collection from shards
  444. smc.metricsLock.Lock()
  445. defer smc.metricsLock.Unlock()
  446. // Placeholder metrics
  447. smc.metrics = []ShardMetrics{
  448. {
  449. ShardID: 0,
  450. DocumentCount: 1000000,
  451. IndexSize: 1024 * 1024 * 1024, // 1GB
  452. SearchLatency: 100 * time.Millisecond,
  453. IndexingRate: 500.0,
  454. CPUUsage: 0.4,
  455. MemoryUsage: 512 * 1024 * 1024, // 512MB
  456. LastOptimized: time.Now().Add(-1 * time.Hour),
  457. },
  458. }
  459. }
  460. // GetMetrics returns current shard metrics
  461. func (smc *ShardMetricsCollector) GetMetrics() []ShardMetrics {
  462. if smc.realCollector != nil {
  463. return smc.realCollector.GetMetrics()
  464. }
  465. smc.metricsLock.RLock()
  466. defer smc.metricsLock.RUnlock()
  467. // Return copy to avoid race conditions
  468. metrics := make([]ShardMetrics, len(smc.metrics))
  469. copy(metrics, smc.metrics)
  470. return metrics
  471. }