1
0

service.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. package upstream
  2. import (
  3. "maps"
  4. "slices"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/cache"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // TargetInfo contains proxy target information with source config
  12. type TargetInfo struct {
  13. ProxyTarget
  14. ConfigPath string `json:"config_path"`
  15. LastSeen time.Time `json:"last_seen"`
  16. }
  17. // UpstreamDefinition contains upstream block information
  18. type UpstreamDefinition struct {
  19. Name string `json:"name"`
  20. Servers []ProxyTarget `json:"servers"`
  21. ConfigPath string `json:"config_path"`
  22. LastSeen time.Time `json:"last_seen"`
  23. }
  24. // UpstreamService manages upstream availability testing
  25. type UpstreamService struct {
  26. targets map[string]*TargetInfo // key: host:port
  27. availabilityMap map[string]*Status // key: host:port
  28. configTargets map[string][]string // configPath -> []targetKeys
  29. // Public upstream definitions storage
  30. Upstreams map[string]*UpstreamDefinition // key: upstream name
  31. upstreamsMutex sync.RWMutex
  32. targetsMutex sync.RWMutex
  33. lastUpdateTime time.Time
  34. testInProgress bool
  35. testMutex sync.Mutex
  36. }
  37. var (
  38. upstreamService *UpstreamService
  39. serviceOnce sync.Once
  40. )
  41. // formatSocketAddress formats a host:port combination into a proper socket address
  42. // For IPv6 addresses, it adds brackets around the host if they're not already present
  43. func formatSocketAddress(host, port string) string {
  44. // Check if this is an IPv6 address by looking for colons
  45. if strings.Contains(host, ":") {
  46. // IPv6 address - check if it already has brackets
  47. if !strings.HasPrefix(host, "[") {
  48. return "[" + host + "]:" + port
  49. }
  50. // Already has brackets, just append port
  51. return host + ":" + port
  52. }
  53. // IPv4 address or hostname
  54. return host + ":" + port
  55. }
  56. // GetUpstreamService returns the singleton upstream service instance
  57. func GetUpstreamService() *UpstreamService {
  58. serviceOnce.Do(func() {
  59. upstreamService = &UpstreamService{
  60. targets: make(map[string]*TargetInfo),
  61. availabilityMap: make(map[string]*Status),
  62. configTargets: make(map[string][]string),
  63. Upstreams: make(map[string]*UpstreamDefinition),
  64. lastUpdateTime: time.Now(),
  65. }
  66. })
  67. return upstreamService
  68. }
  69. // init registers the ParseProxyTargetsFromRawContent callback
  70. func init() {
  71. cache.RegisterCallback(scanForProxyTargets)
  72. }
  73. // scanForProxyTargets is the callback function for cache scanner
  74. func scanForProxyTargets(configPath string, content []byte) error {
  75. // logger.Debug("scanForProxyTargets", configPath)
  76. // Parse proxy targets and upstream definitions from config content
  77. result := ParseProxyTargetsAndUpstreamsFromRawContent(string(content))
  78. service := GetUpstreamService()
  79. service.updateTargetsFromConfig(configPath, result.ProxyTargets)
  80. // Update upstream definitions
  81. for upstreamName, servers := range result.Upstreams {
  82. service.UpdateUpstreamDefinition(upstreamName, servers, configPath)
  83. }
  84. return nil
  85. }
  86. // updateTargetsFromConfig updates proxy targets from a specific config file
  87. func (s *UpstreamService) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
  88. s.targetsMutex.Lock()
  89. defer s.targetsMutex.Unlock()
  90. now := time.Now()
  91. // Remove old targets from this config path
  92. if oldTargetKeys, exists := s.configTargets[configPath]; exists {
  93. for _, key := range oldTargetKeys {
  94. if _, exists := s.targets[key]; exists {
  95. // Only remove if this is the only config using this target
  96. isOnlyConfig := true
  97. for otherConfig, otherKeys := range s.configTargets {
  98. if otherConfig != configPath {
  99. if slices.Contains(otherKeys, key) {
  100. isOnlyConfig = false
  101. }
  102. if !isOnlyConfig {
  103. break
  104. }
  105. }
  106. }
  107. if isOnlyConfig {
  108. delete(s.targets, key)
  109. delete(s.availabilityMap, key)
  110. // logger.Debug("Removed proxy target:", key, "from config:", configPath)
  111. } else {
  112. // logger.Debug("Keeping proxy target:", key, "still used by other configs")
  113. }
  114. }
  115. }
  116. }
  117. // Add/update new targets
  118. newTargetKeys := make([]string, 0, len(targets))
  119. for _, target := range targets {
  120. key := formatSocketAddress(target.Host, target.Port)
  121. newTargetKeys = append(newTargetKeys, key)
  122. if existingTarget, exists := s.targets[key]; exists {
  123. // Update existing target with latest info
  124. existingTarget.LastSeen = now
  125. existingTarget.ConfigPath = configPath // Update to latest config that referenced it
  126. // logger.Debug("Updated proxy target:", key, "from config:", configPath)
  127. } else {
  128. // Add new target
  129. s.targets[key] = &TargetInfo{
  130. ProxyTarget: target,
  131. ConfigPath: configPath,
  132. LastSeen: now,
  133. }
  134. // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
  135. }
  136. }
  137. // Update config target mapping
  138. s.configTargets[configPath] = newTargetKeys
  139. s.lastUpdateTime = now
  140. // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
  141. }
  142. // GetTargets returns a copy of current proxy targets
  143. func (s *UpstreamService) GetTargets() []ProxyTarget {
  144. s.targetsMutex.RLock()
  145. defer s.targetsMutex.RUnlock()
  146. targets := make([]ProxyTarget, 0, len(s.targets))
  147. for _, targetInfo := range s.targets {
  148. targets = append(targets, targetInfo.ProxyTarget)
  149. }
  150. return targets
  151. }
  152. // GetTargetInfos returns a copy of current target infos
  153. func (s *UpstreamService) GetTargetInfos() []*TargetInfo {
  154. s.targetsMutex.RLock()
  155. defer s.targetsMutex.RUnlock()
  156. targetInfos := make([]*TargetInfo, 0, len(s.targets))
  157. for _, targetInfo := range s.targets {
  158. // Create a copy
  159. targetInfoCopy := &TargetInfo{
  160. ProxyTarget: targetInfo.ProxyTarget,
  161. ConfigPath: targetInfo.ConfigPath,
  162. LastSeen: targetInfo.LastSeen,
  163. }
  164. targetInfos = append(targetInfos, targetInfoCopy)
  165. }
  166. return targetInfos
  167. }
  168. // GetAvailabilityMap returns a copy of current availability results
  169. func (s *UpstreamService) GetAvailabilityMap() map[string]*Status {
  170. s.targetsMutex.RLock()
  171. defer s.targetsMutex.RUnlock()
  172. result := make(map[string]*Status)
  173. for k, v := range s.availabilityMap {
  174. // Create a copy of the status
  175. result[k] = &Status{
  176. Online: v.Online,
  177. Latency: v.Latency,
  178. }
  179. }
  180. return result
  181. }
  182. // PerformAvailabilityTest performs availability test for all targets
  183. func (s *UpstreamService) PerformAvailabilityTest() {
  184. // Prevent concurrent tests
  185. s.testMutex.Lock()
  186. if s.testInProgress {
  187. s.testMutex.Unlock()
  188. // logger.Debug("Availability test already in progress, skipping")
  189. return
  190. }
  191. s.testInProgress = true
  192. s.testMutex.Unlock()
  193. // Ensure we reset the flag when done
  194. defer func() {
  195. s.testMutex.Lock()
  196. s.testInProgress = false
  197. s.testMutex.Unlock()
  198. }()
  199. s.targetsMutex.RLock()
  200. targetCount := len(s.targets)
  201. s.targetsMutex.RUnlock()
  202. if targetCount == 0 {
  203. logger.Debug("No targets to test")
  204. return
  205. }
  206. logger.Debug("Performing availability test for", targetCount, "unique targets")
  207. // Separate targets into traditional and consul groups from the start
  208. s.targetsMutex.RLock()
  209. regularTargetKeys := make([]string, 0, len(s.targets))
  210. consulTargets := make([]ProxyTarget, 0, len(s.targets))
  211. for _, targetInfo := range s.targets {
  212. if targetInfo.ProxyTarget.IsConsul {
  213. consulTargets = append(consulTargets, targetInfo.ProxyTarget)
  214. } else {
  215. // Traditional target - use properly formatted socket address
  216. key := formatSocketAddress(targetInfo.ProxyTarget.Host, targetInfo.ProxyTarget.Port)
  217. regularTargetKeys = append(regularTargetKeys, key)
  218. }
  219. }
  220. s.targetsMutex.RUnlock()
  221. // Initialize results map
  222. results := make(map[string]*Status)
  223. // Test traditional targets using the original AvailabilityTest
  224. if len(regularTargetKeys) > 0 {
  225. // logger.Debug("Testing", len(regularTargetKeys), "traditional targets")
  226. regularResults := AvailabilityTest(regularTargetKeys)
  227. maps.Copy(results, regularResults)
  228. }
  229. // Test consul targets using consul-specific logic
  230. if len(consulTargets) > 0 {
  231. // logger.Debug("Testing", len(consulTargets), "consul targets")
  232. consulResults := TestDynamicTargets(consulTargets)
  233. maps.Copy(results, consulResults)
  234. }
  235. // Update availability map
  236. s.targetsMutex.Lock()
  237. s.availabilityMap = results
  238. s.targetsMutex.Unlock()
  239. // logger.Debug("Availability test completed for", len(results), "targets")
  240. }
  241. // ClearTargets clears all targets (useful for testing or reloading)
  242. func (s *UpstreamService) ClearTargets() {
  243. s.targetsMutex.Lock()
  244. s.upstreamsMutex.Lock()
  245. defer s.targetsMutex.Unlock()
  246. defer s.upstreamsMutex.Unlock()
  247. s.targets = make(map[string]*TargetInfo)
  248. s.availabilityMap = make(map[string]*Status)
  249. s.configTargets = make(map[string][]string)
  250. s.Upstreams = make(map[string]*UpstreamDefinition)
  251. s.lastUpdateTime = time.Now()
  252. // logger.Debug("Cleared all proxy targets and upstream definitions")
  253. }
  254. // GetLastUpdateTime returns the last time targets were updated
  255. func (s *UpstreamService) GetLastUpdateTime() time.Time {
  256. s.targetsMutex.RLock()
  257. defer s.targetsMutex.RUnlock()
  258. return s.lastUpdateTime
  259. }
  260. // GetTargetCount returns the number of unique targets
  261. func (s *UpstreamService) GetTargetCount() int {
  262. s.targetsMutex.RLock()
  263. defer s.targetsMutex.RUnlock()
  264. return len(s.targets)
  265. }
  266. // UpdateUpstreamDefinition updates or adds an upstream definition
  267. func (s *UpstreamService) UpdateUpstreamDefinition(name string, servers []ProxyTarget, configPath string) {
  268. s.upstreamsMutex.Lock()
  269. defer s.upstreamsMutex.Unlock()
  270. s.Upstreams[name] = &UpstreamDefinition{
  271. Name: name,
  272. Servers: servers,
  273. ConfigPath: configPath,
  274. LastSeen: time.Now(),
  275. }
  276. }
  277. // GetUpstreamDefinition returns an upstream definition by name
  278. func (s *UpstreamService) GetUpstreamDefinition(name string) (*UpstreamDefinition, bool) {
  279. s.upstreamsMutex.RLock()
  280. defer s.upstreamsMutex.RUnlock()
  281. upstream, exists := s.Upstreams[name]
  282. if !exists {
  283. return nil, false
  284. }
  285. // Return a copy to avoid race conditions
  286. return &UpstreamDefinition{
  287. Name: upstream.Name,
  288. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  289. ConfigPath: upstream.ConfigPath,
  290. LastSeen: upstream.LastSeen,
  291. }, true
  292. }
  293. // GetAllUpstreamDefinitions returns a copy of all upstream definitions
  294. func (s *UpstreamService) GetAllUpstreamDefinitions() map[string]*UpstreamDefinition {
  295. s.upstreamsMutex.RLock()
  296. defer s.upstreamsMutex.RUnlock()
  297. result := make(map[string]*UpstreamDefinition)
  298. for name, upstream := range s.Upstreams {
  299. result[name] = &UpstreamDefinition{
  300. Name: upstream.Name,
  301. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  302. ConfigPath: upstream.ConfigPath,
  303. LastSeen: upstream.LastSeen,
  304. }
  305. }
  306. return result
  307. }
  308. // IsUpstreamName checks if a given name is a known upstream
  309. func (s *UpstreamService) IsUpstreamName(name string) bool {
  310. s.upstreamsMutex.RLock()
  311. defer s.upstreamsMutex.RUnlock()
  312. _, exists := s.Upstreams[name]
  313. return exists
  314. }
  315. // RemoveConfigTargets removes all targets associated with a specific config file
  316. func (s *UpstreamService) RemoveConfigTargets(configPath string) {
  317. s.targetsMutex.Lock()
  318. defer s.targetsMutex.Unlock()
  319. if targetKeys, exists := s.configTargets[configPath]; exists {
  320. for _, key := range targetKeys {
  321. // Check if this target is used by other configs
  322. isUsedByOthers := false
  323. for otherConfig, otherKeys := range s.configTargets {
  324. if otherConfig != configPath {
  325. if slices.Contains(otherKeys, key) {
  326. isUsedByOthers = true
  327. }
  328. if isUsedByOthers {
  329. break
  330. }
  331. }
  332. }
  333. if !isUsedByOthers {
  334. delete(s.targets, key)
  335. delete(s.availabilityMap, key)
  336. // logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
  337. }
  338. }
  339. delete(s.configTargets, configPath)
  340. s.lastUpdateTime = time.Now()
  341. // logger.Debug("Removed config targets for:", configPath)
  342. }
  343. }