background_service.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package nginx_log
  2. import (
  3. "context"
  4. "os"
  5. "path/filepath"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/nginx"
  8. "github.com/uozi-tech/cosy/logger"
  9. )
  10. // BackgroundLogService manages automatic log discovery and indexing
  11. type BackgroundLogService struct {
  12. indexer *LogIndexer
  13. ctx context.Context
  14. cancel context.CancelFunc
  15. }
  16. // NewBackgroundLogService creates a new background log service
  17. func NewBackgroundLogService() (*BackgroundLogService, error) {
  18. indexer, err := NewLogIndexer()
  19. if err != nil {
  20. return nil, err
  21. }
  22. service := &BackgroundLogService{
  23. indexer: indexer,
  24. }
  25. return service, nil
  26. }
  27. // Start begins the background log discovery and indexing process
  28. func (s *BackgroundLogService) Start() {
  29. logger.Info("Starting background log service")
  30. // Initialize analytics service and set indexer
  31. InitAnalyticsService()
  32. SetAnalyticsServiceIndexer(s.indexer)
  33. // Initialize Bleve stats service
  34. InitBleveStatsService()
  35. SetBleveStatsServiceIndexer(s.indexer)
  36. // Load existing log indexes from database
  37. go s.loadExistingIndexes()
  38. // Start periodic log discovery
  39. go s.periodicLogDiscovery()
  40. // Discover initial log files
  41. go s.discoverInitialLogs()
  42. }
  43. // Stop stops the background log service
  44. func (s *BackgroundLogService) Stop() {
  45. logger.Info("Stopping background log service")
  46. if s.cancel != nil {
  47. s.cancel()
  48. }
  49. if s.indexer != nil {
  50. s.indexer.Close()
  51. }
  52. }
  53. // GetIndexer returns the log indexer instance
  54. func (s *BackgroundLogService) GetIndexer() *LogIndexer {
  55. return s.indexer
  56. }
  57. // discoverInitialLogs discovers and indexes initial log files
  58. func (s *BackgroundLogService) discoverInitialLogs() {
  59. logger.Info("Starting initial log discovery")
  60. // Get access log path
  61. accessLogPath := nginx.GetAccessLogPath()
  62. if accessLogPath != "" && IsLogPathUnderWhiteList(accessLogPath) {
  63. logger.Infof("Discovering access logs from: %s", accessLogPath)
  64. s.discoverLogFiles(accessLogPath)
  65. } else {
  66. logger.Warn("Access log path not available or not in whitelist")
  67. }
  68. // Skip error logs - they have different format and are not indexed for structured search
  69. logger.Info("Initial log discovery completed")
  70. }
  71. // periodicLogDiscovery runs periodic log discovery
  72. func (s *BackgroundLogService) periodicLogDiscovery() {
  73. ticker := time.NewTicker(30 * time.Minute) // Check every 30 minutes
  74. defer ticker.Stop()
  75. for {
  76. select {
  77. case <-s.ctx.Done():
  78. logger.Info("Periodic log discovery stopping")
  79. return
  80. case <-ticker.C:
  81. logger.Debug("Running periodic log discovery")
  82. s.discoverInitialLogs()
  83. }
  84. }
  85. }
  86. // discoverLogFiles discovers log files for a given log path
  87. func (s *BackgroundLogService) discoverLogFiles(logPath string) {
  88. if s.indexer == nil {
  89. logger.Error("Log indexer not available")
  90. return
  91. }
  92. logDir := filepath.Dir(logPath)
  93. baseLogName := filepath.Base(logPath)
  94. logger.Debugf("Discovering log files in %s with base name %s", logDir, baseLogName)
  95. if err := s.indexer.DiscoverLogFiles(logDir, baseLogName); err != nil {
  96. logger.Errorf("Failed to discover log files for %s: %v", logPath, err)
  97. } else {
  98. logger.Debugf("Successfully discovered log files for %s (queued for indexing)", logPath)
  99. // Note: Index ready notification will be sent after actual indexing is complete
  100. }
  101. }
  102. // loadExistingIndexes loads log file paths from the database and sets up monitoring
  103. func (s *BackgroundLogService) loadExistingIndexes() {
  104. logger.Info("Loading existing log indexes from database")
  105. if s.indexer == nil {
  106. logger.Error("Log indexer not available for loading existing indexes")
  107. return
  108. }
  109. persistence := NewPersistenceManager()
  110. indexes, err := persistence.GetAllLogIndexes()
  111. if err != nil {
  112. logger.Errorf("Failed to load existing log indexes: %v", err)
  113. return
  114. }
  115. logger.Infof("Found %d existing log indexes in database", len(indexes))
  116. // If no indexes found in database but we should have log files, discover them
  117. if len(indexes) == 0 {
  118. logger.Warnf("No existing log indexes found in database, running initial log discovery")
  119. s.discoverInitialLogs()
  120. return
  121. }
  122. for _, logIndex := range indexes {
  123. // Check if file still exists
  124. if _, err := os.Stat(logIndex.Path); os.IsNotExist(err) {
  125. logger.Warnf("Log file no longer exists, skipping: %s", logIndex.Path)
  126. continue
  127. }
  128. logger.Infof("Loading existing log index: %s", logIndex.Path)
  129. // Add to indexer (this will set up monitoring and check for updates)
  130. if err := s.indexer.AddLogPath(logIndex.Path); err != nil {
  131. logger.Errorf("Failed to add existing log path %s: %v", logIndex.Path, err)
  132. continue
  133. }
  134. // Check if file needs reindexing (only if file has changed since last index)
  135. if !logIndex.LastIndexed.IsZero() {
  136. fileInfo, err := os.Stat(logIndex.Path)
  137. if err == nil {
  138. // Check if file has been modified or if log group index size has changed since last index
  139. totalSize := s.indexer.calculateRelatedLogFilesSize(logIndex.Path)
  140. needsReindex := fileInfo.ModTime().After(logIndex.LastModified) ||
  141. totalSize != logIndex.LastSize
  142. if needsReindex {
  143. logger.Infof("File %s has changed since last index, queuing incremental update", logIndex.Path)
  144. // Queue for incremental indexing
  145. if err := s.indexer.IndexLogFileWithMode(logIndex.Path, false); err != nil {
  146. logger.Errorf("Failed to queue incremental index for %s: %v", logIndex.Path, err)
  147. }
  148. } else {
  149. logger.Infof("File %s unchanged since last index, skipping", logIndex.Path)
  150. }
  151. }
  152. }
  153. logger.Infof("Successfully loaded log index: %s", logIndex.Path)
  154. }
  155. logger.Infof("Finished loading existing log indexes")
  156. }
  157. // Global background service instance
  158. var backgroundService *BackgroundLogService
  159. // InitBackgroundLogService initializes the global background log service
  160. func InitBackgroundLogService(ctx context.Context) error {
  161. var err error
  162. backgroundService, err = NewBackgroundLogService()
  163. if err != nil {
  164. return err
  165. }
  166. // Use the provided context instead of creating a new one
  167. backgroundService.ctx, backgroundService.cancel = context.WithCancel(ctx)
  168. backgroundService.Start()
  169. return nil
  170. }
  171. // GetBackgroundLogService returns the global background service instance
  172. func GetBackgroundLogService() *BackgroundLogService {
  173. return backgroundService
  174. }
  175. // StopBackgroundLogService stops the global background log service
  176. func StopBackgroundLogService() {
  177. if backgroundService != nil {
  178. backgroundService.Stop()
  179. backgroundService = nil
  180. }
  181. }