service.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. package upstream
  2. import (
  3. "maps"
  4. "slices"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/cache"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // TargetInfo contains proxy target information with source config
  13. type TargetInfo struct {
  14. ProxyTarget
  15. ConfigPath string `json:"config_path"`
  16. LastSeen time.Time `json:"last_seen"`
  17. }
  18. // Definition contains upstream block information
  19. type Definition struct {
  20. Name string `json:"name"`
  21. Servers []ProxyTarget `json:"servers"`
  22. ConfigPath string `json:"config_path"`
  23. LastSeen time.Time `json:"last_seen"`
  24. }
  25. // Service manages upstream availability testing
  26. type Service struct {
  27. targets map[string]*TargetInfo // key: host:port
  28. availabilityMap map[string]*Status // key: host:port
  29. configTargets map[string][]string // configPath -> []targetKeys
  30. // Public upstream definitions storage
  31. Upstreams map[string]*Definition // key: upstream name
  32. upstreamsMutex sync.RWMutex
  33. targetsMutex sync.RWMutex
  34. lastUpdateTime time.Time
  35. testInProgress bool
  36. testMutex sync.Mutex
  37. cachedDisabledSockets map[string]bool
  38. disabledSocketsCacheMutex sync.RWMutex
  39. disabledSocketsCacheExpiry time.Time
  40. disabledSocketsCacheDuration time.Duration
  41. }
  42. var (
  43. upstreamService *Service
  44. serviceOnce sync.Once
  45. )
  46. // formatSocketAddress formats a host:port combination into a proper socket address
  47. // For IPv6 addresses, it adds brackets around the host if they're not already present
  48. func formatSocketAddress(host, port string) string {
  49. // Check if this is an IPv6 address by looking for colons
  50. if strings.Contains(host, ":") {
  51. // IPv6 address - check if it already has brackets
  52. if !strings.HasPrefix(host, "[") {
  53. return "[" + host + "]:" + port
  54. }
  55. // Already has brackets, just append port
  56. return host + ":" + port
  57. }
  58. // IPv4 address or hostname
  59. return host + ":" + port
  60. }
  61. // GetUpstreamService returns the singleton upstream service instance
  62. func GetUpstreamService() *Service {
  63. serviceOnce.Do(func() {
  64. upstreamService = &Service{
  65. targets: make(map[string]*TargetInfo),
  66. availabilityMap: make(map[string]*Status),
  67. configTargets: make(map[string][]string),
  68. Upstreams: make(map[string]*Definition),
  69. lastUpdateTime: time.Now(),
  70. disabledSocketsCacheDuration: 30 * time.Second,
  71. }
  72. })
  73. return upstreamService
  74. }
  75. // init registers the ParseProxyTargetsFromRawContent callback
  76. func init() {
  77. cache.RegisterCallback("upstream.scanForProxyTargets", scanForProxyTargets)
  78. }
  79. // scanForProxyTargets is the callback function for cache scanner
  80. func scanForProxyTargets(configPath string, content []byte) error {
  81. // logger.Debug("scanForProxyTargets", configPath)
  82. // Parse proxy targets and upstream definitions from config content
  83. result := ParseProxyTargetsAndUpstreamsFromRawContent(string(content))
  84. service := GetUpstreamService()
  85. service.updateTargetsFromConfig(configPath, result.ProxyTargets)
  86. // Update upstream definitions
  87. for upstreamName, servers := range result.Upstreams {
  88. service.UpdateUpstreamDefinition(upstreamName, servers, configPath)
  89. }
  90. return nil
  91. }
  92. // updateTargetsFromConfig updates proxy targets from a specific config file
  93. func (s *Service) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
  94. s.targetsMutex.Lock()
  95. defer s.targetsMutex.Unlock()
  96. now := time.Now()
  97. // Remove old targets from this config path
  98. if oldTargetKeys, exists := s.configTargets[configPath]; exists {
  99. for _, key := range oldTargetKeys {
  100. if _, exists := s.targets[key]; exists {
  101. // Only remove if this is the only config using this target
  102. isOnlyConfig := true
  103. for otherConfig, otherKeys := range s.configTargets {
  104. if otherConfig != configPath {
  105. if slices.Contains(otherKeys, key) {
  106. isOnlyConfig = false
  107. }
  108. if !isOnlyConfig {
  109. break
  110. }
  111. }
  112. }
  113. if isOnlyConfig {
  114. delete(s.targets, key)
  115. delete(s.availabilityMap, key)
  116. // logger.Debug("Removed proxy target:", key, "from config:", configPath)
  117. } else {
  118. // logger.Debug("Keeping proxy target:", key, "still used by other configs")
  119. }
  120. }
  121. }
  122. }
  123. // Add/update new targets
  124. newTargetKeys := make([]string, 0, len(targets))
  125. for _, target := range targets {
  126. key := formatSocketAddress(target.Host, target.Port)
  127. newTargetKeys = append(newTargetKeys, key)
  128. if existingTarget, exists := s.targets[key]; exists {
  129. // Update existing target with latest info
  130. existingTarget.LastSeen = now
  131. existingTarget.ConfigPath = configPath // Update to latest config that referenced it
  132. // logger.Debug("Updated proxy target:", key, "from config:", configPath)
  133. } else {
  134. // Add new target
  135. s.targets[key] = &TargetInfo{
  136. ProxyTarget: target,
  137. ConfigPath: configPath,
  138. LastSeen: now,
  139. }
  140. // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
  141. }
  142. }
  143. // Update config target mapping
  144. s.configTargets[configPath] = newTargetKeys
  145. s.lastUpdateTime = now
  146. // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
  147. }
  148. // GetTargets returns a copy of current proxy targets
  149. func (s *Service) GetTargets() []ProxyTarget {
  150. s.targetsMutex.RLock()
  151. defer s.targetsMutex.RUnlock()
  152. targets := make([]ProxyTarget, 0, len(s.targets))
  153. for _, targetInfo := range s.targets {
  154. targets = append(targets, targetInfo.ProxyTarget)
  155. }
  156. return targets
  157. }
  158. // GetTargetInfos returns a copy of current target infos
  159. func (s *Service) GetTargetInfos() []*TargetInfo {
  160. s.targetsMutex.RLock()
  161. defer s.targetsMutex.RUnlock()
  162. targetInfos := make([]*TargetInfo, 0, len(s.targets))
  163. for _, targetInfo := range s.targets {
  164. // Create a copy
  165. targetInfoCopy := &TargetInfo{
  166. ProxyTarget: targetInfo.ProxyTarget,
  167. ConfigPath: targetInfo.ConfigPath,
  168. LastSeen: targetInfo.LastSeen,
  169. }
  170. targetInfos = append(targetInfos, targetInfoCopy)
  171. }
  172. return targetInfos
  173. }
  174. // GetAvailabilityMap returns a copy of current availability results
  175. func (s *Service) GetAvailabilityMap() map[string]*Status {
  176. s.targetsMutex.RLock()
  177. defer s.targetsMutex.RUnlock()
  178. result := make(map[string]*Status)
  179. for k, v := range s.availabilityMap {
  180. // Create a copy of the status
  181. result[k] = &Status{
  182. Online: v.Online,
  183. Latency: v.Latency,
  184. }
  185. }
  186. return result
  187. }
  188. // PerformAvailabilityTest performs availability test for all targets
  189. func (s *Service) PerformAvailabilityTest() {
  190. // Prevent concurrent tests
  191. s.testMutex.Lock()
  192. if s.testInProgress {
  193. s.testMutex.Unlock()
  194. // logger.Debug("Availability test already in progress, skipping")
  195. return
  196. }
  197. s.testInProgress = true
  198. s.testMutex.Unlock()
  199. // Ensure we reset the flag when done
  200. defer func() {
  201. s.testMutex.Lock()
  202. s.testInProgress = false
  203. s.testMutex.Unlock()
  204. }()
  205. s.targetsMutex.RLock()
  206. targetCount := len(s.targets)
  207. s.targetsMutex.RUnlock()
  208. if targetCount == 0 {
  209. logger.Debug("No targets to test")
  210. return
  211. }
  212. // logger.Debug("Performing availability test for", targetCount, "unique targets")
  213. // Get disabled sockets from cache or database
  214. disabledSockets := s.getDisabledSockets()
  215. // Separate targets into traditional and consul groups from the start
  216. s.targetsMutex.RLock()
  217. regularTargetKeys := make([]string, 0, len(s.targets))
  218. consulTargets := make([]ProxyTarget, 0, len(s.targets))
  219. for _, targetInfo := range s.targets {
  220. // Check if this socket is disabled
  221. socketAddr := formatSocketAddress(targetInfo.ProxyTarget.Host, targetInfo.ProxyTarget.Port)
  222. if disabledSockets[socketAddr] {
  223. // logger.Debug("Skipping disabled socket:", socketAddr)
  224. continue
  225. }
  226. if targetInfo.ProxyTarget.IsConsul {
  227. consulTargets = append(consulTargets, targetInfo.ProxyTarget)
  228. } else {
  229. // Traditional target - use properly formatted socket address
  230. regularTargetKeys = append(regularTargetKeys, socketAddr)
  231. }
  232. }
  233. s.targetsMutex.RUnlock()
  234. // Initialize results map
  235. results := make(map[string]*Status)
  236. // Test traditional targets using the original AvailabilityTest
  237. if len(regularTargetKeys) > 0 {
  238. // logger.Debug("Testing", len(regularTargetKeys), "traditional targets")
  239. regularResults := AvailabilityTest(regularTargetKeys)
  240. maps.Copy(results, regularResults)
  241. }
  242. // Test consul targets using consul-specific logic
  243. if len(consulTargets) > 0 {
  244. // logger.Debug("Testing", len(consulTargets), "consul targets")
  245. consulResults := TestDynamicTargets(consulTargets)
  246. maps.Copy(results, consulResults)
  247. }
  248. // Update availability map
  249. s.targetsMutex.Lock()
  250. s.availabilityMap = results
  251. s.targetsMutex.Unlock()
  252. // logger.Debug("Availability test completed for", len(results), "targets")
  253. }
  254. // findUpstreamNameForTarget finds which upstream a target belongs to
  255. func (s *Service) findUpstreamNameForTarget(target ProxyTarget) string {
  256. s.upstreamsMutex.RLock()
  257. defer s.upstreamsMutex.RUnlock()
  258. targetKey := formatSocketAddress(target.Host, target.Port)
  259. for name, upstream := range s.Upstreams {
  260. for _, server := range upstream.Servers {
  261. serverKey := formatSocketAddress(server.Host, server.Port)
  262. if serverKey == targetKey {
  263. return name
  264. }
  265. }
  266. }
  267. return ""
  268. }
  269. // getDisabledSockets queries the database for disabled sockets with caching
  270. func (s *Service) getDisabledSockets() map[string]bool {
  271. // Check if cache is still valid
  272. s.disabledSocketsCacheMutex.RLock()
  273. if time.Now().Before(s.disabledSocketsCacheExpiry) && s.cachedDisabledSockets != nil {
  274. // Return a copy of the cached data
  275. result := make(map[string]bool, len(s.cachedDisabledSockets))
  276. for k, v := range s.cachedDisabledSockets {
  277. result[k] = v
  278. }
  279. s.disabledSocketsCacheMutex.RUnlock()
  280. return result
  281. }
  282. s.disabledSocketsCacheMutex.RUnlock()
  283. // Cache expired or not initialized, refresh from database
  284. disabled := make(map[string]bool)
  285. db := model.UseDB()
  286. if db == nil {
  287. return disabled
  288. }
  289. var configs []model.UpstreamConfig
  290. if err := db.Where("enabled = ?", false).Find(&configs).Error; err != nil {
  291. logger.Error("Failed to query disabled sockets:", err)
  292. return disabled
  293. }
  294. for _, config := range configs {
  295. disabled[config.Socket] = true
  296. }
  297. // Update cache
  298. s.disabledSocketsCacheMutex.Lock()
  299. s.cachedDisabledSockets = disabled
  300. s.disabledSocketsCacheExpiry = time.Now().Add(s.disabledSocketsCacheDuration)
  301. s.disabledSocketsCacheMutex.Unlock()
  302. // Return a copy to prevent external modification
  303. result := make(map[string]bool, len(disabled))
  304. for k, v := range disabled {
  305. result[k] = v
  306. }
  307. return result
  308. }
  309. // InvalidateDisabledSocketsCache invalidates the cache, forcing next read to refresh from database
  310. func (s *Service) InvalidateDisabledSocketsCache() {
  311. s.disabledSocketsCacheMutex.Lock()
  312. defer s.disabledSocketsCacheMutex.Unlock()
  313. s.disabledSocketsCacheExpiry = time.Time{} // Set to zero time to force refresh
  314. logger.Debug("Disabled sockets cache invalidated")
  315. }
  316. // ClearTargets clears all targets (useful for testing or reloading)
  317. func (s *Service) ClearTargets() {
  318. s.targetsMutex.Lock()
  319. s.upstreamsMutex.Lock()
  320. defer s.targetsMutex.Unlock()
  321. defer s.upstreamsMutex.Unlock()
  322. s.targets = make(map[string]*TargetInfo)
  323. s.availabilityMap = make(map[string]*Status)
  324. s.configTargets = make(map[string][]string)
  325. s.Upstreams = make(map[string]*Definition)
  326. s.lastUpdateTime = time.Now()
  327. // logger.Debug("Cleared all proxy targets and upstream definitions")
  328. }
  329. // GetLastUpdateTime returns the last time targets were updated
  330. func (s *Service) GetLastUpdateTime() time.Time {
  331. s.targetsMutex.RLock()
  332. defer s.targetsMutex.RUnlock()
  333. return s.lastUpdateTime
  334. }
  335. // GetTargetCount returns the number of unique targets
  336. func (s *Service) GetTargetCount() int {
  337. s.targetsMutex.RLock()
  338. defer s.targetsMutex.RUnlock()
  339. return len(s.targets)
  340. }
  341. // UpdateUpstreamDefinition updates or adds an upstream definition
  342. func (s *Service) UpdateUpstreamDefinition(name string, servers []ProxyTarget, configPath string) {
  343. s.upstreamsMutex.Lock()
  344. defer s.upstreamsMutex.Unlock()
  345. s.Upstreams[name] = &Definition{
  346. Name: name,
  347. Servers: servers,
  348. ConfigPath: configPath,
  349. LastSeen: time.Now(),
  350. }
  351. }
  352. // GetUpstreamDefinition returns an upstream definition by name
  353. func (s *Service) GetUpstreamDefinition(name string) (*Definition, bool) {
  354. s.upstreamsMutex.RLock()
  355. defer s.upstreamsMutex.RUnlock()
  356. upstream, exists := s.Upstreams[name]
  357. if !exists {
  358. return nil, false
  359. }
  360. // Return a copy to avoid race conditions
  361. return &Definition{
  362. Name: upstream.Name,
  363. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  364. ConfigPath: upstream.ConfigPath,
  365. LastSeen: upstream.LastSeen,
  366. }, true
  367. }
  368. // GetAllUpstreamDefinitions returns a copy of all upstream definitions
  369. func (s *Service) GetAllUpstreamDefinitions() map[string]*Definition {
  370. s.upstreamsMutex.RLock()
  371. defer s.upstreamsMutex.RUnlock()
  372. result := make(map[string]*Definition)
  373. for name, upstream := range s.Upstreams {
  374. result[name] = &Definition{
  375. Name: upstream.Name,
  376. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  377. ConfigPath: upstream.ConfigPath,
  378. LastSeen: upstream.LastSeen,
  379. }
  380. }
  381. return result
  382. }
  383. // IsUpstreamName checks if a given name is a known upstream
  384. func (s *Service) IsUpstreamName(name string) bool {
  385. s.upstreamsMutex.RLock()
  386. defer s.upstreamsMutex.RUnlock()
  387. _, exists := s.Upstreams[name]
  388. return exists
  389. }
  390. // RemoveConfigTargets removes all targets associated with a specific config file
  391. func (s *Service) RemoveConfigTargets(configPath string) {
  392. s.targetsMutex.Lock()
  393. defer s.targetsMutex.Unlock()
  394. if targetKeys, exists := s.configTargets[configPath]; exists {
  395. for _, key := range targetKeys {
  396. // Check if this target is used by other configs
  397. isUsedByOthers := false
  398. for otherConfig, otherKeys := range s.configTargets {
  399. if otherConfig != configPath {
  400. if slices.Contains(otherKeys, key) {
  401. isUsedByOthers = true
  402. }
  403. if isUsedByOthers {
  404. break
  405. }
  406. }
  407. }
  408. if !isUsedByOthers {
  409. delete(s.targets, key)
  410. delete(s.availabilityMap, key)
  411. // logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
  412. }
  413. }
  414. delete(s.configTargets, configPath)
  415. s.lastUpdateTime = time.Now()
  416. // logger.Debug("Removed config targets for:", configPath)
  417. }
  418. }