123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package upstream
- import (
- "sync"
- "time"
- "github.com/0xJacky/Nginx-UI/internal/cache"
- "github.com/uozi-tech/cosy/logger"
- )
- // TargetInfo contains proxy target information with source config
- type TargetInfo struct {
- ProxyTarget
- ConfigPath string `json:"config_path"`
- LastSeen time.Time `json:"last_seen"`
- }
- // UpstreamService manages upstream availability testing
- type UpstreamService struct {
- targets map[string]*TargetInfo // key: host:port
- availabilityMap map[string]*Status // key: host:port
- configTargets map[string][]string // configPath -> []targetKeys
- targetsMutex sync.RWMutex
- lastUpdateTime time.Time
- testInProgress bool
- testMutex sync.Mutex
- }
- var (
- upstreamService *UpstreamService
- serviceOnce sync.Once
- )
- // GetUpstreamService returns the singleton upstream service instance
- func GetUpstreamService() *UpstreamService {
- serviceOnce.Do(func() {
- upstreamService = &UpstreamService{
- targets: make(map[string]*TargetInfo),
- availabilityMap: make(map[string]*Status),
- configTargets: make(map[string][]string),
- lastUpdateTime: time.Now(),
- }
- })
- return upstreamService
- }
- // init registers the ParseProxyTargetsFromRawContent callback
- func init() {
- cache.RegisterCallback(scanForProxyTargets)
- }
- // scanForProxyTargets is the callback function for cache scanner
- func scanForProxyTargets(configPath string, content []byte) error {
- // Parse proxy targets from config content
- targets := ParseProxyTargetsFromRawContent(string(content))
- service := GetUpstreamService()
- service.updateTargetsFromConfig(configPath, targets)
- return nil
- }
- // updateTargetsFromConfig updates proxy targets from a specific config file
- func (s *UpstreamService) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
- s.targetsMutex.Lock()
- defer s.targetsMutex.Unlock()
- now := time.Now()
- // Remove old targets from this config path
- if oldTargetKeys, exists := s.configTargets[configPath]; exists {
- for _, key := range oldTargetKeys {
- if _, exists := s.targets[key]; exists {
- // Only remove if this is the only config using this target
- isOnlyConfig := true
- for otherConfig, otherKeys := range s.configTargets {
- if otherConfig != configPath {
- for _, otherKey := range otherKeys {
- if otherKey == key {
- isOnlyConfig = false
- break
- }
- }
- if !isOnlyConfig {
- break
- }
- }
- }
- if isOnlyConfig {
- delete(s.targets, key)
- delete(s.availabilityMap, key)
- logger.Debug("Removed proxy target:", key, "from config:", configPath)
- } else {
- logger.Debug("Keeping proxy target:", key, "still used by other configs")
- }
- }
- }
- }
- // Add/update new targets
- newTargetKeys := make([]string, 0, len(targets))
- for _, target := range targets {
- key := target.Host + ":" + target.Port
- newTargetKeys = append(newTargetKeys, key)
- if existingTarget, exists := s.targets[key]; exists {
- // Update existing target with latest info
- existingTarget.LastSeen = now
- existingTarget.ConfigPath = configPath // Update to latest config that referenced it
- // logger.Debug("Updated proxy target:", key, "from config:", configPath)
- } else {
- // Add new target
- s.targets[key] = &TargetInfo{
- ProxyTarget: target,
- ConfigPath: configPath,
- LastSeen: now,
- }
- // logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
- }
- }
- // Update config target mapping
- s.configTargets[configPath] = newTargetKeys
- s.lastUpdateTime = now
- // logger.Debug("Config", configPath, "updated with", len(targets), "targets")
- }
- // GetTargets returns a copy of current proxy targets
- func (s *UpstreamService) GetTargets() []ProxyTarget {
- s.targetsMutex.RLock()
- defer s.targetsMutex.RUnlock()
- targets := make([]ProxyTarget, 0, len(s.targets))
- for _, targetInfo := range s.targets {
- targets = append(targets, targetInfo.ProxyTarget)
- }
- return targets
- }
- // GetTargetInfos returns a copy of current target infos
- func (s *UpstreamService) GetTargetInfos() []*TargetInfo {
- s.targetsMutex.RLock()
- defer s.targetsMutex.RUnlock()
- targetInfos := make([]*TargetInfo, 0, len(s.targets))
- for _, targetInfo := range s.targets {
- // Create a copy
- targetInfoCopy := &TargetInfo{
- ProxyTarget: targetInfo.ProxyTarget,
- ConfigPath: targetInfo.ConfigPath,
- LastSeen: targetInfo.LastSeen,
- }
- targetInfos = append(targetInfos, targetInfoCopy)
- }
- return targetInfos
- }
- // GetAvailabilityMap returns a copy of current availability results
- func (s *UpstreamService) GetAvailabilityMap() map[string]*Status {
- s.targetsMutex.RLock()
- defer s.targetsMutex.RUnlock()
- result := make(map[string]*Status)
- for k, v := range s.availabilityMap {
- // Create a copy of the status
- result[k] = &Status{
- Online: v.Online,
- Latency: v.Latency,
- }
- }
- return result
- }
- // PerformAvailabilityTest performs availability test for all targets
- func (s *UpstreamService) PerformAvailabilityTest() {
- // Prevent concurrent tests
- s.testMutex.Lock()
- if s.testInProgress {
- s.testMutex.Unlock()
- logger.Debug("Availability test already in progress, skipping")
- return
- }
- s.testInProgress = true
- s.testMutex.Unlock()
- // Ensure we reset the flag when done
- defer func() {
- s.testMutex.Lock()
- s.testInProgress = false
- s.testMutex.Unlock()
- }()
- s.targetsMutex.RLock()
- targetCount := len(s.targets)
- s.targetsMutex.RUnlock()
- if targetCount == 0 {
- logger.Debug("No targets to test")
- return
- }
- logger.Debug("Performing availability test for", targetCount, "unique targets")
- // Get target keys for testing
- s.targetsMutex.RLock()
- testTargets := make([]string, 0, len(s.targets))
- for key := range s.targets {
- testTargets = append(testTargets, key)
- }
- s.targetsMutex.RUnlock()
- // Perform the actual availability test
- results := AvailabilityTest(testTargets)
- // Update availability map
- s.targetsMutex.Lock()
- s.availabilityMap = results
- s.targetsMutex.Unlock()
- logger.Debug("Availability test completed for", len(results), "targets")
- }
- // ClearTargets clears all targets (useful for testing or reloading)
- func (s *UpstreamService) ClearTargets() {
- s.targetsMutex.Lock()
- defer s.targetsMutex.Unlock()
- s.targets = make(map[string]*TargetInfo)
- s.availabilityMap = make(map[string]*Status)
- s.configTargets = make(map[string][]string)
- s.lastUpdateTime = time.Now()
- logger.Debug("Cleared all proxy targets")
- }
- // GetLastUpdateTime returns the last time targets were updated
- func (s *UpstreamService) GetLastUpdateTime() time.Time {
- s.targetsMutex.RLock()
- defer s.targetsMutex.RUnlock()
- return s.lastUpdateTime
- }
- // GetTargetCount returns the number of unique targets
- func (s *UpstreamService) GetTargetCount() int {
- s.targetsMutex.RLock()
- defer s.targetsMutex.RUnlock()
- return len(s.targets)
- }
- // RemoveConfigTargets removes all targets associated with a specific config file
- func (s *UpstreamService) RemoveConfigTargets(configPath string) {
- s.targetsMutex.Lock()
- defer s.targetsMutex.Unlock()
- if targetKeys, exists := s.configTargets[configPath]; exists {
- for _, key := range targetKeys {
- // Check if this target is used by other configs
- isUsedByOthers := false
- for otherConfig, otherKeys := range s.configTargets {
- if otherConfig != configPath {
- for _, otherKey := range otherKeys {
- if otherKey == key {
- isUsedByOthers = true
- break
- }
- }
- if isUsedByOthers {
- break
- }
- }
- }
- if !isUsedByOthers {
- delete(s.targets, key)
- delete(s.availabilityMap, key)
- logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
- }
- }
- delete(s.configTargets, configPath)
- s.lastUpdateTime = time.Now()
- logger.Debug("Removed config targets for:", configPath)
- }
- }
|