service.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package upstream
  2. import (
  3. "maps"
  4. "slices"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/cache"
  9. "github.com/uozi-tech/cosy/logger"
  10. )
  11. // TargetInfo contains proxy target information with source config
  12. type TargetInfo struct {
  13. ProxyTarget
  14. ConfigPath string `json:"config_path"`
  15. LastSeen time.Time `json:"last_seen"`
  16. }
  17. // Definition contains upstream block information
  18. type Definition struct {
  19. Name string `json:"name"`
  20. Servers []ProxyTarget `json:"servers"`
  21. ConfigPath string `json:"config_path"`
  22. LastSeen time.Time `json:"last_seen"`
  23. }
  24. // Service manages upstream availability testing
  25. type Service struct {
  26. targets map[string]*TargetInfo // key: host:port
  27. availabilityMap map[string]*Status // key: host:port
  28. configTargets map[string][]string // configPath -> []targetKeys
  29. // Public upstream definitions storage
  30. Upstreams map[string]*Definition // key: upstream name
  31. upstreamsMutex sync.RWMutex
  32. targetsMutex sync.RWMutex
  33. lastUpdateTime time.Time
  34. testInProgress bool
  35. testMutex sync.Mutex
  36. disabledSocketsChecker func() map[string]bool
  37. }
  38. var (
  39. upstreamService *Service
  40. serviceOnce sync.Once
  41. )
  42. // formatSocketAddress formats a host:port combination into a proper socket address
  43. // For IPv6 addresses, it adds brackets around the host if they're not already present
  44. func formatSocketAddress(host, port string) string {
  45. // Check if this is an IPv6 address by looking for colons
  46. if strings.Contains(host, ":") {
  47. // IPv6 address - check if it already has brackets
  48. if !strings.HasPrefix(host, "[") {
  49. return "[" + host + "]:" + port
  50. }
  51. // Already has brackets, just append port
  52. return host + ":" + port
  53. }
  54. // IPv4 address or hostname
  55. return host + ":" + port
  56. }
  57. // GetUpstreamService returns the singleton upstream service instance
  58. func GetUpstreamService() *Service {
  59. serviceOnce.Do(func() {
  60. upstreamService = &Service{
  61. targets: make(map[string]*TargetInfo),
  62. availabilityMap: make(map[string]*Status),
  63. configTargets: make(map[string][]string),
  64. Upstreams: make(map[string]*Definition),
  65. lastUpdateTime: time.Now(),
  66. }
  67. })
  68. return upstreamService
  69. }
  70. // init registers the ParseProxyTargetsFromRawContent callback
  71. func init() {
  72. cache.RegisterCallback("upstream.scanForProxyTargets", scanForProxyTargets)
  73. }
  74. // scanForProxyTargets is the callback function for cache scanner
  75. func scanForProxyTargets(configPath string, content []byte) error {
  76. // logger.Debug("scanForProxyTargets", configPath)
  77. // Parse proxy targets and upstream definitions from config content
  78. result := ParseProxyTargetsAndUpstreamsFromRawContent(string(content))
  79. service := GetUpstreamService()
  80. service.updateTargetsFromConfig(configPath, result.ProxyTargets)
  81. // Update upstream definitions
  82. for upstreamName, servers := range result.Upstreams {
  83. service.UpdateUpstreamDefinition(upstreamName, servers, configPath)
  84. }
  85. return nil
  86. }
  87. // updateTargetsFromConfig updates proxy targets from a specific config file
  88. func (s *Service) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
  89. s.targetsMutex.Lock()
  90. defer s.targetsMutex.Unlock()
  91. now := time.Now()
  92. // Remove old targets from this config path
  93. if oldTargetKeys, exists := s.configTargets[configPath]; exists {
  94. for _, key := range oldTargetKeys {
  95. if _, exists := s.targets[key]; exists {
  96. // Only remove if this is the only config using this target
  97. isOnlyConfig := true
  98. for otherConfig, otherKeys := range s.configTargets {
  99. if otherConfig != configPath {
  100. if slices.Contains(otherKeys, key) {
  101. isOnlyConfig = false
  102. }
  103. if !isOnlyConfig {
  104. break
  105. }
  106. }
  107. }
  108. if isOnlyConfig {
  109. delete(s.targets, key)
  110. delete(s.availabilityMap, key)
  111. // logger.Debug("Removed proxy target:", key, "from config:", configPath)
  112. } else {
  113. // logger.Debug("Keeping proxy target:", key, "still used by other configs")
  114. }
  115. }
  116. }
  117. }
  118. // Add/update new targets
  119. newTargetKeys := make([]string, 0, len(targets))
  120. for _, target := range targets {
  121. key := formatSocketAddress(target.Host, target.Port)
  122. newTargetKeys = append(newTargetKeys, key)
  123. if existingTarget, exists := s.targets[key]; exists {
  124. // Update existing target with latest info
  125. existingTarget.LastSeen = now
  126. existingTarget.ConfigPath = configPath // Update to latest config that referenced it
  127. // logger.Debug("Updated proxy target:", key, "from config:", configPath)
  128. } else {
  129. // Add new target
  130. s.targets[key] = &TargetInfo{
  131. ProxyTarget: target,
  132. ConfigPath: configPath,
  133. LastSeen: now,
  134. }
  135. // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
  136. }
  137. }
  138. // Update config target mapping
  139. s.configTargets[configPath] = newTargetKeys
  140. s.lastUpdateTime = now
  141. // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
  142. }
  143. // GetTargets returns a copy of current proxy targets
  144. func (s *Service) GetTargets() []ProxyTarget {
  145. s.targetsMutex.RLock()
  146. defer s.targetsMutex.RUnlock()
  147. targets := make([]ProxyTarget, 0, len(s.targets))
  148. for _, targetInfo := range s.targets {
  149. targets = append(targets, targetInfo.ProxyTarget)
  150. }
  151. return targets
  152. }
  153. // GetTargetInfos returns a copy of current target infos
  154. func (s *Service) GetTargetInfos() []*TargetInfo {
  155. s.targetsMutex.RLock()
  156. defer s.targetsMutex.RUnlock()
  157. targetInfos := make([]*TargetInfo, 0, len(s.targets))
  158. for _, targetInfo := range s.targets {
  159. // Create a copy
  160. targetInfoCopy := &TargetInfo{
  161. ProxyTarget: targetInfo.ProxyTarget,
  162. ConfigPath: targetInfo.ConfigPath,
  163. LastSeen: targetInfo.LastSeen,
  164. }
  165. targetInfos = append(targetInfos, targetInfoCopy)
  166. }
  167. return targetInfos
  168. }
  169. // GetAvailabilityMap returns a copy of current availability results
  170. func (s *Service) GetAvailabilityMap() map[string]*Status {
  171. s.targetsMutex.RLock()
  172. defer s.targetsMutex.RUnlock()
  173. result := make(map[string]*Status)
  174. for k, v := range s.availabilityMap {
  175. // Create a copy of the status
  176. result[k] = &Status{
  177. Online: v.Online,
  178. Latency: v.Latency,
  179. }
  180. }
  181. return result
  182. }
  183. // PerformAvailabilityTest performs availability test for all targets
  184. func (s *Service) PerformAvailabilityTest() {
  185. // Prevent concurrent tests
  186. s.testMutex.Lock()
  187. if s.testInProgress {
  188. s.testMutex.Unlock()
  189. // logger.Debug("Availability test already in progress, skipping")
  190. return
  191. }
  192. s.testInProgress = true
  193. s.testMutex.Unlock()
  194. // Ensure we reset the flag when done
  195. defer func() {
  196. s.testMutex.Lock()
  197. s.testInProgress = false
  198. s.testMutex.Unlock()
  199. }()
  200. s.targetsMutex.RLock()
  201. targetCount := len(s.targets)
  202. s.targetsMutex.RUnlock()
  203. if targetCount == 0 {
  204. logger.Debug("No targets to test")
  205. return
  206. }
  207. // logger.Debug("Performing availability test for", targetCount, "unique targets")
  208. // Get disabled sockets from database
  209. disabledSockets := make(map[string]bool)
  210. if s.disabledSocketsChecker != nil {
  211. disabledSockets = s.disabledSocketsChecker()
  212. }
  213. // Separate targets into traditional and consul groups from the start
  214. s.targetsMutex.RLock()
  215. regularTargetKeys := make([]string, 0, len(s.targets))
  216. consulTargets := make([]ProxyTarget, 0, len(s.targets))
  217. for _, targetInfo := range s.targets {
  218. // Check if this socket is disabled
  219. socketAddr := formatSocketAddress(targetInfo.ProxyTarget.Host, targetInfo.ProxyTarget.Port)
  220. if disabledSockets[socketAddr] {
  221. // logger.Debug("Skipping disabled socket:", socketAddr)
  222. continue
  223. }
  224. if targetInfo.ProxyTarget.IsConsul {
  225. consulTargets = append(consulTargets, targetInfo.ProxyTarget)
  226. } else {
  227. // Traditional target - use properly formatted socket address
  228. regularTargetKeys = append(regularTargetKeys, socketAddr)
  229. }
  230. }
  231. s.targetsMutex.RUnlock()
  232. // Initialize results map
  233. results := make(map[string]*Status)
  234. // Test traditional targets using the original AvailabilityTest
  235. if len(regularTargetKeys) > 0 {
  236. // logger.Debug("Testing", len(regularTargetKeys), "traditional targets")
  237. regularResults := AvailabilityTest(regularTargetKeys)
  238. maps.Copy(results, regularResults)
  239. }
  240. // Test consul targets using consul-specific logic
  241. if len(consulTargets) > 0 {
  242. // logger.Debug("Testing", len(consulTargets), "consul targets")
  243. consulResults := TestDynamicTargets(consulTargets)
  244. maps.Copy(results, consulResults)
  245. }
  246. // Update availability map
  247. s.targetsMutex.Lock()
  248. s.availabilityMap = results
  249. s.targetsMutex.Unlock()
  250. // logger.Debug("Availability test completed for", len(results), "targets")
  251. }
  252. // findUpstreamNameForTarget finds which upstream a target belongs to
  253. func (s *Service) findUpstreamNameForTarget(target ProxyTarget) string {
  254. s.upstreamsMutex.RLock()
  255. defer s.upstreamsMutex.RUnlock()
  256. targetKey := formatSocketAddress(target.Host, target.Port)
  257. for name, upstream := range s.Upstreams {
  258. for _, server := range upstream.Servers {
  259. serverKey := formatSocketAddress(server.Host, server.Port)
  260. if serverKey == targetKey {
  261. return name
  262. }
  263. }
  264. }
  265. return ""
  266. }
  267. // SetDisabledSocketsChecker sets a callback function to check disabled sockets
  268. func (s *Service) SetDisabledSocketsChecker(checker func() map[string]bool) {
  269. s.disabledSocketsChecker = checker
  270. }
  271. // ClearTargets clears all targets (useful for testing or reloading)
  272. func (s *Service) ClearTargets() {
  273. s.targetsMutex.Lock()
  274. s.upstreamsMutex.Lock()
  275. defer s.targetsMutex.Unlock()
  276. defer s.upstreamsMutex.Unlock()
  277. s.targets = make(map[string]*TargetInfo)
  278. s.availabilityMap = make(map[string]*Status)
  279. s.configTargets = make(map[string][]string)
  280. s.Upstreams = make(map[string]*Definition)
  281. s.lastUpdateTime = time.Now()
  282. // logger.Debug("Cleared all proxy targets and upstream definitions")
  283. }
  284. // GetLastUpdateTime returns the last time targets were updated
  285. func (s *Service) GetLastUpdateTime() time.Time {
  286. s.targetsMutex.RLock()
  287. defer s.targetsMutex.RUnlock()
  288. return s.lastUpdateTime
  289. }
  290. // GetTargetCount returns the number of unique targets
  291. func (s *Service) GetTargetCount() int {
  292. s.targetsMutex.RLock()
  293. defer s.targetsMutex.RUnlock()
  294. return len(s.targets)
  295. }
  296. // UpdateUpstreamDefinition updates or adds an upstream definition
  297. func (s *Service) UpdateUpstreamDefinition(name string, servers []ProxyTarget, configPath string) {
  298. s.upstreamsMutex.Lock()
  299. defer s.upstreamsMutex.Unlock()
  300. s.Upstreams[name] = &Definition{
  301. Name: name,
  302. Servers: servers,
  303. ConfigPath: configPath,
  304. LastSeen: time.Now(),
  305. }
  306. }
  307. // GetUpstreamDefinition returns an upstream definition by name
  308. func (s *Service) GetUpstreamDefinition(name string) (*Definition, bool) {
  309. s.upstreamsMutex.RLock()
  310. defer s.upstreamsMutex.RUnlock()
  311. upstream, exists := s.Upstreams[name]
  312. if !exists {
  313. return nil, false
  314. }
  315. // Return a copy to avoid race conditions
  316. return &Definition{
  317. Name: upstream.Name,
  318. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  319. ConfigPath: upstream.ConfigPath,
  320. LastSeen: upstream.LastSeen,
  321. }, true
  322. }
  323. // GetAllUpstreamDefinitions returns a copy of all upstream definitions
  324. func (s *Service) GetAllUpstreamDefinitions() map[string]*Definition {
  325. s.upstreamsMutex.RLock()
  326. defer s.upstreamsMutex.RUnlock()
  327. result := make(map[string]*Definition)
  328. for name, upstream := range s.Upstreams {
  329. result[name] = &Definition{
  330. Name: upstream.Name,
  331. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  332. ConfigPath: upstream.ConfigPath,
  333. LastSeen: upstream.LastSeen,
  334. }
  335. }
  336. return result
  337. }
  338. // IsUpstreamName checks if a given name is a known upstream
  339. func (s *Service) IsUpstreamName(name string) bool {
  340. s.upstreamsMutex.RLock()
  341. defer s.upstreamsMutex.RUnlock()
  342. _, exists := s.Upstreams[name]
  343. return exists
  344. }
  345. // RemoveConfigTargets removes all targets associated with a specific config file
  346. func (s *Service) RemoveConfigTargets(configPath string) {
  347. s.targetsMutex.Lock()
  348. defer s.targetsMutex.Unlock()
  349. if targetKeys, exists := s.configTargets[configPath]; exists {
  350. for _, key := range targetKeys {
  351. // Check if this target is used by other configs
  352. isUsedByOthers := false
  353. for otherConfig, otherKeys := range s.configTargets {
  354. if otherConfig != configPath {
  355. if slices.Contains(otherKeys, key) {
  356. isUsedByOthers = true
  357. }
  358. if isUsedByOthers {
  359. break
  360. }
  361. }
  362. }
  363. if !isUsedByOthers {
  364. delete(s.targets, key)
  365. delete(s.availabilityMap, key)
  366. // logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
  367. }
  368. }
  369. delete(s.configTargets, configPath)
  370. s.lastUpdateTime = time.Now()
  371. // logger.Debug("Removed config targets for:", configPath)
  372. }
  373. }