service.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. package upstream
  2. import (
  3. "maps"
  4. "slices"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/cache"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // TargetInfo contains proxy target information with source config
  13. type TargetInfo struct {
  14. ProxyTarget
  15. ConfigPath string `json:"config_path"`
  16. LastSeen time.Time `json:"last_seen"`
  17. }
  18. // Definition contains upstream block information
  19. type Definition struct {
  20. Name string `json:"name"`
  21. Servers []ProxyTarget `json:"servers"`
  22. ConfigPath string `json:"config_path"`
  23. LastSeen time.Time `json:"last_seen"`
  24. }
  25. // Service manages upstream availability testing
  26. type Service struct {
  27. targets map[string]*TargetInfo // key: host:port
  28. availabilityMap map[string]*Status // key: host:port
  29. configTargets map[string][]string // configPath -> []targetKeys
  30. // Public upstream definitions storage
  31. Upstreams map[string]*Definition // key: upstream name
  32. upstreamsMutex sync.RWMutex
  33. targetsMutex sync.RWMutex
  34. lastUpdateTime time.Time
  35. testInProgress bool
  36. testMutex sync.Mutex
  37. cachedDisabledSockets map[string]bool
  38. disabledSocketsCacheMutex sync.RWMutex
  39. disabledSocketsCacheValid bool // true if cache is valid, false if needs refresh
  40. }
  41. var (
  42. upstreamService *Service
  43. serviceOnce sync.Once
  44. )
  45. // formatSocketAddress formats a host:port combination into a proper socket address
  46. // For IPv6 addresses, it adds brackets around the host if they're not already present
  47. func formatSocketAddress(host, port string) string {
  48. // Check if this is an IPv6 address by looking for colons
  49. if strings.Contains(host, ":") {
  50. // IPv6 address - check if it already has brackets
  51. if !strings.HasPrefix(host, "[") {
  52. return "[" + host + "]:" + port
  53. }
  54. // Already has brackets, just append port
  55. return host + ":" + port
  56. }
  57. // IPv4 address or hostname
  58. return host + ":" + port
  59. }
  60. // GetUpstreamService returns the singleton upstream service instance
  61. func GetUpstreamService() *Service {
  62. serviceOnce.Do(func() {
  63. upstreamService = &Service{
  64. targets: make(map[string]*TargetInfo),
  65. availabilityMap: make(map[string]*Status),
  66. configTargets: make(map[string][]string),
  67. Upstreams: make(map[string]*Definition),
  68. lastUpdateTime: time.Now(),
  69. disabledSocketsCacheValid: false, // Initialize as invalid to force first load
  70. }
  71. })
  72. return upstreamService
  73. }
  74. // init registers the ParseProxyTargetsFromRawContent callback
  75. func init() {
  76. cache.RegisterCallback("upstream.scanForProxyTargets", scanForProxyTargets)
  77. }
  78. // scanForProxyTargets is the callback function for cache scanner
  79. func scanForProxyTargets(configPath string, content []byte) error {
  80. // Handle file removal - clean up targets from this config
  81. if len(content) == 0 {
  82. service := GetUpstreamService()
  83. service.RemoveConfigTargets(configPath)
  84. return nil
  85. }
  86. // logger.Debug("scanForProxyTargets", configPath)
  87. // Parse proxy targets and upstream definitions from config content
  88. result := ParseProxyTargetsAndUpstreamsFromRawContent(string(content))
  89. service := GetUpstreamService()
  90. service.updateTargetsFromConfig(configPath, result.ProxyTargets)
  91. // Update upstream definitions
  92. for upstreamName, servers := range result.Upstreams {
  93. service.UpdateUpstreamDefinition(upstreamName, servers, configPath)
  94. }
  95. return nil
  96. }
  97. // updateTargetsFromConfig updates proxy targets from a specific config file
  98. func (s *Service) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
  99. s.targetsMutex.Lock()
  100. defer s.targetsMutex.Unlock()
  101. now := time.Now()
  102. // Remove old targets from this config path
  103. if oldTargetKeys, exists := s.configTargets[configPath]; exists {
  104. for _, key := range oldTargetKeys {
  105. if _, exists := s.targets[key]; exists {
  106. // Only remove if this is the only config using this target
  107. isOnlyConfig := true
  108. for otherConfig, otherKeys := range s.configTargets {
  109. if otherConfig != configPath {
  110. if slices.Contains(otherKeys, key) {
  111. isOnlyConfig = false
  112. }
  113. if !isOnlyConfig {
  114. break
  115. }
  116. }
  117. }
  118. if isOnlyConfig {
  119. delete(s.targets, key)
  120. delete(s.availabilityMap, key)
  121. // logger.Debug("Removed proxy target:", key, "from config:", configPath)
  122. } else {
  123. // logger.Debug("Keeping proxy target:", key, "still used by other configs")
  124. }
  125. }
  126. }
  127. }
  128. // Add/update new targets
  129. newTargetKeys := make([]string, 0, len(targets))
  130. for _, target := range targets {
  131. key := formatSocketAddress(target.Host, target.Port)
  132. newTargetKeys = append(newTargetKeys, key)
  133. if existingTarget, exists := s.targets[key]; exists {
  134. // Update existing target with latest info
  135. existingTarget.LastSeen = now
  136. existingTarget.ConfigPath = configPath // Update to latest config that referenced it
  137. // logger.Debug("Updated proxy target:", key, "from config:", configPath)
  138. } else {
  139. // Add new target
  140. s.targets[key] = &TargetInfo{
  141. ProxyTarget: target,
  142. ConfigPath: configPath,
  143. LastSeen: now,
  144. }
  145. // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
  146. }
  147. }
  148. // Update config target mapping
  149. s.configTargets[configPath] = newTargetKeys
  150. s.lastUpdateTime = now
  151. // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
  152. }
  153. // GetTargets returns a copy of current proxy targets
  154. func (s *Service) GetTargets() []ProxyTarget {
  155. s.targetsMutex.RLock()
  156. defer s.targetsMutex.RUnlock()
  157. targets := make([]ProxyTarget, 0, len(s.targets))
  158. for _, targetInfo := range s.targets {
  159. targets = append(targets, targetInfo.ProxyTarget)
  160. }
  161. return targets
  162. }
  163. // GetTargetInfos returns a copy of current target infos
  164. func (s *Service) GetTargetInfos() []*TargetInfo {
  165. s.targetsMutex.RLock()
  166. defer s.targetsMutex.RUnlock()
  167. targetInfos := make([]*TargetInfo, 0, len(s.targets))
  168. for _, targetInfo := range s.targets {
  169. // Create a copy
  170. targetInfoCopy := &TargetInfo{
  171. ProxyTarget: targetInfo.ProxyTarget,
  172. ConfigPath: targetInfo.ConfigPath,
  173. LastSeen: targetInfo.LastSeen,
  174. }
  175. targetInfos = append(targetInfos, targetInfoCopy)
  176. }
  177. return targetInfos
  178. }
  179. // GetAvailabilityMap returns a copy of current availability results
  180. func (s *Service) GetAvailabilityMap() map[string]*Status {
  181. s.targetsMutex.RLock()
  182. defer s.targetsMutex.RUnlock()
  183. result := make(map[string]*Status)
  184. for k, v := range s.availabilityMap {
  185. // Create a copy of the status
  186. result[k] = &Status{
  187. Online: v.Online,
  188. Latency: v.Latency,
  189. }
  190. }
  191. return result
  192. }
  193. // PerformAvailabilityTest performs availability test for all targets
  194. func (s *Service) PerformAvailabilityTest() {
  195. // Prevent concurrent tests
  196. s.testMutex.Lock()
  197. if s.testInProgress {
  198. s.testMutex.Unlock()
  199. // logger.Debug("Availability test already in progress, skipping")
  200. return
  201. }
  202. s.testInProgress = true
  203. s.testMutex.Unlock()
  204. // Ensure we reset the flag when done
  205. defer func() {
  206. s.testMutex.Lock()
  207. s.testInProgress = false
  208. s.testMutex.Unlock()
  209. }()
  210. s.targetsMutex.RLock()
  211. targetCount := len(s.targets)
  212. s.targetsMutex.RUnlock()
  213. if targetCount == 0 {
  214. logger.Debug("No targets to test")
  215. return
  216. }
  217. // logger.Debug("Performing availability test for", targetCount, "unique targets")
  218. // Get disabled sockets from cache or database
  219. disabledSockets := s.getDisabledSockets()
  220. // Separate targets into traditional and consul groups from the start
  221. s.targetsMutex.RLock()
  222. regularTargetKeys := make([]string, 0, len(s.targets))
  223. consulTargets := make([]ProxyTarget, 0, len(s.targets))
  224. for _, targetInfo := range s.targets {
  225. // Check if this socket is disabled
  226. socketAddr := formatSocketAddress(targetInfo.ProxyTarget.Host, targetInfo.ProxyTarget.Port)
  227. if disabledSockets[socketAddr] {
  228. // logger.Debug("Skipping disabled socket:", socketAddr)
  229. continue
  230. }
  231. if targetInfo.ProxyTarget.IsConsul {
  232. consulTargets = append(consulTargets, targetInfo.ProxyTarget)
  233. } else {
  234. // Traditional target - use properly formatted socket address
  235. regularTargetKeys = append(regularTargetKeys, socketAddr)
  236. }
  237. }
  238. s.targetsMutex.RUnlock()
  239. // Initialize results map
  240. results := make(map[string]*Status)
  241. // Test traditional targets using the original AvailabilityTest
  242. if len(regularTargetKeys) > 0 {
  243. // logger.Debug("Testing", len(regularTargetKeys), "traditional targets")
  244. regularResults := AvailabilityTest(regularTargetKeys)
  245. maps.Copy(results, regularResults)
  246. }
  247. // Test consul targets using consul-specific logic
  248. if len(consulTargets) > 0 {
  249. // logger.Debug("Testing", len(consulTargets), "consul targets")
  250. consulResults := TestDynamicTargets(consulTargets)
  251. maps.Copy(results, consulResults)
  252. }
  253. // Update availability map
  254. s.targetsMutex.Lock()
  255. s.availabilityMap = results
  256. s.targetsMutex.Unlock()
  257. // logger.Debug("Availability test completed for", len(results), "targets")
  258. }
  259. // findUpstreamNameForTarget finds which upstream a target belongs to
  260. func (s *Service) findUpstreamNameForTarget(target ProxyTarget) string {
  261. s.upstreamsMutex.RLock()
  262. defer s.upstreamsMutex.RUnlock()
  263. targetKey := formatSocketAddress(target.Host, target.Port)
  264. for name, upstream := range s.Upstreams {
  265. for _, server := range upstream.Servers {
  266. serverKey := formatSocketAddress(server.Host, server.Port)
  267. if serverKey == targetKey {
  268. return name
  269. }
  270. }
  271. }
  272. return ""
  273. }
  274. // getDisabledSockets queries the database for disabled sockets with event-driven caching
  275. func (s *Service) getDisabledSockets() map[string]bool {
  276. // Check if cache is valid
  277. s.disabledSocketsCacheMutex.RLock()
  278. if s.disabledSocketsCacheValid && s.cachedDisabledSockets != nil {
  279. // Return a copy of the cached data
  280. result := make(map[string]bool, len(s.cachedDisabledSockets))
  281. for k, v := range s.cachedDisabledSockets {
  282. result[k] = v
  283. }
  284. s.disabledSocketsCacheMutex.RUnlock()
  285. return result
  286. }
  287. s.disabledSocketsCacheMutex.RUnlock()
  288. // Cache invalid or not initialized, refresh from database
  289. disabled := make(map[string]bool)
  290. db := model.UseDB()
  291. if db == nil {
  292. return disabled
  293. }
  294. var configs []model.UpstreamConfig
  295. if err := db.Where("enabled = ?", false).Find(&configs).Error; err != nil {
  296. logger.Error("Failed to query disabled sockets:", err)
  297. return disabled
  298. }
  299. for _, config := range configs {
  300. disabled[config.Socket] = true
  301. }
  302. // Update cache and mark as valid
  303. s.disabledSocketsCacheMutex.Lock()
  304. s.cachedDisabledSockets = disabled
  305. s.disabledSocketsCacheValid = true
  306. s.disabledSocketsCacheMutex.Unlock()
  307. logger.Debug("Disabled sockets cache refreshed from database, found", len(disabled), "disabled sockets")
  308. // Return a copy to prevent external modification
  309. result := make(map[string]bool, len(disabled))
  310. for k, v := range disabled {
  311. result[k] = v
  312. }
  313. return result
  314. }
  315. // InvalidateDisabledSocketsCache invalidates the cache, forcing next read to refresh from database
  316. func (s *Service) InvalidateDisabledSocketsCache() {
  317. s.disabledSocketsCacheMutex.Lock()
  318. defer s.disabledSocketsCacheMutex.Unlock()
  319. s.disabledSocketsCacheValid = false
  320. logger.Debug("Disabled sockets cache invalidated")
  321. }
  322. // ClearTargets clears all targets (useful for testing or reloading)
  323. func (s *Service) ClearTargets() {
  324. s.targetsMutex.Lock()
  325. s.upstreamsMutex.Lock()
  326. defer s.targetsMutex.Unlock()
  327. defer s.upstreamsMutex.Unlock()
  328. s.targets = make(map[string]*TargetInfo)
  329. s.availabilityMap = make(map[string]*Status)
  330. s.configTargets = make(map[string][]string)
  331. s.Upstreams = make(map[string]*Definition)
  332. s.lastUpdateTime = time.Now()
  333. // logger.Debug("Cleared all proxy targets and upstream definitions")
  334. }
  335. // GetLastUpdateTime returns the last time targets were updated
  336. func (s *Service) GetLastUpdateTime() time.Time {
  337. s.targetsMutex.RLock()
  338. defer s.targetsMutex.RUnlock()
  339. return s.lastUpdateTime
  340. }
  341. // GetTargetCount returns the number of unique targets
  342. func (s *Service) GetTargetCount() int {
  343. s.targetsMutex.RLock()
  344. defer s.targetsMutex.RUnlock()
  345. return len(s.targets)
  346. }
  347. // UpdateUpstreamDefinition updates or adds an upstream definition
  348. func (s *Service) UpdateUpstreamDefinition(name string, servers []ProxyTarget, configPath string) {
  349. s.upstreamsMutex.Lock()
  350. defer s.upstreamsMutex.Unlock()
  351. s.Upstreams[name] = &Definition{
  352. Name: name,
  353. Servers: servers,
  354. ConfigPath: configPath,
  355. LastSeen: time.Now(),
  356. }
  357. }
  358. // GetUpstreamDefinition returns an upstream definition by name
  359. func (s *Service) GetUpstreamDefinition(name string) (*Definition, bool) {
  360. s.upstreamsMutex.RLock()
  361. defer s.upstreamsMutex.RUnlock()
  362. upstream, exists := s.Upstreams[name]
  363. if !exists {
  364. return nil, false
  365. }
  366. // Return a copy to avoid race conditions
  367. return &Definition{
  368. Name: upstream.Name,
  369. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  370. ConfigPath: upstream.ConfigPath,
  371. LastSeen: upstream.LastSeen,
  372. }, true
  373. }
  374. // GetAllUpstreamDefinitions returns a copy of all upstream definitions
  375. func (s *Service) GetAllUpstreamDefinitions() map[string]*Definition {
  376. s.upstreamsMutex.RLock()
  377. defer s.upstreamsMutex.RUnlock()
  378. result := make(map[string]*Definition)
  379. for name, upstream := range s.Upstreams {
  380. result[name] = &Definition{
  381. Name: upstream.Name,
  382. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  383. ConfigPath: upstream.ConfigPath,
  384. LastSeen: upstream.LastSeen,
  385. }
  386. }
  387. return result
  388. }
  389. // IsUpstreamName checks if a given name is a known upstream
  390. func (s *Service) IsUpstreamName(name string) bool {
  391. s.upstreamsMutex.RLock()
  392. defer s.upstreamsMutex.RUnlock()
  393. _, exists := s.Upstreams[name]
  394. return exists
  395. }
  396. // RemoveConfigTargets removes all targets associated with a specific config file
  397. func (s *Service) RemoveConfigTargets(configPath string) {
  398. s.targetsMutex.Lock()
  399. defer s.targetsMutex.Unlock()
  400. if targetKeys, exists := s.configTargets[configPath]; exists {
  401. for _, key := range targetKeys {
  402. // Check if this target is used by other configs
  403. isUsedByOthers := false
  404. for otherConfig, otherKeys := range s.configTargets {
  405. if otherConfig != configPath {
  406. if slices.Contains(otherKeys, key) {
  407. isUsedByOthers = true
  408. }
  409. if isUsedByOthers {
  410. break
  411. }
  412. }
  413. }
  414. if !isUsedByOthers {
  415. delete(s.targets, key)
  416. delete(s.availabilityMap, key)
  417. // logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
  418. }
  419. }
  420. delete(s.configTargets, configPath)
  421. s.lastUpdateTime = time.Now()
  422. // logger.Debug("Removed config targets for:", configPath)
  423. }
  424. }