service.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package upstream
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/0xJacky/Nginx-UI/internal/cache"
  6. "github.com/uozi-tech/cosy/logger"
  7. )
  8. // TargetInfo contains proxy target information with source config
  9. type TargetInfo struct {
  10. ProxyTarget
  11. ConfigPath string `json:"config_path"`
  12. LastSeen time.Time `json:"last_seen"`
  13. }
  14. // UpstreamService manages upstream availability testing
  15. type UpstreamService struct {
  16. targets map[string]*TargetInfo // key: host:port
  17. availabilityMap map[string]*Status // key: host:port
  18. configTargets map[string][]string // configPath -> []targetKeys
  19. targetsMutex sync.RWMutex
  20. lastUpdateTime time.Time
  21. testInProgress bool
  22. testMutex sync.Mutex
  23. }
  24. var (
  25. upstreamService *UpstreamService
  26. serviceOnce sync.Once
  27. )
  28. // GetUpstreamService returns the singleton upstream service instance
  29. func GetUpstreamService() *UpstreamService {
  30. serviceOnce.Do(func() {
  31. upstreamService = &UpstreamService{
  32. targets: make(map[string]*TargetInfo),
  33. availabilityMap: make(map[string]*Status),
  34. configTargets: make(map[string][]string),
  35. lastUpdateTime: time.Now(),
  36. }
  37. })
  38. return upstreamService
  39. }
  40. // init registers the ParseProxyTargetsFromRawContent callback
  41. func init() {
  42. cache.RegisterCallback(scanForProxyTargets)
  43. }
  44. // scanForProxyTargets is the callback function for cache scanner
  45. func scanForProxyTargets(configPath string, content []byte) error {
  46. // Parse proxy targets from config content
  47. targets := ParseProxyTargetsFromRawContent(string(content))
  48. service := GetUpstreamService()
  49. service.updateTargetsFromConfig(configPath, targets)
  50. return nil
  51. }
  52. // updateTargetsFromConfig updates proxy targets from a specific config file
  53. func (s *UpstreamService) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
  54. s.targetsMutex.Lock()
  55. defer s.targetsMutex.Unlock()
  56. now := time.Now()
  57. // Remove old targets from this config path
  58. if oldTargetKeys, exists := s.configTargets[configPath]; exists {
  59. for _, key := range oldTargetKeys {
  60. if _, exists := s.targets[key]; exists {
  61. // Only remove if this is the only config using this target
  62. isOnlyConfig := true
  63. for otherConfig, otherKeys := range s.configTargets {
  64. if otherConfig != configPath {
  65. for _, otherKey := range otherKeys {
  66. if otherKey == key {
  67. isOnlyConfig = false
  68. break
  69. }
  70. }
  71. if !isOnlyConfig {
  72. break
  73. }
  74. }
  75. }
  76. if isOnlyConfig {
  77. delete(s.targets, key)
  78. delete(s.availabilityMap, key)
  79. logger.Debug("Removed proxy target:", key, "from config:", configPath)
  80. } else {
  81. logger.Debug("Keeping proxy target:", key, "still used by other configs")
  82. }
  83. }
  84. }
  85. }
  86. // Add/update new targets
  87. newTargetKeys := make([]string, 0, len(targets))
  88. for _, target := range targets {
  89. key := target.Host + ":" + target.Port
  90. newTargetKeys = append(newTargetKeys, key)
  91. if existingTarget, exists := s.targets[key]; exists {
  92. // Update existing target with latest info
  93. existingTarget.LastSeen = now
  94. existingTarget.ConfigPath = configPath // Update to latest config that referenced it
  95. // logger.Debug("Updated proxy target:", key, "from config:", configPath)
  96. } else {
  97. // Add new target
  98. s.targets[key] = &TargetInfo{
  99. ProxyTarget: target,
  100. ConfigPath: configPath,
  101. LastSeen: now,
  102. }
  103. // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
  104. }
  105. }
  106. // Update config target mapping
  107. s.configTargets[configPath] = newTargetKeys
  108. s.lastUpdateTime = now
  109. // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
  110. }
  111. // GetTargets returns a copy of current proxy targets
  112. func (s *UpstreamService) GetTargets() []ProxyTarget {
  113. s.targetsMutex.RLock()
  114. defer s.targetsMutex.RUnlock()
  115. targets := make([]ProxyTarget, 0, len(s.targets))
  116. for _, targetInfo := range s.targets {
  117. targets = append(targets, targetInfo.ProxyTarget)
  118. }
  119. return targets
  120. }
  121. // GetTargetInfos returns a copy of current target infos
  122. func (s *UpstreamService) GetTargetInfos() []*TargetInfo {
  123. s.targetsMutex.RLock()
  124. defer s.targetsMutex.RUnlock()
  125. targetInfos := make([]*TargetInfo, 0, len(s.targets))
  126. for _, targetInfo := range s.targets {
  127. // Create a copy
  128. targetInfoCopy := &TargetInfo{
  129. ProxyTarget: targetInfo.ProxyTarget,
  130. ConfigPath: targetInfo.ConfigPath,
  131. LastSeen: targetInfo.LastSeen,
  132. }
  133. targetInfos = append(targetInfos, targetInfoCopy)
  134. }
  135. return targetInfos
  136. }
  137. // GetAvailabilityMap returns a copy of current availability results
  138. func (s *UpstreamService) GetAvailabilityMap() map[string]*Status {
  139. s.targetsMutex.RLock()
  140. defer s.targetsMutex.RUnlock()
  141. result := make(map[string]*Status)
  142. for k, v := range s.availabilityMap {
  143. // Create a copy of the status
  144. result[k] = &Status{
  145. Online: v.Online,
  146. Latency: v.Latency,
  147. }
  148. }
  149. return result
  150. }
  151. // PerformAvailabilityTest performs availability test for all targets
  152. func (s *UpstreamService) PerformAvailabilityTest() {
  153. // Prevent concurrent tests
  154. s.testMutex.Lock()
  155. if s.testInProgress {
  156. s.testMutex.Unlock()
  157. logger.Debug("Availability test already in progress, skipping")
  158. return
  159. }
  160. s.testInProgress = true
  161. s.testMutex.Unlock()
  162. // Ensure we reset the flag when done
  163. defer func() {
  164. s.testMutex.Lock()
  165. s.testInProgress = false
  166. s.testMutex.Unlock()
  167. }()
  168. s.targetsMutex.RLock()
  169. targetCount := len(s.targets)
  170. s.targetsMutex.RUnlock()
  171. if targetCount == 0 {
  172. logger.Debug("No targets to test")
  173. return
  174. }
  175. logger.Debug("Performing availability test for", targetCount, "unique targets")
  176. // Get target keys for testing
  177. s.targetsMutex.RLock()
  178. testTargets := make([]string, 0, len(s.targets))
  179. for key := range s.targets {
  180. testTargets = append(testTargets, key)
  181. }
  182. s.targetsMutex.RUnlock()
  183. // Perform the actual availability test
  184. results := AvailabilityTest(testTargets)
  185. // Update availability map
  186. s.targetsMutex.Lock()
  187. s.availabilityMap = results
  188. s.targetsMutex.Unlock()
  189. logger.Debug("Availability test completed for", len(results), "targets")
  190. }
  191. // ClearTargets clears all targets (useful for testing or reloading)
  192. func (s *UpstreamService) ClearTargets() {
  193. s.targetsMutex.Lock()
  194. defer s.targetsMutex.Unlock()
  195. s.targets = make(map[string]*TargetInfo)
  196. s.availabilityMap = make(map[string]*Status)
  197. s.configTargets = make(map[string][]string)
  198. s.lastUpdateTime = time.Now()
  199. logger.Debug("Cleared all proxy targets")
  200. }
  201. // GetLastUpdateTime returns the last time targets were updated
  202. func (s *UpstreamService) GetLastUpdateTime() time.Time {
  203. s.targetsMutex.RLock()
  204. defer s.targetsMutex.RUnlock()
  205. return s.lastUpdateTime
  206. }
  207. // GetTargetCount returns the number of unique targets
  208. func (s *UpstreamService) GetTargetCount() int {
  209. s.targetsMutex.RLock()
  210. defer s.targetsMutex.RUnlock()
  211. return len(s.targets)
  212. }
  213. // RemoveConfigTargets removes all targets associated with a specific config file
  214. func (s *UpstreamService) RemoveConfigTargets(configPath string) {
  215. s.targetsMutex.Lock()
  216. defer s.targetsMutex.Unlock()
  217. if targetKeys, exists := s.configTargets[configPath]; exists {
  218. for _, key := range targetKeys {
  219. // Check if this target is used by other configs
  220. isUsedByOthers := false
  221. for otherConfig, otherKeys := range s.configTargets {
  222. if otherConfig != configPath {
  223. for _, otherKey := range otherKeys {
  224. if otherKey == key {
  225. isUsedByOthers = true
  226. break
  227. }
  228. }
  229. if isUsedByOthers {
  230. break
  231. }
  232. }
  233. }
  234. if !isUsedByOthers {
  235. delete(s.targets, key)
  236. delete(s.availabilityMap, key)
  237. logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
  238. }
  239. }
  240. delete(s.configTargets, configPath)
  241. s.lastUpdateTime = time.Now()
  242. logger.Debug("Removed config targets for:", configPath)
  243. }
  244. }