index.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. package cache
  2. import (
  3. "context"
  4. "os"
  5. "path/filepath"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/0xJacky/Nginx-UI/internal/nginx"
  11. "github.com/fsnotify/fsnotify"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // ScanCallback is a function that gets called during config scanning
  15. // It receives the config file path and contents
  16. type ScanCallback func(configPath string, content []byte) error
  17. // Scanner is responsible for scanning and watching nginx config files
  18. type Scanner struct {
  19. ctx context.Context // Context for the scanner
  20. watcher *fsnotify.Watcher // File system watcher
  21. scanTicker *time.Ticker // Ticker for periodic scanning
  22. initialized bool // Whether the scanner has been initialized
  23. scanning bool // Whether a scan is currently in progress
  24. scanMutex sync.RWMutex // Mutex for protecting the scanning state
  25. statusChan chan bool // Channel to broadcast scanning status changes
  26. subscribers map[chan bool]struct{} // Set of subscribers
  27. subscriberMux sync.RWMutex // Mutex for protecting the subscribers map
  28. }
  29. // Global variables
  30. var (
  31. // scanner is the singleton instance of Scanner
  32. scanner *Scanner
  33. configScannerInitMux sync.Mutex
  34. // This regex matches: include directives in nginx config files
  35. includeRegex = regexp.MustCompile(`include\s+([^;]+);`)
  36. // Global callbacks that will be executed during config file scanning
  37. scanCallbacks = make([]ScanCallback, 0)
  38. scanCallbacksMutex sync.RWMutex
  39. )
  40. // InitScanner initializes the config scanner
  41. func InitScanner(ctx context.Context) {
  42. if nginx.GetConfPath() == "" {
  43. logger.Error("Nginx config path is not set")
  44. return
  45. }
  46. s := GetScanner()
  47. err := s.Initialize(ctx)
  48. if err != nil {
  49. logger.Error("Failed to initialize config scanner:", err)
  50. }
  51. }
  52. // GetScanner returns the singleton instance of Scanner
  53. func GetScanner() *Scanner {
  54. configScannerInitMux.Lock()
  55. defer configScannerInitMux.Unlock()
  56. if scanner == nil {
  57. scanner = &Scanner{
  58. statusChan: make(chan bool, 10), // Buffer to prevent blocking
  59. subscribers: make(map[chan bool]struct{}),
  60. }
  61. // Start broadcaster goroutine
  62. go scanner.broadcastStatus()
  63. }
  64. return scanner
  65. }
  66. // RegisterCallback adds a callback function to be executed during scans
  67. // This function can be called before Scanner is initialized
  68. func RegisterCallback(callback ScanCallback) {
  69. scanCallbacksMutex.Lock()
  70. defer scanCallbacksMutex.Unlock()
  71. scanCallbacks = append(scanCallbacks, callback)
  72. }
  73. // broadcastStatus listens for status changes and broadcasts to all subscribers
  74. func (s *Scanner) broadcastStatus() {
  75. for status := range s.statusChan {
  76. s.subscriberMux.RLock()
  77. for ch := range s.subscribers {
  78. // Non-blocking send to prevent slow subscribers from blocking others
  79. select {
  80. case ch <- status:
  81. default:
  82. // Skip if channel buffer is full
  83. }
  84. }
  85. s.subscriberMux.RUnlock()
  86. }
  87. }
  88. // SubscribeScanningStatus allows a client to subscribe to scanning status changes
  89. func SubscribeScanningStatus() chan bool {
  90. s := GetScanner()
  91. ch := make(chan bool, 5) // Buffer to prevent blocking
  92. // Add to subscribers
  93. s.subscriberMux.Lock()
  94. s.subscribers[ch] = struct{}{}
  95. s.subscriberMux.Unlock()
  96. // Send current status immediately
  97. s.scanMutex.RLock()
  98. currentStatus := s.scanning
  99. s.scanMutex.RUnlock()
  100. // Non-blocking send
  101. select {
  102. case ch <- currentStatus:
  103. default:
  104. }
  105. return ch
  106. }
  107. // UnsubscribeScanningStatus removes a subscriber from receiving status updates
  108. func UnsubscribeScanningStatus(ch chan bool) {
  109. s := GetScanner()
  110. s.subscriberMux.Lock()
  111. delete(s.subscribers, ch)
  112. s.subscriberMux.Unlock()
  113. // Close the channel so the client knows it's unsubscribed
  114. close(ch)
  115. }
  116. // Initialize sets up the scanner and starts watching for file changes
  117. func (s *Scanner) Initialize(ctx context.Context) error {
  118. if s.initialized {
  119. return nil
  120. }
  121. // Create a new watcher
  122. watcher, err := fsnotify.NewWatcher()
  123. if err != nil {
  124. return err
  125. }
  126. s.watcher = watcher
  127. s.ctx = ctx
  128. // Scan for the first time
  129. err = s.ScanAllConfigs()
  130. if err != nil {
  131. return err
  132. }
  133. // Setup watcher for config directory
  134. configDir := filepath.Dir(nginx.GetConfPath())
  135. availableDir := nginx.GetConfPath("sites-available")
  136. enabledDir := nginx.GetConfPath("sites-enabled")
  137. streamAvailableDir := nginx.GetConfPath("stream-available")
  138. streamEnabledDir := nginx.GetConfPath("stream-enabled")
  139. // Watch the main directories
  140. err = s.watcher.Add(configDir)
  141. if err != nil {
  142. logger.Error("Failed to watch config directory:", err)
  143. }
  144. // Watch sites-available and sites-enabled if they exist
  145. if _, err := os.Stat(availableDir); err == nil {
  146. err = s.watcher.Add(availableDir)
  147. if err != nil {
  148. logger.Error("Failed to watch sites-available directory:", err)
  149. }
  150. }
  151. if _, err := os.Stat(enabledDir); err == nil {
  152. err = s.watcher.Add(enabledDir)
  153. if err != nil {
  154. logger.Error("Failed to watch sites-enabled directory:", err)
  155. }
  156. }
  157. // Watch stream-available and stream-enabled if they exist
  158. if _, err := os.Stat(streamAvailableDir); err == nil {
  159. err = s.watcher.Add(streamAvailableDir)
  160. if err != nil {
  161. logger.Error("Failed to watch stream-available directory:", err)
  162. }
  163. }
  164. if _, err := os.Stat(streamEnabledDir); err == nil {
  165. err = s.watcher.Add(streamEnabledDir)
  166. if err != nil {
  167. logger.Error("Failed to watch stream-enabled directory:", err)
  168. }
  169. }
  170. // Start the watcher goroutine
  171. go s.watchForChanges()
  172. // Setup a ticker for periodic scanning (every 5 minutes)
  173. s.scanTicker = time.NewTicker(5 * time.Minute)
  174. go func() {
  175. for {
  176. select {
  177. case <-s.ctx.Done():
  178. return
  179. case <-s.scanTicker.C:
  180. err := s.ScanAllConfigs()
  181. if err != nil {
  182. logger.Error("Periodic config scan failed:", err)
  183. }
  184. }
  185. }
  186. }()
  187. // Start a goroutine to listen for context cancellation
  188. go func() {
  189. <-s.ctx.Done()
  190. logger.Debug("Context cancelled, shutting down scanner")
  191. s.Shutdown()
  192. }()
  193. s.initialized = true
  194. return nil
  195. }
  196. // watchForChanges handles the fsnotify events and triggers rescans when necessary
  197. func (s *Scanner) watchForChanges() {
  198. for {
  199. select {
  200. case <-s.ctx.Done():
  201. return
  202. case event, ok := <-s.watcher.Events:
  203. if !ok {
  204. return
  205. }
  206. // Skip irrelevant events
  207. if !event.Has(fsnotify.Create) && !event.Has(fsnotify.Write) &&
  208. !event.Has(fsnotify.Rename) && !event.Has(fsnotify.Remove) {
  209. continue
  210. }
  211. // Add newly created directories to the watch list
  212. if event.Has(fsnotify.Create) {
  213. if fi, err := os.Stat(event.Name); err == nil && fi.IsDir() {
  214. _ = s.watcher.Add(event.Name)
  215. }
  216. }
  217. // For remove events, perform a full scan
  218. if event.Has(fsnotify.Remove) {
  219. logger.Debug("Config item removed:", event.Name)
  220. if err := s.ScanAllConfigs(); err != nil {
  221. logger.Error("Failed to rescan configs after removal:", err)
  222. }
  223. continue
  224. }
  225. // Handle non-remove events
  226. fi, err := os.Stat(event.Name)
  227. if err != nil {
  228. logger.Error("Failed to stat changed path:", err)
  229. continue
  230. }
  231. if fi.IsDir() {
  232. // Directory change, perform full scan
  233. logger.Debug("Config directory changed:", event.Name)
  234. if err := s.ScanAllConfigs(); err != nil {
  235. logger.Error("Failed to rescan configs after directory change:", err)
  236. }
  237. } else {
  238. // File change, scan only the single file
  239. logger.Debug("Config file changed:", event.Name)
  240. // Give the system a moment to finish writing the file
  241. time.Sleep(100 * time.Millisecond)
  242. if err := s.scanSingleFile(event.Name); err != nil {
  243. logger.Error("Failed to scan changed file:", err)
  244. }
  245. }
  246. case err, ok := <-s.watcher.Errors:
  247. if !ok {
  248. return
  249. }
  250. logger.Error("Watcher error:", err)
  251. }
  252. }
  253. }
  254. // scanSingleFile scans a single file and executes all registered callbacks
  255. func (s *Scanner) scanSingleFile(filePath string) error {
  256. // Set scanning state to true
  257. s.scanMutex.Lock()
  258. wasScanning := s.scanning
  259. s.scanning = true
  260. if !wasScanning {
  261. // Only broadcast if status changed from not scanning to scanning
  262. s.statusChan <- true
  263. }
  264. s.scanMutex.Unlock()
  265. // Ensure we reset scanning state when done
  266. defer func() {
  267. s.scanMutex.Lock()
  268. s.scanning = false
  269. // Broadcast the completion
  270. s.statusChan <- false
  271. s.scanMutex.Unlock()
  272. }()
  273. // Open the file
  274. file, err := os.Open(filePath)
  275. if err != nil {
  276. return err
  277. }
  278. defer file.Close()
  279. // Read the entire file content
  280. content, err := os.ReadFile(filePath)
  281. if err != nil {
  282. return err
  283. }
  284. // Execute all registered callbacks
  285. scanCallbacksMutex.RLock()
  286. for _, callback := range scanCallbacks {
  287. err := callback(filePath, content)
  288. if err != nil {
  289. logger.Error("Callback error for file", filePath, ":", err)
  290. }
  291. }
  292. scanCallbacksMutex.RUnlock()
  293. // Look for include directives to process included files
  294. includeMatches := includeRegex.FindAllSubmatch(content, -1)
  295. for _, match := range includeMatches {
  296. if len(match) >= 2 {
  297. includePath := string(match[1])
  298. // Handle glob patterns in include directives
  299. if strings.Contains(includePath, "*") {
  300. // If it's a relative path, make it absolute based on nginx config dir
  301. if !filepath.IsAbs(includePath) {
  302. configDir := filepath.Dir(nginx.GetConfPath())
  303. includePath = filepath.Join(configDir, includePath)
  304. }
  305. // Expand the glob pattern
  306. matchedFiles, err := filepath.Glob(includePath)
  307. if err != nil {
  308. logger.Error("Error expanding glob pattern:", includePath, err)
  309. continue
  310. }
  311. // Process each matched file
  312. for _, matchedFile := range matchedFiles {
  313. fileInfo, err := os.Stat(matchedFile)
  314. if err == nil && !fileInfo.IsDir() {
  315. err = s.scanSingleFile(matchedFile)
  316. if err != nil {
  317. logger.Error("Failed to scan included file:", matchedFile, err)
  318. }
  319. }
  320. }
  321. } else {
  322. // Handle single file include
  323. // If it's a relative path, make it absolute based on nginx config dir
  324. if !filepath.IsAbs(includePath) {
  325. configDir := filepath.Dir(nginx.GetConfPath())
  326. includePath = filepath.Join(configDir, includePath)
  327. }
  328. fileInfo, err := os.Stat(includePath)
  329. if err == nil && !fileInfo.IsDir() {
  330. err = s.scanSingleFile(includePath)
  331. if err != nil {
  332. logger.Error("Failed to scan included file:", includePath, err)
  333. }
  334. }
  335. }
  336. }
  337. }
  338. return nil
  339. }
  340. // ScanAllConfigs scans all nginx config files and executes all registered callbacks
  341. func (s *Scanner) ScanAllConfigs() error {
  342. // Set scanning state to true
  343. s.scanMutex.Lock()
  344. wasScanning := s.scanning
  345. s.scanning = true
  346. if !wasScanning {
  347. // Only broadcast if status changed from not scanning to scanning
  348. s.statusChan <- true
  349. }
  350. s.scanMutex.Unlock()
  351. // Ensure we reset scanning state when done
  352. defer func() {
  353. s.scanMutex.Lock()
  354. s.scanning = false
  355. // Broadcast the completion
  356. s.statusChan <- false
  357. s.scanMutex.Unlock()
  358. }()
  359. // Get the main config file
  360. mainConfigPath := nginx.GetConfEntryPath()
  361. err := s.scanSingleFile(mainConfigPath)
  362. if err != nil {
  363. logger.Error("Failed to scan main config:", err)
  364. }
  365. // Scan sites-available directory
  366. sitesAvailablePath := nginx.GetConfPath("sites-available", "")
  367. sitesAvailableFiles, err := os.ReadDir(sitesAvailablePath)
  368. if err == nil {
  369. for _, file := range sitesAvailableFiles {
  370. if !file.IsDir() {
  371. configPath := filepath.Join(sitesAvailablePath, file.Name())
  372. err := s.scanSingleFile(configPath)
  373. if err != nil {
  374. logger.Error("Failed to scan config:", configPath, err)
  375. }
  376. }
  377. }
  378. }
  379. // Scan stream-available directory if it exists
  380. streamAvailablePath := nginx.GetConfPath("stream-available", "")
  381. streamAvailableFiles, err := os.ReadDir(streamAvailablePath)
  382. if err == nil {
  383. for _, file := range streamAvailableFiles {
  384. if !file.IsDir() {
  385. configPath := filepath.Join(streamAvailablePath, file.Name())
  386. err := s.scanSingleFile(configPath)
  387. if err != nil {
  388. logger.Error("Failed to scan stream config:", configPath, err)
  389. }
  390. }
  391. }
  392. }
  393. return nil
  394. }
  395. // Shutdown cleans up resources used by the scanner
  396. func (s *Scanner) Shutdown() {
  397. if s.watcher != nil {
  398. s.watcher.Close()
  399. }
  400. if s.scanTicker != nil {
  401. s.scanTicker.Stop()
  402. }
  403. // Clean up subscriber resources
  404. s.subscriberMux.Lock()
  405. // Close all subscriber channels
  406. for ch := range s.subscribers {
  407. close(ch)
  408. }
  409. // Clear the map
  410. s.subscribers = make(map[chan bool]struct{})
  411. s.subscriberMux.Unlock()
  412. // Close the status channel
  413. close(s.statusChan)
  414. }
  415. // IsScanningInProgress returns whether a scan is currently in progress
  416. func IsScanningInProgress() bool {
  417. s := GetScanner()
  418. s.scanMutex.RLock()
  419. defer s.scanMutex.RUnlock()
  420. return s.scanning
  421. }
  422. // WithContext sets a context for the scanner that will be used to control its lifecycle
  423. func (s *Scanner) WithContext(ctx context.Context) *Scanner {
  424. // Create a context with cancel if not already done in Initialize
  425. if s.ctx == nil {
  426. s.ctx = ctx
  427. }
  428. return s
  429. }