service.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. package upstream
  2. import (
  3. "maps"
  4. "slices"
  5. "sync"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/cache"
  8. "github.com/uozi-tech/cosy/logger"
  9. )
  10. // TargetInfo contains proxy target information with source config
  11. type TargetInfo struct {
  12. ProxyTarget
  13. ConfigPath string `json:"config_path"`
  14. LastSeen time.Time `json:"last_seen"`
  15. }
  16. // UpstreamDefinition contains upstream block information
  17. type UpstreamDefinition struct {
  18. Name string `json:"name"`
  19. Servers []ProxyTarget `json:"servers"`
  20. ConfigPath string `json:"config_path"`
  21. LastSeen time.Time `json:"last_seen"`
  22. }
  23. // UpstreamService manages upstream availability testing
  24. type UpstreamService struct {
  25. targets map[string]*TargetInfo // key: host:port
  26. availabilityMap map[string]*Status // key: host:port
  27. configTargets map[string][]string // configPath -> []targetKeys
  28. // Public upstream definitions storage
  29. Upstreams map[string]*UpstreamDefinition // key: upstream name
  30. upstreamsMutex sync.RWMutex
  31. targetsMutex sync.RWMutex
  32. lastUpdateTime time.Time
  33. testInProgress bool
  34. testMutex sync.Mutex
  35. }
  36. var (
  37. upstreamService *UpstreamService
  38. serviceOnce sync.Once
  39. )
  40. // GetUpstreamService returns the singleton upstream service instance
  41. func GetUpstreamService() *UpstreamService {
  42. serviceOnce.Do(func() {
  43. upstreamService = &UpstreamService{
  44. targets: make(map[string]*TargetInfo),
  45. availabilityMap: make(map[string]*Status),
  46. configTargets: make(map[string][]string),
  47. Upstreams: make(map[string]*UpstreamDefinition),
  48. lastUpdateTime: time.Now(),
  49. }
  50. })
  51. return upstreamService
  52. }
  53. // init registers the ParseProxyTargetsFromRawContent callback
  54. func init() {
  55. cache.RegisterCallback(scanForProxyTargets)
  56. }
  57. // scanForProxyTargets is the callback function for cache scanner
  58. func scanForProxyTargets(configPath string, content []byte) error {
  59. logger.Debug("scanForProxyTargets", configPath)
  60. // Parse proxy targets and upstream definitions from config content
  61. result := ParseProxyTargetsAndUpstreamsFromRawContent(string(content))
  62. service := GetUpstreamService()
  63. service.updateTargetsFromConfig(configPath, result.ProxyTargets)
  64. // Update upstream definitions
  65. for upstreamName, servers := range result.Upstreams {
  66. service.UpdateUpstreamDefinition(upstreamName, servers, configPath)
  67. }
  68. return nil
  69. }
  70. // updateTargetsFromConfig updates proxy targets from a specific config file
  71. func (s *UpstreamService) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
  72. s.targetsMutex.Lock()
  73. defer s.targetsMutex.Unlock()
  74. now := time.Now()
  75. // Remove old targets from this config path
  76. if oldTargetKeys, exists := s.configTargets[configPath]; exists {
  77. for _, key := range oldTargetKeys {
  78. if _, exists := s.targets[key]; exists {
  79. // Only remove if this is the only config using this target
  80. isOnlyConfig := true
  81. for otherConfig, otherKeys := range s.configTargets {
  82. if otherConfig != configPath {
  83. if slices.Contains(otherKeys, key) {
  84. isOnlyConfig = false
  85. }
  86. if !isOnlyConfig {
  87. break
  88. }
  89. }
  90. }
  91. if isOnlyConfig {
  92. delete(s.targets, key)
  93. delete(s.availabilityMap, key)
  94. // logger.Debug("Removed proxy target:", key, "from config:", configPath)
  95. } else {
  96. // logger.Debug("Keeping proxy target:", key, "still used by other configs")
  97. }
  98. }
  99. }
  100. }
  101. // Add/update new targets
  102. newTargetKeys := make([]string, 0, len(targets))
  103. for _, target := range targets {
  104. key := target.Host + ":" + target.Port
  105. newTargetKeys = append(newTargetKeys, key)
  106. if existingTarget, exists := s.targets[key]; exists {
  107. // Update existing target with latest info
  108. existingTarget.LastSeen = now
  109. existingTarget.ConfigPath = configPath // Update to latest config that referenced it
  110. // logger.Debug("Updated proxy target:", key, "from config:", configPath)
  111. } else {
  112. // Add new target
  113. s.targets[key] = &TargetInfo{
  114. ProxyTarget: target,
  115. ConfigPath: configPath,
  116. LastSeen: now,
  117. }
  118. // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
  119. }
  120. }
  121. // Update config target mapping
  122. s.configTargets[configPath] = newTargetKeys
  123. s.lastUpdateTime = now
  124. // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
  125. }
  126. // GetTargets returns a copy of current proxy targets
  127. func (s *UpstreamService) GetTargets() []ProxyTarget {
  128. s.targetsMutex.RLock()
  129. defer s.targetsMutex.RUnlock()
  130. targets := make([]ProxyTarget, 0, len(s.targets))
  131. for _, targetInfo := range s.targets {
  132. targets = append(targets, targetInfo.ProxyTarget)
  133. }
  134. return targets
  135. }
  136. // GetTargetInfos returns a copy of current target infos
  137. func (s *UpstreamService) GetTargetInfos() []*TargetInfo {
  138. s.targetsMutex.RLock()
  139. defer s.targetsMutex.RUnlock()
  140. targetInfos := make([]*TargetInfo, 0, len(s.targets))
  141. for _, targetInfo := range s.targets {
  142. // Create a copy
  143. targetInfoCopy := &TargetInfo{
  144. ProxyTarget: targetInfo.ProxyTarget,
  145. ConfigPath: targetInfo.ConfigPath,
  146. LastSeen: targetInfo.LastSeen,
  147. }
  148. targetInfos = append(targetInfos, targetInfoCopy)
  149. }
  150. return targetInfos
  151. }
  152. // GetAvailabilityMap returns a copy of current availability results
  153. func (s *UpstreamService) GetAvailabilityMap() map[string]*Status {
  154. s.targetsMutex.RLock()
  155. defer s.targetsMutex.RUnlock()
  156. result := make(map[string]*Status)
  157. for k, v := range s.availabilityMap {
  158. // Create a copy of the status
  159. result[k] = &Status{
  160. Online: v.Online,
  161. Latency: v.Latency,
  162. }
  163. }
  164. return result
  165. }
  166. // PerformAvailabilityTest performs availability test for all targets
  167. func (s *UpstreamService) PerformAvailabilityTest() {
  168. // Prevent concurrent tests
  169. s.testMutex.Lock()
  170. if s.testInProgress {
  171. s.testMutex.Unlock()
  172. // logger.Debug("Availability test already in progress, skipping")
  173. return
  174. }
  175. s.testInProgress = true
  176. s.testMutex.Unlock()
  177. // Ensure we reset the flag when done
  178. defer func() {
  179. s.testMutex.Lock()
  180. s.testInProgress = false
  181. s.testMutex.Unlock()
  182. }()
  183. s.targetsMutex.RLock()
  184. targetCount := len(s.targets)
  185. s.targetsMutex.RUnlock()
  186. if targetCount == 0 {
  187. logger.Debug("No targets to test")
  188. return
  189. }
  190. logger.Debug("Performing availability test for", targetCount, "unique targets")
  191. // Separate targets into traditional and consul groups from the start
  192. s.targetsMutex.RLock()
  193. regularTargetKeys := make([]string, 0, len(s.targets))
  194. consulTargets := make([]ProxyTarget, 0, len(s.targets))
  195. for _, targetInfo := range s.targets {
  196. if targetInfo.ProxyTarget.IsConsul {
  197. consulTargets = append(consulTargets, targetInfo.ProxyTarget)
  198. } else {
  199. // Traditional target - use host:port key format
  200. key := targetInfo.ProxyTarget.Host + ":" + targetInfo.ProxyTarget.Port
  201. regularTargetKeys = append(regularTargetKeys, key)
  202. }
  203. }
  204. s.targetsMutex.RUnlock()
  205. // Initialize results map
  206. results := make(map[string]*Status)
  207. // Test traditional targets using the original AvailabilityTest
  208. if len(regularTargetKeys) > 0 {
  209. // logger.Debug("Testing", len(regularTargetKeys), "traditional targets")
  210. regularResults := AvailabilityTest(regularTargetKeys)
  211. maps.Copy(results, regularResults)
  212. }
  213. // Test consul targets using consul-specific logic
  214. if len(consulTargets) > 0 {
  215. // logger.Debug("Testing", len(consulTargets), "consul targets")
  216. consulResults := TestDynamicTargets(consulTargets)
  217. maps.Copy(results, consulResults)
  218. }
  219. // Update availability map
  220. s.targetsMutex.Lock()
  221. s.availabilityMap = results
  222. s.targetsMutex.Unlock()
  223. // logger.Debug("Availability test completed for", len(results), "targets")
  224. }
  225. // ClearTargets clears all targets (useful for testing or reloading)
  226. func (s *UpstreamService) ClearTargets() {
  227. s.targetsMutex.Lock()
  228. s.upstreamsMutex.Lock()
  229. defer s.targetsMutex.Unlock()
  230. defer s.upstreamsMutex.Unlock()
  231. s.targets = make(map[string]*TargetInfo)
  232. s.availabilityMap = make(map[string]*Status)
  233. s.configTargets = make(map[string][]string)
  234. s.Upstreams = make(map[string]*UpstreamDefinition)
  235. s.lastUpdateTime = time.Now()
  236. // logger.Debug("Cleared all proxy targets and upstream definitions")
  237. }
  238. // GetLastUpdateTime returns the last time targets were updated
  239. func (s *UpstreamService) GetLastUpdateTime() time.Time {
  240. s.targetsMutex.RLock()
  241. defer s.targetsMutex.RUnlock()
  242. return s.lastUpdateTime
  243. }
  244. // GetTargetCount returns the number of unique targets
  245. func (s *UpstreamService) GetTargetCount() int {
  246. s.targetsMutex.RLock()
  247. defer s.targetsMutex.RUnlock()
  248. return len(s.targets)
  249. }
  250. // UpdateUpstreamDefinition updates or adds an upstream definition
  251. func (s *UpstreamService) UpdateUpstreamDefinition(name string, servers []ProxyTarget, configPath string) {
  252. s.upstreamsMutex.Lock()
  253. defer s.upstreamsMutex.Unlock()
  254. s.Upstreams[name] = &UpstreamDefinition{
  255. Name: name,
  256. Servers: servers,
  257. ConfigPath: configPath,
  258. LastSeen: time.Now(),
  259. }
  260. }
  261. // GetUpstreamDefinition returns an upstream definition by name
  262. func (s *UpstreamService) GetUpstreamDefinition(name string) (*UpstreamDefinition, bool) {
  263. s.upstreamsMutex.RLock()
  264. defer s.upstreamsMutex.RUnlock()
  265. upstream, exists := s.Upstreams[name]
  266. if !exists {
  267. return nil, false
  268. }
  269. // Return a copy to avoid race conditions
  270. return &UpstreamDefinition{
  271. Name: upstream.Name,
  272. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  273. ConfigPath: upstream.ConfigPath,
  274. LastSeen: upstream.LastSeen,
  275. }, true
  276. }
  277. // GetAllUpstreamDefinitions returns a copy of all upstream definitions
  278. func (s *UpstreamService) GetAllUpstreamDefinitions() map[string]*UpstreamDefinition {
  279. s.upstreamsMutex.RLock()
  280. defer s.upstreamsMutex.RUnlock()
  281. result := make(map[string]*UpstreamDefinition)
  282. for name, upstream := range s.Upstreams {
  283. result[name] = &UpstreamDefinition{
  284. Name: upstream.Name,
  285. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  286. ConfigPath: upstream.ConfigPath,
  287. LastSeen: upstream.LastSeen,
  288. }
  289. }
  290. return result
  291. }
  292. // IsUpstreamName checks if a given name is a known upstream
  293. func (s *UpstreamService) IsUpstreamName(name string) bool {
  294. s.upstreamsMutex.RLock()
  295. defer s.upstreamsMutex.RUnlock()
  296. _, exists := s.Upstreams[name]
  297. return exists
  298. }
  299. // RemoveConfigTargets removes all targets associated with a specific config file
  300. func (s *UpstreamService) RemoveConfigTargets(configPath string) {
  301. s.targetsMutex.Lock()
  302. defer s.targetsMutex.Unlock()
  303. if targetKeys, exists := s.configTargets[configPath]; exists {
  304. for _, key := range targetKeys {
  305. // Check if this target is used by other configs
  306. isUsedByOthers := false
  307. for otherConfig, otherKeys := range s.configTargets {
  308. if otherConfig != configPath {
  309. if slices.Contains(otherKeys, key) {
  310. isUsedByOthers = true
  311. }
  312. if isUsedByOthers {
  313. break
  314. }
  315. }
  316. }
  317. if !isUsedByOthers {
  318. delete(s.targets, key)
  319. delete(s.availabilityMap, key)
  320. // logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
  321. }
  322. }
  323. delete(s.configTargets, configPath)
  324. s.lastUpdateTime = time.Now()
  325. // logger.Debug("Removed config targets for:", configPath)
  326. }
  327. }