rebuild.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. package indexer
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/uozi-tech/cosy/logger"
  15. )
  16. // RebuildManager handles index rebuilding operations
  17. type RebuildManager struct {
  18. indexer *ParallelIndexer
  19. persistence *PersistenceManager
  20. progressManager *ProgressManager
  21. shardManager ShardManager
  22. config *RebuildConfig
  23. rebuilding int32 // atomic flag
  24. lastRebuildTime time.Time
  25. mu sync.RWMutex
  26. }
  27. // RebuildConfig contains configuration for rebuild operations
  28. type RebuildConfig struct {
  29. BatchSize int `json:"batch_size"`
  30. MaxConcurrency int `json:"max_concurrency"`
  31. DeleteBeforeRebuild bool `json:"delete_before_rebuild"`
  32. ProgressInterval time.Duration `json:"progress_interval"`
  33. TimeoutPerFile time.Duration `json:"timeout_per_file"`
  34. }
  35. // DefaultRebuildConfig returns default rebuild configuration
  36. func DefaultRebuildConfig() *RebuildConfig {
  37. return &RebuildConfig{
  38. BatchSize: 1000,
  39. MaxConcurrency: 4,
  40. DeleteBeforeRebuild: true,
  41. ProgressInterval: 5 * time.Second,
  42. TimeoutPerFile: 30 * time.Minute,
  43. }
  44. }
  45. // NewRebuildManager creates a new rebuild manager
  46. func NewRebuildManager(indexer *ParallelIndexer, persistence *PersistenceManager, progressManager *ProgressManager, shardManager ShardManager, config *RebuildConfig) *RebuildManager {
  47. if config == nil {
  48. config = DefaultRebuildConfig()
  49. }
  50. return &RebuildManager{
  51. indexer: indexer,
  52. persistence: persistence,
  53. progressManager: progressManager,
  54. shardManager: shardManager,
  55. config: config,
  56. }
  57. }
  58. // RebuildAll rebuilds all indexes from scratch
  59. func (rm *RebuildManager) RebuildAll(ctx context.Context) error {
  60. // Check if already rebuilding
  61. if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
  62. return fmt.Errorf("rebuild already in progress")
  63. }
  64. defer atomic.StoreInt32(&rm.rebuilding, 0)
  65. startTime := time.Now()
  66. rm.mu.Lock()
  67. rm.lastRebuildTime = startTime
  68. rm.mu.Unlock()
  69. // Get all log groups to rebuild
  70. logGroups, err := rm.getAllLogGroups()
  71. if err != nil {
  72. return fmt.Errorf("failed to get log groups: %w", err)
  73. }
  74. if len(logGroups) == 0 {
  75. return fmt.Errorf("no log groups found to rebuild")
  76. }
  77. // Delete existing indexes if configured
  78. if rm.config.DeleteBeforeRebuild {
  79. if err := rm.deleteAllIndexes(); err != nil {
  80. return fmt.Errorf("failed to delete existing indexes: %w", err)
  81. }
  82. }
  83. // Reset persistence records
  84. if rm.persistence != nil {
  85. if err := rm.resetAllPersistenceRecords(); err != nil {
  86. return fmt.Errorf("failed to reset persistence records: %w", err)
  87. }
  88. }
  89. // Create progress tracker for overall rebuild
  90. rebuildProgress := &RebuildProgress{
  91. TotalGroups: len(logGroups),
  92. CompletedGroups: 0,
  93. StartTime: startTime,
  94. }
  95. // Process each log group
  96. errors := make([]error, 0)
  97. var wg sync.WaitGroup
  98. semaphore := make(chan struct{}, rm.config.MaxConcurrency)
  99. for _, logGroup := range logGroups {
  100. wg.Add(1)
  101. go func(group string) {
  102. defer wg.Done()
  103. // Acquire semaphore
  104. semaphore <- struct{}{}
  105. defer func() { <-semaphore }()
  106. // Check context
  107. if ctx.Err() != nil {
  108. return
  109. }
  110. // Rebuild this log group
  111. if err := rm.rebuildLogGroup(ctx, group); err != nil {
  112. rm.mu.Lock()
  113. errors = append(errors, fmt.Errorf("failed to rebuild group %s: %w", group, err))
  114. rm.mu.Unlock()
  115. } else {
  116. // Update progress
  117. rm.mu.Lock()
  118. rebuildProgress.CompletedGroups++
  119. rm.mu.Unlock()
  120. // Notify progress
  121. rm.notifyRebuildProgress(rebuildProgress)
  122. }
  123. }(logGroup)
  124. }
  125. // Wait for all groups to complete
  126. wg.Wait()
  127. // Check for errors
  128. if len(errors) > 0 {
  129. return fmt.Errorf("rebuild completed with %d errors: %v", len(errors), errors)
  130. }
  131. rebuildProgress.CompletedTime = time.Now()
  132. rebuildProgress.Duration = time.Since(startTime)
  133. // Notify completion
  134. rm.notifyRebuildComplete(rebuildProgress)
  135. return nil
  136. }
  137. // RebuildSingle rebuilds index for a single log group
  138. func (rm *RebuildManager) RebuildSingle(ctx context.Context, logGroupPath string) error {
  139. // Check if already rebuilding
  140. if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
  141. return fmt.Errorf("rebuild already in progress")
  142. }
  143. defer atomic.StoreInt32(&rm.rebuilding, 0)
  144. startTime := time.Now()
  145. // Delete existing index for this log group if configured
  146. if rm.config.DeleteBeforeRebuild {
  147. if err := rm.deleteLogGroupIndex(logGroupPath); err != nil {
  148. return fmt.Errorf("failed to delete existing index: %w", err)
  149. }
  150. }
  151. // Reset persistence records for this group
  152. if rm.persistence != nil {
  153. if err := rm.resetLogGroupPersistence(logGroupPath); err != nil {
  154. return fmt.Errorf("failed to reset persistence: %w", err)
  155. }
  156. }
  157. // Rebuild the log group
  158. if err := rm.rebuildLogGroup(ctx, logGroupPath); err != nil {
  159. return fmt.Errorf("failed to rebuild log group: %w", err)
  160. }
  161. duration := time.Since(startTime)
  162. // Notify completion
  163. rm.notifySingleRebuildComplete(logGroupPath, duration)
  164. return nil
  165. }
  166. // rebuildLogGroup rebuilds index for a single log group
  167. func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath string) error {
  168. // Get all files for this log group
  169. files, err := rm.discoverLogGroupFiles(logGroupPath)
  170. if err != nil {
  171. return fmt.Errorf("failed to discover files: %w", err)
  172. }
  173. if len(files) == 0 {
  174. return fmt.Errorf("no files found for log group %s", logGroupPath)
  175. }
  176. // Create progress tracker for this log group
  177. progressConfig := &ProgressConfig{
  178. OnProgress: func(pn ProgressNotification) {
  179. // Handle progress notifications
  180. rm.handleProgressNotification(logGroupPath, pn)
  181. },
  182. OnCompletion: func(cn CompletionNotification) {
  183. // Handle completion notifications
  184. rm.handleCompletionNotification(logGroupPath, cn)
  185. },
  186. }
  187. tracker := rm.progressManager.GetTracker(logGroupPath, progressConfig)
  188. // Add all files to tracker
  189. for _, file := range files {
  190. tracker.AddFile(file.Path, file.IsCompressed)
  191. if file.EstimatedLines > 0 {
  192. tracker.SetFileEstimate(file.Path, file.EstimatedLines)
  193. }
  194. if file.Size > 0 {
  195. tracker.SetFileSize(file.Path, file.Size)
  196. }
  197. }
  198. // Process each file with smart change detection
  199. for _, file := range files {
  200. // Check context
  201. if ctx.Err() != nil {
  202. tracker.FailFile(file.Path, ctx.Err().Error())
  203. return ctx.Err()
  204. }
  205. // Skip unchanged files (especially compressed archives)
  206. shouldProcess, skipReason := rm.shouldProcessFile(file)
  207. if !shouldProcess {
  208. logger.Infof("Skipping file %s: %s", file.Path, skipReason)
  209. // Mark as completed without processing
  210. tracker.CompleteFile(file.Path, 0)
  211. continue
  212. }
  213. // Create file-specific context with timeout
  214. fileCtx, cancel := context.WithTimeout(ctx, rm.config.TimeoutPerFile)
  215. // Start processing
  216. tracker.StartFile(file.Path)
  217. // Index the file
  218. err := rm.indexFile(fileCtx, file, tracker)
  219. cancel()
  220. if err != nil {
  221. tracker.FailFile(file.Path, err.Error())
  222. return fmt.Errorf("failed to index file %s: %w", file.Path, err)
  223. }
  224. // Mark as completed
  225. tracker.CompleteFile(file.Path, file.ProcessedLines)
  226. // Update persistence with exact doc count from Bleve
  227. if rm.persistence != nil {
  228. exactCount := file.DocumentCount
  229. if rm.indexer != nil && rm.indexer.IsHealthy() {
  230. if c, err := rm.indexer.CountDocsByFilePath(file.Path); err == nil {
  231. exactCount = c
  232. } else {
  233. logger.Warnf("Falling back to computed count for %s due to count error: %v", file.Path, err)
  234. }
  235. }
  236. if err := rm.persistence.MarkFileAsIndexed(file.Path, exactCount, file.LastPosition); err != nil {
  237. // Log but don't fail
  238. // logger.Warnf("Failed to update persistence for %s: %v", file.Path, err)
  239. }
  240. }
  241. }
  242. return nil
  243. }
  244. // shouldProcessFile determines if a file needs to be processed based on change detection
  245. func (rm *RebuildManager) shouldProcessFile(file *LogGroupFile) (bool, string) {
  246. // Get file information
  247. fileInfo, err := os.Stat(file.Path)
  248. if err != nil {
  249. return true, fmt.Sprintf("cannot stat file (will process): %v", err)
  250. }
  251. // For compressed files (.gz), check if we've already processed them and they haven't changed
  252. if file.IsCompressed {
  253. // Check if we have persistence information for this file
  254. if rm.persistence != nil {
  255. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  256. // Check if file hasn't changed since last indexing
  257. currentModTime := fileInfo.ModTime().Unix()
  258. currentSize := fileInfo.Size()
  259. if info.LastModified == currentModTime &&
  260. info.LastSize == currentSize &&
  261. info.LastPosition == currentSize {
  262. return false, "compressed file already fully indexed and unchanged"
  263. }
  264. }
  265. }
  266. }
  267. // For active log files (non-compressed), always process but may resume from checkpoint
  268. if !file.IsCompressed {
  269. // Check if file has grown or changed
  270. if rm.persistence != nil {
  271. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  272. currentModTime := fileInfo.ModTime().Unix()
  273. currentSize := fileInfo.Size()
  274. // File hasn't changed at all
  275. if info.LastModified == currentModTime &&
  276. info.LastSize == currentSize &&
  277. info.LastPosition == currentSize {
  278. return false, "active file unchanged since last indexing"
  279. }
  280. // File has shrunk (possible log rotation)
  281. if currentSize < info.LastSize {
  282. return true, "active file appears to have been rotated (size decreased)"
  283. }
  284. // File has grown or been modified
  285. if currentSize > info.LastSize || currentModTime > info.LastModified {
  286. return true, "active file has new content"
  287. }
  288. }
  289. }
  290. // No persistence info available, process the file
  291. return true, "no previous indexing record found for active file"
  292. }
  293. // Default: process compressed files if no persistence info
  294. return true, "no previous indexing record found for compressed file"
  295. }
  296. // LogGroupFile represents a file in a log group
  297. type LogGroupFile struct {
  298. Path string
  299. Size int64
  300. ModTime int64 // Unix timestamp of file modification time
  301. IsCompressed bool
  302. EstimatedLines int64
  303. ProcessedLines int64
  304. DocumentCount uint64
  305. LastPosition int64
  306. }
  307. // discoverLogGroupFiles discovers all files for a log group
  308. func (rm *RebuildManager) discoverLogGroupFiles(logGroupPath string) ([]*LogGroupFile, error) {
  309. dir := filepath.Dir(logGroupPath)
  310. // Remove any rotation suffixes to get the base name
  311. mainPath := getMainLogPathFromFile(logGroupPath)
  312. files := make([]*LogGroupFile, 0)
  313. // Walk the directory to find related files
  314. err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
  315. if err != nil {
  316. return nil // Skip files we can't access
  317. }
  318. // Skip directories
  319. if info.IsDir() {
  320. return nil
  321. }
  322. // Check if this file belongs to the log group
  323. if getMainLogPathFromFile(path) == mainPath {
  324. file := &LogGroupFile{
  325. Path: path,
  326. Size: info.Size(),
  327. ModTime: info.ModTime().Unix(),
  328. IsCompressed: IsCompressedFile(path),
  329. }
  330. // Estimate lines
  331. ctx := context.Background()
  332. if lines, err := EstimateFileLines(ctx, path, info.Size(), file.IsCompressed); err == nil {
  333. file.EstimatedLines = lines
  334. }
  335. files = append(files, file)
  336. }
  337. return nil
  338. })
  339. if err != nil {
  340. return nil, err
  341. }
  342. return files, nil
  343. }
  344. // indexFile indexes a single file with checkpoint/resume support
  345. func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tracker *ProgressTracker) error {
  346. // Create a batch writer
  347. batch := NewBatchWriter(rm.indexer, rm.config.BatchSize)
  348. defer batch.Flush()
  349. // Get checkpoint information from persistence layer
  350. var startPosition int64 = 0
  351. var resuming bool = false
  352. if rm.persistence != nil {
  353. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  354. // Get current file modification time
  355. fileInfo, err := os.Stat(file.Path)
  356. if err != nil {
  357. return fmt.Errorf("failed to stat file %s: %w", file.Path, err)
  358. }
  359. currentModTime := fileInfo.ModTime().Unix()
  360. currentSize := fileInfo.Size()
  361. // Check if file hasn't changed since last indexing
  362. if info.LastIndexed > 0 &&
  363. info.LastModified == currentModTime &&
  364. info.LastSize == currentSize &&
  365. info.LastPosition == currentSize {
  366. // File hasn't changed and was fully indexed
  367. logger.Infof("Skipping indexing for unchanged file %s (last indexed: %v)",
  368. file.Path, time.Unix(info.LastIndexed, 0))
  369. file.ProcessedLines = 0 // No new lines processed
  370. file.DocumentCount = 0 // No new documents added
  371. file.LastPosition = currentSize
  372. return nil
  373. }
  374. // Check if we should resume from a previous position
  375. if info.LastPosition > 0 && info.LastPosition < currentSize {
  376. // File has grown since last indexing
  377. startPosition = info.LastPosition
  378. resuming = true
  379. logger.Infof("Resuming indexing from position %d for file %s (file size: %d -> %d)",
  380. startPosition, file.Path, info.LastSize, currentSize)
  381. } else if currentSize < info.LastSize {
  382. // File has been truncated or rotated, start from beginning
  383. startPosition = 0
  384. logger.Infof("File %s has been truncated/rotated (size: %d -> %d), reindexing from start",
  385. file.Path, info.LastSize, currentSize)
  386. } else if info.LastPosition >= currentSize && currentSize > 0 {
  387. // File size hasn't changed and we've already processed it completely
  388. if info.LastModified == currentModTime {
  389. logger.Infof("File %s already fully indexed and unchanged, skipping", file.Path)
  390. file.ProcessedLines = 0
  391. file.DocumentCount = 0
  392. file.LastPosition = currentSize
  393. return nil
  394. }
  395. // File has same size but different modification time, reindex from start
  396. startPosition = 0
  397. logger.Infof("File %s has same size but different mod time, reindexing from start", file.Path)
  398. }
  399. }
  400. }
  401. // Open file with resume support
  402. reader, err := rm.openFileFromPosition(file.Path, startPosition)
  403. if err != nil {
  404. return fmt.Errorf("failed to open file %s from position %d: %w", file.Path, startPosition, err)
  405. }
  406. defer reader.Close()
  407. // Process file line by line with checkpointing
  408. var processedLines int64 = 0
  409. var currentPosition int64 = startPosition
  410. var documentCount uint64 = 0
  411. checkpointInterval := int64(1000) // Save checkpoint every 1000 lines
  412. scanner := bufio.NewScanner(reader)
  413. for scanner.Scan() {
  414. // Check context for cancellation
  415. select {
  416. case <-ctx.Done():
  417. return ctx.Err()
  418. default:
  419. }
  420. line := scanner.Text()
  421. currentPosition += int64(len(line)) + 1 // +1 for newline
  422. // Process the log line (parse and add to batch)
  423. // This would typically involve:
  424. // 1. Parse log entry using parser
  425. // 2. Create search document
  426. // 3. Add to batch
  427. processedLines++
  428. documentCount++
  429. // Update progress
  430. tracker.UpdateFileProgress(file.Path, processedLines)
  431. // Periodic checkpoint saving
  432. if processedLines%checkpointInterval == 0 {
  433. if rm.persistence != nil {
  434. // Get current file modification time for checkpoint
  435. fileInfo, err := os.Stat(file.Path)
  436. var modTime int64
  437. if err == nil {
  438. modTime = fileInfo.ModTime().Unix()
  439. } else {
  440. modTime = time.Now().Unix()
  441. }
  442. info := &LogFileInfo{
  443. Path: file.Path,
  444. LastPosition: currentPosition,
  445. LastIndexed: time.Now().Unix(),
  446. LastModified: modTime,
  447. LastSize: file.Size,
  448. }
  449. if err := rm.persistence.UpdateIncrementalInfo(file.Path, info); err != nil {
  450. logger.Warnf("Failed to save checkpoint for %s: %v", file.Path, err)
  451. }
  452. }
  453. }
  454. }
  455. if err := scanner.Err(); err != nil {
  456. return fmt.Errorf("error reading file %s: %w", file.Path, err)
  457. }
  458. // Update file statistics
  459. file.ProcessedLines = processedLines
  460. file.DocumentCount = documentCount
  461. file.LastPosition = currentPosition
  462. // Save final checkpoint
  463. if rm.persistence != nil {
  464. // Get current file info for accurate metadata
  465. fileInfo, err := os.Stat(file.Path)
  466. var modTime int64
  467. if err == nil {
  468. modTime = fileInfo.ModTime().Unix()
  469. } else {
  470. modTime = time.Now().Unix()
  471. }
  472. info := &LogFileInfo{
  473. Path: file.Path,
  474. LastPosition: currentPosition,
  475. LastIndexed: time.Now().Unix(),
  476. LastModified: modTime,
  477. LastSize: file.Size,
  478. }
  479. if err := rm.persistence.UpdateIncrementalInfo(file.Path, info); err != nil {
  480. logger.Warnf("Failed to save final checkpoint for %s: %v", file.Path, err)
  481. }
  482. }
  483. if resuming {
  484. logger.Infof("Completed resumed indexing for %s: %d lines, %d documents",
  485. file.Path, processedLines, documentCount)
  486. }
  487. return nil
  488. }
  489. // openFileFromPosition opens a file and seeks to the specified position
  490. // Handles both compressed (.gz) and regular files
  491. func (rm *RebuildManager) openFileFromPosition(filePath string, startPosition int64) (io.ReadCloser, error) {
  492. file, err := os.Open(filePath)
  493. if err != nil {
  494. return nil, err
  495. }
  496. // Check if file is compressed
  497. isGzipped := strings.HasSuffix(filePath, ".gz")
  498. if isGzipped {
  499. // For gzip files, we need to read from the beginning and skip to position
  500. // This is because gzip doesn't support random seeking
  501. gzReader, err := gzip.NewReader(file)
  502. if err != nil {
  503. file.Close()
  504. return nil, fmt.Errorf("failed to create gzip reader: %w", err)
  505. }
  506. if startPosition > 0 {
  507. // Skip to the start position by reading and discarding bytes
  508. _, err := io.CopyN(io.Discard, gzReader, startPosition)
  509. if err != nil && err != io.EOF {
  510. gzReader.Close()
  511. file.Close()
  512. return nil, fmt.Errorf("failed to seek to position %d in gzip file: %w", startPosition, err)
  513. }
  514. }
  515. // Return a wrapped reader that closes both gzReader and file
  516. return &gzipReaderCloser{gzReader: gzReader, file: file}, nil
  517. } else {
  518. // For regular files, seek directly
  519. if startPosition > 0 {
  520. _, err := file.Seek(startPosition, io.SeekStart)
  521. if err != nil {
  522. file.Close()
  523. return nil, fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
  524. }
  525. }
  526. return file, nil
  527. }
  528. }
  529. // gzipReaderCloser wraps gzip.Reader to close both the gzip reader and underlying file
  530. type gzipReaderCloser struct {
  531. gzReader *gzip.Reader
  532. file *os.File
  533. }
  534. func (g *gzipReaderCloser) Read(p []byte) (n int, err error) {
  535. return g.gzReader.Read(p)
  536. }
  537. func (g *gzipReaderCloser) Close() error {
  538. if err := g.gzReader.Close(); err != nil {
  539. g.file.Close() // Still close file even if gzip reader fails
  540. return err
  541. }
  542. return g.file.Close()
  543. }
  544. // getAllLogGroups returns all unique log groups
  545. func (rm *RebuildManager) getAllLogGroups() ([]string, error) {
  546. if rm.persistence == nil {
  547. return []string{}, nil
  548. }
  549. indexes, err := rm.persistence.GetAllLogIndexes()
  550. if err != nil {
  551. return nil, err
  552. }
  553. // Use map to get unique main log paths
  554. groups := make(map[string]struct{})
  555. for _, idx := range indexes {
  556. groups[idx.MainLogPath] = struct{}{}
  557. }
  558. // Convert to slice
  559. result := make([]string, 0, len(groups))
  560. for group := range groups {
  561. result = append(result, group)
  562. }
  563. return result, nil
  564. }
  565. // deleteAllIndexes deletes all existing indexes
  566. func (rm *RebuildManager) deleteAllIndexes() error {
  567. // Get all shards
  568. shards := rm.shardManager.GetAllShards()
  569. // Delete each shard
  570. for i, shard := range shards {
  571. if shard != nil {
  572. if err := shard.Close(); err != nil {
  573. return fmt.Errorf("failed to close shard %d: %w", i, err)
  574. }
  575. }
  576. }
  577. // Recreate shards
  578. // This would typically be done by recreating the shard manager
  579. // For now, return nil as placeholder
  580. return nil
  581. }
  582. // deleteLogGroupIndex deletes index for a specific log group
  583. func (rm *RebuildManager) deleteLogGroupIndex(logGroupPath string) error {
  584. // In a real implementation, this would:
  585. // 1. Find all documents for this log group
  586. // 2. Delete them from the appropriate shards
  587. // For now, return nil as placeholder
  588. return nil
  589. }
  590. // resetAllPersistenceRecords resets all persistence records
  591. func (rm *RebuildManager) resetAllPersistenceRecords() error {
  592. if rm.persistence == nil {
  593. return nil
  594. }
  595. indexes, err := rm.persistence.GetAllLogIndexes()
  596. if err != nil {
  597. return err
  598. }
  599. for _, idx := range indexes {
  600. idx.Reset()
  601. if err := rm.persistence.SaveLogIndex(idx); err != nil {
  602. return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
  603. }
  604. }
  605. return nil
  606. }
  607. // resetLogGroupPersistence resets persistence for a log group
  608. func (rm *RebuildManager) resetLogGroupPersistence(logGroupPath string) error {
  609. if rm.persistence == nil {
  610. return nil
  611. }
  612. indexes, err := rm.persistence.GetLogGroupIndexes(logGroupPath)
  613. if err != nil {
  614. return err
  615. }
  616. for _, idx := range indexes {
  617. idx.Reset()
  618. if err := rm.persistence.SaveLogIndex(idx); err != nil {
  619. return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
  620. }
  621. }
  622. return nil
  623. }
  624. // RebuildProgress tracks rebuild progress
  625. type RebuildProgress struct {
  626. TotalGroups int
  627. CompletedGroups int
  628. StartTime time.Time
  629. CompletedTime time.Time
  630. Duration time.Duration
  631. CurrentGroup string
  632. CurrentFile string
  633. Errors []error
  634. }
  635. // notification methods
  636. func (rm *RebuildManager) notifyRebuildProgress(progress *RebuildProgress) {
  637. // Emit progress event
  638. // This would typically publish to an event bus
  639. }
  640. func (rm *RebuildManager) notifyRebuildComplete(progress *RebuildProgress) {
  641. // Emit completion event
  642. }
  643. func (rm *RebuildManager) notifySingleRebuildComplete(logGroupPath string, duration time.Duration) {
  644. // Emit single rebuild completion event
  645. }
  646. func (rm *RebuildManager) handleProgressNotification(logGroupPath string, pn ProgressNotification) {
  647. // Handle progress notification from tracker
  648. }
  649. func (rm *RebuildManager) handleCompletionNotification(logGroupPath string, cn CompletionNotification) {
  650. // Handle completion notification from tracker
  651. }
  652. // IsRebuilding returns true if rebuild is in progress
  653. func (rm *RebuildManager) IsRebuilding() bool {
  654. return atomic.LoadInt32(&rm.rebuilding) == 1
  655. }
  656. // GetLastRebuildTime returns the time of the last rebuild
  657. func (rm *RebuildManager) GetLastRebuildTime() time.Time {
  658. rm.mu.RLock()
  659. defer rm.mu.RUnlock()
  660. return rm.lastRebuildTime
  661. }
  662. // RebuildStats GetRebuildStats returns statistics about rebuild operations
  663. type RebuildStats struct {
  664. IsRebuilding bool `json:"is_rebuilding"`
  665. LastRebuildTime time.Time `json:"last_rebuild_time"`
  666. Config *RebuildConfig `json:"config"`
  667. }
  668. func (rm *RebuildManager) GetRebuildStats() *RebuildStats {
  669. rm.mu.RLock()
  670. defer rm.mu.RUnlock()
  671. return &RebuildStats{
  672. IsRebuilding: rm.IsRebuilding(),
  673. LastRebuildTime: rm.lastRebuildTime,
  674. Config: rm.config,
  675. }
  676. }