123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- package indexer
- import (
- "context"
- "fmt"
- "os"
- "path/filepath"
- "sync"
- "sync/atomic"
- "time"
- )
- // RebuildManager handles index rebuilding operations
- type RebuildManager struct {
- indexer *ParallelIndexer
- persistence *PersistenceManager
- progressManager *ProgressManager
- shardManager ShardManager
- config *RebuildConfig
- rebuilding int32 // atomic flag
- lastRebuildTime time.Time
- mu sync.RWMutex
- }
- // RebuildConfig contains configuration for rebuild operations
- type RebuildConfig struct {
- BatchSize int `json:"batch_size"`
- MaxConcurrency int `json:"max_concurrency"`
- DeleteBeforeRebuild bool `json:"delete_before_rebuild"`
- ProgressInterval time.Duration `json:"progress_interval"`
- TimeoutPerFile time.Duration `json:"timeout_per_file"`
- }
- // DefaultRebuildConfig returns default rebuild configuration
- func DefaultRebuildConfig() *RebuildConfig {
- return &RebuildConfig{
- BatchSize: 1000,
- MaxConcurrency: 4,
- DeleteBeforeRebuild: true,
- ProgressInterval: 5 * time.Second,
- TimeoutPerFile: 30 * time.Minute,
- }
- }
- // NewRebuildManager creates a new rebuild manager
- func NewRebuildManager(indexer *ParallelIndexer, persistence *PersistenceManager, progressManager *ProgressManager, shardManager ShardManager, config *RebuildConfig) *RebuildManager {
- if config == nil {
- config = DefaultRebuildConfig()
- }
- return &RebuildManager{
- indexer: indexer,
- persistence: persistence,
- progressManager: progressManager,
- shardManager: shardManager,
- config: config,
- }
- }
- // RebuildAll rebuilds all indexes from scratch
- func (rm *RebuildManager) RebuildAll(ctx context.Context) error {
- // Check if already rebuilding
- if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
- return fmt.Errorf("rebuild already in progress")
- }
- defer atomic.StoreInt32(&rm.rebuilding, 0)
- startTime := time.Now()
- rm.mu.Lock()
- rm.lastRebuildTime = startTime
- rm.mu.Unlock()
- // Get all log groups to rebuild
- logGroups, err := rm.getAllLogGroups()
- if err != nil {
- return fmt.Errorf("failed to get log groups: %w", err)
- }
- if len(logGroups) == 0 {
- return fmt.Errorf("no log groups found to rebuild")
- }
- // Delete existing indexes if configured
- if rm.config.DeleteBeforeRebuild {
- if err := rm.deleteAllIndexes(); err != nil {
- return fmt.Errorf("failed to delete existing indexes: %w", err)
- }
- }
- // Reset persistence records
- if rm.persistence != nil {
- if err := rm.resetAllPersistenceRecords(); err != nil {
- return fmt.Errorf("failed to reset persistence records: %w", err)
- }
- }
- // Create progress tracker for overall rebuild
- rebuildProgress := &RebuildProgress{
- TotalGroups: len(logGroups),
- CompletedGroups: 0,
- StartTime: startTime,
- }
- // Process each log group
- errors := make([]error, 0)
- var wg sync.WaitGroup
- semaphore := make(chan struct{}, rm.config.MaxConcurrency)
- for _, logGroup := range logGroups {
- wg.Add(1)
- go func(group string) {
- defer wg.Done()
- // Acquire semaphore
- semaphore <- struct{}{}
- defer func() { <-semaphore }()
- // Check context
- if ctx.Err() != nil {
- return
- }
- // Rebuild this log group
- if err := rm.rebuildLogGroup(ctx, group); err != nil {
- rm.mu.Lock()
- errors = append(errors, fmt.Errorf("failed to rebuild group %s: %w", group, err))
- rm.mu.Unlock()
- } else {
- // Update progress
- rm.mu.Lock()
- rebuildProgress.CompletedGroups++
- rm.mu.Unlock()
- // Notify progress
- rm.notifyRebuildProgress(rebuildProgress)
- }
- }(logGroup)
- }
- // Wait for all groups to complete
- wg.Wait()
- // Check for errors
- if len(errors) > 0 {
- return fmt.Errorf("rebuild completed with %d errors: %v", len(errors), errors)
- }
- rebuildProgress.CompletedTime = time.Now()
- rebuildProgress.Duration = time.Since(startTime)
- // Notify completion
- rm.notifyRebuildComplete(rebuildProgress)
- return nil
- }
- // RebuildSingle rebuilds index for a single log group
- func (rm *RebuildManager) RebuildSingle(ctx context.Context, logGroupPath string) error {
- // Check if already rebuilding
- if !atomic.CompareAndSwapInt32(&rm.rebuilding, 0, 1) {
- return fmt.Errorf("rebuild already in progress")
- }
- defer atomic.StoreInt32(&rm.rebuilding, 0)
- startTime := time.Now()
- // Delete existing index for this log group if configured
- if rm.config.DeleteBeforeRebuild {
- if err := rm.deleteLogGroupIndex(logGroupPath); err != nil {
- return fmt.Errorf("failed to delete existing index: %w", err)
- }
- }
- // Reset persistence records for this group
- if rm.persistence != nil {
- if err := rm.resetLogGroupPersistence(logGroupPath); err != nil {
- return fmt.Errorf("failed to reset persistence: %w", err)
- }
- }
- // Rebuild the log group
- if err := rm.rebuildLogGroup(ctx, logGroupPath); err != nil {
- return fmt.Errorf("failed to rebuild log group: %w", err)
- }
- duration := time.Since(startTime)
- // Notify completion
- rm.notifySingleRebuildComplete(logGroupPath, duration)
- return nil
- }
- // rebuildLogGroup rebuilds index for a single log group
- func (rm *RebuildManager) rebuildLogGroup(ctx context.Context, logGroupPath string) error {
- // Get all files for this log group
- files, err := rm.discoverLogGroupFiles(logGroupPath)
- if err != nil {
- return fmt.Errorf("failed to discover files: %w", err)
- }
- if len(files) == 0 {
- return fmt.Errorf("no files found for log group %s", logGroupPath)
- }
- // Create progress tracker for this log group
- progressConfig := &ProgressConfig{
- OnProgress: func(pn ProgressNotification) {
- // Handle progress notifications
- rm.handleProgressNotification(logGroupPath, pn)
- },
- OnCompletion: func(cn CompletionNotification) {
- // Handle completion notifications
- rm.handleCompletionNotification(logGroupPath, cn)
- },
- }
- tracker := rm.progressManager.GetTracker(logGroupPath, progressConfig)
- // Add all files to tracker
- for _, file := range files {
- tracker.AddFile(file.Path, file.IsCompressed)
- if file.EstimatedLines > 0 {
- tracker.SetFileEstimate(file.Path, file.EstimatedLines)
- }
- if file.Size > 0 {
- tracker.SetFileSize(file.Path, file.Size)
- }
- }
- // Process each file
- for _, file := range files {
- // Check context
- if ctx.Err() != nil {
- tracker.FailFile(file.Path, ctx.Err().Error())
- return ctx.Err()
- }
- // Create file-specific context with timeout
- fileCtx, cancel := context.WithTimeout(ctx, rm.config.TimeoutPerFile)
- // Start processing
- tracker.StartFile(file.Path)
- // Index the file
- err := rm.indexFile(fileCtx, file, tracker)
- cancel()
- if err != nil {
- tracker.FailFile(file.Path, err.Error())
- return fmt.Errorf("failed to index file %s: %w", file.Path, err)
- }
- // Mark as completed
- tracker.CompleteFile(file.Path, file.ProcessedLines)
- // Update persistence
- if rm.persistence != nil {
- if err := rm.persistence.MarkFileAsIndexed(file.Path, file.DocumentCount, file.LastPosition); err != nil {
- // Log but don't fail
- // logger.Warnf("Failed to update persistence for %s: %v", file.Path, err)
- }
- }
- }
- return nil
- }
- // LogGroupFile represents a file in a log group
- type LogGroupFile struct {
- Path string
- Size int64
- IsCompressed bool
- EstimatedLines int64
- ProcessedLines int64
- DocumentCount uint64
- LastPosition int64
- }
- // discoverLogGroupFiles discovers all files for a log group
- func (rm *RebuildManager) discoverLogGroupFiles(logGroupPath string) ([]*LogGroupFile, error) {
- dir := filepath.Dir(logGroupPath)
- // Remove any rotation suffixes to get the base name
- mainPath := getMainLogPathFromFile(logGroupPath)
- files := make([]*LogGroupFile, 0)
- // Walk the directory to find related files
- err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return nil // Skip files we can't access
- }
- // Skip directories
- if info.IsDir() {
- return nil
- }
- // Check if this file belongs to the log group
- if getMainLogPathFromFile(path) == mainPath {
- file := &LogGroupFile{
- Path: path,
- Size: info.Size(),
- IsCompressed: IsCompressedFile(path),
- }
- // Estimate lines
- ctx := context.Background()
- if lines, err := EstimateFileLines(ctx, path, info.Size(), file.IsCompressed); err == nil {
- file.EstimatedLines = lines
- }
- files = append(files, file)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- return files, nil
- }
- // indexFile indexes a single file
- func (rm *RebuildManager) indexFile(ctx context.Context, file *LogGroupFile, tracker *ProgressTracker) error {
- // Create a batch writer
- batch := NewBatchWriter(rm.indexer, rm.config.BatchSize)
- defer batch.Flush()
- // Open and process the file
- // This is simplified - in real implementation, you would:
- // 1. Open the file (handling compression)
- // 2. Parse log lines
- // 3. Create documents
- // 4. Add to batch
- // 5. Update progress
- // For now, return a placeholder implementation
- file.ProcessedLines = file.EstimatedLines
- file.DocumentCount = uint64(file.EstimatedLines)
- file.LastPosition = file.Size
- // Update progress periodically
- tracker.UpdateFileProgress(file.Path, file.ProcessedLines)
- return nil
- }
- // getAllLogGroups returns all unique log groups
- func (rm *RebuildManager) getAllLogGroups() ([]string, error) {
- if rm.persistence == nil {
- return []string{}, nil
- }
- indexes, err := rm.persistence.GetAllLogIndexes()
- if err != nil {
- return nil, err
- }
- // Use map to get unique main log paths
- groups := make(map[string]struct{})
- for _, idx := range indexes {
- groups[idx.MainLogPath] = struct{}{}
- }
- // Convert to slice
- result := make([]string, 0, len(groups))
- for group := range groups {
- result = append(result, group)
- }
- return result, nil
- }
- // deleteAllIndexes deletes all existing indexes
- func (rm *RebuildManager) deleteAllIndexes() error {
- // Get all shards
- shards := rm.shardManager.GetAllShards()
- // Delete each shard
- for i, shard := range shards {
- if shard != nil {
- if err := shard.Close(); err != nil {
- return fmt.Errorf("failed to close shard %d: %w", i, err)
- }
- }
- }
- // Recreate shards
- // This would typically be done by recreating the shard manager
- // For now, return nil as placeholder
- return nil
- }
- // deleteLogGroupIndex deletes index for a specific log group
- func (rm *RebuildManager) deleteLogGroupIndex(logGroupPath string) error {
- // In a real implementation, this would:
- // 1. Find all documents for this log group
- // 2. Delete them from the appropriate shards
- // For now, return nil as placeholder
- return nil
- }
- // resetAllPersistenceRecords resets all persistence records
- func (rm *RebuildManager) resetAllPersistenceRecords() error {
- if rm.persistence == nil {
- return nil
- }
- indexes, err := rm.persistence.GetAllLogIndexes()
- if err != nil {
- return err
- }
- for _, idx := range indexes {
- idx.Reset()
- if err := rm.persistence.SaveLogIndex(idx); err != nil {
- return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
- }
- }
- return nil
- }
- // resetLogGroupPersistence resets persistence for a log group
- func (rm *RebuildManager) resetLogGroupPersistence(logGroupPath string) error {
- if rm.persistence == nil {
- return nil
- }
- indexes, err := rm.persistence.GetLogGroupIndexes(logGroupPath)
- if err != nil {
- return err
- }
- for _, idx := range indexes {
- idx.Reset()
- if err := rm.persistence.SaveLogIndex(idx); err != nil {
- return fmt.Errorf("failed to reset index %s: %w", idx.Path, err)
- }
- }
- return nil
- }
- // RebuildProgress tracks rebuild progress
- type RebuildProgress struct {
- TotalGroups int
- CompletedGroups int
- StartTime time.Time
- CompletedTime time.Time
- Duration time.Duration
- CurrentGroup string
- CurrentFile string
- Errors []error
- }
- // notification methods
- func (rm *RebuildManager) notifyRebuildProgress(progress *RebuildProgress) {
- // Emit progress event
- // This would typically publish to an event bus
- }
- func (rm *RebuildManager) notifyRebuildComplete(progress *RebuildProgress) {
- // Emit completion event
- }
- func (rm *RebuildManager) notifySingleRebuildComplete(logGroupPath string, duration time.Duration) {
- // Emit single rebuild completion event
- }
- func (rm *RebuildManager) handleProgressNotification(logGroupPath string, pn ProgressNotification) {
- // Handle progress notification from tracker
- }
- func (rm *RebuildManager) handleCompletionNotification(logGroupPath string, cn CompletionNotification) {
- // Handle completion notification from tracker
- }
- // IsRebuilding returns true if rebuild is in progress
- func (rm *RebuildManager) IsRebuilding() bool {
- return atomic.LoadInt32(&rm.rebuilding) == 1
- }
- // GetLastRebuildTime returns the time of the last rebuild
- func (rm *RebuildManager) GetLastRebuildTime() time.Time {
- rm.mu.RLock()
- defer rm.mu.RUnlock()
- return rm.lastRebuildTime
- }
- // RebuildStats GetRebuildStats returns statistics about rebuild operations
- type RebuildStats struct {
- IsRebuilding bool `json:"is_rebuilding"`
- LastRebuildTime time.Time `json:"last_rebuild_time"`
- Config *RebuildConfig `json:"config"`
- }
- func (rm *RebuildManager) GetRebuildStats() *RebuildStats {
- rm.mu.RLock()
- defer rm.mu.RUnlock()
- return &RebuildStats{
- IsRebuilding: rm.IsRebuilding(),
- LastRebuildTime: rm.lastRebuildTime,
- Config: rm.config,
- }
- }
|