123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package nginx_log
- import (
- "fmt"
- "path/filepath"
- "strings"
- "sync"
- "github.com/fsnotify/fsnotify"
- "github.com/uozi-tech/cosy/logger"
- )
- // AddLogPath adds a log path to be indexed and monitored
- func (li *LogIndexer) AddLogPath(logPath string) error {
- li.mu.Lock()
- defer li.mu.Unlock()
- // Check if file exists using safe method
- info, err := li.safeGetFileInfo(logPath)
- if err != nil {
- return fmt.Errorf("failed to safely stat log file %s: %w", logPath, err)
- }
- // Determine if file is compressed
- isCompressed := strings.HasSuffix(logPath, ".gz") || strings.HasSuffix(logPath, ".bz2")
- // Check if this path is already being tracked
- if existingInfo, exists := li.logPaths[logPath]; exists {
- // Update compressed status but keep existing tracking info
- existingInfo.IsCompressed = isCompressed
- logger.Debugf("Log path %s already being tracked, updated compressed status to %v", logPath, isCompressed)
- } else {
- // Add new log path with zero time to trigger initial indexing check
- li.logPaths[logPath] = &LogFileInfo{
- Path: logPath,
- LastModified: 0, // Will trigger indexing check on first scan
- LastSize: 0, // Will trigger indexing check on first scan
- IsCompressed: isCompressed,
- }
- logger.Infof("Added new log path %s (compressed=%v)", logPath, isCompressed)
- }
- // Add to file watcher if not compressed and watcher is available
- if li.watcher != nil && !isCompressed {
- if err := li.watcher.Add(logPath); err != nil {
- logger.Warnf("Failed to add file watcher for %s: %v", logPath, err)
- }
- }
- // Also watch the directory for compressed files if watcher is available
- if li.watcher != nil {
- dir := filepath.Dir(logPath)
- if err := li.watcher.Add(dir); err != nil {
- logger.Warnf("Failed to add directory watcher for %s: %v", dir, err)
- }
- }
- // Check if file needs incremental or full indexing
- logIndex, err := li.persistence.GetLogIndex(logPath)
- if err != nil {
- logger.Warnf("Failed to get log index record for %s: %v", logPath, err)
- }
- // Calculate total index size of related log files for comparison
- totalSize := li.calculateRelatedLogFilesSize(logPath)
- needsFullReindex := logIndex == nil || logIndex.ShouldFullReindex(info.ModTime(), totalSize)
- // Queue for background indexing
- li.queueIndexTask(&IndexTask{
- FilePath: logPath,
- Priority: 1, // Normal priority
- FullReindex: needsFullReindex,
- }, nil) // No waitgroup for regular add
- return nil
- }
- // processIndexQueue processes indexing tasks in the background
- func (li *LogIndexer) processIndexQueue() {
- for {
- select {
- case <-li.ctx.Done():
- logger.Info("Log indexer background processor stopping")
- return
- case task := <-li.indexQueue:
- li.processIndexTask(task)
- }
- }
- }
- // queueIndexTask adds a task to the indexing queue with debouncing
- func (li *LogIndexer) queueIndexTask(task *IndexTask, wg *sync.WaitGroup) {
- task.Wg = wg // Assign WaitGroup to task
- // Apply debouncing for file updates (not for manual rebuilds)
- if task.Priority < 10 { // Priority 10 is for manual rebuilds, should not be debounced
- li.debounceIndexTask(task)
- } else {
- // Manual rebuilds bypass debouncing
- li.executeIndexTask(task)
- }
- }
- // handleCompressedLogFile handles the creation of new compressed log files
- func (li *LogIndexer) handleCompressedLogFile(fullPath string) {
- li.mu.RLock()
- defer li.mu.RUnlock()
- fileName := filepath.Base(fullPath)
- for logPath := range li.logPaths {
- baseLogName := filepath.Base(logPath)
- if isLogrotateFile(fileName, baseLogName) {
- go func(path string) {
- if err := li.AddLogPath(path); err != nil {
- logger.Errorf("Failed to add new compressed log file %s: %v", path, err)
- return
- }
- // Queue for full indexing (compressed files need full reindex)
- li.queueIndexTask(&IndexTask{
- FilePath: path,
- Priority: 1, // Normal priority for compressed files
- FullReindex: true, // Compressed files need full indexing
- }, nil) // No waitgroup for autodetected compressed file
- }(fullPath)
- return // Found matching log path, no need to continue
- }
- }
- }
- // watchFiles watches for file system events
- func (li *LogIndexer) watchFiles() {
- for {
- select {
- case <-li.ctx.Done():
- logger.Info("Log indexer file watcher stopping")
- return
- case event, ok := <-li.watcher.Events:
- if !ok {
- return
- }
- // Handle file modifications
- if event.Op&fsnotify.Write == fsnotify.Write {
- li.mu.RLock()
- _, exists := li.logPaths[event.Name]
- li.mu.RUnlock()
- if exists {
- // Queue for incremental indexing (debouncing handled by queueIndexTask)
- li.queueIndexTask(&IndexTask{
- FilePath: event.Name,
- Priority: 2, // Higher priority for file updates
- FullReindex: false, // Use incremental indexing for file updates
- }, nil) // No waitgroup for file updates
- }
- }
- // Handle new compressed files
- if event.Op&fsnotify.Create == fsnotify.Create {
- if strings.HasSuffix(event.Name, ".gz") {
- // Check if this is a rotated log file we should index
- li.handleCompressedLogFile(event.Name)
- }
- }
- case err, ok := <-li.watcher.Errors:
- if !ok {
- return
- }
- logger.Errorf("File watcher error: %v", err)
- }
- }
- }
|