enhanced_dynamic_shard_manager.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/blevesearch/bleve/v2"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // EnhancedDynamicShardManager extends DefaultShardManager with dynamic scaling
  12. type EnhancedDynamicShardManager struct {
  13. *DefaultShardManager
  14. // Dynamic features
  15. targetShardCount int32
  16. autoScaleEnabled bool
  17. scalingInProgress int32
  18. lastScaleTime time.Time
  19. scalingCooldown time.Duration
  20. // Metrics and monitoring
  21. metricsCollector *ShardMetricsCollector
  22. loadThresholds *ScalingThresholds
  23. // Context and control
  24. ctx context.Context
  25. cancel context.CancelFunc
  26. stopOnce sync.Once
  27. // Real-time shard monitoring
  28. shardHealth map[int]*ShardHealthStatus
  29. healthMutex sync.RWMutex
  30. }
  31. // ShardHealthStatus represents the health and performance of a single shard
  32. type ShardHealthStatus struct {
  33. ShardID int `json:"shard_id"`
  34. IsHealthy bool `json:"is_healthy"`
  35. LastHealthCheck time.Time `json:"last_health_check"`
  36. DocumentCount uint64 `json:"document_count"`
  37. IndexSize int64 `json:"index_size"`
  38. AvgQueryTime time.Duration `json:"avg_query_time"`
  39. IndexingRate float64 `json:"indexing_rate"`
  40. ErrorCount int64 `json:"error_count"`
  41. LastError string `json:"last_error,omitempty"`
  42. LoadScore float64 `json:"load_score"` // 0.0-1.0, higher means more loaded
  43. }
  44. // NewEnhancedDynamicShardManager creates a new enhanced shard manager
  45. func NewEnhancedDynamicShardManager(config *Config) *EnhancedDynamicShardManager {
  46. ctx, cancel := context.WithCancel(context.Background())
  47. dsm := &EnhancedDynamicShardManager{
  48. DefaultShardManager: NewDefaultShardManager(config),
  49. targetShardCount: int32(config.ShardCount),
  50. autoScaleEnabled: true,
  51. scalingCooldown: 3 * time.Minute, // Conservative cooldown
  52. ctx: ctx,
  53. cancel: cancel,
  54. shardHealth: make(map[int]*ShardHealthStatus),
  55. loadThresholds: &ScalingThresholds{
  56. MaxSearchLatency: 3 * time.Second, // More conservative
  57. MaxCPUUtilization: 0.80, // 80% CPU max
  58. MaxMemoryUsage: 0.75, // 75% memory max
  59. MaxDocsPerShard: 5000000, // 5M docs per shard
  60. MaxShardSize: 50 * 1024 * 1024 * 1024, // 50GB per shard
  61. MinSearchLatency: 500 * time.Millisecond,
  62. MinCPUUtilization: 0.20, // 20% CPU min
  63. MinDocsPerShard: 500000, // 500K docs minimum
  64. MinShardSize: 5 * 1024 * 1024 * 1024, // 5GB minimum
  65. MinShards: 2,
  66. MaxShards: max(16, config.WorkerCount), // Reasonable maximum
  67. ScalingCooldown: 3 * time.Minute,
  68. },
  69. }
  70. // Initialize metrics collector with real shard access
  71. realCollector := NewRealShardMetricsCollector(ctx, dsm, 15*time.Second)
  72. dsm.metricsCollector = &ShardMetricsCollector{
  73. realCollector: realCollector,
  74. }
  75. return dsm
  76. }
  77. // Initialize starts the enhanced shard manager
  78. func (dsm *EnhancedDynamicShardManager) Initialize() error {
  79. // Initialize base shard manager first
  80. if err := dsm.DefaultShardManager.Initialize(); err != nil {
  81. return fmt.Errorf("failed to initialize base shard manager: %w", err)
  82. }
  83. // Start metrics collection
  84. if err := dsm.metricsCollector.Start(); err != nil {
  85. return fmt.Errorf("failed to start metrics collector: %w", err)
  86. }
  87. // Initialize health status for existing shards
  88. dsm.initializeHealthStatus()
  89. // Start monitoring goroutines
  90. go dsm.healthMonitoringLoop()
  91. if dsm.autoScaleEnabled {
  92. go dsm.autoScaleMonitoringLoop()
  93. }
  94. logger.Info("Enhanced dynamic shard manager initialized",
  95. "initial_shards", atomic.LoadInt32(&dsm.targetShardCount),
  96. "auto_scale", dsm.autoScaleEnabled)
  97. return nil
  98. }
  99. // ScaleShards dynamically scales shard count
  100. func (dsm *EnhancedDynamicShardManager) ScaleShards(targetCount int) error {
  101. if !atomic.CompareAndSwapInt32(&dsm.scalingInProgress, 0, 1) {
  102. return fmt.Errorf("scaling operation already in progress")
  103. }
  104. defer atomic.StoreInt32(&dsm.scalingInProgress, 0)
  105. currentCount := dsm.config.ShardCount
  106. // Validate target count
  107. if targetCount < dsm.loadThresholds.MinShards {
  108. targetCount = dsm.loadThresholds.MinShards
  109. }
  110. if targetCount > dsm.loadThresholds.MaxShards {
  111. targetCount = dsm.loadThresholds.MaxShards
  112. }
  113. if targetCount == currentCount {
  114. return nil // No change needed
  115. }
  116. logger.Info("Scaling shards",
  117. "current", currentCount,
  118. "target", targetCount,
  119. "action", map[bool]string{true: "scale_up", false: "scale_down"}[targetCount > currentCount])
  120. if targetCount > currentCount {
  121. // Scale up - add new shards (needs lock)
  122. dsm.mu.Lock()
  123. // Scale up - add new shards
  124. for i := currentCount; i < targetCount; i++ {
  125. if err := dsm.createShardLocked(i); err != nil {
  126. dsm.mu.Unlock()
  127. return fmt.Errorf("failed to create shard %d during scale-up: %w", i, err)
  128. }
  129. // Initialize health status for new shard
  130. dsm.healthMutex.Lock()
  131. dsm.shardHealth[i] = &ShardHealthStatus{
  132. ShardID: i,
  133. IsHealthy: true,
  134. LastHealthCheck: time.Now(),
  135. LoadScore: 0.0,
  136. }
  137. dsm.healthMutex.Unlock()
  138. logger.Debug("Created new shard during scale-up", "shard_id", i)
  139. }
  140. // Update config while holding lock
  141. dsm.config.ShardCount = targetCount
  142. dsm.mu.Unlock()
  143. } else {
  144. // Scale down - safely migrate data before removing shards (no lock during migration)
  145. logger.Info("Starting safe scale-down with data migration",
  146. "removing_shards", currentCount-targetCount)
  147. // Step 1: Migrate data WITHOUT holding the main lock to avoid deadlock
  148. for i := currentCount - 1; i >= targetCount; i-- {
  149. migratedDocs, err := dsm.migrateShardData(i, targetCount)
  150. if err != nil {
  151. logger.Errorf("Failed to migrate data from shard %d: %v", i, err)
  152. return fmt.Errorf("data migration failed for shard %d: %w", i, err)
  153. }
  154. logger.Info("Data migration completed",
  155. "from_shard", i,
  156. "migrated_documents", migratedDocs)
  157. }
  158. // Step 2: Now acquire lock and close the empty shards
  159. dsm.mu.Lock()
  160. for i := currentCount - 1; i >= targetCount; i-- {
  161. // Close the now-empty shard (manual implementation to avoid lock re-entry)
  162. if shard, exists := dsm.shards[i]; exists {
  163. if err := shard.Close(); err != nil {
  164. logger.Warnf("Failed to close shard %d during scale-down: %v", i, err)
  165. }
  166. delete(dsm.shards, i)
  167. delete(dsm.shardPaths, i)
  168. }
  169. // Remove from health tracking
  170. dsm.healthMutex.Lock()
  171. delete(dsm.shardHealth, i)
  172. dsm.healthMutex.Unlock()
  173. logger.Info("Successfully removed shard with data preservation", "shard_id", i)
  174. }
  175. // Update config while holding lock
  176. dsm.config.ShardCount = targetCount
  177. dsm.mu.Unlock()
  178. }
  179. atomic.StoreInt32(&dsm.targetShardCount, int32(targetCount))
  180. dsm.lastScaleTime = time.Now()
  181. logger.Info("Shard scaling completed",
  182. "new_count", targetCount,
  183. "duration", time.Since(dsm.lastScaleTime))
  184. return nil
  185. }
  186. // AutoScale performs automatic scaling based on real metrics
  187. func (dsm *EnhancedDynamicShardManager) AutoScale() error {
  188. if !dsm.autoScaleEnabled {
  189. return nil
  190. }
  191. // Check cooldown period
  192. if time.Since(dsm.lastScaleTime) < dsm.scalingCooldown {
  193. return nil
  194. }
  195. metrics := dsm.collectCurrentLoadMetrics()
  196. decision := dsm.makeScalingDecision(metrics)
  197. if decision.Action != "none" {
  198. logger.Info("Auto-scaling decision",
  199. "action", decision.Action,
  200. "current_shards", dsm.config.ShardCount,
  201. "target_shards", decision.TargetShards,
  202. "reason", decision.Reason,
  203. "confidence", decision.Confidence)
  204. return dsm.ScaleShards(decision.TargetShards)
  205. }
  206. return nil
  207. }
  208. // GetShardHealth returns current health status of all shards
  209. func (dsm *EnhancedDynamicShardManager) GetShardHealth() map[int]*ShardHealthStatus {
  210. dsm.healthMutex.RLock()
  211. defer dsm.healthMutex.RUnlock()
  212. // Return deep copy to avoid race conditions
  213. health := make(map[int]*ShardHealthStatus)
  214. for id, status := range dsm.shardHealth {
  215. statusCopy := *status // Copy struct
  216. health[id] = &statusCopy
  217. }
  218. return health
  219. }
  220. // GetScalingRecommendations analyzes current state and provides recommendations
  221. func (dsm *EnhancedDynamicShardManager) GetScalingRecommendations() *ScalingRecommendation {
  222. metrics := dsm.collectCurrentLoadMetrics()
  223. decision := dsm.makeScalingDecision(metrics)
  224. health := dsm.GetShardHealth()
  225. totalDocs := uint64(0)
  226. totalSize := int64(0)
  227. healthyShards := 0
  228. for _, h := range health {
  229. totalDocs += h.DocumentCount
  230. totalSize += h.IndexSize
  231. if h.IsHealthy {
  232. healthyShards++
  233. }
  234. }
  235. return &ScalingRecommendation{
  236. CurrentShards: dsm.config.ShardCount,
  237. RecommendedShards: decision.TargetShards,
  238. Action: decision.Action,
  239. Reason: decision.Reason,
  240. Confidence: decision.Confidence,
  241. TotalDocuments: totalDocs,
  242. TotalSize: totalSize,
  243. HealthyShards: healthyShards,
  244. AutoScaleEnabled: dsm.autoScaleEnabled,
  245. LastScaleTime: dsm.lastScaleTime,
  246. NextScaleAvailable: dsm.lastScaleTime.Add(dsm.scalingCooldown),
  247. }
  248. }
  249. // ScalingRecommendation contains scaling analysis and recommendations
  250. type ScalingRecommendation struct {
  251. CurrentShards int `json:"current_shards"`
  252. RecommendedShards int `json:"recommended_shards"`
  253. Action string `json:"action"`
  254. Reason string `json:"reason"`
  255. Confidence float64 `json:"confidence"`
  256. TotalDocuments uint64 `json:"total_documents"`
  257. TotalSize int64 `json:"total_size"`
  258. HealthyShards int `json:"healthy_shards"`
  259. AutoScaleEnabled bool `json:"auto_scale_enabled"`
  260. LastScaleTime time.Time `json:"last_scale_time"`
  261. NextScaleAvailable time.Time `json:"next_scale_available"`
  262. }
  263. // initializeHealthStatus sets up health monitoring for existing shards
  264. func (dsm *EnhancedDynamicShardManager) initializeHealthStatus() {
  265. dsm.healthMutex.Lock()
  266. defer dsm.healthMutex.Unlock()
  267. for i := 0; i < dsm.config.ShardCount; i++ {
  268. dsm.shardHealth[i] = &ShardHealthStatus{
  269. ShardID: i,
  270. IsHealthy: true,
  271. LastHealthCheck: time.Now(),
  272. LoadScore: 0.0,
  273. }
  274. }
  275. }
  276. // healthMonitoringLoop continuously monitors shard health
  277. func (dsm *EnhancedDynamicShardManager) healthMonitoringLoop() {
  278. ticker := time.NewTicker(30 * time.Second)
  279. defer ticker.Stop()
  280. for {
  281. select {
  282. case <-ticker.C:
  283. dsm.updateShardHealth()
  284. case <-dsm.ctx.Done():
  285. return
  286. }
  287. }
  288. }
  289. // autoScaleMonitoringLoop runs the auto-scaling logic
  290. func (dsm *EnhancedDynamicShardManager) autoScaleMonitoringLoop() {
  291. ticker := time.NewTicker(2 * time.Minute) // Check every 2 minutes
  292. defer ticker.Stop()
  293. for {
  294. select {
  295. case <-ticker.C:
  296. if err := dsm.AutoScale(); err != nil {
  297. logger.Warnf("Auto-scaling failed: %v", err)
  298. }
  299. case <-dsm.ctx.Done():
  300. return
  301. }
  302. }
  303. }
  304. // updateShardHealth performs health checks on all shards
  305. func (dsm *EnhancedDynamicShardManager) updateShardHealth() {
  306. dsm.mu.RLock()
  307. shardIDs := make([]int, 0, len(dsm.shards))
  308. for id := range dsm.shards {
  309. shardIDs = append(shardIDs, id)
  310. }
  311. dsm.mu.RUnlock()
  312. for _, id := range shardIDs {
  313. dsm.checkShardHealth(id)
  314. }
  315. }
  316. // checkShardHealth checks the health of a specific shard
  317. func (dsm *EnhancedDynamicShardManager) checkShardHealth(shardID int) {
  318. shard, err := dsm.GetShardByID(shardID)
  319. if err != nil {
  320. dsm.updateHealthStatus(shardID, false, fmt.Sprintf("Failed to get shard: %v", err))
  321. return
  322. }
  323. // Perform health checks
  324. startTime := time.Now()
  325. // Check 1: Document count (tests basic index access)
  326. docCount, err := shard.DocCount()
  327. if err != nil {
  328. dsm.updateHealthStatus(shardID, false, fmt.Sprintf("DocCount failed: %v", err))
  329. return
  330. }
  331. // Check 2: Quick search test (tests query performance)
  332. searchRequest := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  333. searchRequest.Size = 1 // Minimal result set
  334. _, err = shard.Search(searchRequest)
  335. queryTime := time.Since(startTime)
  336. if err != nil {
  337. dsm.updateHealthStatus(shardID, false, fmt.Sprintf("Search test failed: %v", err))
  338. return
  339. }
  340. // Calculate load score
  341. loadScore := dsm.calculateShardLoad(docCount, queryTime)
  342. // Update health status
  343. dsm.healthMutex.Lock()
  344. if status, exists := dsm.shardHealth[shardID]; exists {
  345. status.IsHealthy = true
  346. status.LastHealthCheck = time.Now()
  347. status.DocumentCount = docCount
  348. status.AvgQueryTime = queryTime
  349. status.LoadScore = loadScore
  350. status.LastError = ""
  351. }
  352. dsm.healthMutex.Unlock()
  353. }
  354. // updateHealthStatus updates the health status of a shard
  355. func (dsm *EnhancedDynamicShardManager) updateHealthStatus(shardID int, healthy bool, errorMsg string) {
  356. dsm.healthMutex.Lock()
  357. defer dsm.healthMutex.Unlock()
  358. if status, exists := dsm.shardHealth[shardID]; exists {
  359. status.IsHealthy = healthy
  360. status.LastHealthCheck = time.Now()
  361. if !healthy {
  362. status.ErrorCount++
  363. status.LastError = errorMsg
  364. }
  365. }
  366. }
  367. // calculateShardLoad calculates a load score for a shard
  368. func (dsm *EnhancedDynamicShardManager) calculateShardLoad(docCount uint64, queryTime time.Duration) float64 {
  369. // Normalize factors
  370. docFactor := float64(docCount) / float64(dsm.loadThresholds.MaxDocsPerShard)
  371. timeFactor := float64(queryTime) / float64(dsm.loadThresholds.MaxSearchLatency)
  372. // Weighted average (60% doc count, 40% query time)
  373. loadScore := (docFactor * 0.6) + (timeFactor * 0.4)
  374. // Cap at 1.0
  375. if loadScore > 1.0 {
  376. loadScore = 1.0
  377. }
  378. return loadScore
  379. }
  380. // collectCurrentLoadMetrics gathers real-time metrics from shards
  381. func (dsm *EnhancedDynamicShardManager) collectCurrentLoadMetrics() LoadMetrics {
  382. health := dsm.GetShardHealth()
  383. shardSizes := make([]int64, 0, len(health))
  384. var totalLatency time.Duration
  385. var maxLatency time.Duration
  386. var totalLoad float64
  387. healthyCount := 0
  388. for _, h := range health {
  389. shardSizes = append(shardSizes, h.IndexSize)
  390. totalLatency += h.AvgQueryTime
  391. totalLoad += h.LoadScore
  392. if h.AvgQueryTime > maxLatency {
  393. maxLatency = h.AvgQueryTime
  394. }
  395. if h.IsHealthy {
  396. healthyCount++
  397. }
  398. }
  399. avgLoad := 0.0
  400. if len(health) > 0 {
  401. avgLoad = totalLoad / float64(len(health))
  402. }
  403. return LoadMetrics{
  404. SearchLatency: maxLatency, // Use max latency for scaling decisions
  405. ShardSizes: shardSizes,
  406. CPUUtilization: avgLoad, // Use load score as CPU proxy
  407. ActiveQueries: 0, // TODO: Track active queries
  408. QueueLength: 0, // TODO: Get from indexer queue
  409. }
  410. }
  411. // makeScalingDecision analyzes metrics and decides on scaling action
  412. func (dsm *EnhancedDynamicShardManager) makeScalingDecision(metrics LoadMetrics) ScalingDecision {
  413. thresholds := dsm.loadThresholds
  414. currentShards := dsm.config.ShardCount
  415. // Check scale-up conditions
  416. if metrics.SearchLatency > thresholds.MaxSearchLatency {
  417. return ScalingDecision{
  418. Action: "scale_up",
  419. TargetShards: min(currentShards+1, thresholds.MaxShards),
  420. Reason: fmt.Sprintf("High search latency: %v > %v", metrics.SearchLatency, thresholds.MaxSearchLatency),
  421. Confidence: 0.9,
  422. }
  423. }
  424. // Check if any shard is too large
  425. for i, size := range metrics.ShardSizes {
  426. if size > thresholds.MaxShardSize {
  427. return ScalingDecision{
  428. Action: "scale_up",
  429. TargetShards: min(currentShards+1, thresholds.MaxShards),
  430. Reason: fmt.Sprintf("Shard %d too large: %d bytes > %d bytes", i, size, thresholds.MaxShardSize),
  431. Confidence: 0.8,
  432. }
  433. }
  434. }
  435. // Check CPU utilization (using load score)
  436. if metrics.CPUUtilization > thresholds.MaxCPUUtilization {
  437. return ScalingDecision{
  438. Action: "scale_up",
  439. TargetShards: min(currentShards+1, thresholds.MaxShards),
  440. Reason: fmt.Sprintf("High load: %.2f > %.2f", metrics.CPUUtilization, thresholds.MaxCPUUtilization),
  441. Confidence: 0.7,
  442. }
  443. }
  444. // Check scale-down conditions (more conservative)
  445. if currentShards > thresholds.MinShards &&
  446. metrics.SearchLatency < thresholds.MinSearchLatency &&
  447. metrics.CPUUtilization < thresholds.MinCPUUtilization {
  448. // Ensure all shards are small enough for scale-down
  449. allShardsSmall := true
  450. for _, size := range metrics.ShardSizes {
  451. if size > thresholds.MinShardSize*2 { // 2x buffer for safety
  452. allShardsSmall = false
  453. break
  454. }
  455. }
  456. if allShardsSmall {
  457. return ScalingDecision{
  458. Action: "scale_down",
  459. TargetShards: max(currentShards-1, thresholds.MinShards),
  460. Reason: "All shards underutilized and small",
  461. Confidence: 0.6,
  462. }
  463. }
  464. }
  465. return ScalingDecision{
  466. Action: "none",
  467. TargetShards: currentShards,
  468. Reason: "Current configuration optimal",
  469. Confidence: 0.5,
  470. }
  471. }
  472. // SetAutoScaleEnabled enables or disables auto-scaling
  473. func (dsm *EnhancedDynamicShardManager) SetAutoScaleEnabled(enabled bool) {
  474. dsm.autoScaleEnabled = enabled
  475. logger.Info("Auto-scaling setting changed", "enabled", enabled)
  476. }
  477. // IsAutoScaleEnabled returns the current auto-scaling status
  478. func (dsm *EnhancedDynamicShardManager) IsAutoScaleEnabled() bool {
  479. return dsm.autoScaleEnabled
  480. }
  481. // Close shuts down the enhanced shard manager
  482. func (dsm *EnhancedDynamicShardManager) Close() error {
  483. var closeErr error
  484. dsm.stopOnce.Do(func() {
  485. dsm.cancel()
  486. // Stop metrics collection
  487. if dsm.metricsCollector != nil {
  488. dsm.metricsCollector.Stop()
  489. }
  490. // Close base shard manager
  491. if err := dsm.DefaultShardManager.Close(); err != nil {
  492. closeErr = fmt.Errorf("failed to close base shard manager: %w", err)
  493. }
  494. })
  495. return closeErr
  496. }
  497. // GetCurrentShardCount returns the current number of shards
  498. func (dsm *EnhancedDynamicShardManager) GetCurrentShardCount() int {
  499. return dsm.config.ShardCount
  500. }
  501. // GetShardCount implements the ShardManager interface
  502. func (dsm *EnhancedDynamicShardManager) GetShardCount() int {
  503. return dsm.GetCurrentShardCount()
  504. }
  505. // GetTargetShardCount returns the target shard count
  506. func (dsm *EnhancedDynamicShardManager) GetTargetShardCount() int {
  507. return int(atomic.LoadInt32(&dsm.targetShardCount))
  508. }
  509. // IsScalingInProgress returns whether a scaling operation is in progress
  510. func (dsm *EnhancedDynamicShardManager) IsScalingInProgress() bool {
  511. return atomic.LoadInt32(&dsm.scalingInProgress) == 1
  512. }
  513. // migrateShardData safely migrates all documents from source shard to target shards
  514. func (dsm *EnhancedDynamicShardManager) migrateShardData(sourceShardID int, targetShardCount int) (int64, error) {
  515. logger.Info("Starting data migration",
  516. "source_shard", sourceShardID,
  517. "target_shard_count", targetShardCount)
  518. // Get source shard
  519. sourceShard, err := dsm.GetShardByID(sourceShardID)
  520. if err != nil {
  521. return 0, fmt.Errorf("failed to get source shard %d: %w", sourceShardID, err)
  522. }
  523. bleveIndex, ok := sourceShard.(bleve.Index)
  524. if !ok {
  525. return 0, fmt.Errorf("source shard %d is not a bleve.Index", sourceShardID)
  526. }
  527. // Create search query to get all documents
  528. searchRequest := bleve.NewSearchRequest(bleve.NewMatchAllQuery())
  529. searchRequest.Size = 100 // Smaller batch size for testing
  530. searchRequest.From = 0
  531. searchRequest.IncludeLocations = false
  532. searchRequest.Fields = []string{"content", "type"} // Only specific fields to avoid issues
  533. var totalMigrated int64
  534. batchNum := 0
  535. for {
  536. // Search for batch of documents
  537. searchResult, err := bleveIndex.Search(searchRequest)
  538. if err != nil {
  539. return totalMigrated, fmt.Errorf("failed to search source shard %d at batch %d: %w", sourceShardID, batchNum, err)
  540. }
  541. if len(searchResult.Hits) == 0 {
  542. break // No more documents
  543. }
  544. logger.Debug("Migrating document batch",
  545. "source_shard", sourceShardID,
  546. "batch", batchNum,
  547. "documents", len(searchResult.Hits))
  548. // Migrate each document in the batch
  549. batch := bleveIndex.NewBatch()
  550. for _, hit := range searchResult.Hits {
  551. // Determine target shard using hash function from base manager
  552. targetShardID := dsm.DefaultShardManager.hashFunc(hit.ID, targetShardCount)
  553. // Get target shard
  554. targetShard, err := dsm.GetShardByID(targetShardID)
  555. if err != nil {
  556. return totalMigrated, fmt.Errorf("failed to get target shard %d: %w", targetShardID, err)
  557. }
  558. targetIndex, ok := targetShard.(bleve.Index)
  559. if !ok {
  560. return totalMigrated, fmt.Errorf("target shard %d is not a bleve.Index", targetShardID)
  561. }
  562. // Create document for re-indexing using stored fields from search hit
  563. documentData := make(map[string]interface{})
  564. // Use the stored fields from the search hit
  565. if hit.Fields != nil {
  566. for fieldName, fieldValue := range hit.Fields {
  567. documentData[fieldName] = fieldValue
  568. }
  569. } else {
  570. // Fallback: reconstruct from hit fragments if available
  571. documentData["id"] = hit.ID
  572. documentData["score"] = hit.Score
  573. }
  574. // Index in target shard
  575. if err := targetIndex.Index(hit.ID, documentData); err != nil {
  576. logger.Warnf("Failed to index document %s in target shard %d: %v", hit.ID, targetShardID, err)
  577. continue
  578. }
  579. // Add to batch for deletion from source
  580. batch.Delete(hit.ID)
  581. totalMigrated++
  582. }
  583. // Delete migrated documents from source shard
  584. if batch.Size() > 0 {
  585. if err := bleveIndex.Batch(batch); err != nil {
  586. logger.Warnf("Failed to delete migrated documents from source shard %d: %v", sourceShardID, err)
  587. // Continue - documents are already copied to target shards
  588. }
  589. }
  590. // Prepare for next batch
  591. searchRequest.From += len(searchResult.Hits)
  592. batchNum++
  593. // Safety check - avoid infinite loops
  594. if batchNum > 1000 {
  595. logger.Warnf("Migration stopped after 1000 batches from shard %d", sourceShardID)
  596. break
  597. }
  598. }
  599. logger.Info("Shard data migration completed",
  600. "source_shard", sourceShardID,
  601. "total_migrated", totalMigrated,
  602. "batches_processed", batchNum)
  603. return totalMigrated, nil
  604. }