Bläddra i källkod

refactor(upstream): remove init function and implement caching for disabled sockets in service

0xJacky 4 dagar sedan
förälder
incheckning
6de168c945
4 ändrade filer med 80 tillägg och 55 borttagningar
  1. 0 35
      api/upstream/init.go
  2. 7 0
      api/upstream/socket.go
  3. 1 0
      app/components.d.ts
  4. 72 20
      internal/upstream/service.go

+ 0 - 35
api/upstream/init.go

@@ -1,35 +0,0 @@
-package upstream
-
-import (
-	"github.com/0xJacky/Nginx-UI/internal/upstream"
-	"github.com/0xJacky/Nginx-UI/model"
-	"github.com/uozi-tech/cosy/logger"
-)
-
-func init() {
-	// Register the disabled sockets checker callback
-	service := upstream.GetUpstreamService()
-	service.SetDisabledSocketsChecker(getDisabledSockets)
-}
-
-// getDisabledSockets queries the database for disabled sockets
-func getDisabledSockets() map[string]bool {
-	disabled := make(map[string]bool)
-
-	db := model.UseDB()
-	if db == nil {
-		return disabled
-	}
-
-	var configs []model.UpstreamConfig
-	if err := db.Where("enabled = ?", false).Find(&configs).Error; err != nil {
-		logger.Error("Failed to query disabled sockets:", err)
-		return disabled
-	}
-
-	for _, config := range configs {
-		disabled[config.Socket] = true
-	}
-
-	return disabled
-}

+ 7 - 0
api/upstream/socket.go

@@ -123,6 +123,9 @@ func UpdateSocketConfig(c *gin.Context) {
 			cosy.ErrHandler(c, err)
 			return
 		}
+		// Invalidate cache after creating new config
+		service := upstream.GetUpstreamService()
+		service.InvalidateDisabledSocketsCache()
 	} else {
 		// Update existing config
 		if _, err := u.Where(u.Socket.Eq(socket)).Update(u.Enabled, req.Enabled); err != nil {
@@ -132,6 +135,10 @@ func UpdateSocketConfig(c *gin.Context) {
 		}
 	}
 
+	// Invalidate the disabled sockets cache to ensure changes take effect immediately
+	service := upstream.GetUpstreamService()
+	service.InvalidateDisabledSocketsCache()
+
 	c.JSON(http.StatusOK, gin.H{
 		"message": "Socket config updated successfully",
 	})

+ 1 - 0
app/components.d.ts

@@ -11,6 +11,7 @@ declare module 'vue' {
     AAlert: typeof import('ant-design-vue/es')['Alert']
     AApp: typeof import('ant-design-vue/es')['App']
     AAutoComplete: typeof import('ant-design-vue/es')['AutoComplete']
+    AAvatar: typeof import('ant-design-vue/es')['Avatar']
     ABadge: typeof import('ant-design-vue/es')['Badge']
     ABreadcrumb: typeof import('ant-design-vue/es')['Breadcrumb']
     ABreadcrumbItem: typeof import('ant-design-vue/es')['BreadcrumbItem']

+ 72 - 20
internal/upstream/service.go

@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/0xJacky/Nginx-UI/internal/cache"
+	"github.com/0xJacky/Nginx-UI/model"
 	"github.com/uozi-tech/cosy/logger"
 )
 
@@ -32,13 +33,16 @@ type Service struct {
 	availabilityMap map[string]*Status     // key: host:port
 	configTargets   map[string][]string    // configPath -> []targetKeys
 	// Public upstream definitions storage
-	Upstreams                map[string]*Definition // key: upstream name
-	upstreamsMutex           sync.RWMutex
-	targetsMutex             sync.RWMutex
-	lastUpdateTime         time.Time
-	testInProgress         bool
-	testMutex              sync.Mutex
-	disabledSocketsChecker func() map[string]bool
+	Upstreams                   map[string]*Definition // key: upstream name
+	upstreamsMutex              sync.RWMutex
+	targetsMutex                sync.RWMutex
+	lastUpdateTime              time.Time
+	testInProgress              bool
+	testMutex                   sync.Mutex
+	cachedDisabledSockets       map[string]bool
+	disabledSocketsCacheMutex   sync.RWMutex
+	disabledSocketsCacheExpiry  time.Time
+	disabledSocketsCacheDuration time.Duration
 }
 
 var (
@@ -66,11 +70,12 @@ func formatSocketAddress(host, port string) string {
 func GetUpstreamService() *Service {
 	serviceOnce.Do(func() {
 		upstreamService = &Service{
-			targets:         make(map[string]*TargetInfo),
-			availabilityMap: make(map[string]*Status),
-			configTargets:   make(map[string][]string),
-			Upstreams:       make(map[string]*Definition),
-			lastUpdateTime:  time.Now(),
+			targets:                      make(map[string]*TargetInfo),
+			availabilityMap:              make(map[string]*Status),
+			configTargets:                make(map[string][]string),
+			Upstreams:                    make(map[string]*Definition),
+			lastUpdateTime:               time.Now(),
+			disabledSocketsCacheDuration: 30 * time.Second,
 		}
 	})
 	return upstreamService
@@ -237,11 +242,8 @@ func (s *Service) PerformAvailabilityTest() {
 
 	// logger.Debug("Performing availability test for", targetCount, "unique targets")
 
-	// Get disabled sockets from database
-	disabledSockets := make(map[string]bool)
-	if s.disabledSocketsChecker != nil {
-		disabledSockets = s.disabledSocketsChecker()
-	}
+	// Get disabled sockets from cache or database
+	disabledSockets := s.getDisabledSockets()
 
 	// Separate targets into traditional and consul groups from the start
 	s.targetsMutex.RLock()
@@ -307,9 +309,59 @@ func (s *Service) findUpstreamNameForTarget(target ProxyTarget) string {
 	return ""
 }
 
-// SetDisabledSocketsChecker sets a callback function to check disabled sockets
-func (s *Service) SetDisabledSocketsChecker(checker func() map[string]bool) {
-	s.disabledSocketsChecker = checker
+// getDisabledSockets queries the database for disabled sockets with caching
+func (s *Service) getDisabledSockets() map[string]bool {
+	// Check if cache is still valid
+	s.disabledSocketsCacheMutex.RLock()
+	if time.Now().Before(s.disabledSocketsCacheExpiry) && s.cachedDisabledSockets != nil {
+		// Return a copy of the cached data
+		result := make(map[string]bool, len(s.cachedDisabledSockets))
+		for k, v := range s.cachedDisabledSockets {
+			result[k] = v
+		}
+		s.disabledSocketsCacheMutex.RUnlock()
+		return result
+	}
+	s.disabledSocketsCacheMutex.RUnlock()
+
+	// Cache expired or not initialized, refresh from database
+	disabled := make(map[string]bool)
+
+	db := model.UseDB()
+	if db == nil {
+		return disabled
+	}
+
+	var configs []model.UpstreamConfig
+	if err := db.Where("enabled = ?", false).Find(&configs).Error; err != nil {
+		logger.Error("Failed to query disabled sockets:", err)
+		return disabled
+	}
+
+	for _, config := range configs {
+		disabled[config.Socket] = true
+	}
+
+	// Update cache
+	s.disabledSocketsCacheMutex.Lock()
+	s.cachedDisabledSockets = disabled
+	s.disabledSocketsCacheExpiry = time.Now().Add(s.disabledSocketsCacheDuration)
+	s.disabledSocketsCacheMutex.Unlock()
+
+	// Return a copy to prevent external modification
+	result := make(map[string]bool, len(disabled))
+	for k, v := range disabled {
+		result[k] = v
+	}
+	return result
+}
+
+// InvalidateDisabledSocketsCache invalidates the cache, forcing next read to refresh from database
+func (s *Service) InvalidateDisabledSocketsCache() {
+	s.disabledSocketsCacheMutex.Lock()
+	defer s.disabledSocketsCacheMutex.Unlock()
+	s.disabledSocketsCacheExpiry = time.Time{} // Set to zero time to force refresh
+	logger.Debug("Disabled sockets cache invalidated")
 }
 
 // ClearTargets clears all targets (useful for testing or reloading)