rebuild.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. package indexer
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. // RebuildManager handles index rebuilding operations
  12. type RebuildManager struct {
  13. indexer *ParallelIndexer
  14. persistence *PersistenceManager
  15. progressManager *ProgressManager
  16. shardManager ShardManager
  17. config *RebuildConfig
  18. rebuilding int32 // atomic flag
  19. lastRebuildTime time.Time
  20. mu sync.RWMutex
  21. }
  22. // RebuildConfig contains configuration for rebuild operations
  23. type RebuildConfig struct {
  24. BatchSize int `json:"batch_size"`
  25. MaxConcurrency int `json:"max_concurrency"`
  26. DeleteBeforeRebuild bool `json:"delete_before_rebuild"`
  27. ProgressInterval time.Duration `json:"progress_interval"`
  28. TimeoutPerFile time.Duration `json:"timeout_per_file"`
  29. }
  30. // DefaultRebuildConfig returns default rebuild configuration
  31. func DefaultRebuildConfig() *RebuildConfig {
  32. return &RebuildConfig{
  33. BatchSize: 1000,
  34. MaxConcurrency: 4,
  35. DeleteBeforeRebuild: true,
  36. ProgressInterval: 5 * time.Second,
  37. TimeoutPerFile: 30 * time.Minute,
  38. }
  39. }
  40. // NewRebuildManager creates a new rebuild manager
  41. func NewRebuildManager(indexer *ParallelIndexer, persistence *PersistenceManager, progressManager *ProgressManager, shardManager ShardManager, config *RebuildConfig) *RebuildManager {
  42. if config == nil {
  43. config = DefaultRebuildConfig()
  44. }
  45. return &RebuildManager{
  46. indexer: indexer,
  47. persistence: persistence,
  48. progressManager: progressManager,
  49. shardManager: shardManager,
  50. config: config,
  51. }
  52. }
  53. // RebuildAll rebuilds all indexes from scratch
  54. func (rm *RebuildManager) RebuildAll(ctx context.Context) error {
  55. // Check if already rebuilding
  56. if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
  57. return fmt.Errorf("rebuild already in progress")
  58. }
  59. defer atomic.StoreInt32(&rm.rebuilding, 0)
  60. startTime := time.Now()
  61. rm.mu.Lock()
  62. rm.lastRebuildTime = startTime
  63. rm.mu.Unlock()
  64. // Get all log groups to rebuild
  65. logGroups, err := rm.getAllLogGroups()
  66. if err != nil {
  67. return fmt.Errorf("failed to get log groups: %w", err)
  68. }
  69. if len(logGroups) == 0 {
  70. return fmt.Errorf("no log groups found to rebuild")
  71. }
  72. // Delete existing indexes if configured
  73. if rm.config.DeleteBeforeRebuild {
  74. if err := rm.deleteAllIndexes(); err != nil {
  75. return fmt.Errorf("failed to delete existing indexes: %w", err)
  76. }
  77. }
  78. // Reset persistence records
  79. if rm.persistence != nil {
  80. if err := rm.resetAllPersistenceRecords(); err != nil {
  81. return fmt.Errorf("failed to reset persistence records: %w", err)
  82. }
  83. }
  84. // Create progress tracker for overall rebuild
  85. rebuildProgress := &RebuildProgress{
  86. TotalGroups: len(logGroups),
  87. CompletedGroups: 0,
  88. StartTime: startTime,
  89. }
  90. // Process each log group
  91. errors := make([]error, 0)
  92. var wg sync.WaitGroup
  93. semaphore := make(chan struct{}, rm.config.MaxConcurrency)
  94. for _, logGroup := range logGroups {
  95. wg.Add(1)
  96. go func(group string) {
  97. defer wg.Done()
  98. // Acquire semaphore
  99. semaphore <- struct{}{}
  100. defer func() { <-semaphore }()
  101. // Check context
  102. if ctx.Err() != nil {
  103. return
  104. }
  105. // Rebuild this log group
  106. if err := rm.rebuildLogGroup(ctx, group); err != nil {
  107. rm.mu.Lock()
  108. errors = append(errors, fmt.Errorf("failed to rebuild group %s: %w", group, err))
  109. rm.mu.Unlock()
  110. } else {
  111. // Update progress
  112. rm.mu.Lock()
  113. rebuildProgress.CompletedGroups++
  114. rm.mu.Unlock()
  115. // Notify progress
  116. rm.notifyRebuildProgress(rebuildProgress)
  117. }
  118. }(logGroup)
  119. }
  120. // Wait for all groups to complete
  121. wg.Wait()
  122. // Check for errors
  123. if len(errors) > 0 {
  124. return fmt.Errorf("rebuild completed with %d errors: %v", len(errors), errors)
  125. }
  126. rebuildProgress.CompletedTime = time.Now()
  127. rebuildProgress.Duration = time.Since(startTime)
  128. // Notify completion
  129. rm.notifyRebuildComplete(rebuildProgress)
  130. return nil
  131. }
  132. // RebuildSingle rebuilds index for a single log group
  133. func (rm *RebuildManager) RebuildSingle(ctx context.Context, logGroupPath string) error {
  134. // Check if already rebuilding
  135. if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
  136. return fmt.Errorf("rebuild already in progress")
  137. }
  138. defer atomic.StoreInt32(&rm.rebuilding, 0)
  139. startTime := time.Now()
  140. // Delete existing index for this log group if configured
  141. if rm.config.DeleteBeforeRebuild {
  142. if err := rm.deleteLogGroupIndex(logGroupPath); err != nil {
  143. return fmt.Errorf("failed to delete existing index: %w", err)
  144. }
  145. }
  146. // Reset persistence records for this group
  147. if rm.persistence != nil {
  148. if err := rm.resetLogGroupPersistence(logGroupPath); err != nil {
  149. return fmt.Errorf("failed to reset persistence: %w", err)
  150. }
  151. }
  152. // Rebuild the log group
  153. if err := rm.rebuildLogGroup(ctx, logGroupPath); err != nil {
  154. return fmt.Errorf("failed to rebuild log group: %w", err)
  155. }
  156. duration := time.Since(startTime)
  157. // Notify completion
  158. rm.notifySingleRebuildComplete(logGroupPath, duration)
  159. return nil
  160. }
  161. // rebuildLogGroup rebuilds index for a single log group
  162. func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath string) error {
  163. // Get all files for this log group
  164. files, err := rm.discoverLogGroupFiles(logGroupPath)
  165. if err != nil {
  166. return fmt.Errorf("failed to discover files: %w", err)
  167. }
  168. if len(files) == 0 {
  169. return fmt.Errorf("no files found for log group %s", logGroupPath)
  170. }
  171. // Create progress tracker for this log group
  172. progressConfig := &ProgressConfig{
  173. OnProgress: func(pn ProgressNotification) {
  174. // Handle progress notifications
  175. rm.handleProgressNotification(logGroupPath, pn)
  176. },
  177. OnCompletion: func(cn CompletionNotification) {
  178. // Handle completion notifications
  179. rm.handleCompletionNotification(logGroupPath, cn)
  180. },
  181. }
  182. tracker := rm.progressManager.GetTracker(logGroupPath, progressConfig)
  183. // Add all files to tracker
  184. for _, file := range files {
  185. tracker.AddFile(file.Path, file.IsCompressed)
  186. if file.EstimatedLines > 0 {
  187. tracker.SetFileEstimate(file.Path, file.EstimatedLines)
  188. }
  189. if file.Size > 0 {
  190. tracker.SetFileSize(file.Path, file.Size)
  191. }
  192. }
  193. // Process each file
  194. for _, file := range files {
  195. // Check context
  196. if ctx.Err() != nil {
  197. tracker.FailFile(file.Path, ctx.Err().Error())
  198. return ctx.Err()
  199. }
  200. // Create file-specific context with timeout
  201. fileCtx, cancel := context.WithTimeout(ctx, rm.config.TimeoutPerFile)
  202. // Start processing
  203. tracker.StartFile(file.Path)
  204. // Index the file
  205. err := rm.indexFile(fileCtx, file, tracker)
  206. cancel()
  207. if err != nil {
  208. tracker.FailFile(file.Path, err.Error())
  209. return fmt.Errorf("failed to index file %s: %w", file.Path, err)
  210. }
  211. // Mark as completed
  212. tracker.CompleteFile(file.Path, file.ProcessedLines)
  213. // Update persistence
  214. if rm.persistence != nil {
  215. if err := rm.persistence.MarkFileAsIndexed(file.Path, file.DocumentCount, file.LastPosition); err != nil {
  216. // Log but don't fail
  217. // logger.Warnf("Failed to update persistence for %s: %v", file.Path, err)
  218. }
  219. }
  220. }
  221. return nil
  222. }
  223. // LogGroupFile represents a file in a log group
  224. type LogGroupFile struct {
  225. Path string
  226. Size int64
  227. IsCompressed bool
  228. EstimatedLines int64
  229. ProcessedLines int64
  230. DocumentCount uint64
  231. LastPosition int64
  232. }
  233. // discoverLogGroupFiles discovers all files for a log group
  234. func (rm *RebuildManager) discoverLogGroupFiles(logGroupPath string) ([]*LogGroupFile, error) {
  235. dir := filepath.Dir(logGroupPath)
  236. // Remove any rotation suffixes to get the base name
  237. mainPath := getMainLogPathFromFile(logGroupPath)
  238. files := make([]*LogGroupFile, 0)
  239. // Walk the directory to find related files
  240. err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
  241. if err != nil {
  242. return nil // Skip files we can't access
  243. }
  244. // Skip directories
  245. if info.IsDir() {
  246. return nil
  247. }
  248. // Check if this file belongs to the log group
  249. if getMainLogPathFromFile(path) == mainPath {
  250. file := &LogGroupFile{
  251. Path: path,
  252. Size: info.Size(),
  253. IsCompressed: IsCompressedFile(path),
  254. }
  255. // Estimate lines
  256. ctx := context.Background()
  257. if lines, err := EstimateFileLines(ctx, path, info.Size(), file.IsCompressed); err == nil {
  258. file.EstimatedLines = lines
  259. }
  260. files = append(files, file)
  261. }
  262. return nil
  263. })
  264. if err != nil {
  265. return nil, err
  266. }
  267. return files, nil
  268. }
  269. // indexFile indexes a single file
  270. func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tracker *ProgressTracker) error {
  271. // Create a batch writer
  272. batch := NewBatchWriter(rm.indexer, rm.config.BatchSize)
  273. defer batch.Flush()
  274. // Open and process the file
  275. // This is simplified - in real implementation, you would:
  276. // 1. Open the file (handling compression)
  277. // 2. Parse log lines
  278. // 3. Create documents
  279. // 4. Add to batch
  280. // 5. Update progress
  281. // For now, return a placeholder implementation
  282. file.ProcessedLines = file.EstimatedLines
  283. file.DocumentCount = uint64(file.EstimatedLines)
  284. file.LastPosition = file.Size
  285. // Update progress periodically
  286. tracker.UpdateFileProgress(file.Path, file.ProcessedLines)
  287. return nil
  288. }
  289. // getAllLogGroups returns all unique log groups
  290. func (rm *RebuildManager) getAllLogGroups() ([]string, error) {
  291. if rm.persistence == nil {
  292. return []string{}, nil
  293. }
  294. indexes, err := rm.persistence.GetAllLogIndexes()
  295. if err != nil {
  296. return nil, err
  297. }
  298. // Use map to get unique main log paths
  299. groups := make(map[string]struct{})
  300. for _, idx := range indexes {
  301. groups[idx.MainLogPath] = struct{}{}
  302. }
  303. // Convert to slice
  304. result := make([]string, 0, len(groups))
  305. for group := range groups {
  306. result = append(result, group)
  307. }
  308. return result, nil
  309. }
  310. // deleteAllIndexes deletes all existing indexes
  311. func (rm *RebuildManager) deleteAllIndexes() error {
  312. // Get all shards
  313. shards := rm.shardManager.GetAllShards()
  314. // Delete each shard
  315. for i, shard := range shards {
  316. if shard != nil {
  317. if err := shard.Close(); err != nil {
  318. return fmt.Errorf("failed to close shard %d: %w", i, err)
  319. }
  320. }
  321. }
  322. // Recreate shards
  323. // This would typically be done by recreating the shard manager
  324. // For now, return nil as placeholder
  325. return nil
  326. }
  327. // deleteLogGroupIndex deletes index for a specific log group
  328. func (rm *RebuildManager) deleteLogGroupIndex(logGroupPath string) error {
  329. // In a real implementation, this would:
  330. // 1. Find all documents for this log group
  331. // 2. Delete them from the appropriate shards
  332. // For now, return nil as placeholder
  333. return nil
  334. }
  335. // resetAllPersistenceRecords resets all persistence records
  336. func (rm *RebuildManager) resetAllPersistenceRecords() error {
  337. if rm.persistence == nil {
  338. return nil
  339. }
  340. indexes, err := rm.persistence.GetAllLogIndexes()
  341. if err != nil {
  342. return err
  343. }
  344. for _, idx := range indexes {
  345. idx.Reset()
  346. if err := rm.persistence.SaveLogIndex(idx); err != nil {
  347. return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
  348. }
  349. }
  350. return nil
  351. }
  352. // resetLogGroupPersistence resets persistence for a log group
  353. func (rm *RebuildManager) resetLogGroupPersistence(logGroupPath string) error {
  354. if rm.persistence == nil {
  355. return nil
  356. }
  357. indexes, err := rm.persistence.GetLogGroupIndexes(logGroupPath)
  358. if err != nil {
  359. return err
  360. }
  361. for _, idx := range indexes {
  362. idx.Reset()
  363. if err := rm.persistence.SaveLogIndex(idx); err != nil {
  364. return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
  365. }
  366. }
  367. return nil
  368. }
  369. // RebuildProgress tracks rebuild progress
  370. type RebuildProgress struct {
  371. TotalGroups int
  372. CompletedGroups int
  373. StartTime time.Time
  374. CompletedTime time.Time
  375. Duration time.Duration
  376. CurrentGroup string
  377. CurrentFile string
  378. Errors []error
  379. }
  380. // notification methods
  381. func (rm *RebuildManager) notifyRebuildProgress(progress *RebuildProgress) {
  382. // Emit progress event
  383. // This would typically publish to an event bus
  384. }
  385. func (rm *RebuildManager) notifyRebuildComplete(progress *RebuildProgress) {
  386. // Emit completion event
  387. }
  388. func (rm *RebuildManager) notifySingleRebuildComplete(logGroupPath string, duration time.Duration) {
  389. // Emit single rebuild completion event
  390. }
  391. func (rm *RebuildManager) handleProgressNotification(logGroupPath string, pn ProgressNotification) {
  392. // Handle progress notification from tracker
  393. }
  394. func (rm *RebuildManager) handleCompletionNotification(logGroupPath string, cn CompletionNotification) {
  395. // Handle completion notification from tracker
  396. }
  397. // IsRebuilding returns true if rebuild is in progress
  398. func (rm *RebuildManager) IsRebuilding() bool {
  399. return atomic.LoadInt32(&rm.rebuilding) == 1
  400. }
  401. // GetLastRebuildTime returns the time of the last rebuild
  402. func (rm *RebuildManager) GetLastRebuildTime() time.Time {
  403. rm.mu.RLock()
  404. defer rm.mu.RUnlock()
  405. return rm.lastRebuildTime
  406. }
  407. // RebuildStats GetRebuildStats returns statistics about rebuild operations
  408. type RebuildStats struct {
  409. IsRebuilding bool `json:"is_rebuilding"`
  410. LastRebuildTime time.Time `json:"last_rebuild_time"`
  411. Config *RebuildConfig `json:"config"`
  412. }
  413. func (rm *RebuildManager) GetRebuildStats() *RebuildStats {
  414. rm.mu.RLock()
  415. defer rm.mu.RUnlock()
  416. return &RebuildStats{
  417. IsRebuilding: rm.IsRebuilding(),
  418. LastRebuildTime: rm.lastRebuildTime,
  419. Config: rm.config,
  420. }
  421. }