background_service.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package nginx_log
  2. import (
  3. "context"
  4. "os"
  5. "path/filepath"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/nginx"
  8. "github.com/uozi-tech/cosy/logger"
  9. )
  10. // BackgroundLogService manages automatic log discovery and indexing
  11. type BackgroundLogService struct {
  12. indexer *LogIndexer
  13. ctx context.Context
  14. cancel context.CancelFunc
  15. }
  16. // NewBackgroundLogService creates a new background log service
  17. func NewBackgroundLogService(ctx context.Context) (*BackgroundLogService, error) {
  18. indexer, err := NewLogIndexer(ctx)
  19. if err != nil {
  20. return nil, err
  21. }
  22. service := &BackgroundLogService{
  23. indexer: indexer,
  24. }
  25. return service, nil
  26. }
  27. // Start begins the background log discovery and indexing process
  28. func (s *BackgroundLogService) Start() {
  29. logger.Info("Starting background log service")
  30. // Initialize analytics service and set indexer
  31. InitAnalyticsService()
  32. SetAnalyticsServiceIndexer(s.indexer)
  33. // Initialize Bleve stats service
  34. InitBleveStatsService()
  35. SetBleveStatsServiceIndexer(s.indexer)
  36. // Load existing log indexes from database
  37. go s.loadExistingIndexes()
  38. // Start periodic log discovery
  39. go s.periodicLogDiscovery()
  40. // Discover initial log files
  41. go s.discoverInitialLogs()
  42. }
  43. // Stop stops the background log service
  44. func (s *BackgroundLogService) Stop() {
  45. logger.Info("Stopping background log service")
  46. if s.cancel != nil {
  47. s.cancel()
  48. }
  49. if s.indexer != nil {
  50. s.indexer.Close()
  51. }
  52. }
  53. // GetIndexer returns the log indexer instance
  54. func (s *BackgroundLogService) GetIndexer() *LogIndexer {
  55. return s.indexer
  56. }
  57. // discoverInitialLogs discovers and indexes initial log files
  58. func (s *BackgroundLogService) discoverInitialLogs() {
  59. logger.Info("Starting initial log discovery")
  60. // Get access log path
  61. accessLogPath := nginx.GetAccessLogPath()
  62. if accessLogPath != "" && IsLogPathUnderWhiteList(accessLogPath) {
  63. logger.Infof("Discovering access logs from: %s", accessLogPath)
  64. s.discoverLogFiles(accessLogPath)
  65. } else {
  66. logger.Warn("Access log path not available or not in whitelist")
  67. }
  68. logger.Info("Initial log discovery completed")
  69. }
  70. // periodicLogDiscovery runs periodic log discovery
  71. func (s *BackgroundLogService) periodicLogDiscovery() {
  72. ticker := time.NewTicker(30 * time.Minute) // Check every 30 minutes
  73. defer ticker.Stop()
  74. for {
  75. select {
  76. case <-s.ctx.Done():
  77. logger.Info("Periodic log discovery stopping")
  78. return
  79. case <-ticker.C:
  80. logger.Debug("Running periodic log discovery")
  81. s.discoverInitialLogs()
  82. }
  83. }
  84. }
  85. // discoverLogFiles discovers log files for a given log path
  86. func (s *BackgroundLogService) discoverLogFiles(logPath string) {
  87. if s.indexer == nil {
  88. logger.Error("Log indexer not available")
  89. return
  90. }
  91. logDir := filepath.Dir(logPath)
  92. baseLogName := filepath.Base(logPath)
  93. logger.Debugf("Discovering log files in %s with base name %s", logDir, baseLogName)
  94. if err := s.indexer.DiscoverLogFiles(logDir, baseLogName); err != nil {
  95. logger.Errorf("Failed to discover log files for %s: %v", logPath, err)
  96. } else {
  97. logger.Debugf("Successfully discovered log files for %s (queued for indexing)", logPath)
  98. // Note: Index ready notification will be sent after actual indexing is complete
  99. }
  100. }
  101. // loadExistingIndexes loads log file paths from the database and sets up monitoring
  102. func (s *BackgroundLogService) loadExistingIndexes() {
  103. logger.Info("Loading existing log indexes from database")
  104. if s.indexer == nil {
  105. logger.Error("Log indexer not available for loading existing indexes")
  106. return
  107. }
  108. persistence := NewPersistenceManager()
  109. indexes, err := persistence.GetAllLogIndexes()
  110. if err != nil {
  111. logger.Errorf("Failed to load existing log indexes: %v", err)
  112. return
  113. }
  114. logger.Infof("Found %d existing log indexes in database", len(indexes))
  115. // If no indexes found in database but we should have log files, discover them
  116. if len(indexes) == 0 {
  117. logger.Warnf("No existing log indexes found in database, running initial log discovery")
  118. s.discoverInitialLogs()
  119. return
  120. }
  121. for _, logIndex := range indexes {
  122. // Check if file still exists
  123. if _, err := os.Stat(logIndex.Path); os.IsNotExist(err) {
  124. logger.Warnf("Log file no longer exists, skipping: %s", logIndex.Path)
  125. continue
  126. }
  127. logger.Infof("Loading existing log index: %s", logIndex.Path)
  128. // Add to indexer (this will set up monitoring and check for updates)
  129. if err := s.indexer.AddLogPath(logIndex.Path); err != nil {
  130. logger.Errorf("Failed to add existing log path %s: %v", logIndex.Path, err)
  131. continue
  132. }
  133. // Check if file needs reindexing (only if file has changed since last index)
  134. if !logIndex.LastIndexed.IsZero() {
  135. fileInfo, err := os.Stat(logIndex.Path)
  136. if err == nil {
  137. // Check if file has been modified or if log group index size has changed since last index
  138. totalSize := s.indexer.calculateRelatedLogFilesSize(logIndex.Path)
  139. needsReindex := fileInfo.ModTime().After(logIndex.LastModified) ||
  140. totalSize != logIndex.LastSize
  141. if needsReindex {
  142. logger.Infof("File %s has changed since last index, queuing incremental update", logIndex.Path)
  143. // Queue for incremental indexing
  144. if err := s.indexer.IndexLogFileWithMode(logIndex.Path, false); err != nil {
  145. logger.Errorf("Failed to queue incremental index for %s: %v", logIndex.Path, err)
  146. }
  147. } else {
  148. logger.Infof("File %s unchanged since last index, skipping", logIndex.Path)
  149. }
  150. }
  151. }
  152. logger.Infof("Successfully loaded log index: %s", logIndex.Path)
  153. }
  154. logger.Infof("Finished loading existing log indexes")
  155. }
  156. // Global background service instance
  157. var backgroundService *BackgroundLogService
  158. // InitBackgroundLogService initializes the global background log service
  159. func InitBackgroundLogService(ctx context.Context) {
  160. var err error
  161. backgroundService, err = NewBackgroundLogService(ctx)
  162. if err != nil {
  163. logger.Fatalf("Failed to initialize background log service: %v", err)
  164. return
  165. }
  166. // Use the provided context instead of creating a new one
  167. backgroundService.ctx, backgroundService.cancel = context.WithCancel(ctx)
  168. backgroundService.Start()
  169. <-ctx.Done()
  170. backgroundService.Stop()
  171. }
  172. // GetBackgroundLogService returns the global background service instance
  173. func GetBackgroundLogService() *BackgroundLogService {
  174. return backgroundService
  175. }