|
|
@@ -33,16 +33,15 @@ type Service struct {
|
|
|
availabilityMap map[string]*Status // key: host:port
|
|
|
configTargets map[string][]string // configPath -> []targetKeys
|
|
|
// Public upstream definitions storage
|
|
|
- Upstreams map[string]*Definition // key: upstream name
|
|
|
- upstreamsMutex sync.RWMutex
|
|
|
- targetsMutex sync.RWMutex
|
|
|
- lastUpdateTime time.Time
|
|
|
- testInProgress bool
|
|
|
- testMutex sync.Mutex
|
|
|
- cachedDisabledSockets map[string]bool
|
|
|
- disabledSocketsCacheMutex sync.RWMutex
|
|
|
- disabledSocketsCacheExpiry time.Time
|
|
|
- disabledSocketsCacheDuration time.Duration
|
|
|
+ Upstreams map[string]*Definition // key: upstream name
|
|
|
+ upstreamsMutex sync.RWMutex
|
|
|
+ targetsMutex sync.RWMutex
|
|
|
+ lastUpdateTime time.Time
|
|
|
+ testInProgress bool
|
|
|
+ testMutex sync.Mutex
|
|
|
+ cachedDisabledSockets map[string]bool
|
|
|
+ disabledSocketsCacheMutex sync.RWMutex
|
|
|
+ disabledSocketsCacheValid bool // true if cache is valid, false if needs refresh
|
|
|
}
|
|
|
|
|
|
var (
|
|
|
@@ -70,12 +69,12 @@ func formatSocketAddress(host, port string) string {
|
|
|
func GetUpstreamService() *Service {
|
|
|
serviceOnce.Do(func() {
|
|
|
upstreamService = &Service{
|
|
|
- targets: make(map[string]*TargetInfo),
|
|
|
- availabilityMap: make(map[string]*Status),
|
|
|
- configTargets: make(map[string][]string),
|
|
|
- Upstreams: make(map[string]*Definition),
|
|
|
- lastUpdateTime: time.Now(),
|
|
|
- disabledSocketsCacheDuration: 30 * time.Second,
|
|
|
+ targets: make(map[string]*TargetInfo),
|
|
|
+ availabilityMap: make(map[string]*Status),
|
|
|
+ configTargets: make(map[string][]string),
|
|
|
+ Upstreams: make(map[string]*Definition),
|
|
|
+ lastUpdateTime: time.Now(),
|
|
|
+ disabledSocketsCacheValid: false, // Initialize as invalid to force first load
|
|
|
}
|
|
|
})
|
|
|
return upstreamService
|
|
|
@@ -309,11 +308,11 @@ func (s *Service) findUpstreamNameForTarget(target ProxyTarget) string {
|
|
|
return ""
|
|
|
}
|
|
|
|
|
|
-// getDisabledSockets queries the database for disabled sockets with caching
|
|
|
+// getDisabledSockets queries the database for disabled sockets with event-driven caching
|
|
|
func (s *Service) getDisabledSockets() map[string]bool {
|
|
|
- // Check if cache is still valid
|
|
|
+ // Check if cache is valid
|
|
|
s.disabledSocketsCacheMutex.RLock()
|
|
|
- if time.Now().Before(s.disabledSocketsCacheExpiry) && s.cachedDisabledSockets != nil {
|
|
|
+ if s.disabledSocketsCacheValid && s.cachedDisabledSockets != nil {
|
|
|
// Return a copy of the cached data
|
|
|
result := make(map[string]bool, len(s.cachedDisabledSockets))
|
|
|
for k, v := range s.cachedDisabledSockets {
|
|
|
@@ -324,7 +323,7 @@ func (s *Service) getDisabledSockets() map[string]bool {
|
|
|
}
|
|
|
s.disabledSocketsCacheMutex.RUnlock()
|
|
|
|
|
|
- // Cache expired or not initialized, refresh from database
|
|
|
+ // Cache invalid or not initialized, refresh from database
|
|
|
disabled := make(map[string]bool)
|
|
|
|
|
|
db := model.UseDB()
|
|
|
@@ -342,12 +341,14 @@ func (s *Service) getDisabledSockets() map[string]bool {
|
|
|
disabled[config.Socket] = true
|
|
|
}
|
|
|
|
|
|
- // Update cache
|
|
|
+ // Update cache and mark as valid
|
|
|
s.disabledSocketsCacheMutex.Lock()
|
|
|
s.cachedDisabledSockets = disabled
|
|
|
- s.disabledSocketsCacheExpiry = time.Now().Add(s.disabledSocketsCacheDuration)
|
|
|
+ s.disabledSocketsCacheValid = true
|
|
|
s.disabledSocketsCacheMutex.Unlock()
|
|
|
|
|
|
+ logger.Debug("Disabled sockets cache refreshed from database, found", len(disabled), "disabled sockets")
|
|
|
+
|
|
|
// Return a copy to prevent external modification
|
|
|
result := make(map[string]bool, len(disabled))
|
|
|
for k, v := range disabled {
|
|
|
@@ -360,7 +361,7 @@ func (s *Service) getDisabledSockets() map[string]bool {
|
|
|
func (s *Service) InvalidateDisabledSocketsCache() {
|
|
|
s.disabledSocketsCacheMutex.Lock()
|
|
|
defer s.disabledSocketsCacheMutex.Unlock()
|
|
|
- s.disabledSocketsCacheExpiry = time.Time{} // Set to zero time to force refresh
|
|
|
+ s.disabledSocketsCacheValid = false
|
|
|
logger.Debug("Disabled sockets cache invalidated")
|
|
|
}
|
|
|
|