1
0

rebuild.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. package indexer
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/uozi-tech/cosy/logger"
  15. )
  16. // RebuildManager handles index rebuilding operations
  17. type RebuildManager struct {
  18. indexer *ParallelIndexer
  19. persistence *PersistenceManager
  20. progressManager *ProgressManager
  21. shardManager ShardManager
  22. config *RebuildConfig
  23. rebuilding int32 // atomic flag
  24. lastRebuildTime time.Time
  25. mu sync.RWMutex
  26. }
  27. // RebuildConfig contains configuration for rebuild operations
  28. type RebuildConfig struct {
  29. BatchSize int `json:"batch_size"`
  30. MaxConcurrency int `json:"max_concurrency"`
  31. DeleteBeforeRebuild bool `json:"delete_before_rebuild"`
  32. ProgressInterval time.Duration `json:"progress_interval"`
  33. TimeoutPerFile time.Duration `json:"timeout_per_file"`
  34. }
  35. // DefaultRebuildConfig returns default rebuild configuration
  36. func DefaultRebuildConfig() *RebuildConfig {
  37. return &RebuildConfig{
  38. BatchSize: 1000,
  39. MaxConcurrency: 4,
  40. DeleteBeforeRebuild: true,
  41. ProgressInterval: 5 * time.Second,
  42. TimeoutPerFile: 30 * time.Minute,
  43. }
  44. }
  45. // NewRebuildManager creates a new rebuild manager
  46. func NewRebuildManager(indexer *ParallelIndexer, persistence *PersistenceManager, progressManager *ProgressManager, shardManager ShardManager, config *RebuildConfig) *RebuildManager {
  47. if config == nil {
  48. config = DefaultRebuildConfig()
  49. }
  50. return &RebuildManager{
  51. indexer: indexer,
  52. persistence: persistence,
  53. progressManager: progressManager,
  54. shardManager: shardManager,
  55. config: config,
  56. }
  57. }
  58. // RebuildAll rebuilds all indexes from scratch
  59. func (rm *RebuildManager) RebuildAll(ctx context.Context) error {
  60. // Check if already rebuilding
  61. if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
  62. return fmt.Errorf("rebuild already in progress")
  63. }
  64. defer atomic.StoreInt32(&rm.rebuilding, 0)
  65. startTime := time.Now()
  66. rm.mu.Lock()
  67. rm.lastRebuildTime = startTime
  68. rm.mu.Unlock()
  69. // Get all log groups to rebuild
  70. logGroups, err := rm.getAllLogGroups()
  71. if err != nil {
  72. return fmt.Errorf("failed to get log groups: %w", err)
  73. }
  74. if len(logGroups) == 0 {
  75. return fmt.Errorf("no log groups found to rebuild")
  76. }
  77. // Delete existing indexes if configured
  78. if rm.config.DeleteBeforeRebuild {
  79. if err := rm.deleteAllIndexes(); err != nil {
  80. return fmt.Errorf("failed to delete existing indexes: %w", err)
  81. }
  82. }
  83. // Reset persistence records
  84. if rm.persistence != nil {
  85. if err := rm.resetAllPersistenceRecords(); err != nil {
  86. return fmt.Errorf("failed to reset persistence records: %w", err)
  87. }
  88. }
  89. // Create progress tracker for overall rebuild
  90. rebuildProgress := &RebuildProgress{
  91. TotalGroups: len(logGroups),
  92. CompletedGroups: 0,
  93. StartTime: startTime,
  94. }
  95. // Process each log group
  96. errors := make([]error, 0)
  97. var wg sync.WaitGroup
  98. semaphore := make(chan struct{}, rm.config.MaxConcurrency)
  99. for _, logGroup := range logGroups {
  100. wg.Add(1)
  101. go func(group string) {
  102. defer wg.Done()
  103. // Acquire semaphore
  104. semaphore <- struct{}{}
  105. defer func() { <-semaphore }()
  106. // Check context
  107. if ctx.Err() != nil {
  108. return
  109. }
  110. // Rebuild this log group
  111. if err := rm.rebuildLogGroup(ctx, group); err != nil {
  112. rm.mu.Lock()
  113. errors = append(errors, fmt.Errorf("failed to rebuild group %s: %w", group, err))
  114. rm.mu.Unlock()
  115. } else {
  116. // Update progress
  117. rm.mu.Lock()
  118. rebuildProgress.CompletedGroups++
  119. rm.mu.Unlock()
  120. // Notify progress
  121. rm.notifyRebuildProgress(rebuildProgress)
  122. }
  123. }(logGroup)
  124. }
  125. // Wait for all groups to complete
  126. wg.Wait()
  127. // Check for errors
  128. if len(errors) > 0 {
  129. return fmt.Errorf("rebuild completed with %d errors: %v", len(errors), errors)
  130. }
  131. rebuildProgress.CompletedTime = time.Now()
  132. rebuildProgress.Duration = time.Since(startTime)
  133. // Notify completion
  134. rm.notifyRebuildComplete(rebuildProgress)
  135. return nil
  136. }
  137. // RebuildSingle rebuilds index for a single log group
  138. func (rm *RebuildManager) RebuildSingle(ctx context.Context, logGroupPath string) error {
  139. // Check if already rebuilding
  140. if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
  141. return fmt.Errorf("rebuild already in progress")
  142. }
  143. defer atomic.StoreInt32(&rm.rebuilding, 0)
  144. startTime := time.Now()
  145. // Delete existing index for this log group if configured
  146. if rm.config.DeleteBeforeRebuild {
  147. if err := rm.deleteLogGroupIndex(logGroupPath); err != nil {
  148. return fmt.Errorf("failed to delete existing index: %w", err)
  149. }
  150. }
  151. // Reset persistence records for this group
  152. if rm.persistence != nil {
  153. if err := rm.resetLogGroupPersistence(logGroupPath); err != nil {
  154. return fmt.Errorf("failed to reset persistence: %w", err)
  155. }
  156. }
  157. // Rebuild the log group
  158. if err := rm.rebuildLogGroup(ctx, logGroupPath); err != nil {
  159. return fmt.Errorf("failed to rebuild log group: %w", err)
  160. }
  161. duration := time.Since(startTime)
  162. // Notify completion
  163. rm.notifySingleRebuildComplete(logGroupPath, duration)
  164. return nil
  165. }
  166. // rebuildLogGroup rebuilds index for a single log group
  167. func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath string) error {
  168. // Get all files for this log group
  169. files, err := rm.discoverLogGroupFiles(logGroupPath)
  170. if err != nil {
  171. return fmt.Errorf("failed to discover files: %w", err)
  172. }
  173. if len(files) == 0 {
  174. return fmt.Errorf("no files found for log group %s", logGroupPath)
  175. }
  176. // Create progress tracker for this log group
  177. progressConfig := &ProgressConfig{
  178. OnProgress: func(pn ProgressNotification) {
  179. // Handle progress notifications
  180. rm.handleProgressNotification(logGroupPath, pn)
  181. },
  182. OnCompletion: func(cn CompletionNotification) {
  183. // Handle completion notifications
  184. rm.handleCompletionNotification(logGroupPath, cn)
  185. },
  186. }
  187. tracker := rm.progressManager.GetTracker(logGroupPath, progressConfig)
  188. // Add all files to tracker
  189. for _, file := range files {
  190. tracker.AddFile(file.Path, file.IsCompressed)
  191. if file.EstimatedLines > 0 {
  192. tracker.SetFileEstimate(file.Path, file.EstimatedLines)
  193. }
  194. if file.Size > 0 {
  195. tracker.SetFileSize(file.Path, file.Size)
  196. }
  197. }
  198. // Process each file with smart change detection
  199. for _, file := range files {
  200. // Check context
  201. if ctx.Err() != nil {
  202. tracker.FailFile(file.Path, ctx.Err().Error())
  203. return ctx.Err()
  204. }
  205. // Skip unchanged files (especially compressed archives)
  206. shouldProcess, skipReason := rm.shouldProcessFile(file)
  207. if !shouldProcess {
  208. logger.Infof("Skipping file %s: %s", file.Path, skipReason)
  209. // Mark as completed without processing
  210. tracker.CompleteFile(file.Path, 0)
  211. continue
  212. }
  213. // Create file-specific context with timeout
  214. fileCtx, cancel := context.WithTimeout(ctx, rm.config.TimeoutPerFile)
  215. // Start processing
  216. tracker.StartFile(file.Path)
  217. // Index the file
  218. err := rm.indexFile(fileCtx, file, tracker)
  219. cancel()
  220. if err != nil {
  221. tracker.FailFile(file.Path, err.Error())
  222. return fmt.Errorf("failed to index file %s: %w", file.Path, err)
  223. }
  224. // Mark as completed
  225. tracker.CompleteFile(file.Path, file.ProcessedLines)
  226. // Update persistence
  227. if rm.persistence != nil {
  228. if err := rm.persistence.MarkFileAsIndexed(file.Path, file.DocumentCount, file.LastPosition); err != nil {
  229. // Log but don't fail
  230. // logger.Warnf("Failed to update persistence for %s: %v", file.Path, err)
  231. }
  232. }
  233. }
  234. return nil
  235. }
  236. // shouldProcessFile determines if a file needs to be processed based on change detection
  237. func (rm *RebuildManager) shouldProcessFile(file *LogGroupFile) (bool, string) {
  238. // Get file information
  239. fileInfo, err := os.Stat(file.Path)
  240. if err != nil {
  241. return true, fmt.Sprintf("cannot stat file (will process): %v", err)
  242. }
  243. // For compressed files (.gz), check if we've already processed them and they haven't changed
  244. if file.IsCompressed {
  245. // Check if we have persistence information for this file
  246. if rm.persistence != nil {
  247. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  248. // Check if file hasn't changed since last indexing
  249. currentModTime := fileInfo.ModTime().Unix()
  250. currentSize := fileInfo.Size()
  251. if info.LastModified == currentModTime &&
  252. info.LastSize == currentSize &&
  253. info.LastPosition == currentSize {
  254. return false, "compressed file already fully indexed and unchanged"
  255. }
  256. }
  257. }
  258. }
  259. // For active log files (non-compressed), always process but may resume from checkpoint
  260. if !file.IsCompressed {
  261. // Check if file has grown or changed
  262. if rm.persistence != nil {
  263. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  264. currentModTime := fileInfo.ModTime().Unix()
  265. currentSize := fileInfo.Size()
  266. // File hasn't changed at all
  267. if info.LastModified == currentModTime &&
  268. info.LastSize == currentSize &&
  269. info.LastPosition == currentSize {
  270. return false, "active file unchanged since last indexing"
  271. }
  272. // File has shrunk (possible log rotation)
  273. if currentSize < info.LastSize {
  274. return true, "active file appears to have been rotated (size decreased)"
  275. }
  276. // File has grown or been modified
  277. if currentSize > info.LastSize || currentModTime > info.LastModified {
  278. return true, "active file has new content"
  279. }
  280. }
  281. }
  282. // No persistence info available, process the file
  283. return true, "no previous indexing record found for active file"
  284. }
  285. // Default: process compressed files if no persistence info
  286. return true, "no previous indexing record found for compressed file"
  287. }
  288. // LogGroupFile represents a file in a log group
  289. type LogGroupFile struct {
  290. Path string
  291. Size int64
  292. ModTime int64 // Unix timestamp of file modification time
  293. IsCompressed bool
  294. EstimatedLines int64
  295. ProcessedLines int64
  296. DocumentCount uint64
  297. LastPosition int64
  298. }
  299. // discoverLogGroupFiles discovers all files for a log group
  300. func (rm *RebuildManager) discoverLogGroupFiles(logGroupPath string) ([]*LogGroupFile, error) {
  301. dir := filepath.Dir(logGroupPath)
  302. // Remove any rotation suffixes to get the base name
  303. mainPath := getMainLogPathFromFile(logGroupPath)
  304. files := make([]*LogGroupFile, 0)
  305. // Walk the directory to find related files
  306. err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
  307. if err != nil {
  308. return nil // Skip files we can't access
  309. }
  310. // Skip directories
  311. if info.IsDir() {
  312. return nil
  313. }
  314. // Check if this file belongs to the log group
  315. if getMainLogPathFromFile(path) == mainPath {
  316. file := &LogGroupFile{
  317. Path: path,
  318. Size: info.Size(),
  319. ModTime: info.ModTime().Unix(),
  320. IsCompressed: IsCompressedFile(path),
  321. }
  322. // Estimate lines
  323. ctx := context.Background()
  324. if lines, err := EstimateFileLines(ctx, path, info.Size(), file.IsCompressed); err == nil {
  325. file.EstimatedLines = lines
  326. }
  327. files = append(files, file)
  328. }
  329. return nil
  330. })
  331. if err != nil {
  332. return nil, err
  333. }
  334. return files, nil
  335. }
  336. // indexFile indexes a single file with checkpoint/resume support
  337. func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tracker *ProgressTracker) error {
  338. // Create a batch writer
  339. batch := NewBatchWriter(rm.indexer, rm.config.BatchSize)
  340. defer batch.Flush()
  341. // Get checkpoint information from persistence layer
  342. var startPosition int64 = 0
  343. var resuming bool = false
  344. if rm.persistence != nil {
  345. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  346. // Get current file modification time
  347. fileInfo, err := os.Stat(file.Path)
  348. if err != nil {
  349. return fmt.Errorf("failed to stat file %s: %w", file.Path, err)
  350. }
  351. currentModTime := fileInfo.ModTime().Unix()
  352. currentSize := fileInfo.Size()
  353. // Check if file hasn't changed since last indexing
  354. if info.LastIndexed > 0 &&
  355. info.LastModified == currentModTime &&
  356. info.LastSize == currentSize &&
  357. info.LastPosition == currentSize {
  358. // File hasn't changed and was fully indexed
  359. logger.Infof("Skipping indexing for unchanged file %s (last indexed: %v)",
  360. file.Path, time.Unix(info.LastIndexed, 0))
  361. file.ProcessedLines = 0 // No new lines processed
  362. file.DocumentCount = 0 // No new documents added
  363. file.LastPosition = currentSize
  364. return nil
  365. }
  366. // Check if we should resume from a previous position
  367. if info.LastPosition > 0 && info.LastPosition < currentSize {
  368. // File has grown since last indexing
  369. startPosition = info.LastPosition
  370. resuming = true
  371. logger.Infof("Resuming indexing from position %d for file %s (file size: %d -> %d)",
  372. startPosition, file.Path, info.LastSize, currentSize)
  373. } else if currentSize < info.LastSize {
  374. // File has been truncated or rotated, start from beginning
  375. startPosition = 0
  376. logger.Infof("File %s has been truncated/rotated (size: %d -> %d), reindexing from start",
  377. file.Path, info.LastSize, currentSize)
  378. } else if info.LastPosition >= currentSize && currentSize > 0 {
  379. // File size hasn't changed and we've already processed it completely
  380. if info.LastModified == currentModTime {
  381. logger.Infof("File %s already fully indexed and unchanged, skipping", file.Path)
  382. file.ProcessedLines = 0
  383. file.DocumentCount = 0
  384. file.LastPosition = currentSize
  385. return nil
  386. }
  387. // File has same size but different modification time, reindex from start
  388. startPosition = 0
  389. logger.Infof("File %s has same size but different mod time, reindexing from start", file.Path)
  390. }
  391. }
  392. }
  393. // Open file with resume support
  394. reader, err := rm.openFileFromPosition(file.Path, startPosition)
  395. if err != nil {
  396. return fmt.Errorf("failed to open file %s from position %d: %w", file.Path, startPosition, err)
  397. }
  398. defer reader.Close()
  399. // Process file line by line with checkpointing
  400. var processedLines int64 = 0
  401. var currentPosition int64 = startPosition
  402. var documentCount uint64 = 0
  403. checkpointInterval := int64(1000) // Save checkpoint every 1000 lines
  404. scanner := bufio.NewScanner(reader)
  405. for scanner.Scan() {
  406. // Check context for cancellation
  407. select {
  408. case <-ctx.Done():
  409. return ctx.Err()
  410. default:
  411. }
  412. line := scanner.Text()
  413. currentPosition += int64(len(line)) + 1 // +1 for newline
  414. // Process the log line (parse and add to batch)
  415. // This would typically involve:
  416. // 1. Parse log entry using parser
  417. // 2. Create search document
  418. // 3. Add to batch
  419. processedLines++
  420. documentCount++
  421. // Update progress
  422. tracker.UpdateFileProgress(file.Path, processedLines)
  423. // Periodic checkpoint saving
  424. if processedLines%checkpointInterval == 0 {
  425. if rm.persistence != nil {
  426. // Get current file modification time for checkpoint
  427. fileInfo, err := os.Stat(file.Path)
  428. var modTime int64
  429. if err == nil {
  430. modTime = fileInfo.ModTime().Unix()
  431. } else {
  432. modTime = time.Now().Unix()
  433. }
  434. info := &LogFileInfo{
  435. Path: file.Path,
  436. LastPosition: currentPosition,
  437. LastIndexed: time.Now().Unix(),
  438. LastModified: modTime,
  439. LastSize: file.Size,
  440. }
  441. if err := rm.persistence.UpdateIncrementalInfo(file.Path, info); err != nil {
  442. logger.Warnf("Failed to save checkpoint for %s: %v", file.Path, err)
  443. }
  444. }
  445. }
  446. }
  447. if err := scanner.Err(); err != nil {
  448. return fmt.Errorf("error reading file %s: %w", file.Path, err)
  449. }
  450. // Update file statistics
  451. file.ProcessedLines = processedLines
  452. file.DocumentCount = documentCount
  453. file.LastPosition = currentPosition
  454. // Save final checkpoint
  455. if rm.persistence != nil {
  456. // Get current file info for accurate metadata
  457. fileInfo, err := os.Stat(file.Path)
  458. var modTime int64
  459. if err == nil {
  460. modTime = fileInfo.ModTime().Unix()
  461. } else {
  462. modTime = time.Now().Unix()
  463. }
  464. info := &LogFileInfo{
  465. Path: file.Path,
  466. LastPosition: currentPosition,
  467. LastIndexed: time.Now().Unix(),
  468. LastModified: modTime,
  469. LastSize: file.Size,
  470. }
  471. if err := rm.persistence.UpdateIncrementalInfo(file.Path, info); err != nil {
  472. logger.Warnf("Failed to save final checkpoint for %s: %v", file.Path, err)
  473. }
  474. }
  475. if resuming {
  476. logger.Infof("Completed resumed indexing for %s: %d lines, %d documents",
  477. file.Path, processedLines, documentCount)
  478. }
  479. return nil
  480. }
  481. // openFileFromPosition opens a file and seeks to the specified position
  482. // Handles both compressed (.gz) and regular files
  483. func (rm *RebuildManager) openFileFromPosition(filePath string, startPosition int64) (io.ReadCloser, error) {
  484. file, err := os.Open(filePath)
  485. if err != nil {
  486. return nil, err
  487. }
  488. // Check if file is compressed
  489. isGzipped := strings.HasSuffix(filePath, ".gz")
  490. if isGzipped {
  491. // For gzip files, we need to read from the beginning and skip to position
  492. // This is because gzip doesn't support random seeking
  493. gzReader, err := gzip.NewReader(file)
  494. if err != nil {
  495. file.Close()
  496. return nil, fmt.Errorf("failed to create gzip reader: %w", err)
  497. }
  498. if startPosition > 0 {
  499. // Skip to the start position by reading and discarding bytes
  500. _, err := io.CopyN(io.Discard, gzReader, startPosition)
  501. if err != nil && err != io.EOF {
  502. gzReader.Close()
  503. file.Close()
  504. return nil, fmt.Errorf("failed to seek to position %d in gzip file: %w", startPosition, err)
  505. }
  506. }
  507. // Return a wrapped reader that closes both gzReader and file
  508. return &gzipReaderCloser{gzReader: gzReader, file: file}, nil
  509. } else {
  510. // For regular files, seek directly
  511. if startPosition > 0 {
  512. _, err := file.Seek(startPosition, io.SeekStart)
  513. if err != nil {
  514. file.Close()
  515. return nil, fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
  516. }
  517. }
  518. return file, nil
  519. }
  520. }
  521. // gzipReaderCloser wraps gzip.Reader to close both the gzip reader and underlying file
  522. type gzipReaderCloser struct {
  523. gzReader *gzip.Reader
  524. file *os.File
  525. }
  526. func (g *gzipReaderCloser) Read(p []byte) (n int, err error) {
  527. return g.gzReader.Read(p)
  528. }
  529. func (g *gzipReaderCloser) Close() error {
  530. if err := g.gzReader.Close(); err != nil {
  531. g.file.Close() // Still close file even if gzip reader fails
  532. return err
  533. }
  534. return g.file.Close()
  535. }
  536. // getAllLogGroups returns all unique log groups
  537. func (rm *RebuildManager) getAllLogGroups() ([]string, error) {
  538. if rm.persistence == nil {
  539. return []string{}, nil
  540. }
  541. indexes, err := rm.persistence.GetAllLogIndexes()
  542. if err != nil {
  543. return nil, err
  544. }
  545. // Use map to get unique main log paths
  546. groups := make(map[string]struct{})
  547. for _, idx := range indexes {
  548. groups[idx.MainLogPath] = struct{}{}
  549. }
  550. // Convert to slice
  551. result := make([]string, 0, len(groups))
  552. for group := range groups {
  553. result = append(result, group)
  554. }
  555. return result, nil
  556. }
  557. // deleteAllIndexes deletes all existing indexes
  558. func (rm *RebuildManager) deleteAllIndexes() error {
  559. // Get all shards
  560. shards := rm.shardManager.GetAllShards()
  561. // Delete each shard
  562. for i, shard := range shards {
  563. if shard != nil {
  564. if err := shard.Close(); err != nil {
  565. return fmt.Errorf("failed to close shard %d: %w", i, err)
  566. }
  567. }
  568. }
  569. // Recreate shards
  570. // This would typically be done by recreating the shard manager
  571. // For now, return nil as placeholder
  572. return nil
  573. }
  574. // deleteLogGroupIndex deletes index for a specific log group
  575. func (rm *RebuildManager) deleteLogGroupIndex(logGroupPath string) error {
  576. // In a real implementation, this would:
  577. // 1. Find all documents for this log group
  578. // 2. Delete them from the appropriate shards
  579. // For now, return nil as placeholder
  580. return nil
  581. }
  582. // resetAllPersistenceRecords resets all persistence records
  583. func (rm *RebuildManager) resetAllPersistenceRecords() error {
  584. if rm.persistence == nil {
  585. return nil
  586. }
  587. indexes, err := rm.persistence.GetAllLogIndexes()
  588. if err != nil {
  589. return err
  590. }
  591. for _, idx := range indexes {
  592. idx.Reset()
  593. if err := rm.persistence.SaveLogIndex(idx); err != nil {
  594. return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
  595. }
  596. }
  597. return nil
  598. }
  599. // resetLogGroupPersistence resets persistence for a log group
  600. func (rm *RebuildManager) resetLogGroupPersistence(logGroupPath string) error {
  601. if rm.persistence == nil {
  602. return nil
  603. }
  604. indexes, err := rm.persistence.GetLogGroupIndexes(logGroupPath)
  605. if err != nil {
  606. return err
  607. }
  608. for _, idx := range indexes {
  609. idx.Reset()
  610. if err := rm.persistence.SaveLogIndex(idx); err != nil {
  611. return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
  612. }
  613. }
  614. return nil
  615. }
  616. // RebuildProgress tracks rebuild progress
  617. type RebuildProgress struct {
  618. TotalGroups int
  619. CompletedGroups int
  620. StartTime time.Time
  621. CompletedTime time.Time
  622. Duration time.Duration
  623. CurrentGroup string
  624. CurrentFile string
  625. Errors []error
  626. }
  627. // notification methods
  628. func (rm *RebuildManager) notifyRebuildProgress(progress *RebuildProgress) {
  629. // Emit progress event
  630. // This would typically publish to an event bus
  631. }
  632. func (rm *RebuildManager) notifyRebuildComplete(progress *RebuildProgress) {
  633. // Emit completion event
  634. }
  635. func (rm *RebuildManager) notifySingleRebuildComplete(logGroupPath string, duration time.Duration) {
  636. // Emit single rebuild completion event
  637. }
  638. func (rm *RebuildManager) handleProgressNotification(logGroupPath string, pn ProgressNotification) {
  639. // Handle progress notification from tracker
  640. }
  641. func (rm *RebuildManager) handleCompletionNotification(logGroupPath string, cn CompletionNotification) {
  642. // Handle completion notification from tracker
  643. }
  644. // IsRebuilding returns true if rebuild is in progress
  645. func (rm *RebuildManager) IsRebuilding() bool {
  646. return atomic.LoadInt32(&rm.rebuilding) == 1
  647. }
  648. // GetLastRebuildTime returns the time of the last rebuild
  649. func (rm *RebuildManager) GetLastRebuildTime() time.Time {
  650. rm.mu.RLock()
  651. defer rm.mu.RUnlock()
  652. return rm.lastRebuildTime
  653. }
  654. // RebuildStats GetRebuildStats returns statistics about rebuild operations
  655. type RebuildStats struct {
  656. IsRebuilding bool `json:"is_rebuilding"`
  657. LastRebuildTime time.Time `json:"last_rebuild_time"`
  658. Config *RebuildConfig `json:"config"`
  659. }
  660. func (rm *RebuildManager) GetRebuildStats() *RebuildStats {
  661. rm.mu.RLock()
  662. defer rm.mu.RUnlock()
  663. return &RebuildStats{
  664. IsRebuilding: rm.IsRebuilding(),
  665. LastRebuildTime: rm.lastRebuildTime,
  666. Config: rm.config,
  667. }
  668. }