service.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. package upstream
  2. import (
  3. "maps"
  4. "slices"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/cache"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/uozi-tech/cosy/logger"
  11. )
  12. // TargetInfo contains proxy target information with source config
  13. type TargetInfo struct {
  14. ProxyTarget
  15. ConfigPath string `json:"config_path"`
  16. LastSeen time.Time `json:"last_seen"`
  17. }
  18. // Definition contains upstream block information
  19. type Definition struct {
  20. Name string `json:"name"`
  21. Servers []ProxyTarget `json:"servers"`
  22. ConfigPath string `json:"config_path"`
  23. LastSeen time.Time `json:"last_seen"`
  24. }
  25. // Service manages upstream availability testing
  26. type Service struct {
  27. targets map[string]*TargetInfo // key: host:port
  28. availabilityMap map[string]*Status // key: host:port
  29. configTargets map[string][]string // configPath -> []targetKeys
  30. // Public upstream definitions storage
  31. Upstreams map[string]*Definition // key: upstream name
  32. upstreamsMutex sync.RWMutex
  33. targetsMutex sync.RWMutex
  34. lastUpdateTime time.Time
  35. testInProgress bool
  36. testMutex sync.Mutex
  37. cachedDisabledSockets map[string]bool
  38. disabledSocketsCacheMutex sync.RWMutex
  39. disabledSocketsCacheValid bool // true if cache is valid, false if needs refresh
  40. }
  41. var (
  42. upstreamService *Service
  43. serviceOnce sync.Once
  44. )
  45. // formatSocketAddress formats a host:port combination into a proper socket address
  46. // For IPv6 addresses, it adds brackets around the host if they're not already present
  47. func formatSocketAddress(host, port string) string {
  48. // Check if this is an IPv6 address by looking for colons
  49. if strings.Contains(host, ":") {
  50. // IPv6 address - check if it already has brackets
  51. if !strings.HasPrefix(host, "[") {
  52. return "[" + host + "]:" + port
  53. }
  54. // Already has brackets, just append port
  55. return host + ":" + port
  56. }
  57. // IPv4 address or hostname
  58. return host + ":" + port
  59. }
  60. // GetUpstreamService returns the singleton upstream service instance
  61. func GetUpstreamService() *Service {
  62. serviceOnce.Do(func() {
  63. upstreamService = &Service{
  64. targets: make(map[string]*TargetInfo),
  65. availabilityMap: make(map[string]*Status),
  66. configTargets: make(map[string][]string),
  67. Upstreams: make(map[string]*Definition),
  68. lastUpdateTime: time.Now(),
  69. disabledSocketsCacheValid: false, // Initialize as invalid to force first load
  70. }
  71. })
  72. return upstreamService
  73. }
  74. // init registers the ParseProxyTargetsFromRawContent callback
  75. func init() {
  76. cache.RegisterCallback("upstream.scanForProxyTargets", scanForProxyTargets)
  77. }
  78. // scanForProxyTargets is the callback function for cache scanner
  79. func scanForProxyTargets(configPath string, content []byte) error {
  80. // logger.Debug("scanForProxyTargets", configPath)
  81. // Parse proxy targets and upstream definitions from config content
  82. result := ParseProxyTargetsAndUpstreamsFromRawContent(string(content))
  83. service := GetUpstreamService()
  84. service.updateTargetsFromConfig(configPath, result.ProxyTargets)
  85. // Update upstream definitions
  86. for upstreamName, servers := range result.Upstreams {
  87. service.UpdateUpstreamDefinition(upstreamName, servers, configPath)
  88. }
  89. return nil
  90. }
  91. // updateTargetsFromConfig updates proxy targets from a specific config file
  92. func (s *Service) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
  93. s.targetsMutex.Lock()
  94. defer s.targetsMutex.Unlock()
  95. now := time.Now()
  96. // Remove old targets from this config path
  97. if oldTargetKeys, exists := s.configTargets[configPath]; exists {
  98. for _, key := range oldTargetKeys {
  99. if _, exists := s.targets[key]; exists {
  100. // Only remove if this is the only config using this target
  101. isOnlyConfig := true
  102. for otherConfig, otherKeys := range s.configTargets {
  103. if otherConfig != configPath {
  104. if slices.Contains(otherKeys, key) {
  105. isOnlyConfig = false
  106. }
  107. if !isOnlyConfig {
  108. break
  109. }
  110. }
  111. }
  112. if isOnlyConfig {
  113. delete(s.targets, key)
  114. delete(s.availabilityMap, key)
  115. // logger.Debug("Removed proxy target:", key, "from config:", configPath)
  116. } else {
  117. // logger.Debug("Keeping proxy target:", key, "still used by other configs")
  118. }
  119. }
  120. }
  121. }
  122. // Add/update new targets
  123. newTargetKeys := make([]string, 0, len(targets))
  124. for _, target := range targets {
  125. key := formatSocketAddress(target.Host, target.Port)
  126. newTargetKeys = append(newTargetKeys, key)
  127. if existingTarget, exists := s.targets[key]; exists {
  128. // Update existing target with latest info
  129. existingTarget.LastSeen = now
  130. existingTarget.ConfigPath = configPath // Update to latest config that referenced it
  131. // logger.Debug("Updated proxy target:", key, "from config:", configPath)
  132. } else {
  133. // Add new target
  134. s.targets[key] = &TargetInfo{
  135. ProxyTarget: target,
  136. ConfigPath: configPath,
  137. LastSeen: now,
  138. }
  139. // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
  140. }
  141. }
  142. // Update config target mapping
  143. s.configTargets[configPath] = newTargetKeys
  144. s.lastUpdateTime = now
  145. // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
  146. }
  147. // GetTargets returns a copy of current proxy targets
  148. func (s *Service) GetTargets() []ProxyTarget {
  149. s.targetsMutex.RLock()
  150. defer s.targetsMutex.RUnlock()
  151. targets := make([]ProxyTarget, 0, len(s.targets))
  152. for _, targetInfo := range s.targets {
  153. targets = append(targets, targetInfo.ProxyTarget)
  154. }
  155. return targets
  156. }
  157. // GetTargetInfos returns a copy of current target infos
  158. func (s *Service) GetTargetInfos() []*TargetInfo {
  159. s.targetsMutex.RLock()
  160. defer s.targetsMutex.RUnlock()
  161. targetInfos := make([]*TargetInfo, 0, len(s.targets))
  162. for _, targetInfo := range s.targets {
  163. // Create a copy
  164. targetInfoCopy := &TargetInfo{
  165. ProxyTarget: targetInfo.ProxyTarget,
  166. ConfigPath: targetInfo.ConfigPath,
  167. LastSeen: targetInfo.LastSeen,
  168. }
  169. targetInfos = append(targetInfos, targetInfoCopy)
  170. }
  171. return targetInfos
  172. }
  173. // GetAvailabilityMap returns a copy of current availability results
  174. func (s *Service) GetAvailabilityMap() map[string]*Status {
  175. s.targetsMutex.RLock()
  176. defer s.targetsMutex.RUnlock()
  177. result := make(map[string]*Status)
  178. for k, v := range s.availabilityMap {
  179. // Create a copy of the status
  180. result[k] = &Status{
  181. Online: v.Online,
  182. Latency: v.Latency,
  183. }
  184. }
  185. return result
  186. }
  187. // PerformAvailabilityTest performs availability test for all targets
  188. func (s *Service) PerformAvailabilityTest() {
  189. // Prevent concurrent tests
  190. s.testMutex.Lock()
  191. if s.testInProgress {
  192. s.testMutex.Unlock()
  193. // logger.Debug("Availability test already in progress, skipping")
  194. return
  195. }
  196. s.testInProgress = true
  197. s.testMutex.Unlock()
  198. // Ensure we reset the flag when done
  199. defer func() {
  200. s.testMutex.Lock()
  201. s.testInProgress = false
  202. s.testMutex.Unlock()
  203. }()
  204. s.targetsMutex.RLock()
  205. targetCount := len(s.targets)
  206. s.targetsMutex.RUnlock()
  207. if targetCount == 0 {
  208. logger.Debug("No targets to test")
  209. return
  210. }
  211. // logger.Debug("Performing availability test for", targetCount, "unique targets")
  212. // Get disabled sockets from cache or database
  213. disabledSockets := s.getDisabledSockets()
  214. // Separate targets into traditional and consul groups from the start
  215. s.targetsMutex.RLock()
  216. regularTargetKeys := make([]string, 0, len(s.targets))
  217. consulTargets := make([]ProxyTarget, 0, len(s.targets))
  218. for _, targetInfo := range s.targets {
  219. // Check if this socket is disabled
  220. socketAddr := formatSocketAddress(targetInfo.ProxyTarget.Host, targetInfo.ProxyTarget.Port)
  221. if disabledSockets[socketAddr] {
  222. // logger.Debug("Skipping disabled socket:", socketAddr)
  223. continue
  224. }
  225. if targetInfo.ProxyTarget.IsConsul {
  226. consulTargets = append(consulTargets, targetInfo.ProxyTarget)
  227. } else {
  228. // Traditional target - use properly formatted socket address
  229. regularTargetKeys = append(regularTargetKeys, socketAddr)
  230. }
  231. }
  232. s.targetsMutex.RUnlock()
  233. // Initialize results map
  234. results := make(map[string]*Status)
  235. // Test traditional targets using the original AvailabilityTest
  236. if len(regularTargetKeys) > 0 {
  237. // logger.Debug("Testing", len(regularTargetKeys), "traditional targets")
  238. regularResults := AvailabilityTest(regularTargetKeys)
  239. maps.Copy(results, regularResults)
  240. }
  241. // Test consul targets using consul-specific logic
  242. if len(consulTargets) > 0 {
  243. // logger.Debug("Testing", len(consulTargets), "consul targets")
  244. consulResults := TestDynamicTargets(consulTargets)
  245. maps.Copy(results, consulResults)
  246. }
  247. // Update availability map
  248. s.targetsMutex.Lock()
  249. s.availabilityMap = results
  250. s.targetsMutex.Unlock()
  251. // logger.Debug("Availability test completed for", len(results), "targets")
  252. }
  253. // findUpstreamNameForTarget finds which upstream a target belongs to
  254. func (s *Service) findUpstreamNameForTarget(target ProxyTarget) string {
  255. s.upstreamsMutex.RLock()
  256. defer s.upstreamsMutex.RUnlock()
  257. targetKey := formatSocketAddress(target.Host, target.Port)
  258. for name, upstream := range s.Upstreams {
  259. for _, server := range upstream.Servers {
  260. serverKey := formatSocketAddress(server.Host, server.Port)
  261. if serverKey == targetKey {
  262. return name
  263. }
  264. }
  265. }
  266. return ""
  267. }
  268. // getDisabledSockets queries the database for disabled sockets with event-driven caching
  269. func (s *Service) getDisabledSockets() map[string]bool {
  270. // Check if cache is valid
  271. s.disabledSocketsCacheMutex.RLock()
  272. if s.disabledSocketsCacheValid && s.cachedDisabledSockets != nil {
  273. // Return a copy of the cached data
  274. result := make(map[string]bool, len(s.cachedDisabledSockets))
  275. for k, v := range s.cachedDisabledSockets {
  276. result[k] = v
  277. }
  278. s.disabledSocketsCacheMutex.RUnlock()
  279. return result
  280. }
  281. s.disabledSocketsCacheMutex.RUnlock()
  282. // Cache invalid or not initialized, refresh from database
  283. disabled := make(map[string]bool)
  284. db := model.UseDB()
  285. if db == nil {
  286. return disabled
  287. }
  288. var configs []model.UpstreamConfig
  289. if err := db.Where("enabled = ?", false).Find(&configs).Error; err != nil {
  290. logger.Error("Failed to query disabled sockets:", err)
  291. return disabled
  292. }
  293. for _, config := range configs {
  294. disabled[config.Socket] = true
  295. }
  296. // Update cache and mark as valid
  297. s.disabledSocketsCacheMutex.Lock()
  298. s.cachedDisabledSockets = disabled
  299. s.disabledSocketsCacheValid = true
  300. s.disabledSocketsCacheMutex.Unlock()
  301. logger.Debug("Disabled sockets cache refreshed from database, found", len(disabled), "disabled sockets")
  302. // Return a copy to prevent external modification
  303. result := make(map[string]bool, len(disabled))
  304. for k, v := range disabled {
  305. result[k] = v
  306. }
  307. return result
  308. }
  309. // InvalidateDisabledSocketsCache invalidates the cache, forcing next read to refresh from database
  310. func (s *Service) InvalidateDisabledSocketsCache() {
  311. s.disabledSocketsCacheMutex.Lock()
  312. defer s.disabledSocketsCacheMutex.Unlock()
  313. s.disabledSocketsCacheValid = false
  314. logger.Debug("Disabled sockets cache invalidated")
  315. }
  316. // ClearTargets clears all targets (useful for testing or reloading)
  317. func (s *Service) ClearTargets() {
  318. s.targetsMutex.Lock()
  319. s.upstreamsMutex.Lock()
  320. defer s.targetsMutex.Unlock()
  321. defer s.upstreamsMutex.Unlock()
  322. s.targets = make(map[string]*TargetInfo)
  323. s.availabilityMap = make(map[string]*Status)
  324. s.configTargets = make(map[string][]string)
  325. s.Upstreams = make(map[string]*Definition)
  326. s.lastUpdateTime = time.Now()
  327. // logger.Debug("Cleared all proxy targets and upstream definitions")
  328. }
  329. // GetLastUpdateTime returns the last time targets were updated
  330. func (s *Service) GetLastUpdateTime() time.Time {
  331. s.targetsMutex.RLock()
  332. defer s.targetsMutex.RUnlock()
  333. return s.lastUpdateTime
  334. }
  335. // GetTargetCount returns the number of unique targets
  336. func (s *Service) GetTargetCount() int {
  337. s.targetsMutex.RLock()
  338. defer s.targetsMutex.RUnlock()
  339. return len(s.targets)
  340. }
  341. // UpdateUpstreamDefinition updates or adds an upstream definition
  342. func (s *Service) UpdateUpstreamDefinition(name string, servers []ProxyTarget, configPath string) {
  343. s.upstreamsMutex.Lock()
  344. defer s.upstreamsMutex.Unlock()
  345. s.Upstreams[name] = &Definition{
  346. Name: name,
  347. Servers: servers,
  348. ConfigPath: configPath,
  349. LastSeen: time.Now(),
  350. }
  351. }
  352. // GetUpstreamDefinition returns an upstream definition by name
  353. func (s *Service) GetUpstreamDefinition(name string) (*Definition, bool) {
  354. s.upstreamsMutex.RLock()
  355. defer s.upstreamsMutex.RUnlock()
  356. upstream, exists := s.Upstreams[name]
  357. if !exists {
  358. return nil, false
  359. }
  360. // Return a copy to avoid race conditions
  361. return &Definition{
  362. Name: upstream.Name,
  363. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  364. ConfigPath: upstream.ConfigPath,
  365. LastSeen: upstream.LastSeen,
  366. }, true
  367. }
  368. // GetAllUpstreamDefinitions returns a copy of all upstream definitions
  369. func (s *Service) GetAllUpstreamDefinitions() map[string]*Definition {
  370. s.upstreamsMutex.RLock()
  371. defer s.upstreamsMutex.RUnlock()
  372. result := make(map[string]*Definition)
  373. for name, upstream := range s.Upstreams {
  374. result[name] = &Definition{
  375. Name: upstream.Name,
  376. Servers: append([]ProxyTarget(nil), upstream.Servers...),
  377. ConfigPath: upstream.ConfigPath,
  378. LastSeen: upstream.LastSeen,
  379. }
  380. }
  381. return result
  382. }
  383. // IsUpstreamName checks if a given name is a known upstream
  384. func (s *Service) IsUpstreamName(name string) bool {
  385. s.upstreamsMutex.RLock()
  386. defer s.upstreamsMutex.RUnlock()
  387. _, exists := s.Upstreams[name]
  388. return exists
  389. }
  390. // RemoveConfigTargets removes all targets associated with a specific config file
  391. func (s *Service) RemoveConfigTargets(configPath string) {
  392. s.targetsMutex.Lock()
  393. defer s.targetsMutex.Unlock()
  394. if targetKeys, exists := s.configTargets[configPath]; exists {
  395. for _, key := range targetKeys {
  396. // Check if this target is used by other configs
  397. isUsedByOthers := false
  398. for otherConfig, otherKeys := range s.configTargets {
  399. if otherConfig != configPath {
  400. if slices.Contains(otherKeys, key) {
  401. isUsedByOthers = true
  402. }
  403. if isUsedByOthers {
  404. break
  405. }
  406. }
  407. }
  408. if !isUsedByOthers {
  409. delete(s.targets, key)
  410. delete(s.availabilityMap, key)
  411. // logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
  412. }
  413. }
  414. delete(s.configTargets, configPath)
  415. s.lastUpdateTime = time.Now()
  416. // logger.Debug("Removed config targets for:", configPath)
  417. }
  418. }