modern_services.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. package nginx_log
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx_log/analytics"
  11. "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
  12. "github.com/0xJacky/Nginx-UI/internal/nginx_log/searcher"
  13. "github.com/0xJacky/Nginx-UI/settings"
  14. "github.com/blevesearch/bleve/v2"
  15. "github.com/uozi-tech/cosy/logger"
  16. cSettings "github.com/uozi-tech/cosy/settings"
  17. )
  18. // Global instances for new services
  19. var (
  20. globalSearcher searcher.Searcher
  21. globalAnalytics analytics.Service
  22. globalIndexer *indexer.ParallelIndexer
  23. globalLogFileManager *indexer.LogFileManager
  24. servicesInitialized bool
  25. servicesMutex sync.RWMutex
  26. shutdownCancel context.CancelFunc
  27. isShuttingDown bool
  28. lastShardUpdateAttempt int64
  29. )
  30. // InitializeModernServices initializes the new modular services
  31. func InitializeModernServices(ctx context.Context) {
  32. servicesMutex.Lock()
  33. defer servicesMutex.Unlock()
  34. // Check if advanced indexing is enabled
  35. if !settings.NginxLogSettings.AdvancedIndexingEnabled {
  36. logger.Info("Advanced indexing is disabled, skipping nginx_log services initialization")
  37. return
  38. }
  39. if servicesInitialized {
  40. logger.Info("Modern nginx log services already initialized, skipping")
  41. return
  42. }
  43. logger.Info("Initializing modern nginx log services...")
  44. // Create a cancellable context for services
  45. serviceCtx, cancel := context.WithCancel(ctx)
  46. shutdownCancel = cancel
  47. // Initialize with default configuration directly
  48. if err := initializeWithDefaults(serviceCtx); err != nil {
  49. logger.Errorf("Failed to initialize modern services: %v", err)
  50. return
  51. }
  52. logger.Info("Modern nginx log services initialization completed")
  53. // Monitor context for shutdown
  54. go func() {
  55. logger.Info("Started nginx_log shutdown monitor goroutine")
  56. <-serviceCtx.Done()
  57. logger.Info("Context cancelled, initiating shutdown...")
  58. // Use the same shutdown logic as manual stop
  59. StopModernServices()
  60. logger.Info("Nginx_log shutdown monitor goroutine completed")
  61. }()
  62. }
  63. // initializeWithDefaults creates services with default configuration
  64. func initializeWithDefaults(ctx context.Context) error {
  65. logger.Info("Initializing services with default configuration")
  66. // Create empty searcher (will be populated when indexes are available)
  67. searcherConfig := searcher.DefaultSearcherConfig()
  68. globalSearcher = searcher.NewDistributedSearcher(searcherConfig, []bleve.Index{})
  69. // Initialize analytics with empty searcher
  70. globalAnalytics = analytics.NewService(globalSearcher)
  71. // Initialize parallel indexer with shard manager
  72. indexerConfig := indexer.DefaultIndexerConfig()
  73. // Use config directory for index path
  74. indexerConfig.IndexPath = getConfigDirIndexPath()
  75. shardManager := indexer.NewGroupedShardManager(indexerConfig)
  76. globalIndexer = indexer.NewParallelIndexer(indexerConfig, shardManager)
  77. // Start the indexer
  78. if err := globalIndexer.Start(ctx); err != nil {
  79. logger.Errorf("Failed to start parallel indexer: %v", err)
  80. return fmt.Errorf("failed to start parallel indexer: %w", err)
  81. }
  82. // Initialize log file manager
  83. globalLogFileManager = indexer.NewLogFileManager()
  84. // Inject indexer for precise doc counting before persisting
  85. globalLogFileManager.SetIndexer(globalIndexer)
  86. servicesInitialized = true
  87. // After all services are initialized, update the searcher with any existing shards.
  88. // This is crucial for loading the index state on application startup.
  89. // We call the 'locked' version because we already hold the mutex here.
  90. updateSearcherShardsLocked()
  91. return nil
  92. }
  93. // getConfigDirIndexPath returns the index path relative to the config file directory
  94. func getConfigDirIndexPath() string {
  95. // Get the config file path from cosy settings
  96. if cSettings.ConfPath != "" {
  97. configDir := filepath.Dir(cSettings.ConfPath)
  98. indexPath := filepath.Join(configDir, "log-index")
  99. // Ensure the directory exists
  100. if err := os.MkdirAll(indexPath, 0755); err != nil {
  101. logger.Warnf("Failed to create index directory at %s: %v, using default", indexPath, err)
  102. return "./log-index"
  103. }
  104. return indexPath
  105. }
  106. // Fallback to default relative path
  107. logger.Warn("Config file path not available, using default index path")
  108. return "./log-index"
  109. }
  110. // GetModernSearcher returns the global searcher instance
  111. func GetModernSearcher() searcher.Searcher {
  112. servicesMutex.RLock()
  113. defer servicesMutex.RUnlock()
  114. if !servicesInitialized {
  115. logger.Warn("Modern services not initialized, returning nil")
  116. return nil
  117. }
  118. if globalSearcher == nil {
  119. logger.Warn("GetModernSearcher: globalSearcher is nil even though services are initialized")
  120. return nil
  121. }
  122. // Check searcher health status
  123. isHealthy := globalSearcher.IsHealthy()
  124. isRunning := globalSearcher.IsRunning()
  125. logger.Debugf("GetModernSearcher: returning searcher, isHealthy: %v, isRunning: %v", isHealthy, isRunning)
  126. // Auto-heal: if the searcher is running but unhealthy (likely zero shards),
  127. // and the indexer is initialized, trigger an async shard swap (throttled).
  128. if !isHealthy && isRunning && globalIndexer != nil {
  129. now := time.Now().UnixNano()
  130. prev := atomic.LoadInt64(&lastShardUpdateAttempt)
  131. if now-prev > int64(5*time.Second) {
  132. if atomic.CompareAndSwapInt64(&lastShardUpdateAttempt, prev, now) {
  133. logger.Debugf("GetModernSearcher: unhealthy detected, scheduling UpdateSearcherShards()")
  134. go UpdateSearcherShards()
  135. }
  136. }
  137. }
  138. return globalSearcher
  139. }
  140. // GetModernAnalytics returns the global analytics service instance
  141. func GetModernAnalytics() analytics.Service {
  142. servicesMutex.RLock()
  143. defer servicesMutex.RUnlock()
  144. if !servicesInitialized {
  145. logger.Warn("Modern services not initialized, returning nil")
  146. return nil
  147. }
  148. return globalAnalytics
  149. }
  150. // GetModernIndexer returns the global indexer instance
  151. func GetModernIndexer() *indexer.ParallelIndexer {
  152. servicesMutex.RLock()
  153. defer servicesMutex.RUnlock()
  154. if !servicesInitialized {
  155. logger.Warn("Modern services not initialized, returning nil")
  156. return nil
  157. }
  158. return globalIndexer
  159. }
  160. // GetLogFileManager returns the global log file manager instance
  161. func GetLogFileManager() *indexer.LogFileManager {
  162. servicesMutex.RLock()
  163. defer servicesMutex.RUnlock()
  164. if !servicesInitialized {
  165. // Only warn during actual operations, not during initialization
  166. return nil
  167. }
  168. if globalLogFileManager == nil {
  169. logger.Warnf("[nginx_log] GetLogFileManager: globalLogFileManager is nil even though servicesInitialized=true")
  170. return nil
  171. }
  172. return globalLogFileManager
  173. }
  174. // NginxLogCache Type aliases for backward compatibility
  175. type NginxLogCache = indexer.NginxLogCache
  176. type NginxLogWithIndex = indexer.NginxLogWithIndex
  177. // Constants for backward compatibility
  178. const (
  179. IndexStatusIndexed = string(indexer.IndexStatusIndexed)
  180. IndexStatusIndexing = string(indexer.IndexStatusIndexing)
  181. IndexStatusNotIndexed = string(indexer.IndexStatusNotIndexed)
  182. )
  183. // Legacy compatibility functions for log cache system
  184. // AddLogPath adds a log path to the log cache with the source config file
  185. func AddLogPath(path, logType, name, configFile string) {
  186. manager := GetLogFileManager()
  187. if manager != nil {
  188. manager.AddLogPath(path, logType, name, configFile)
  189. } else {
  190. // Only warn if during initialization (when it might be expected)
  191. // Skip warning during shutdown or restart phases
  192. }
  193. }
  194. // RemoveLogPathsFromConfig removes all log paths associated with a specific config file
  195. func RemoveLogPathsFromConfig(configFile string) {
  196. manager := GetLogFileManager()
  197. if manager != nil {
  198. manager.RemoveLogPathsFromConfig(configFile)
  199. } else {
  200. // Silently skip if manager not available - this is normal during shutdown/restart
  201. }
  202. }
  203. // GetAllLogPaths returns all cached log paths, optionally filtered
  204. func GetAllLogPaths(filters ...func(*NginxLogCache) bool) []*NginxLogCache {
  205. if manager := GetLogFileManager(); manager != nil {
  206. return manager.GetAllLogPaths(filters...)
  207. }
  208. return []*NginxLogCache{}
  209. }
  210. // GetAllLogsWithIndex returns all cached log paths with their index status
  211. func GetAllLogsWithIndex(filters ...func(*NginxLogWithIndex) bool) []*NginxLogWithIndex {
  212. if manager := GetLogFileManager(); manager != nil {
  213. return manager.GetAllLogsWithIndex(filters...)
  214. }
  215. return []*NginxLogWithIndex{}
  216. }
  217. // GetAllLogsWithIndexGrouped returns logs grouped by their base name
  218. func GetAllLogsWithIndexGrouped(filters ...func(*NginxLogWithIndex) bool) []*NginxLogWithIndex {
  219. if manager := GetLogFileManager(); manager != nil {
  220. return manager.GetAllLogsWithIndexGrouped(filters...)
  221. }
  222. return []*NginxLogWithIndex{}
  223. }
  224. // SetIndexingStatus sets the indexing status for a specific file path
  225. func SetIndexingStatus(path string, isIndexing bool) {
  226. if manager := GetLogFileManager(); manager != nil {
  227. manager.SetIndexingStatus(path, isIndexing)
  228. }
  229. }
  230. // GetIndexingFiles returns a list of files currently being indexed
  231. func GetIndexingFiles() []string {
  232. if manager := GetLogFileManager(); manager != nil {
  233. return manager.GetIndexingFiles()
  234. }
  235. return []string{}
  236. }
  237. // UpdateSearcherShards fetches all shards from the indexer and performs zero-downtime shard updates.
  238. // Uses Bleve IndexAlias.Swap() for atomic shard replacement without recreating the searcher.
  239. // This function is safe for concurrent use and maintains service availability during index rebuilds.
  240. func UpdateSearcherShards() {
  241. // Schedule async update to avoid blocking indexing operations
  242. logger.Debugf("UpdateSearcherShards: Scheduling async shard update")
  243. go updateSearcherShardsAsync()
  244. }
  245. // updateSearcherShardsAsync performs the actual shard update asynchronously
  246. func updateSearcherShardsAsync() {
  247. // Small delay to let indexing operations complete
  248. time.Sleep(500 * time.Millisecond)
  249. logger.Debugf("updateSearcherShardsAsync: Attempting to acquire write lock...")
  250. servicesMutex.Lock()
  251. logger.Debugf("updateSearcherShardsAsync: Write lock acquired")
  252. defer func() {
  253. logger.Debugf("updateSearcherShardsAsync: Releasing write lock...")
  254. servicesMutex.Unlock()
  255. }()
  256. updateSearcherShardsLocked()
  257. }
  258. // updateSearcherShardsLocked performs the actual update logic assumes the caller holds the lock.
  259. // Uses Bleve IndexAlias.Swap() for zero-downtime shard updates following official best practices.
  260. func updateSearcherShardsLocked() {
  261. if !servicesInitialized || globalIndexer == nil {
  262. logger.Warn("Cannot update searcher shards, services not fully initialized.")
  263. return
  264. }
  265. // Check if indexer is healthy before getting shards
  266. if !globalIndexer.IsHealthy() {
  267. logger.Warn("Cannot update searcher shards, indexer is not healthy")
  268. return
  269. }
  270. newShards := globalIndexer.GetAllShards()
  271. logger.Infof("Retrieved %d new shards from indexer for hot-swap update", len(newShards))
  272. // If no searcher exists yet, create the initial one (first time setup)
  273. if globalSearcher == nil {
  274. logger.Info("Creating initial searcher with IndexAlias")
  275. searcherConfig := searcher.DefaultSearcherConfig()
  276. globalSearcher = searcher.NewDistributedSearcher(searcherConfig, newShards)
  277. if globalSearcher == nil {
  278. logger.Error("Failed to create initial searcher instance")
  279. return
  280. }
  281. // Create analytics service with the initial searcher
  282. globalAnalytics = analytics.NewService(globalSearcher)
  283. isHealthy := globalSearcher.IsHealthy()
  284. isRunning := globalSearcher.IsRunning()
  285. logger.Infof("Initial searcher created successfully, isHealthy: %v, isRunning: %v", isHealthy, isRunning)
  286. return
  287. }
  288. // For subsequent updates, use hot-swap through IndexAlias
  289. // This follows Bleve best practices for zero-downtime index updates
  290. if ds, ok := globalSearcher.(*searcher.DistributedSearcher); ok {
  291. oldShards := ds.GetShards()
  292. logger.Debugf("updateSearcherShardsLocked: About to call SwapShards...")
  293. // Perform atomic shard swap using IndexAlias
  294. if err := ds.SwapShards(newShards); err != nil {
  295. logger.Errorf("Failed to swap shards atomically: %v", err)
  296. return
  297. }
  298. logger.Debugf("updateSearcherShardsLocked: SwapShards completed successfully")
  299. logger.Infof("Successfully swapped %d old shards with %d new shards using IndexAlias",
  300. len(oldShards), len(newShards))
  301. // Verify searcher health after swap
  302. isHealthy := globalSearcher.IsHealthy()
  303. isRunning := globalSearcher.IsRunning()
  304. logger.Infof("Post-swap searcher status: isHealthy: %v, isRunning: %v", isHealthy, isRunning)
  305. // Note: We do NOT recreate the analytics service here since the searcher interface remains the same
  306. // The CardinalityCounter will automatically use the new shards through the same IndexAlias
  307. } else {
  308. logger.Warn("globalSearcher is not a DistributedSearcher, cannot perform hot-swap")
  309. }
  310. }
  311. // StopModernServices stops all running modern services
  312. func StopModernServices() {
  313. servicesMutex.Lock()
  314. defer servicesMutex.Unlock()
  315. if !servicesInitialized {
  316. logger.Info("Modern nginx log services not initialized, nothing to stop")
  317. return
  318. }
  319. if isShuttingDown {
  320. logger.Info("Modern nginx log services already shutting down")
  321. return
  322. }
  323. logger.Info("Stopping modern nginx log services...")
  324. isShuttingDown = true
  325. // Cancel the service context to trigger graceful shutdown
  326. if shutdownCancel != nil {
  327. shutdownCancel()
  328. // Wait a bit for graceful shutdown
  329. time.Sleep(500 * time.Millisecond)
  330. }
  331. // Stop all services
  332. if globalIndexer != nil {
  333. if err := globalIndexer.Stop(); err != nil {
  334. logger.Errorf("Failed to stop indexer: %v", err)
  335. }
  336. globalIndexer = nil
  337. }
  338. if globalAnalytics != nil {
  339. if err := globalAnalytics.Stop(); err != nil {
  340. logger.Errorf("Failed to stop analytics service: %v", err)
  341. }
  342. globalAnalytics = nil
  343. }
  344. if globalSearcher != nil {
  345. if err := globalSearcher.Stop(); err != nil {
  346. logger.Errorf("Failed to stop searcher: %v", err)
  347. }
  348. globalSearcher = nil
  349. }
  350. // Reset state
  351. globalLogFileManager = nil
  352. servicesInitialized = false
  353. shutdownCancel = nil
  354. isShuttingDown = false
  355. logger.Info("Modern nginx log services stopped")
  356. }
  357. // DestroyAllIndexes completely removes all indexed data from disk.
  358. func DestroyAllIndexes(ctx context.Context) error {
  359. servicesMutex.RLock()
  360. defer servicesMutex.RUnlock()
  361. if !servicesInitialized || globalIndexer == nil {
  362. logger.Warn("Cannot destroy indexes, services not initialized.")
  363. return fmt.Errorf("services not initialized")
  364. }
  365. return globalIndexer.DestroyAllIndexes(ctx)
  366. }