index.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. package cache
  2. import (
  3. "context"
  4. "os"
  5. "path/filepath"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx"
  11. "github.com/fsnotify/fsnotify"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // ScanCallback is a function that gets called during config scanning
  15. // It receives the config file path and contents
  16. type ScanCallback func(configPath string, content []byte) error
  17. // Scanner is responsible for scanning and watching nginx config files
  18. type Scanner struct {
  19. ctx context.Context // Context for the scanner
  20. watcher *fsnotify.Watcher // File system watcher
  21. scanTicker *time.Ticker // Ticker for periodic scanning
  22. initialized bool // Whether the scanner has been initialized
  23. scanning bool // Whether a scan is currently in progress
  24. scanMutex sync.RWMutex // Mutex for protecting the scanning state
  25. statusChan chan bool // Channel to broadcast scanning status changes
  26. subscribers map[chan bool]struct{} // Set of subscribers
  27. subscriberMux sync.RWMutex // Mutex for protecting the subscribers map
  28. }
  29. // Global variables
  30. var (
  31. // scanner is the singleton instance of Scanner
  32. scanner *Scanner
  33. configScannerInitMux sync.Mutex
  34. // This regex matches: include directives in nginx config files
  35. includeRegex = regexp.MustCompile(`include\s+([^;]+);`)
  36. // Global callbacks that will be executed during config file scanning
  37. scanCallbacks = make([]ScanCallback, 0)
  38. scanCallbacksMutex sync.RWMutex
  39. )
  40. // InitScanner initializes the config scanner
  41. func InitScanner(ctx context.Context) {
  42. if nginx.GetConfPath() == "" {
  43. logger.Error("Nginx config path is not set")
  44. return
  45. }
  46. s := GetScanner()
  47. err := s.Initialize(ctx)
  48. if err != nil {
  49. logger.Error("Failed to initialize config scanner:", err)
  50. }
  51. }
  52. // GetScanner returns the singleton instance of Scanner
  53. func GetScanner() *Scanner {
  54. configScannerInitMux.Lock()
  55. defer configScannerInitMux.Unlock()
  56. if scanner == nil {
  57. scanner = &Scanner{
  58. statusChan: make(chan bool, 10), // Buffer to prevent blocking
  59. subscribers: make(map[chan bool]struct{}),
  60. }
  61. // Start broadcaster goroutine
  62. go scanner.broadcastStatus()
  63. }
  64. return scanner
  65. }
  66. // RegisterCallback adds a callback function to be executed during scans
  67. // This function can be called before Scanner is initialized
  68. func RegisterCallback(callback ScanCallback) {
  69. scanCallbacksMutex.Lock()
  70. defer scanCallbacksMutex.Unlock()
  71. scanCallbacks = append(scanCallbacks, callback)
  72. }
  73. // broadcastStatus listens for status changes and broadcasts to all subscribers
  74. func (s *Scanner) broadcastStatus() {
  75. for status := range s.statusChan {
  76. s.subscriberMux.RLock()
  77. for ch := range s.subscribers {
  78. // Non-blocking send to prevent slow subscribers from blocking others
  79. select {
  80. case ch <- status:
  81. default:
  82. // Skip if channel buffer is full
  83. }
  84. }
  85. s.subscriberMux.RUnlock()
  86. }
  87. }
  88. // SubscribeScanningStatus allows a client to subscribe to scanning status changes
  89. func SubscribeScanningStatus() chan bool {
  90. s := GetScanner()
  91. ch := make(chan bool, 5) // Buffer to prevent blocking
  92. // Add to subscribers
  93. s.subscriberMux.Lock()
  94. s.subscribers[ch] = struct{}{}
  95. s.subscriberMux.Unlock()
  96. // Send current status immediately
  97. s.scanMutex.RLock()
  98. currentStatus := s.scanning
  99. s.scanMutex.RUnlock()
  100. // Non-blocking send
  101. select {
  102. case ch <- currentStatus:
  103. default:
  104. }
  105. return ch
  106. }
  107. // UnsubscribeScanningStatus removes a subscriber from receiving status updates
  108. func UnsubscribeScanningStatus(ch chan bool) {
  109. s := GetScanner()
  110. s.subscriberMux.Lock()
  111. delete(s.subscribers, ch)
  112. s.subscriberMux.Unlock()
  113. // Close the channel so the client knows it's unsubscribed
  114. close(ch)
  115. }
  116. // Initialize sets up the scanner and starts watching for file changes
  117. func (s *Scanner) Initialize(ctx context.Context) error {
  118. if s.initialized {
  119. return nil
  120. }
  121. // Create a new watcher
  122. watcher, err := fsnotify.NewWatcher()
  123. if err != nil {
  124. return err
  125. }
  126. s.watcher = watcher
  127. s.ctx = ctx
  128. // Scan for the first time
  129. err = s.ScanAllConfigs()
  130. if err != nil {
  131. return err
  132. }
  133. // Setup watcher for config directory
  134. configDir := filepath.Dir(nginx.GetConfPath())
  135. availableDir := nginx.GetConfPath("sites-available")
  136. enabledDir := nginx.GetConfPath("sites-enabled")
  137. streamAvailableDir := nginx.GetConfPath("streams-available")
  138. streamEnabledDir := nginx.GetConfPath("streams-enabled")
  139. // Watch the main directories
  140. err = s.watcher.Add(configDir)
  141. if err != nil {
  142. logger.Error("Failed to watch config directory:", err)
  143. }
  144. // Watch sites-available and sites-enabled if they exist
  145. if _, err := os.Stat(availableDir); err == nil {
  146. err = s.watcher.Add(availableDir)
  147. if err != nil {
  148. logger.Error("Failed to watch sites-available directory:", err)
  149. }
  150. }
  151. if _, err := os.Stat(enabledDir); err == nil {
  152. err = s.watcher.Add(enabledDir)
  153. if err != nil {
  154. logger.Error("Failed to watch sites-enabled directory:", err)
  155. }
  156. }
  157. // Watch streams-available and streams-enabled if they exist
  158. if _, err := os.Stat(streamAvailableDir); err == nil {
  159. err = s.watcher.Add(streamAvailableDir)
  160. if err != nil {
  161. logger.Error("Failed to watch streams-available directory:", err)
  162. }
  163. }
  164. if _, err := os.Stat(streamEnabledDir); err == nil {
  165. err = s.watcher.Add(streamEnabledDir)
  166. if err != nil {
  167. logger.Error("Failed to watch streams-enabled directory:", err)
  168. }
  169. }
  170. // Start the watcher goroutine
  171. go s.watchForChanges()
  172. // Setup a ticker for periodic scanning (every 5 minutes)
  173. s.scanTicker = time.NewTicker(5 * time.Minute)
  174. go func() {
  175. for {
  176. select {
  177. case <-s.ctx.Done():
  178. return
  179. case <-s.scanTicker.C:
  180. err := s.ScanAllConfigs()
  181. if err != nil {
  182. logger.Error("Periodic config scan failed:", err)
  183. }
  184. }
  185. }
  186. }()
  187. // Start a goroutine to listen for context cancellation
  188. go func() {
  189. <-s.ctx.Done()
  190. logger.Debug("Context cancelled, shutting down scanner")
  191. s.Shutdown()
  192. }()
  193. s.initialized = true
  194. return nil
  195. }
  196. // watchForChanges handles the fsnotify events and triggers rescans when necessary
  197. func (s *Scanner) watchForChanges() {
  198. for {
  199. select {
  200. case <-s.ctx.Done():
  201. return
  202. case event, ok := <-s.watcher.Events:
  203. if !ok {
  204. return
  205. }
  206. // Skip irrelevant events
  207. if !event.Has(fsnotify.Create) && !event.Has(fsnotify.Write) &&
  208. !event.Has(fsnotify.Rename) && !event.Has(fsnotify.Remove) {
  209. continue
  210. }
  211. // Add newly created directories to the watch list
  212. if event.Has(fsnotify.Create) {
  213. if fi, err := os.Stat(event.Name); err == nil && fi.IsDir() {
  214. _ = s.watcher.Add(event.Name)
  215. }
  216. }
  217. // For remove events, perform a full scan
  218. if event.Has(fsnotify.Remove) {
  219. logger.Debug("Config item removed:", event.Name)
  220. if err := s.ScanAllConfigs(); err != nil {
  221. logger.Error("Failed to rescan configs after removal:", err)
  222. }
  223. continue
  224. }
  225. // Handle non-remove events
  226. fi, err := os.Stat(event.Name)
  227. if err != nil {
  228. logger.Error("Failed to stat changed path:", err)
  229. continue
  230. }
  231. if fi.IsDir() {
  232. // Directory change, perform full scan
  233. logger.Debug("Config directory changed:", event.Name)
  234. if err := s.ScanAllConfigs(); err != nil {
  235. logger.Error("Failed to rescan configs after directory change:", err)
  236. }
  237. } else {
  238. // File change, scan only the single file
  239. logger.Debug("Config file changed:", event.Name)
  240. // Give the system a moment to finish writing the file
  241. time.Sleep(100 * time.Millisecond)
  242. if err := s.scanSingleFile(event.Name); err != nil {
  243. logger.Error("Failed to scan changed file:", err)
  244. }
  245. }
  246. case err, ok := <-s.watcher.Errors:
  247. if !ok {
  248. return
  249. }
  250. logger.Error("Watcher error:", err)
  251. }
  252. }
  253. }
  254. // scanSingleFile scans a single file and executes all registered callbacks
  255. func (s *Scanner) scanSingleFile(filePath string) error {
  256. return s.scanSingleFileWithDepth(filePath, make(map[string]bool), 0)
  257. }
  258. // scanSingleFileWithDepth scans a single file with recursion protection
  259. func (s *Scanner) scanSingleFileWithDepth(filePath string, visited map[string]bool, depth int) error {
  260. // Maximum recursion depth to prevent infinite recursion
  261. const maxDepth = 10
  262. if depth > maxDepth {
  263. logger.Warn("Maximum recursion depth reached for file:", filePath)
  264. return nil
  265. }
  266. // Resolve the absolute path to handle symlinks properly
  267. absPath, err := filepath.Abs(filePath)
  268. if err != nil {
  269. logger.Error("Failed to resolve absolute path for:", filePath, err)
  270. return err
  271. }
  272. // Check for circular includes
  273. if visited[absPath] {
  274. // Circular include detected, skip this file
  275. return nil
  276. }
  277. // Mark this file as visited
  278. visited[absPath] = true
  279. // Set scanning state to true only for the root call (depth 0)
  280. var wasScanning bool
  281. if depth == 0 {
  282. s.scanMutex.Lock()
  283. wasScanning = s.scanning
  284. s.scanning = true
  285. if !wasScanning {
  286. // Only broadcast if status changed from not scanning to scanning
  287. s.statusChan <- true
  288. }
  289. s.scanMutex.Unlock()
  290. // Ensure we reset scanning state when done (only for root call)
  291. defer func() {
  292. s.scanMutex.Lock()
  293. s.scanning = false
  294. // Broadcast the completion
  295. s.statusChan <- false
  296. s.scanMutex.Unlock()
  297. }()
  298. }
  299. // Open the file
  300. file, err := os.Open(absPath)
  301. if err != nil {
  302. return err
  303. }
  304. defer file.Close()
  305. // Read the entire file content
  306. content, err := os.ReadFile(absPath)
  307. if err != nil {
  308. return err
  309. }
  310. // Execute all registered callbacks
  311. scanCallbacksMutex.RLock()
  312. for _, callback := range scanCallbacks {
  313. err := callback(absPath, content)
  314. if err != nil {
  315. logger.Error("Callback error for file", absPath, ":", err)
  316. }
  317. }
  318. scanCallbacksMutex.RUnlock()
  319. // Look for include directives to process included files
  320. includeMatches := includeRegex.FindAllSubmatch(content, -1)
  321. for _, match := range includeMatches {
  322. if len(match) >= 2 {
  323. includePath := string(match[1])
  324. // Handle glob patterns in include directives
  325. if strings.Contains(includePath, "*") {
  326. // If it's a relative path, make it absolute based on nginx config dir
  327. if !filepath.IsAbs(includePath) {
  328. configDir := filepath.Dir(nginx.GetConfPath())
  329. includePath = filepath.Join(configDir, includePath)
  330. }
  331. // Expand the glob pattern
  332. matchedFiles, err := filepath.Glob(includePath)
  333. if err != nil {
  334. logger.Error("Error expanding glob pattern:", includePath, err)
  335. continue
  336. }
  337. // Process each matched file
  338. for _, matchedFile := range matchedFiles {
  339. fileInfo, err := os.Stat(matchedFile)
  340. if err == nil && !fileInfo.IsDir() {
  341. err = s.scanSingleFileWithDepth(matchedFile, visited, depth+1)
  342. if err != nil {
  343. logger.Error("Failed to scan included file:", matchedFile, err)
  344. }
  345. }
  346. }
  347. } else {
  348. // Handle single file include
  349. // If it's a relative path, make it absolute based on nginx config dir
  350. if !filepath.IsAbs(includePath) {
  351. configDir := filepath.Dir(nginx.GetConfPath())
  352. includePath = filepath.Join(configDir, includePath)
  353. }
  354. fileInfo, err := os.Stat(includePath)
  355. if err == nil && !fileInfo.IsDir() {
  356. err = s.scanSingleFileWithDepth(includePath, visited, depth+1)
  357. if err != nil {
  358. logger.Error("Failed to scan included file:", includePath, err)
  359. }
  360. }
  361. }
  362. }
  363. }
  364. return nil
  365. }
  366. // ScanAllConfigs scans all nginx config files and executes all registered callbacks
  367. func (s *Scanner) ScanAllConfigs() error {
  368. // Set scanning state to true
  369. s.scanMutex.Lock()
  370. wasScanning := s.scanning
  371. s.scanning = true
  372. if !wasScanning {
  373. // Only broadcast if status changed from not scanning to scanning
  374. s.statusChan <- true
  375. }
  376. s.scanMutex.Unlock()
  377. // Ensure we reset scanning state when done
  378. defer func() {
  379. s.scanMutex.Lock()
  380. s.scanning = false
  381. // Broadcast the completion
  382. s.statusChan <- false
  383. s.scanMutex.Unlock()
  384. }()
  385. // Get the main config file
  386. mainConfigPath := nginx.GetConfEntryPath()
  387. err := s.scanSingleFile(mainConfigPath)
  388. if err != nil {
  389. logger.Error("Failed to scan main config:", err)
  390. }
  391. // Scan sites-available directory
  392. sitesAvailablePath := nginx.GetConfPath("sites-available", "")
  393. sitesAvailableFiles, err := os.ReadDir(sitesAvailablePath)
  394. if err == nil {
  395. for _, file := range sitesAvailableFiles {
  396. if !file.IsDir() {
  397. configPath := filepath.Join(sitesAvailablePath, file.Name())
  398. err := s.scanSingleFile(configPath)
  399. if err != nil {
  400. logger.Error("Failed to scan config:", configPath, err)
  401. }
  402. }
  403. }
  404. }
  405. // Scan streams-available directory if it exists
  406. streamAvailablePath := nginx.GetConfPath("streams-available", "")
  407. streamAvailableFiles, err := os.ReadDir(streamAvailablePath)
  408. if err == nil {
  409. for _, file := range streamAvailableFiles {
  410. if !file.IsDir() {
  411. configPath := filepath.Join(streamAvailablePath, file.Name())
  412. err := s.scanSingleFile(configPath)
  413. if err != nil {
  414. logger.Error("Failed to scan stream config:", configPath, err)
  415. }
  416. }
  417. }
  418. }
  419. return nil
  420. }
  421. // Shutdown cleans up resources used by the scanner
  422. func (s *Scanner) Shutdown() {
  423. if s.watcher != nil {
  424. s.watcher.Close()
  425. }
  426. if s.scanTicker != nil {
  427. s.scanTicker.Stop()
  428. }
  429. // Clean up subscriber resources
  430. s.subscriberMux.Lock()
  431. // Close all subscriber channels
  432. for ch := range s.subscribers {
  433. close(ch)
  434. }
  435. // Clear the map
  436. s.subscribers = make(map[chan bool]struct{})
  437. s.subscriberMux.Unlock()
  438. // Close the status channel
  439. close(s.statusChan)
  440. }
  441. // IsScanningInProgress returns whether a scan is currently in progress
  442. func IsScanningInProgress() bool {
  443. s := GetScanner()
  444. s.scanMutex.RLock()
  445. defer s.scanMutex.RUnlock()
  446. return s.scanning
  447. }
  448. // WithContext sets a context for the scanner that will be used to control its lifecycle
  449. func (s *Scanner) WithContext(ctx context.Context) *Scanner {
  450. // Create a context with cancel if not already done in Initialize
  451. if s.ctx == nil {
  452. s.ctx = ctx
  453. }
  454. return s
  455. }