123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- package cron
- import (
- "fmt"
- "os"
- "time"
- "github.com/0xJacky/Nginx-UI/internal/nginx_log"
- "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
- "github.com/go-co-op/gocron/v2"
- "github.com/uozi-tech/cosy/logger"
- )
- // setupIncrementalIndexingJob sets up the periodic incremental log indexing job
- func setupIncrementalIndexingJob(s gocron.Scheduler) (gocron.Job, error) {
- logger.Info("Setting up incremental log indexing job")
- // Run every 5 minutes to check for log file changes
- job, err := s.NewJob(
- gocron.DurationJob(5*time.Minute),
- gocron.NewTask(performIncrementalIndexing),
- gocron.WithName("incremental_log_indexing"),
- gocron.WithStartAt(gocron.WithStartImmediately()),
- )
- if err != nil {
- return nil, err
- }
- logger.Info("Incremental log indexing job scheduled to run every 5 minutes")
- return job, nil
- }
- // performIncrementalIndexing performs the actual incremental indexing check
- func performIncrementalIndexing() {
- logger.Debug("Starting incremental log indexing scan")
- // Get log file manager
- logFileManager := nginx_log.GetLogFileManager()
- if logFileManager == nil {
- logger.Warn("Log file manager not available for incremental indexing")
- return
- }
- // Get modern indexer
- modernIndexer := nginx_log.GetModernIndexer()
- if modernIndexer == nil {
- logger.Warn("Modern indexer not available for incremental indexing")
- return
- }
- // Check if indexer is healthy
- if !modernIndexer.IsHealthy() {
- logger.Warn("Modern indexer is not healthy, skipping incremental indexing")
- return
- }
- // Get all log groups to check for changes
- allLogs := nginx_log.GetAllLogsWithIndexGrouped(func(log *nginx_log.NginxLogWithIndex) bool {
- // Only process access logs (skip error logs as they are not indexed)
- return log.Type == "access"
- })
- changedCount := 0
- for _, log := range allLogs {
- // Check if file needs incremental indexing
- if needsIncrementalIndexing(log) {
- if err := queueIncrementalIndexing(log.Path, modernIndexer, logFileManager); err != nil {
- logger.Errorf("Failed to queue incremental indexing for %s: %v", log.Path, err)
- } else {
- changedCount++
- }
- }
- }
- if changedCount > 0 {
- logger.Infof("Queued %d log files for incremental indexing", changedCount)
- } else {
- logger.Debug("No log files need incremental indexing")
- }
- }
- // needsIncrementalIndexing checks if a log file needs incremental indexing
- func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex) bool {
- // Skip if already indexing or queued
- if log.IndexStatus == string(indexer.IndexStatusIndexing) ||
- log.IndexStatus == string(indexer.IndexStatusQueued) {
- return false
- }
- // Check file system status
- fileInfo, err := os.Stat(log.Path)
- if os.IsNotExist(err) {
- // File doesn't exist, but we have index data - this is fine for historical queries
- return false
- }
- if err != nil {
- logger.Warnf("Cannot stat file %s: %v", log.Path, err)
- return false
- }
- // Check if file has been modified since last index
- fileModTime := fileInfo.ModTime()
- fileSize := fileInfo.Size()
- lastModified := time.Unix(log.LastModified, 0)
- // File was modified after last index and size increased
- if fileModTime.After(lastModified) && fileSize > log.LastSize {
- logger.Debugf("File %s needs incremental indexing: mod_time=%s, size=%d",
- log.Path, fileModTime.Format("2006-01-02 15:04:05"), fileSize)
- return true
- }
- // File size decreased - might be file rotation
- if fileSize < log.LastSize {
- logger.Debugf("File %s needs full re-indexing due to size decrease: old_size=%d, new_size=%d",
- log.Path, log.LastSize, fileSize)
- return true
- }
- return false
- }
- // queueIncrementalIndexing queues a file for incremental indexing
- func queueIncrementalIndexing(logPath string, modernIndexer interface{}, logFileManager interface{}) error {
- // Set the file status to queued
- if err := setFileIndexStatus(logPath, string(indexer.IndexStatusQueued), logFileManager); err != nil {
- return err
- }
- // Queue the indexing job asynchronously
- go func() {
- logger.Infof("Starting incremental indexing for file: %s", logPath)
-
- // Set status to indexing
- if err := setFileIndexStatus(logPath, string(indexer.IndexStatusIndexing), logFileManager); err != nil {
- logger.Errorf("Failed to set indexing status for %s: %v", logPath, err)
- return
- }
- // Perform incremental indexing
- startTime := time.Now()
- docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexLogGroupWithProgress(logPath, nil)
-
- if err != nil {
- logger.Errorf("Failed incremental indexing for %s: %v", logPath, err)
- // Set error status
- if statusErr := setFileIndexStatus(logPath, string(indexer.IndexStatusError), logFileManager); statusErr != nil {
- logger.Errorf("Failed to set error status for %s: %v", logPath, statusErr)
- }
- return
- }
- // Calculate total documents indexed
- var totalDocsIndexed uint64
- for _, docCount := range docsCountMap {
- totalDocsIndexed += docCount
- }
- // Save indexing metadata
- duration := time.Since(startTime)
- if metadataManager, ok := logFileManager.(indexer.MetadataManager); ok {
- if err := metadataManager.SaveIndexMetadata(logPath, totalDocsIndexed, startTime, duration, minTime, maxTime); err != nil {
- logger.Errorf("Failed to save incremental index metadata for %s: %v", logPath, err)
- }
- }
- // Set status to indexed
- if err := setFileIndexStatus(logPath, string(indexer.IndexStatusIndexed), logFileManager); err != nil {
- logger.Errorf("Failed to set indexed status for %s: %v", logPath, err)
- }
- // Update searcher shards
- nginx_log.UpdateSearcherShards()
- logger.Infof("Successfully completed incremental indexing for %s, Documents: %d", logPath, totalDocsIndexed)
- }()
- return nil
- }
- // setFileIndexStatus updates the index status for a file in the database using enhanced status management
- func setFileIndexStatus(logPath, status string, logFileManager interface{}) error {
- if logFileManager == nil {
- return fmt.Errorf("log file manager not available")
- }
-
- // Get persistence manager
- lfm, ok := logFileManager.(*indexer.LogFileManager)
- if !ok {
- return fmt.Errorf("invalid log file manager type")
- }
-
- persistence := lfm.GetPersistence()
- if persistence == nil {
- return fmt.Errorf("persistence manager not available")
- }
-
- // Use enhanced SetIndexStatus method with queue position for queued status
- queuePosition := 0
- if status == string(indexer.IndexStatusQueued) {
- // For incremental indexing, we don't need specific queue positions
- // They will be processed as they come
- queuePosition = int(time.Now().Unix() % 1000) // Simple ordering by time
- }
-
- return persistence.SetIndexStatus(logPath, status, queuePosition, "")
- }
|