rebuild.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. package indexer
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/uozi-tech/cosy/logger"
  15. )
  16. // RebuildManager handles index rebuilding operations
  17. type RebuildManager struct {
  18. indexer *ParallelIndexer
  19. persistence *PersistenceManager
  20. progressManager *ProgressManager
  21. shardManager ShardManager
  22. config *RebuildConfig
  23. rebuilding int32 // atomic flag
  24. lastRebuildTime time.Time
  25. mu sync.RWMutex
  26. }
  27. // RebuildConfig contains configuration for rebuild operations
  28. type RebuildConfig struct {
  29. BatchSize int `json:"batch_size"`
  30. MaxConcurrency int `json:"max_concurrency"`
  31. DeleteBeforeRebuild bool `json:"delete_before_rebuild"`
  32. ProgressInterval time.Duration `json:"progress_interval"`
  33. TimeoutPerFile time.Duration `json:"timeout_per_file"`
  34. }
  35. // DefaultRebuildConfig returns default rebuild configuration
  36. func DefaultRebuildConfig() *RebuildConfig {
  37. return &RebuildConfig{
  38. BatchSize: 1000,
  39. MaxConcurrency: 4,
  40. DeleteBeforeRebuild: true,
  41. ProgressInterval: 5 * time.Second,
  42. TimeoutPerFile: 30 * time.Minute,
  43. }
  44. }
  45. // NewRebuildManager creates a new rebuild manager
  46. func NewRebuildManager(indexer *ParallelIndexer, persistence *PersistenceManager, progressManager *ProgressManager, shardManager ShardManager, config *RebuildConfig) *RebuildManager {
  47. if config == nil {
  48. config = DefaultRebuildConfig()
  49. }
  50. return &RebuildManager{
  51. indexer: indexer,
  52. persistence: persistence,
  53. progressManager: progressManager,
  54. shardManager: shardManager,
  55. config: config,
  56. }
  57. }
  58. // RebuildAll rebuilds all indexes from scratch
  59. func (rm *RebuildManager) RebuildAll(ctx context.Context) error {
  60. // Check if already rebuilding
  61. if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
  62. return fmt.Errorf("rebuild already in progress")
  63. }
  64. defer atomic.StoreInt32(&rm.rebuilding, 0)
  65. startTime := time.Now()
  66. rm.mu.Lock()
  67. rm.lastRebuildTime = startTime
  68. rm.mu.Unlock()
  69. // Get all log groups to rebuild
  70. logGroups, err := rm.getAllLogGroups()
  71. if err != nil {
  72. return fmt.Errorf("failed to get log groups: %w", err)
  73. }
  74. if len(logGroups) == 0 {
  75. return fmt.Errorf("no log groups found to rebuild")
  76. }
  77. // Delete existing indexes if configured
  78. if rm.config.DeleteBeforeRebuild {
  79. if err := rm.deleteAllIndexes(); err != nil {
  80. return fmt.Errorf("failed to delete existing indexes: %w", err)
  81. }
  82. }
  83. // Reset persistence records
  84. if rm.persistence != nil {
  85. if err := rm.resetAllPersistenceRecords(); err != nil {
  86. return fmt.Errorf("failed to reset persistence records: %w", err)
  87. }
  88. }
  89. // Create progress tracker for overall rebuild
  90. rebuildProgress := &RebuildProgress{
  91. TotalGroups: len(logGroups),
  92. CompletedGroups: 0,
  93. StartTime: startTime,
  94. }
  95. // Process each log group
  96. errors := make([]error, 0)
  97. var wg sync.WaitGroup
  98. semaphore := make(chan struct{}, rm.config.MaxConcurrency)
  99. for _, logGroup := range logGroups {
  100. wg.Add(1)
  101. go func(group string) {
  102. defer wg.Done()
  103. // Acquire semaphore
  104. semaphore <- struct{}{}
  105. defer func() { <-semaphore }()
  106. // Check context
  107. if ctx.Err() != nil {
  108. return
  109. }
  110. // Rebuild this log group
  111. if err := rm.rebuildLogGroup(ctx, group); err != nil {
  112. rm.mu.Lock()
  113. errors = append(errors, fmt.Errorf("failed to rebuild group %s: %w", group, err))
  114. rm.mu.Unlock()
  115. } else {
  116. // Update progress
  117. rm.mu.Lock()
  118. rebuildProgress.CompletedGroups++
  119. rm.mu.Unlock()
  120. // Notify progress
  121. rm.notifyRebuildProgress(rebuildProgress)
  122. }
  123. }(logGroup)
  124. }
  125. // Wait for all groups to complete
  126. wg.Wait()
  127. // Check for errors
  128. if len(errors) > 0 {
  129. return fmt.Errorf("rebuild completed with %d errors: %v", len(errors), errors)
  130. }
  131. rebuildProgress.CompletedTime = time.Now()
  132. rebuildProgress.Duration = time.Since(startTime)
  133. // Notify completion
  134. rm.notifyRebuildComplete(rebuildProgress)
  135. return nil
  136. }
  137. // RebuildSingle rebuilds index for a single log group
  138. func (rm *RebuildManager) RebuildSingle(ctx context.Context, logGroupPath string) error {
  139. // Check if already rebuilding
  140. if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
  141. return fmt.Errorf("rebuild already in progress")
  142. }
  143. defer atomic.StoreInt32(&rm.rebuilding, 0)
  144. startTime := time.Now()
  145. // Delete existing index for this log group if configured
  146. if rm.config.DeleteBeforeRebuild {
  147. if err := rm.deleteLogGroupIndex(logGroupPath); err != nil {
  148. return fmt.Errorf("failed to delete existing index: %w", err)
  149. }
  150. }
  151. // Reset persistence records for this group
  152. if rm.persistence != nil {
  153. if err := rm.resetLogGroupPersistence(logGroupPath); err != nil {
  154. return fmt.Errorf("failed to reset persistence: %w", err)
  155. }
  156. }
  157. // Rebuild the log group
  158. if err := rm.rebuildLogGroup(ctx, logGroupPath); err != nil {
  159. return fmt.Errorf("failed to rebuild log group: %w", err)
  160. }
  161. duration := time.Since(startTime)
  162. // Notify completion
  163. rm.notifySingleRebuildComplete(logGroupPath, duration)
  164. return nil
  165. }
  166. // rebuildLogGroup rebuilds index for a single log group
  167. func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath string) error {
  168. // Get all files for this log group
  169. files, err := rm.discoverLogGroupFiles(logGroupPath)
  170. if err != nil {
  171. return fmt.Errorf("failed to discover files: %w", err)
  172. }
  173. if len(files) == 0 {
  174. return fmt.Errorf("no files found for log group %s", logGroupPath)
  175. }
  176. // Create progress tracker for this log group
  177. progressConfig := &ProgressConfig{
  178. OnProgress: func(pn ProgressNotification) {
  179. // Handle progress notifications
  180. rm.handleProgressNotification(logGroupPath, pn)
  181. },
  182. OnCompletion: func(cn CompletionNotification) {
  183. // Handle completion notifications
  184. rm.handleCompletionNotification(logGroupPath, cn)
  185. },
  186. }
  187. tracker := rm.progressManager.GetTracker(logGroupPath, progressConfig)
  188. // Add all files to tracker
  189. for _, file := range files {
  190. tracker.AddFile(file.Path, file.IsCompressed)
  191. if file.EstimatedLines > 0 {
  192. tracker.SetFileEstimate(file.Path, file.EstimatedLines)
  193. }
  194. if file.Size > 0 {
  195. tracker.SetFileSize(file.Path, file.Size)
  196. }
  197. }
  198. // Process files in parallel with controlled concurrency
  199. var fileWg sync.WaitGroup
  200. fileSemaphore := make(chan struct{}, rm.config.MaxConcurrency)
  201. var fileErrors []error
  202. var fileErrMu sync.Mutex
  203. for _, file := range files {
  204. // Check context before starting new file
  205. if ctx.Err() != nil {
  206. tracker.FailFile(file.Path, ctx.Err().Error())
  207. break
  208. }
  209. // Skip unchanged files (especially compressed archives)
  210. shouldProcess, skipReason := rm.shouldProcessFile(file)
  211. if !shouldProcess {
  212. logger.Infof("Skipping file %s: %s", file.Path, skipReason)
  213. // Mark as completed without processing
  214. tracker.CompleteFile(file.Path, 0)
  215. continue
  216. }
  217. fileWg.Add(1)
  218. go func(f *LogGroupFile) {
  219. defer fileWg.Done()
  220. // Acquire semaphore for controlled concurrency
  221. fileSemaphore <- struct{}{}
  222. defer func() { <-fileSemaphore }()
  223. // Check context again inside goroutine
  224. if ctx.Err() != nil {
  225. tracker.FailFile(f.Path, ctx.Err().Error())
  226. return
  227. }
  228. // Create file-specific context with timeout
  229. fileCtx, cancel := context.WithTimeout(ctx, rm.config.TimeoutPerFile)
  230. defer cancel()
  231. // Start processing
  232. tracker.StartFile(f.Path)
  233. // Index the file
  234. err := rm.indexFile(fileCtx, f, tracker)
  235. if err != nil {
  236. tracker.FailFile(f.Path, err.Error())
  237. fileErrMu.Lock()
  238. fileErrors = append(fileErrors, fmt.Errorf("failed to index file %s: %w", f.Path, err))
  239. fileErrMu.Unlock()
  240. return
  241. }
  242. // Mark as completed
  243. tracker.CompleteFile(f.Path, f.ProcessedLines)
  244. // Update persistence with exact doc count from Bleve
  245. if rm.persistence != nil {
  246. exactCount := f.DocumentCount
  247. if rm.indexer != nil && rm.indexer.IsHealthy() {
  248. if c, err := rm.indexer.CountDocsByFilePath(f.Path); err == nil {
  249. exactCount = c
  250. } else {
  251. logger.Warnf("Falling back to computed count for %s due to count error: %v", f.Path, err)
  252. }
  253. }
  254. if err := rm.persistence.MarkFileAsIndexed(f.Path, exactCount, f.LastPosition); err != nil {
  255. // Log but don't fail
  256. // logger.Warnf("Failed to update persistence for %s: %v", f.Path, err)
  257. }
  258. }
  259. }(file)
  260. }
  261. // Wait for all files to complete
  262. fileWg.Wait()
  263. // Check for file processing errors
  264. if len(fileErrors) > 0 {
  265. return fmt.Errorf("failed to index %d files in group %s: %v", len(fileErrors), logGroupPath, fileErrors[0])
  266. }
  267. return nil
  268. }
  269. // shouldProcessFile determines if a file needs to be processed based on change detection
  270. func (rm *RebuildManager) shouldProcessFile(file *LogGroupFile) (bool, string) {
  271. // Get file information
  272. fileInfo, err := os.Stat(file.Path)
  273. if err != nil {
  274. return true, fmt.Sprintf("cannot stat file (will process): %v", err)
  275. }
  276. // For compressed files (.gz), check if we've already processed them and they haven't changed
  277. if file.IsCompressed {
  278. // Check if we have persistence information for this file
  279. if rm.persistence != nil {
  280. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  281. // Check if file hasn't changed since last indexing
  282. currentModTime := fileInfo.ModTime().Unix()
  283. currentSize := fileInfo.Size()
  284. if info.LastModified == currentModTime &&
  285. info.LastSize == currentSize &&
  286. info.LastPosition == currentSize {
  287. return false, "compressed file already fully indexed and unchanged"
  288. }
  289. }
  290. }
  291. }
  292. // For active log files (non-compressed), always process but may resume from checkpoint
  293. if !file.IsCompressed {
  294. // Check if file has grown or changed
  295. if rm.persistence != nil {
  296. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  297. currentModTime := fileInfo.ModTime().Unix()
  298. currentSize := fileInfo.Size()
  299. // File hasn't changed at all
  300. if info.LastModified == currentModTime &&
  301. info.LastSize == currentSize &&
  302. info.LastPosition == currentSize {
  303. return false, "active file unchanged since last indexing"
  304. }
  305. // File has shrunk (possible log rotation)
  306. if currentSize < info.LastSize {
  307. return true, "active file appears to have been rotated (size decreased)"
  308. }
  309. // File has grown or been modified
  310. if currentSize > info.LastSize || currentModTime > info.LastModified {
  311. return true, "active file has new content"
  312. }
  313. }
  314. }
  315. // No persistence info available, process the file
  316. return true, "no previous indexing record found for active file"
  317. }
  318. // Default: process compressed files if no persistence info
  319. return true, "no previous indexing record found for compressed file"
  320. }
  321. // LogGroupFile represents a file in a log group
  322. type LogGroupFile struct {
  323. Path string
  324. Size int64
  325. ModTime int64 // Unix timestamp of file modification time
  326. IsCompressed bool
  327. EstimatedLines int64
  328. ProcessedLines int64
  329. DocumentCount uint64
  330. LastPosition int64
  331. }
  332. // discoverLogGroupFiles discovers all files for a log group
  333. func (rm *RebuildManager) discoverLogGroupFiles(logGroupPath string) ([]*LogGroupFile, error) {
  334. dir := filepath.Dir(logGroupPath)
  335. // Remove any rotation suffixes to get the base name
  336. mainPath := getMainLogPathFromFile(logGroupPath)
  337. files := make([]*LogGroupFile, 0)
  338. // Walk the directory to find related files
  339. err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
  340. if err != nil {
  341. return nil // Skip files we can't access
  342. }
  343. // Skip directories
  344. if info.IsDir() {
  345. return nil
  346. }
  347. // Check if this file belongs to the log group
  348. if getMainLogPathFromFile(path) == mainPath {
  349. file := &LogGroupFile{
  350. Path: path,
  351. Size: info.Size(),
  352. ModTime: info.ModTime().Unix(),
  353. IsCompressed: IsCompressedFile(path),
  354. }
  355. // Estimate lines
  356. ctx := context.Background()
  357. if lines, err := EstimateFileLines(ctx, path, info.Size(), file.IsCompressed); err == nil {
  358. file.EstimatedLines = lines
  359. }
  360. files = append(files, file)
  361. }
  362. return nil
  363. })
  364. if err != nil {
  365. return nil, err
  366. }
  367. return files, nil
  368. }
  369. // indexFile indexes a single file with checkpoint/resume support
  370. func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tracker *ProgressTracker) error {
  371. // Create a batch writer
  372. batch := NewBatchWriter(rm.indexer, rm.config.BatchSize)
  373. defer batch.Flush()
  374. // Get checkpoint information from persistence layer
  375. var startPosition int64 = 0
  376. var resuming bool = false
  377. if rm.persistence != nil {
  378. if info, err := rm.persistence.GetIncrementalInfo(file.Path); err == nil {
  379. // Get current file modification time
  380. fileInfo, err := os.Stat(file.Path)
  381. if err != nil {
  382. return fmt.Errorf("failed to stat file %s: %w", file.Path, err)
  383. }
  384. currentModTime := fileInfo.ModTime().Unix()
  385. currentSize := fileInfo.Size()
  386. // Check if file hasn't changed since last indexing
  387. if info.LastIndexed > 0 &&
  388. info.LastModified == currentModTime &&
  389. info.LastSize == currentSize &&
  390. info.LastPosition == currentSize {
  391. // File hasn't changed and was fully indexed
  392. logger.Infof("Skipping indexing for unchanged file %s (last indexed: %v)",
  393. file.Path, time.Unix(info.LastIndexed, 0))
  394. file.ProcessedLines = 0 // No new lines processed
  395. file.DocumentCount = 0 // No new documents added
  396. file.LastPosition = currentSize
  397. return nil
  398. }
  399. // Check if we should resume from a previous position
  400. if info.LastPosition > 0 && info.LastPosition < currentSize {
  401. // File has grown since last indexing
  402. startPosition = info.LastPosition
  403. resuming = true
  404. logger.Infof("Resuming indexing from position %d for file %s (file size: %d -> %d)",
  405. startPosition, file.Path, info.LastSize, currentSize)
  406. } else if currentSize < info.LastSize {
  407. // File has been truncated or rotated, start from beginning
  408. startPosition = 0
  409. logger.Infof("File %s has been truncated/rotated (size: %d -> %d), reindexing from start",
  410. file.Path, info.LastSize, currentSize)
  411. } else if info.LastPosition >= currentSize && currentSize > 0 {
  412. // File size hasn't changed and we've already processed it completely
  413. if info.LastModified == currentModTime {
  414. logger.Infof("File %s already fully indexed and unchanged, skipping", file.Path)
  415. file.ProcessedLines = 0
  416. file.DocumentCount = 0
  417. file.LastPosition = currentSize
  418. return nil
  419. }
  420. // File has same size but different modification time, reindex from start
  421. startPosition = 0
  422. logger.Infof("File %s has same size but different mod time, reindexing from start", file.Path)
  423. }
  424. }
  425. }
  426. // Open file with resume support
  427. reader, err := rm.openFileFromPosition(file.Path, startPosition)
  428. if err != nil {
  429. return fmt.Errorf("failed to open file %s from position %d: %w", file.Path, startPosition, err)
  430. }
  431. defer reader.Close()
  432. // Process file line by line with checkpointing
  433. var processedLines int64 = 0
  434. var currentPosition int64 = startPosition
  435. var documentCount uint64 = 0
  436. checkpointInterval := int64(1000) // Save checkpoint every 1000 lines
  437. scanner := bufio.NewScanner(reader)
  438. for scanner.Scan() {
  439. // Check context for cancellation
  440. select {
  441. case <-ctx.Done():
  442. return ctx.Err()
  443. default:
  444. }
  445. line := scanner.Text()
  446. currentPosition += int64(len(line)) + 1 // +1 for newline
  447. // Process the log line (parse and add to batch)
  448. // This would typically involve:
  449. // 1. Parse log entry using parser
  450. // 2. Create search document
  451. // 3. Add to batch
  452. processedLines++
  453. documentCount++
  454. // Update progress
  455. tracker.UpdateFileProgress(file.Path, processedLines)
  456. // Periodic checkpoint saving
  457. if processedLines%checkpointInterval == 0 {
  458. if rm.persistence != nil {
  459. // Get current file modification time for checkpoint
  460. fileInfo, err := os.Stat(file.Path)
  461. var modTime int64
  462. if err == nil {
  463. modTime = fileInfo.ModTime().Unix()
  464. } else {
  465. modTime = time.Now().Unix()
  466. }
  467. info := &LogFileInfo{
  468. Path: file.Path,
  469. LastPosition: currentPosition,
  470. LastIndexed: time.Now().Unix(),
  471. LastModified: modTime,
  472. LastSize: file.Size,
  473. }
  474. if err := rm.persistence.UpdateIncrementalInfo(file.Path, info); err != nil {
  475. logger.Warnf("Failed to save checkpoint for %s: %v", file.Path, err)
  476. }
  477. }
  478. }
  479. }
  480. if err := scanner.Err(); err != nil {
  481. return fmt.Errorf("error reading file %s: %w", file.Path, err)
  482. }
  483. // Update file statistics
  484. file.ProcessedLines = processedLines
  485. file.DocumentCount = documentCount
  486. file.LastPosition = currentPosition
  487. // Save final checkpoint
  488. if rm.persistence != nil {
  489. // Get current file info for accurate metadata
  490. fileInfo, err := os.Stat(file.Path)
  491. var modTime int64
  492. if err == nil {
  493. modTime = fileInfo.ModTime().Unix()
  494. } else {
  495. modTime = time.Now().Unix()
  496. }
  497. info := &LogFileInfo{
  498. Path: file.Path,
  499. LastPosition: currentPosition,
  500. LastIndexed: time.Now().Unix(),
  501. LastModified: modTime,
  502. LastSize: file.Size,
  503. }
  504. if err := rm.persistence.UpdateIncrementalInfo(file.Path, info); err != nil {
  505. logger.Warnf("Failed to save final checkpoint for %s: %v", file.Path, err)
  506. }
  507. }
  508. if resuming {
  509. logger.Infof("Completed resumed indexing for %s: %d lines, %d documents",
  510. file.Path, processedLines, documentCount)
  511. }
  512. return nil
  513. }
  514. // openFileFromPosition opens a file and seeks to the specified position
  515. // Handles both compressed (.gz) and regular files
  516. func (rm *RebuildManager) openFileFromPosition(filePath string, startPosition int64) (io.ReadCloser, error) {
  517. file, err := os.Open(filePath)
  518. if err != nil {
  519. return nil, err
  520. }
  521. // Check if file is compressed
  522. isGzipped := strings.HasSuffix(filePath, ".gz")
  523. if isGzipped {
  524. // For gzip files, we need to read from the beginning and skip to position
  525. // This is because gzip doesn't support random seeking
  526. gzReader, err := gzip.NewReader(file)
  527. if err != nil {
  528. file.Close()
  529. return nil, fmt.Errorf("failed to create gzip reader: %w", err)
  530. }
  531. if startPosition > 0 {
  532. // Skip to the start position by reading and discarding bytes
  533. _, err := io.CopyN(io.Discard, gzReader, startPosition)
  534. if err != nil && err != io.EOF {
  535. gzReader.Close()
  536. file.Close()
  537. return nil, fmt.Errorf("failed to seek to position %d in gzip file: %w", startPosition, err)
  538. }
  539. }
  540. // Return a wrapped reader that closes both gzReader and file
  541. return &gzipReaderCloser{gzReader: gzReader, file: file}, nil
  542. } else {
  543. // For regular files, seek directly
  544. if startPosition > 0 {
  545. _, err := file.Seek(startPosition, io.SeekStart)
  546. if err != nil {
  547. file.Close()
  548. return nil, fmt.Errorf("failed to seek to position %d: %w", startPosition, err)
  549. }
  550. }
  551. return file, nil
  552. }
  553. }
  554. // gzipReaderCloser wraps gzip.Reader to close both the gzip reader and underlying file
  555. type gzipReaderCloser struct {
  556. gzReader *gzip.Reader
  557. file *os.File
  558. }
  559. func (g *gzipReaderCloser) Read(p []byte) (n int, err error) {
  560. return g.gzReader.Read(p)
  561. }
  562. func (g *gzipReaderCloser) Close() error {
  563. if err := g.gzReader.Close(); err != nil {
  564. g.file.Close() // Still close file even if gzip reader fails
  565. return err
  566. }
  567. return g.file.Close()
  568. }
  569. // getAllLogGroups returns all unique log groups
  570. func (rm *RebuildManager) getAllLogGroups() ([]string, error) {
  571. if rm.persistence == nil {
  572. return []string{}, nil
  573. }
  574. indexes, err := rm.persistence.GetAllLogIndexes()
  575. if err != nil {
  576. return nil, err
  577. }
  578. // Use map to get unique main log paths
  579. groups := make(map[string]struct{})
  580. for _, idx := range indexes {
  581. groups[idx.MainLogPath] = struct{}{}
  582. }
  583. // Convert to slice
  584. result := make([]string, 0, len(groups))
  585. for group := range groups {
  586. result = append(result, group)
  587. }
  588. return result, nil
  589. }
  590. // deleteAllIndexes deletes all existing indexes
  591. func (rm *RebuildManager) deleteAllIndexes() error {
  592. // Get all shards
  593. shards := rm.shardManager.GetAllShards()
  594. // Delete each shard
  595. for i, shard := range shards {
  596. if shard != nil {
  597. if err := shard.Close(); err != nil {
  598. return fmt.Errorf("failed to close shard %d: %w", i, err)
  599. }
  600. }
  601. }
  602. // Recreate shards
  603. // This would typically be done by recreating the shard manager
  604. // For now, return nil as placeholder
  605. return nil
  606. }
  607. // deleteLogGroupIndex deletes index for a specific log group
  608. func (rm *RebuildManager) deleteLogGroupIndex(logGroupPath string) error {
  609. // In a real implementation, this would:
  610. // 1. Find all documents for this log group
  611. // 2. Delete them from the appropriate shards
  612. // For now, return nil as placeholder
  613. return nil
  614. }
  615. // resetAllPersistenceRecords resets all persistence records
  616. func (rm *RebuildManager) resetAllPersistenceRecords() error {
  617. if rm.persistence == nil {
  618. return nil
  619. }
  620. indexes, err := rm.persistence.GetAllLogIndexes()
  621. if err != nil {
  622. return err
  623. }
  624. for _, idx := range indexes {
  625. idx.Reset()
  626. if err := rm.persistence.SaveLogIndex(idx); err != nil {
  627. return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
  628. }
  629. }
  630. return nil
  631. }
  632. // resetLogGroupPersistence resets persistence for a log group
  633. func (rm *RebuildManager) resetLogGroupPersistence(logGroupPath string) error {
  634. if rm.persistence == nil {
  635. return nil
  636. }
  637. indexes, err := rm.persistence.GetLogGroupIndexes(logGroupPath)
  638. if err != nil {
  639. return err
  640. }
  641. for _, idx := range indexes {
  642. idx.Reset()
  643. if err := rm.persistence.SaveLogIndex(idx); err != nil {
  644. return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
  645. }
  646. }
  647. return nil
  648. }
  649. // RebuildProgress tracks rebuild progress
  650. type RebuildProgress struct {
  651. TotalGroups int
  652. CompletedGroups int
  653. StartTime time.Time
  654. CompletedTime time.Time
  655. Duration time.Duration
  656. CurrentGroup string
  657. CurrentFile string
  658. Errors []error
  659. }
  660. // notification methods
  661. func (rm *RebuildManager) notifyRebuildProgress(progress *RebuildProgress) {
  662. // Emit progress event
  663. // This would typically publish to an event bus
  664. }
  665. func (rm *RebuildManager) notifyRebuildComplete(progress *RebuildProgress) {
  666. // Emit completion event
  667. }
  668. func (rm *RebuildManager) notifySingleRebuildComplete(logGroupPath string, duration time.Duration) {
  669. // Emit single rebuild completion event
  670. }
  671. func (rm *RebuildManager) handleProgressNotification(logGroupPath string, pn ProgressNotification) {
  672. // Handle progress notification from tracker
  673. }
  674. func (rm *RebuildManager) handleCompletionNotification(logGroupPath string, cn CompletionNotification) {
  675. // Handle completion notification from tracker
  676. }
  677. // IsRebuilding returns true if rebuild is in progress
  678. func (rm *RebuildManager) IsRebuilding() bool {
  679. return atomic.LoadInt32(&rm.rebuilding) == 1
  680. }
  681. // GetLastRebuildTime returns the time of the last rebuild
  682. func (rm *RebuildManager) GetLastRebuildTime() time.Time {
  683. rm.mu.RLock()
  684. defer rm.mu.RUnlock()
  685. return rm.lastRebuildTime
  686. }
  687. // RebuildStats GetRebuildStats returns statistics about rebuild operations
  688. type RebuildStats struct {
  689. IsRebuilding bool `json:"is_rebuilding"`
  690. LastRebuildTime time.Time `json:"last_rebuild_time"`
  691. Config *RebuildConfig `json:"config"`
  692. }
  693. func (rm *RebuildManager) GetRebuildStats() *RebuildStats {
  694. rm.mu.RLock()
  695. defer rm.mu.RUnlock()
  696. return &RebuildStats{
  697. IsRebuilding: rm.IsRebuilding(),
  698. LastRebuildTime: rm.lastRebuildTime,
  699. Config: rm.config,
  700. }
  701. }