service.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package upstream
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/0xJacky/Nginx-UI/internal/cache"
  6. )
  7. // TargetInfo contains proxy target information with source config
  8. type TargetInfo struct {
  9. ProxyTarget
  10. ConfigPath string `json:"config_path"`
  11. LastSeen time.Time `json:"last_seen"`
  12. }
  13. // UpstreamService manages upstream availability testing
  14. type UpstreamService struct {
  15. targets map[string]*TargetInfo // key: host:port
  16. availabilityMap map[string]*Status // key: host:port
  17. configTargets map[string][]string // configPath -> []targetKeys
  18. targetsMutex sync.RWMutex
  19. lastUpdateTime time.Time
  20. testInProgress bool
  21. testMutex sync.Mutex
  22. }
  23. var (
  24. upstreamService *UpstreamService
  25. serviceOnce sync.Once
  26. )
  27. // GetUpstreamService returns the singleton upstream service instance
  28. func GetUpstreamService() *UpstreamService {
  29. serviceOnce.Do(func() {
  30. upstreamService = &UpstreamService{
  31. targets: make(map[string]*TargetInfo),
  32. availabilityMap: make(map[string]*Status),
  33. configTargets: make(map[string][]string),
  34. lastUpdateTime: time.Now(),
  35. }
  36. })
  37. return upstreamService
  38. }
  39. // init registers the ParseProxyTargetsFromRawContent callback
  40. func init() {
  41. cache.RegisterCallback(scanForProxyTargets)
  42. }
  43. // scanForProxyTargets is the callback function for cache scanner
  44. func scanForProxyTargets(configPath string, content []byte) error {
  45. // Parse proxy targets from config content
  46. targets := ParseProxyTargetsFromRawContent(string(content))
  47. service := GetUpstreamService()
  48. service.updateTargetsFromConfig(configPath, targets)
  49. return nil
  50. }
  51. // updateTargetsFromConfig updates proxy targets from a specific config file
  52. func (s *UpstreamService) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
  53. s.targetsMutex.Lock()
  54. defer s.targetsMutex.Unlock()
  55. now := time.Now()
  56. // Remove old targets from this config path
  57. if oldTargetKeys, exists := s.configTargets[configPath]; exists {
  58. for _, key := range oldTargetKeys {
  59. if _, exists := s.targets[key]; exists {
  60. // Only remove if this is the only config using this target
  61. isOnlyConfig := true
  62. for otherConfig, otherKeys := range s.configTargets {
  63. if otherConfig != configPath {
  64. for _, otherKey := range otherKeys {
  65. if otherKey == key {
  66. isOnlyConfig = false
  67. break
  68. }
  69. }
  70. if !isOnlyConfig {
  71. break
  72. }
  73. }
  74. }
  75. if isOnlyConfig {
  76. delete(s.targets, key)
  77. delete(s.availabilityMap, key)
  78. // logger.Debug("Removed proxy target:", key, "from config:", configPath)
  79. } else {
  80. // logger.Debug("Keeping proxy target:", key, "still used by other configs")
  81. }
  82. }
  83. }
  84. }
  85. // Add/update new targets
  86. newTargetKeys := make([]string, 0, len(targets))
  87. for _, target := range targets {
  88. key := target.Host + ":" + target.Port
  89. newTargetKeys = append(newTargetKeys, key)
  90. if existingTarget, exists := s.targets[key]; exists {
  91. // Update existing target with latest info
  92. existingTarget.LastSeen = now
  93. existingTarget.ConfigPath = configPath // Update to latest config that referenced it
  94. // logger.Debug("Updated proxy target:", key, "from config:", configPath)
  95. } else {
  96. // Add new target
  97. s.targets[key] = &TargetInfo{
  98. ProxyTarget: target,
  99. ConfigPath: configPath,
  100. LastSeen: now,
  101. }
  102. // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
  103. }
  104. }
  105. // Update config target mapping
  106. s.configTargets[configPath] = newTargetKeys
  107. s.lastUpdateTime = now
  108. // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
  109. }
  110. // GetTargets returns a copy of current proxy targets
  111. func (s *UpstreamService) GetTargets() []ProxyTarget {
  112. s.targetsMutex.RLock()
  113. defer s.targetsMutex.RUnlock()
  114. targets := make([]ProxyTarget, 0, len(s.targets))
  115. for _, targetInfo := range s.targets {
  116. targets = append(targets, targetInfo.ProxyTarget)
  117. }
  118. return targets
  119. }
  120. // GetTargetInfos returns a copy of current target infos
  121. func (s *UpstreamService) GetTargetInfos() []*TargetInfo {
  122. s.targetsMutex.RLock()
  123. defer s.targetsMutex.RUnlock()
  124. targetInfos := make([]*TargetInfo, 0, len(s.targets))
  125. for _, targetInfo := range s.targets {
  126. // Create a copy
  127. targetInfoCopy := &TargetInfo{
  128. ProxyTarget: targetInfo.ProxyTarget,
  129. ConfigPath: targetInfo.ConfigPath,
  130. LastSeen: targetInfo.LastSeen,
  131. }
  132. targetInfos = append(targetInfos, targetInfoCopy)
  133. }
  134. return targetInfos
  135. }
  136. // GetAvailabilityMap returns a copy of current availability results
  137. func (s *UpstreamService) GetAvailabilityMap() map[string]*Status {
  138. s.targetsMutex.RLock()
  139. defer s.targetsMutex.RUnlock()
  140. result := make(map[string]*Status)
  141. for k, v := range s.availabilityMap {
  142. // Create a copy of the status
  143. result[k] = &Status{
  144. Online: v.Online,
  145. Latency: v.Latency,
  146. }
  147. }
  148. return result
  149. }
  150. // PerformAvailabilityTest performs availability test for all targets
  151. func (s *UpstreamService) PerformAvailabilityTest() {
  152. // Prevent concurrent tests
  153. s.testMutex.Lock()
  154. if s.testInProgress {
  155. s.testMutex.Unlock()
  156. // logger.Debug("Availability test already in progress, skipping")
  157. return
  158. }
  159. s.testInProgress = true
  160. s.testMutex.Unlock()
  161. // Ensure we reset the flag when done
  162. defer func() {
  163. s.testMutex.Lock()
  164. s.testInProgress = false
  165. s.testMutex.Unlock()
  166. }()
  167. s.targetsMutex.RLock()
  168. targetCount := len(s.targets)
  169. s.targetsMutex.RUnlock()
  170. if targetCount == 0 {
  171. // logger.Debug("No targets to test")
  172. return
  173. }
  174. // logger.Debug("Performing availability test for", targetCount, "unique targets")
  175. // Get target keys for testing
  176. s.targetsMutex.RLock()
  177. testTargets := make([]string, 0, len(s.targets))
  178. for key := range s.targets {
  179. testTargets = append(testTargets, key)
  180. }
  181. s.targetsMutex.RUnlock()
  182. // Perform the actual availability test
  183. results := AvailabilityTest(testTargets)
  184. // Update availability map
  185. s.targetsMutex.Lock()
  186. s.availabilityMap = results
  187. s.targetsMutex.Unlock()
  188. // logger.Debug("Availability test completed for", len(results), "targets")
  189. }
  190. // ClearTargets clears all targets (useful for testing or reloading)
  191. func (s *UpstreamService) ClearTargets() {
  192. s.targetsMutex.Lock()
  193. defer s.targetsMutex.Unlock()
  194. s.targets = make(map[string]*TargetInfo)
  195. s.availabilityMap = make(map[string]*Status)
  196. s.configTargets = make(map[string][]string)
  197. s.lastUpdateTime = time.Now()
  198. // logger.Debug("Cleared all proxy targets")
  199. }
  200. // GetLastUpdateTime returns the last time targets were updated
  201. func (s *UpstreamService) GetLastUpdateTime() time.Time {
  202. s.targetsMutex.RLock()
  203. defer s.targetsMutex.RUnlock()
  204. return s.lastUpdateTime
  205. }
  206. // GetTargetCount returns the number of unique targets
  207. func (s *UpstreamService) GetTargetCount() int {
  208. s.targetsMutex.RLock()
  209. defer s.targetsMutex.RUnlock()
  210. return len(s.targets)
  211. }
  212. // RemoveConfigTargets removes all targets associated with a specific config file
  213. func (s *UpstreamService) RemoveConfigTargets(configPath string) {
  214. s.targetsMutex.Lock()
  215. defer s.targetsMutex.Unlock()
  216. if targetKeys, exists := s.configTargets[configPath]; exists {
  217. for _, key := range targetKeys {
  218. // Check if this target is used by other configs
  219. isUsedByOthers := false
  220. for otherConfig, otherKeys := range s.configTargets {
  221. if otherConfig != configPath {
  222. for _, otherKey := range otherKeys {
  223. if otherKey == key {
  224. isUsedByOthers = true
  225. break
  226. }
  227. }
  228. if isUsedByOthers {
  229. break
  230. }
  231. }
  232. }
  233. if !isUsedByOthers {
  234. delete(s.targets, key)
  235. delete(s.availabilityMap, key)
  236. // logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
  237. }
  238. }
  239. delete(s.configTargets, configPath)
  240. s.lastUpdateTime = time.Now()
  241. // logger.Debug("Removed config targets for:", configPath)
  242. }
  243. }