|
@@ -1,10 +1,13 @@
|
|
|
package upstream
|
|
|
|
|
|
import (
|
|
|
+ "maps"
|
|
|
+ "slices"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
"github.com/0xJacky/Nginx-UI/internal/cache"
|
|
|
+ "github.com/uozi-tech/cosy/logger"
|
|
|
)
|
|
|
|
|
|
// TargetInfo contains proxy target information with source config
|
|
@@ -14,15 +17,26 @@ type TargetInfo struct {
|
|
|
LastSeen time.Time `json:"last_seen"`
|
|
|
}
|
|
|
|
|
|
+// UpstreamDefinition contains upstream block information
|
|
|
+type UpstreamDefinition struct {
|
|
|
+ Name string `json:"name"`
|
|
|
+ Servers []ProxyTarget `json:"servers"`
|
|
|
+ ConfigPath string `json:"config_path"`
|
|
|
+ LastSeen time.Time `json:"last_seen"`
|
|
|
+}
|
|
|
+
|
|
|
// UpstreamService manages upstream availability testing
|
|
|
type UpstreamService struct {
|
|
|
targets map[string]*TargetInfo // key: host:port
|
|
|
availabilityMap map[string]*Status // key: host:port
|
|
|
configTargets map[string][]string // configPath -> []targetKeys
|
|
|
- targetsMutex sync.RWMutex
|
|
|
- lastUpdateTime time.Time
|
|
|
- testInProgress bool
|
|
|
- testMutex sync.Mutex
|
|
|
+ // Public upstream definitions storage
|
|
|
+ Upstreams map[string]*UpstreamDefinition // key: upstream name
|
|
|
+ upstreamsMutex sync.RWMutex
|
|
|
+ targetsMutex sync.RWMutex
|
|
|
+ lastUpdateTime time.Time
|
|
|
+ testInProgress bool
|
|
|
+ testMutex sync.Mutex
|
|
|
}
|
|
|
|
|
|
var (
|
|
@@ -37,6 +51,7 @@ func GetUpstreamService() *UpstreamService {
|
|
|
targets: make(map[string]*TargetInfo),
|
|
|
availabilityMap: make(map[string]*Status),
|
|
|
configTargets: make(map[string][]string),
|
|
|
+ Upstreams: make(map[string]*UpstreamDefinition),
|
|
|
lastUpdateTime: time.Now(),
|
|
|
}
|
|
|
})
|
|
@@ -50,11 +65,17 @@ func init() {
|
|
|
|
|
|
// scanForProxyTargets is the callback function for cache scanner
|
|
|
func scanForProxyTargets(configPath string, content []byte) error {
|
|
|
- // Parse proxy targets from config content
|
|
|
- targets := ParseProxyTargetsFromRawContent(string(content))
|
|
|
+ logger.Debug("scanForProxyTargets", configPath)
|
|
|
+ // Parse proxy targets and upstream definitions from config content
|
|
|
+ result := ParseProxyTargetsAndUpstreamsFromRawContent(string(content))
|
|
|
|
|
|
service := GetUpstreamService()
|
|
|
- service.updateTargetsFromConfig(configPath, targets)
|
|
|
+ service.updateTargetsFromConfig(configPath, result.ProxyTargets)
|
|
|
+
|
|
|
+ // Update upstream definitions
|
|
|
+ for upstreamName, servers := range result.Upstreams {
|
|
|
+ service.UpdateUpstreamDefinition(upstreamName, servers, configPath)
|
|
|
+ }
|
|
|
|
|
|
return nil
|
|
|
}
|
|
@@ -74,11 +95,8 @@ func (s *UpstreamService) updateTargetsFromConfig(configPath string, targets []P
|
|
|
isOnlyConfig := true
|
|
|
for otherConfig, otherKeys := range s.configTargets {
|
|
|
if otherConfig != configPath {
|
|
|
- for _, otherKey := range otherKeys {
|
|
|
- if otherKey == key {
|
|
|
- isOnlyConfig = false
|
|
|
- break
|
|
|
- }
|
|
|
+ if slices.Contains(otherKeys, key) {
|
|
|
+ isOnlyConfig = false
|
|
|
}
|
|
|
if !isOnlyConfig {
|
|
|
break
|
|
@@ -195,11 +213,11 @@ func (s *UpstreamService) PerformAvailabilityTest() {
|
|
|
s.targetsMutex.RUnlock()
|
|
|
|
|
|
if targetCount == 0 {
|
|
|
- // logger.Debug("No targets to test")
|
|
|
+ logger.Debug("No targets to test")
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- // logger.Debug("Performing availability test for", targetCount, "unique targets")
|
|
|
+ logger.Debug("Performing availability test for", targetCount, "unique targets")
|
|
|
|
|
|
// Separate targets into traditional and consul groups from the start
|
|
|
s.targetsMutex.RLock()
|
|
@@ -224,18 +242,14 @@ func (s *UpstreamService) PerformAvailabilityTest() {
|
|
|
if len(regularTargetKeys) > 0 {
|
|
|
// logger.Debug("Testing", len(regularTargetKeys), "traditional targets")
|
|
|
regularResults := AvailabilityTest(regularTargetKeys)
|
|
|
- for k, v := range regularResults {
|
|
|
- results[k] = v
|
|
|
- }
|
|
|
+ maps.Copy(results, regularResults)
|
|
|
}
|
|
|
|
|
|
// Test consul targets using consul-specific logic
|
|
|
if len(consulTargets) > 0 {
|
|
|
// logger.Debug("Testing", len(consulTargets), "consul targets")
|
|
|
consulResults := TestDynamicTargets(consulTargets)
|
|
|
- for k, v := range consulResults {
|
|
|
- results[k] = v
|
|
|
- }
|
|
|
+ maps.Copy(results, consulResults)
|
|
|
}
|
|
|
|
|
|
// Update availability map
|
|
@@ -249,14 +263,17 @@ func (s *UpstreamService) PerformAvailabilityTest() {
|
|
|
// ClearTargets clears all targets (useful for testing or reloading)
|
|
|
func (s *UpstreamService) ClearTargets() {
|
|
|
s.targetsMutex.Lock()
|
|
|
+ s.upstreamsMutex.Lock()
|
|
|
defer s.targetsMutex.Unlock()
|
|
|
+ defer s.upstreamsMutex.Unlock()
|
|
|
|
|
|
s.targets = make(map[string]*TargetInfo)
|
|
|
s.availabilityMap = make(map[string]*Status)
|
|
|
s.configTargets = make(map[string][]string)
|
|
|
+ s.Upstreams = make(map[string]*UpstreamDefinition)
|
|
|
s.lastUpdateTime = time.Now()
|
|
|
|
|
|
- // logger.Debug("Cleared all proxy targets")
|
|
|
+ // logger.Debug("Cleared all proxy targets and upstream definitions")
|
|
|
}
|
|
|
|
|
|
// GetLastUpdateTime returns the last time targets were updated
|
|
@@ -273,6 +290,63 @@ func (s *UpstreamService) GetTargetCount() int {
|
|
|
return len(s.targets)
|
|
|
}
|
|
|
|
|
|
+// UpdateUpstreamDefinition updates or adds an upstream definition
|
|
|
+func (s *UpstreamService) UpdateUpstreamDefinition(name string, servers []ProxyTarget, configPath string) {
|
|
|
+ s.upstreamsMutex.Lock()
|
|
|
+ defer s.upstreamsMutex.Unlock()
|
|
|
+
|
|
|
+ s.Upstreams[name] = &UpstreamDefinition{
|
|
|
+ Name: name,
|
|
|
+ Servers: servers,
|
|
|
+ ConfigPath: configPath,
|
|
|
+ LastSeen: time.Now(),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// GetUpstreamDefinition returns an upstream definition by name
|
|
|
+func (s *UpstreamService) GetUpstreamDefinition(name string) (*UpstreamDefinition, bool) {
|
|
|
+ s.upstreamsMutex.RLock()
|
|
|
+ defer s.upstreamsMutex.RUnlock()
|
|
|
+
|
|
|
+ upstream, exists := s.Upstreams[name]
|
|
|
+ if !exists {
|
|
|
+ return nil, false
|
|
|
+ }
|
|
|
+
|
|
|
+ // Return a copy to avoid race conditions
|
|
|
+ return &UpstreamDefinition{
|
|
|
+ Name: upstream.Name,
|
|
|
+ Servers: append([]ProxyTarget(nil), upstream.Servers...),
|
|
|
+ ConfigPath: upstream.ConfigPath,
|
|
|
+ LastSeen: upstream.LastSeen,
|
|
|
+ }, true
|
|
|
+}
|
|
|
+
|
|
|
+// GetAllUpstreamDefinitions returns a copy of all upstream definitions
|
|
|
+func (s *UpstreamService) GetAllUpstreamDefinitions() map[string]*UpstreamDefinition {
|
|
|
+ s.upstreamsMutex.RLock()
|
|
|
+ defer s.upstreamsMutex.RUnlock()
|
|
|
+
|
|
|
+ result := make(map[string]*UpstreamDefinition)
|
|
|
+ for name, upstream := range s.Upstreams {
|
|
|
+ result[name] = &UpstreamDefinition{
|
|
|
+ Name: upstream.Name,
|
|
|
+ Servers: append([]ProxyTarget(nil), upstream.Servers...),
|
|
|
+ ConfigPath: upstream.ConfigPath,
|
|
|
+ LastSeen: upstream.LastSeen,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+// IsUpstreamName checks if a given name is a known upstream
|
|
|
+func (s *UpstreamService) IsUpstreamName(name string) bool {
|
|
|
+ s.upstreamsMutex.RLock()
|
|
|
+ defer s.upstreamsMutex.RUnlock()
|
|
|
+ _, exists := s.Upstreams[name]
|
|
|
+ return exists
|
|
|
+}
|
|
|
+
|
|
|
// RemoveConfigTargets removes all targets associated with a specific config file
|
|
|
func (s *UpstreamService) RemoveConfigTargets(configPath string) {
|
|
|
s.targetsMutex.Lock()
|
|
@@ -284,11 +358,8 @@ func (s *UpstreamService) RemoveConfigTargets(configPath string) {
|
|
|
isUsedByOthers := false
|
|
|
for otherConfig, otherKeys := range s.configTargets {
|
|
|
if otherConfig != configPath {
|
|
|
- for _, otherKey := range otherKeys {
|
|
|
- if otherKey == key {
|
|
|
- isUsedByOthers = true
|
|
|
- break
|
|
|
- }
|
|
|
+ if slices.Contains(otherKeys, key) {
|
|
|
+ isUsedByOthers = true
|
|
|
}
|
|
|
if isUsedByOthers {
|
|
|
break
|