123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- package nginx_log
- import (
- "fmt"
- "os"
- "path/filepath"
- "sync"
- "time"
- "github.com/0xJacky/Nginx-UI/internal/nginx"
- "github.com/blevesearch/bleve/v2"
- "github.com/uozi-tech/cosy/logger"
- )
- // RebuildIndex forces a complete rebuild of the index
- func (li *LogIndexer) RebuildIndex() error {
- logger.Infof("Starting index rebuild...")
- // Get all files that need to be marked as indexing
- var allLogPaths []string
- if li.persistence != nil {
- indexes, err := li.persistence.GetAllLogIndexes()
- if err != nil {
- logger.Warnf("Failed to get log indexes: %v", err)
- } else {
- for _, logIndex := range indexes {
- allLogPaths = append(allLogPaths, logIndex.Path)
- }
- }
- }
- // Mark all files as being indexed
- for _, path := range allLogPaths {
- SetIndexingStatus(path, true)
- }
- // Update global indexing status
- statusManager := GetIndexingStatusManager()
- statusManager.UpdateIndexingStatus()
- // Close current index
- if err := li.index.Close(); err != nil {
- logger.Warnf("Failed to close index: %v", err)
- }
- // Remove index directory
- if err := os.RemoveAll(li.indexPath); err != nil {
- // Clear indexing status on error
- for _, path := range allLogPaths {
- SetIndexingStatus(path, false)
- }
- statusManager := GetIndexingStatusManager()
- statusManager.UpdateIndexingStatus()
- return fmt.Errorf("failed to remove index directory: %w", err)
- }
- // Create new index
- mapping := createIndexMapping()
- index, err := bleve.New(li.indexPath, mapping)
- if err != nil {
- // Clear indexing status on error
- for _, path := range allLogPaths {
- SetIndexingStatus(path, false)
- }
- statusManager := GetIndexingStatusManager()
- statusManager.UpdateIndexingStatus()
- return fmt.Errorf("failed to create new index: %w", err)
- }
- li.index = index
- // Reset file tracking
- li.mu.Lock()
- for path := range li.logPaths {
- li.logPaths[path].LastModified = 0
- li.logPaths[path].LastSize = 0
- li.logPaths[path].LastIndexed = 0
- li.logPaths[path].TimeRange = nil // Clear in-memory time range
- }
- li.mu.Unlock()
- // Reset persistence data - clear all index position records
- if li.persistence != nil {
- // Get all log indexes
- indexes, err := li.persistence.GetAllLogIndexes()
- if err != nil {
- logger.Warnf("Failed to get log indexes for reset: %v", err)
- } else {
- // Reset each index record
- for _, logIndex := range indexes {
- logIndex.Reset() // Clear position data
- if err := li.persistence.SaveLogIndex(logIndex); err != nil {
- logger.Warnf("Failed to reset log index for %s: %v", logIndex.Path, err)
- }
- }
- logger.Infof("Reset %d persistence records", len(indexes))
- }
- }
- // Clear caches since all data will be reindexed
- li.cache.Clear()
- li.statsCache.Clear()
- // Clear all log group completion flags to allow new notifications
- li.logGroupCompletionSent.Range(func(key, value interface{}) bool {
- li.logGroupCompletionSent.Delete(key)
- return true
- })
- // --- Start of Synchronous Re-indexing Logic ---
- logger.Infof("Starting synchronous re-indexing of all discovered files")
- // 1. Discover all potential log files from the main access log path
- var discoveredFiles []string
- accessLogPath := nginx.GetAccessLogPath()
- if accessLogPath != "" && IsLogPathUnderWhiteList(accessLogPath) {
- logDir := filepath.Dir(accessLogPath)
- baseLogName := filepath.Base(accessLogPath)
- files, err := li.findRelatedLogFiles(logDir, baseLogName)
- if err != nil {
- logger.Errorf("Failed to discover log files, proceeding with tracked files only: %v", err)
- } else {
- discoveredFiles = files
- logger.Infof("Discovered %d log files to consider for re-indexing", len(discoveredFiles))
- }
- }
- // 2. Get all currently tracked files and discover all log groups
- li.mu.RLock()
- allLogGroups := make(map[string]struct{})
- for path := range li.logPaths {
- mainLogPath := li.getMainLogPath(path)
- allLogGroups[mainLogPath] = struct{}{}
- }
- li.mu.RUnlock()
- // Also add discovered files to the set of log groups
- for _, path := range discoveredFiles {
- mainLogPath := li.getMainLogPath(path)
- allLogGroups[mainLogPath] = struct{}{}
- }
- // 3. Create WaitGroup and dispatch tasks for each unique log group
- var wg sync.WaitGroup
- logger.Infof("Queueing re-index tasks for %d unique log groups", len(allLogGroups))
- for mainLogPath := range allLogGroups {
- // ForceReindexFileGroup will discover files, add to WaitGroup, and queue tasks internally
- if err := li.ForceReindexFileGroup(mainLogPath, &wg); err != nil {
- logger.Warnf("Failed to queue force reindex for log group %s: %v", mainLogPath, err)
- }
- }
- // 4. Wait for all indexing tasks to complete
- logger.Infof("Waiting for all re-indexing tasks for log groups to complete...")
- wg.Wait()
- // 5. Completion notifications are now handled automatically by ProgressTracker
- // when each log group finishes indexing - no need for manual notification
- logger.Infof("All log groups have completed indexing - notifications sent automatically")
- // 6. Finalize status
- statusManager.UpdateIndexingStatus()
- logger.Infof("Synchronous re-indexing completed for all queued files")
- logger.Infof("Index rebuild completed successfully")
- return nil
- }
- // DiscoverAndIndexFile discovers and indexes a single log file
- func (li *LogIndexer) DiscoverAndIndexFile(filePath string, lastModified time.Time, lastSize int64) error {
- logDir := filepath.Dir(filePath)
- baseLogName := filepath.Base(filePath)
- return li.DiscoverLogFiles(logDir, baseLogName)
- }
- // DeleteFileIndex removes all index entries for a specific file
- func (li *LogIndexer) DeleteFileIndex(filePath string) error {
- logger.Infof("Deleting index entries for file: %s", filePath)
- // Create query to find all entries for this file
- query := bleve.NewTermQuery(filePath)
- query.SetField("file_path")
- searchReq := bleve.NewSearchRequest(query)
- searchReq.Size = 10000 // Process in batches
- totalDeleted := 0
- for {
- searchResult, err := li.index.Search(searchReq)
- if err != nil {
- return fmt.Errorf("failed to search for entries to delete: %w", err)
- }
- if len(searchResult.Hits) == 0 {
- break
- }
- // Create batch deletion
- batch := li.index.NewBatch()
- for _, hit := range searchResult.Hits {
- batch.Delete(hit.ID)
- }
- if err := li.index.Batch(batch); err != nil {
- return fmt.Errorf("failed to delete entries: %w", err)
- }
- totalDeleted += len(searchResult.Hits)
- logger.Infof("Deleted %d entries from %s (total: %d)", len(searchResult.Hits), filePath, totalDeleted)
- }
- // Remove from tracking
- li.mu.Lock()
- delete(li.logPaths, filePath)
- li.mu.Unlock()
- // Remove from persistence
- if li.persistence != nil {
- if err := li.persistence.DeleteLogIndex(filePath); err != nil {
- logger.Warnf("Failed to delete persistence record for %s: %v", filePath, err)
- }
- }
- // Clear related caches
- li.invalidateStatsCache()
- logger.Infof("Successfully deleted %d index entries for %s", totalDeleted, filePath)
- return nil
- }
- // DeleteAllIndexes removes all index entries
- func (li *LogIndexer) DeleteAllIndexes() error {
- logger.Info("Deleting all index entries...")
- // Close current index
- if err := li.index.Close(); err != nil {
- logger.Warnf("Failed to close index during deletion: %v", err)
- }
- // Remove index directory
- if err := os.RemoveAll(li.indexPath); err != nil {
- return fmt.Errorf("failed to remove index directory: %w", err)
- }
- // Create new empty index
- mapping := createIndexMapping()
- index, err := bleve.New(li.indexPath, mapping)
- if err != nil {
- return fmt.Errorf("failed to create new empty index: %w", err)
- }
- li.index = index
- // Clear all tracking
- li.mu.Lock()
- li.logPaths = make(map[string]*LogFileInfo)
- li.mu.Unlock()
- // Clear persistence
- if li.persistence != nil {
- if err := li.persistence.DeleteAllLogIndexes(); err != nil {
- logger.Warnf("Failed to clear persistence: %v", err)
- }
- }
- // Clear caches
- li.cache.Clear()
- li.statsCache.Clear()
- logger.Info("Successfully deleted all index entries")
- return nil
- }
- // CleanupOrphanedIndexes removes index entries for files that no longer exist
- func (li *LogIndexer) CleanupOrphanedIndexes() error {
- logger.Info("Cleaning up orphaned index entries...")
- li.mu.RLock()
- paths := make([]string, 0, len(li.logPaths))
- for path := range li.logPaths {
- paths = append(paths, path)
- }
- li.mu.RUnlock()
- orphanedPaths := make([]string, 0)
- for _, path := range paths {
- if _, err := os.Stat(path); os.IsNotExist(err) {
- orphanedPaths = append(orphanedPaths, path)
- }
- }
- if len(orphanedPaths) == 0 {
- logger.Info("No orphaned index entries found")
- return nil
- }
- logger.Infof("Found %d orphaned files to clean up: %v", len(orphanedPaths), orphanedPaths)
- for _, path := range orphanedPaths {
- if err := li.DeleteFileIndex(path); err != nil {
- logger.Errorf("Failed to delete orphaned index for %s: %v", path, err)
- } else {
- logger.Infof("Cleaned up orphaned index for %s", path)
- }
- }
- logger.Infof("Cleanup completed for %d orphaned files", len(orphanedPaths))
- return nil
- }
|