Browse Source

refactor: update upstream availability monitoring

Jacky 1 day ago
parent
commit
9113e82162

+ 2 - 1
api/upstream/router.go

@@ -3,5 +3,6 @@ package upstream
 import "github.com/gin-gonic/gin"
 import "github.com/gin-gonic/gin"
 
 
 func InitRouter(r *gin.RouterGroup) {
 func InitRouter(r *gin.RouterGroup) {
-	r.GET("/availability_test", AvailabilityTest)
+	r.GET("/upstream/availability", GetAvailability)
+	r.GET("/upstream/availability_ws", AvailabilityWebSocket)
 }
 }

+ 81 - 124
api/upstream/upstream.go

@@ -13,171 +13,128 @@ import (
 	"github.com/uozi-tech/cosy/logger"
 	"github.com/uozi-tech/cosy/logger"
 )
 )
 
 
-type wsMessage struct {
-	data interface{}
-	done chan error
+// GetAvailability returns cached upstream availability results via HTTP GET
+func GetAvailability(c *gin.Context) {
+	service := upstream.GetUpstreamService()
+
+	result := gin.H{
+		"results":          service.GetAvailabilityMap(),
+		"targets":          service.GetTargetInfos(),
+		"last_update_time": service.GetLastUpdateTime(),
+		"target_count":     service.GetTargetCount(),
+	}
+
+	c.JSON(http.StatusOK, result)
 }
 }
 
 
-func AvailabilityTest(c *gin.Context) {
+// AvailabilityWebSocket handles WebSocket connections for real-time availability monitoring
+func AvailabilityWebSocket(c *gin.Context) {
 	var upGrader = websocket.Upgrader{
 	var upGrader = websocket.Upgrader{
 		CheckOrigin: func(r *http.Request) bool {
 		CheckOrigin: func(r *http.Request) bool {
 			return true
 			return true
 		},
 		},
 	}
 	}
-	// upgrade http to websocket
+
+	// Upgrade HTTP to WebSocket
 	ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
 	ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
 	if err != nil {
 	if err != nil {
 		logger.Error(err)
 		logger.Error(err)
 		return
 		return
 	}
 	}
-
 	defer ws.Close()
 	defer ws.Close()
 
 
-	var currentTargets []string
-	var targetsMutex sync.RWMutex
-
 	// Use context to manage goroutine lifecycle
 	// Use context to manage goroutine lifecycle
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 	defer cancel()
 
 
-	// Use channel to serialize WebSocket write operations, avoiding concurrent conflicts
-	writeChan := make(chan wsMessage, 10)
-	testChan := make(chan bool, 1) // Immediate test signal
+	// Register this connection and increase check frequency
+	registerWebSocketConnection()
+	defer unregisterWebSocketConnection()
+
+	// Send initial results immediately
+	service := upstream.GetUpstreamService()
+	initialResults := service.GetAvailabilityMap()
+	if err := ws.WriteJSON(initialResults); err != nil {
+		logger.Error("Failed to send initial results:", err)
+		return
+	}
 
 
-	// Create debouncer for test execution
-	testDebouncer := helper.NewDebouncer(300 * time.Millisecond)
+	// Create ticker for periodic updates (every 5 seconds when WebSocket is connected)
+	ticker := time.NewTicker(5 * time.Second)
+	defer ticker.Stop()
 
 
-	// WebSocket writer goroutine - serialize all write operations
+	// Monitor for incoming messages (ping/pong or close)
 	go func() {
 	go func() {
-		defer logger.Debug("WebSocket writer goroutine stopped")
+		defer cancel()
 		for {
 		for {
-			select {
-			case <-ctx.Done():
-				return
-			case msg := <-writeChan:
-				err := ws.WriteJSON(msg.data)
-				if msg.done != nil {
-					msg.done <- err
-					close(msg.done)
-				}
-				if err != nil {
-					logger.Error("Failed to send WebSocket message:", err)
-					if helper.IsUnexpectedWebsocketError(err) {
-						cancel() // Cancel all goroutines
-					}
+			// Read message (we don't expect any specific data, just use it for connection health)
+			_, _, err := ws.ReadMessage()
+			if err != nil {
+				if helper.IsUnexpectedWebsocketError(err) {
+					logger.Error("WebSocket read error:", err)
 				}
 				}
+				return
 			}
 			}
 		}
 		}
 	}()
 	}()
 
 
-	// Safe WebSocket write function
-	writeJSON := func(data interface{}) error {
-		done := make(chan error, 1)
-		msg := wsMessage{data: data, done: done}
-
+	// Main loop to send periodic updates
+	for {
 		select {
 		select {
-		case writeChan <- msg:
-			return <-done
 		case <-ctx.Done():
 		case <-ctx.Done():
-			return ctx.Err()
-		case <-time.After(5 * time.Second): // Prevent write blocking
-			return context.DeadlineExceeded
-		}
-	}
-
-	// Function to perform availability test
-	performTest := func() {
-		targetsMutex.RLock()
-		targets := make([]string, len(currentTargets))
-		copy(targets, currentTargets)
-		targetsMutex.RUnlock()
-
-		logger.Debug("Performing availability test for targets:", targets)
+			logger.Debug("WebSocket connection closed")
+			return
 
 
-		if len(targets) > 0 {
-			logger.Debug("Starting upstream.AvailabilityTest...")
-			result := upstream.AvailabilityTest(targets)
-			logger.Debug("Test completed, results:", result)
+		case <-ticker.C:
+			// Get latest results from service
+			results := service.GetAvailabilityMap()
 
 
-			logger.Debug("Sending results via WebSocket...")
-			if err := writeJSON(result); err != nil {
-				logger.Error("Failed to send WebSocket message:", err)
+			// Send results via WebSocket
+			if err := ws.WriteJSON(results); err != nil {
+				logger.Error("Failed to send WebSocket update:", err)
 				if helper.IsUnexpectedWebsocketError(err) {
 				if helper.IsUnexpectedWebsocketError(err) {
-					cancel() // Cancel all goroutines
+					return
 				}
 				}
-			} else {
-				logger.Debug("Results sent successfully")
-			}
-		} else {
-			logger.Debug("No targets to test")
-			// Send empty result even if no targets
-			emptyResult := make(map[string]interface{})
-			if err := writeJSON(emptyResult); err != nil {
-				logger.Error("Failed to send empty result:", err)
-			} else {
-				logger.Debug("Empty result sent successfully")
 			}
 			}
 		}
 		}
 	}
 	}
+}
 
 
-	// Goroutine to handle incoming messages (target updates)
-	go func() {
-		defer logger.Debug("WebSocket reader goroutine stopped")
-		for {
-			select {
-			case <-ctx.Done():
-				return
-			default:
-			}
-
-			var newTargets []string
-			// Set read timeout to avoid blocking
-			ws.SetReadDeadline(time.Now().Add(30 * time.Second))
-			err := ws.ReadJSON(&newTargets)
-			ws.SetReadDeadline(time.Time{}) // Clear deadline
-
-			if err != nil {
-				if helper.IsUnexpectedWebsocketError(err) {
-					logger.Error(err)
-				}
-				cancel() // Cancel all goroutines
-				return
-			}
-
-			logger.Debug("Received targets from frontend:", newTargets)
+// WebSocket connection tracking for managing check frequency
+var (
+	wsConnections     int
+	wsConnectionMutex sync.Mutex
+)
 
 
-			targetsMutex.Lock()
-			currentTargets = newTargets
-			targetsMutex.Unlock()
+// registerWebSocketConnection increments the WebSocket connection counter
+func registerWebSocketConnection() {
+	wsConnectionMutex.Lock()
+	defer wsConnectionMutex.Unlock()
 
 
-			// Use debouncer to trigger test execution
-			testDebouncer.Trigger(func() {
-				select {
-				case testChan <- true:
-				default:
-				}
-			})
-		}
-	}()
+	wsConnections++
+	logger.Debug("WebSocket connection registered, total connections:", wsConnections)
 
 
-	// Main testing loop
-	ticker := time.NewTicker(10 * time.Second)
-	defer ticker.Stop()
+	// Trigger immediate check when first connection is established
+	if wsConnections == 1 {
+		service := upstream.GetUpstreamService()
+		go service.PerformAvailabilityTest()
+	}
+}
 
 
-	logger.Debug("WebSocket connection established, waiting for messages...")
+// unregisterWebSocketConnection decrements the WebSocket connection counter
+func unregisterWebSocketConnection() {
+	wsConnectionMutex.Lock()
+	defer wsConnectionMutex.Unlock()
 
 
-	for {
-		select {
-		case <-ctx.Done():
-			testDebouncer.Stop()
-			logger.Debug("WebSocket connection closed")
-			return
-		case <-testChan:
-			// Debounce triggered test or first test
-			go performTest() // Execute asynchronously to avoid blocking main loop
-		case <-ticker.C:
-			// Periodic test execution
-			go performTest() // Execute asynchronously to avoid blocking main loop
-		}
+	if wsConnections > 0 {
+		wsConnections--
 	}
 	}
+	logger.Debug("WebSocket connection unregistered, remaining connections:", wsConnections)
+}
+
+// HasActiveWebSocketConnections returns true if there are active WebSocket connections
+func HasActiveWebSocketConnections() bool {
+	wsConnectionMutex.Lock()
+	defer wsConnectionMutex.Unlock()
+	return wsConnections > 0
 }
 }

+ 22 - 2
app/src/api/upstream.ts

@@ -1,3 +1,4 @@
+import { http } from '@uozi-admin/request'
 import ws from '@/lib/websocket'
 import ws from '@/lib/websocket'
 
 
 export interface UpstreamStatus {
 export interface UpstreamStatus {
@@ -5,9 +6,28 @@ export interface UpstreamStatus {
   latency: number
   latency: number
 }
 }
 
 
+export interface UpstreamAvailabilityResponse {
+  results: Record<string, UpstreamStatus>
+  targets: Array<{
+    host: string
+    port: string
+    type: string
+    config_path: string
+    last_seen: string
+  }>
+  last_update_time: string
+  target_count: number
+}
+
 const upstream = {
 const upstream = {
-  availability_test() {
-    return ws('/api/availability_test')
+  // HTTP GET interface to get all upstream availability results
+  getAvailability(): Promise<UpstreamAvailabilityResponse> {
+    return http.get('/upstream/availability')
+  },
+
+  // WebSocket interface for real-time availability updates
+  availabilityWebSocket() {
+    return ws('/api/upstream/availability_ws')
   },
   },
 }
 }
 
 

+ 1 - 15
app/src/components/ProxyTargets/ProxyTargets.vue

@@ -6,23 +6,9 @@ interface Props {
   targets: ProxyTarget[]
   targets: ProxyTarget[]
 }
 }
 
 
-const props = defineProps<Props>()
+defineProps<Props>()
 
 
 const proxyStore = useProxyAvailabilityStore()
 const proxyStore = useProxyAvailabilityStore()
-let componentId = ''
-
-// Register component and watch for target changes
-watch(() => props.targets, newTargets => {
-  componentId = proxyStore.registerComponent(newTargets)
-}, {
-  immediate: true,
-  deep: true,
-})
-
-// Cleanup when component unmounts
-onBeforeUnmount(() => {
-  proxyStore.unregisterComponent(componentId)
-})
 
 
 function getTargetColor(target: ProxyTarget): string {
 function getTargetColor(target: ProxyTarget): string {
   const result = proxyStore.getAvailabilityResult(target)
   const result = proxyStore.getAvailabilityResult(target)

+ 20 - 7
app/src/layouts/BaseLayout.vue

@@ -3,7 +3,7 @@ import { throttle } from 'lodash'
 import { storeToRefs } from 'pinia'
 import { storeToRefs } from 'pinia'
 import settings from '@/api/settings'
 import settings from '@/api/settings'
 import PageHeader from '@/components/PageHeader'
 import PageHeader from '@/components/PageHeader'
-import { useSettingsStore } from '@/pinia'
+import { useSettingsStore, useProxyAvailabilityStore } from '@/pinia'
 import FooterLayout from './FooterLayout.vue'
 import FooterLayout from './FooterLayout.vue'
 import HeaderLayout from './HeaderLayout.vue'
 import HeaderLayout from './HeaderLayout.vue'
 import SideBar from './SideBar.vue'
 import SideBar from './SideBar.vue'
@@ -19,14 +19,8 @@ function _init() {
 
 
 const init = throttle(_init, 50)
 const init = throttle(_init, 50)
 
 
-onMounted(init)
-
 addEventListener('resize', init)
 addEventListener('resize', init)
 
 
-onUnmounted(() => {
-  removeEventListener('resize', init)
-})
-
 function getClientWidth() {
 function getClientWidth() {
   return document.body.clientWidth
   return document.body.clientWidth
 }
 }
@@ -41,6 +35,25 @@ settings.get_server_name().then(r => {
   server_name.value = r.name
   server_name.value = r.name
 })
 })
 
 
+// Initialize proxy availability monitoring after user is logged in and layout is mounted
+const proxyAvailabilityStore = useProxyAvailabilityStore()
+
+onMounted(() => {
+  // Initialize layout
+  init()
+  
+  // Start monitoring for upstream availability
+  proxyAvailabilityStore.startMonitoring()
+})
+
+onUnmounted(() => {
+  // Remove resize listener
+  removeEventListener('resize', init)
+  
+  // Stop monitoring when layout is unmounted
+  proxyAvailabilityStore.stopMonitoring()
+})
+
 const breadList = ref([])
 const breadList = ref([])
 
 
 provide('breadList', breadList)
 provide('breadList', breadList)

+ 94 - 94
app/src/pinia/moudule/proxyAvailability.ts

@@ -1,43 +1,46 @@
 import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type { ProxyTarget } from '@/api/site'
 import type { ProxyTarget } from '@/api/site'
-import { debounce } from 'lodash'
 import { defineStore } from 'pinia'
 import { defineStore } from 'pinia'
-import upstream from '@/api/upstream'
+import upstream, { type UpstreamStatus, type UpstreamAvailabilityResponse } from '@/api/upstream'
 
 
-export interface ProxyAvailabilityResult {
-  online: boolean
-  latency: number
-}
+// Alias for consistency with existing code
+export type ProxyAvailabilityResult = UpstreamStatus
 
 
 export const useProxyAvailabilityStore = defineStore('proxyAvailability', () => {
 export const useProxyAvailabilityStore = defineStore('proxyAvailability', () => {
   const availabilityResults = ref<Record<string, ProxyAvailabilityResult>>({})
   const availabilityResults = ref<Record<string, ProxyAvailabilityResult>>({})
   const websocket = shallowRef<ReconnectingWebSocket | WebSocket>()
   const websocket = shallowRef<ReconnectingWebSocket | WebSocket>()
   const isConnected = ref(false)
   const isConnected = ref(false)
-
-  // Map to store targets for each component instance
-  const componentTargets = ref<Map<string, string[]>>(new Map())
-
-  // Computed property to get unique targets from all components
-  const allTargets = computed(() => {
-    const allTargetsList: string[] = []
-    componentTargets.value.forEach(targets => {
-      allTargetsList.push(...targets)
-    })
-    return [...new Set(allTargetsList)]
-  })
+  const isInitialized = ref(false)
+  const lastUpdateTime = ref<string>('')
+  const targetCount = ref(0)
 
 
   function getTargetKey(target: ProxyTarget): string {
   function getTargetKey(target: ProxyTarget): string {
     return `${target.host}:${target.port}`
     return `${target.host}:${target.port}`
   }
   }
 
 
-  // Debounced function to update targets on server
-  const debouncedUpdateTargets = debounce(() => {
-    if (websocket.value && isConnected.value) {
-      websocket.value.send(JSON.stringify(allTargets.value))
+  // Initialize availability data from HTTP API
+  async function initialize() {
+    if (isInitialized.value) {
+      return
     }
     }
-  }, 300)
 
 
-  function ensureWebSocketConnection() {
+    try {
+      const response = await upstream.getAvailability()
+      const data = response as UpstreamAvailabilityResponse
+      
+      availabilityResults.value = data.results || {}
+      lastUpdateTime.value = data.last_update_time || ''
+      targetCount.value = data.target_count || 0
+      isInitialized.value = true
+      
+      console.log(`Initialized proxy availability with ${targetCount.value} targets`)
+    } catch (error) {
+      console.error('Failed to initialize proxy availability:', error)
+    }
+  }
+
+  // Connect to WebSocket for real-time updates
+  function connectWebSocket() {
     if (websocket.value && isConnected.value) {
     if (websocket.value && isConnected.value) {
       return
       return
     }
     }
@@ -47,96 +50,93 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
       websocket.value.close()
       websocket.value.close()
     }
     }
 
 
-    // Create new WebSocket connection
-    websocket.value = upstream.availability_test()
-
-    websocket.value.onopen = () => {
-      isConnected.value = true
-      // Send current targets immediately after connection
-      debouncedUpdateTargets()
-    }
-
-    websocket.value.onmessage = (e: MessageEvent) => {
-      const results = JSON.parse(e.data) as Record<string, ProxyAvailabilityResult>
-      // Update availability results
-      Object.assign(availabilityResults.value, results)
-    }
-
-    websocket.value.onclose = () => {
-      isConnected.value = false
-    }
-
-    websocket.value.onerror = error => {
-      console.error('WebSocket error:', error)
-      isConnected.value = false
-    }
+         try {
+       // Create new WebSocket connection
+       const ws = upstream.availabilityWebSocket()
+       websocket.value = ws
+
+       ws.onopen = () => {
+         isConnected.value = true
+         console.log('Proxy availability WebSocket connected')
+       }
+
+       ws.onmessage = (e: MessageEvent) => {
+         try {
+           const results = JSON.parse(e.data) as Record<string, ProxyAvailabilityResult>
+           // Update availability results with latest data
+           availabilityResults.value = { ...results }
+           lastUpdateTime.value = new Date().toISOString()
+         } catch (error) {
+           console.error('Failed to parse WebSocket message:', error)
+         }
+       }
+
+       ws.onclose = () => {
+         isConnected.value = false
+         console.log('Proxy availability WebSocket disconnected')
+       }
+
+       ws.onerror = error => {
+         console.error('Proxy availability WebSocket error:', error)
+         isConnected.value = false
+       }
+     } catch (error) {
+       console.error('Failed to create WebSocket connection:', error)
+     }
   }
   }
 
 
-  function registerComponent(targets: ProxyTarget[]): string {
-    const componentId = useId()
-    const targetKeys = targets.map(getTargetKey)
-
-    componentTargets.value.set(componentId, targetKeys)
-
-    // Ensure WebSocket connection exists
-    ensureWebSocketConnection()
-
-    // Update targets on server (debounced)
-    debouncedUpdateTargets()
-
-    return componentId
+  // Start monitoring (initialize + WebSocket)
+  async function startMonitoring() {
+    await initialize()
+    connectWebSocket()
   }
   }
 
 
-  function updateComponentTargets(componentId: string, targets: ProxyTarget[]) {
-    const targetKeys = targets.map(getTargetKey)
-    componentTargets.value.set(componentId, targetKeys)
-
-    // Update targets on server (debounced)
-    debouncedUpdateTargets()
-  }
-
-  function unregisterComponent(componentId: string) {
-    componentTargets.value.delete(componentId)
-
-    // Update targets on server (debounced)
-    debouncedUpdateTargets()
-
-    // Close WebSocket if no components are registered
-    if (componentTargets.value.size === 0) {
-      // Cancel pending debounced calls
-      debouncedUpdateTargets.cancel()
-
-      if (websocket.value) {
-        websocket.value.close()
-        websocket.value = undefined
-        isConnected.value = false
-      }
+  // Stop monitoring and cleanup
+  function stopMonitoring() {
+    if (websocket.value) {
+      websocket.value.close()
+      websocket.value = undefined
+      isConnected.value = false
     }
     }
   }
   }
 
 
+  // Get availability result for a specific target
   function getAvailabilityResult(target: ProxyTarget): ProxyAvailabilityResult | undefined {
   function getAvailabilityResult(target: ProxyTarget): ProxyAvailabilityResult | undefined {
     const key = getTargetKey(target)
     const key = getTargetKey(target)
     return availabilityResults.value[key]
     return availabilityResults.value[key]
   }
   }
 
 
-  function isTargetTesting(target: ProxyTarget): boolean {
+  // Check if target has availability data
+  function hasAvailabilityData(target: ProxyTarget): boolean {
     const key = getTargetKey(target)
     const key = getTargetKey(target)
-    return allTargets.value.includes(key)
+    return key in availabilityResults.value
   }
   }
 
 
-  // Watch for changes in allTargets and update server (debounced)
-  watch(allTargets, () => {
-    debouncedUpdateTargets()
-  })
+  // Get all available targets
+  function getAllTargets(): string[] {
+    return Object.keys(availabilityResults.value)
+  }
+
+  // Auto-cleanup WebSocket on page unload
+  if (typeof window !== 'undefined') {
+    window.addEventListener('beforeunload', () => {
+      stopMonitoring()
+    })
+  }
 
 
   return {
   return {
     availabilityResults: readonly(availabilityResults),
     availabilityResults: readonly(availabilityResults),
     isConnected: readonly(isConnected),
     isConnected: readonly(isConnected),
-    registerComponent,
-    updateComponentTargets,
-    unregisterComponent,
+    isInitialized: readonly(isInitialized),
+    lastUpdateTime: readonly(lastUpdateTime),
+    targetCount: readonly(targetCount),
+    initialize,
+    startMonitoring,
+    stopMonitoring,
+    connectWebSocket,
     getAvailabilityResult,
     getAvailabilityResult,
-    isTargetTesting,
+    hasAvailabilityData,
+    getAllTargets,
     getTargetKey,
     getTargetKey,
   }
   }
 })
 })

+ 77 - 0
docs/frontend-proxy-availability-changes.md

@@ -0,0 +1,77 @@
+# Frontend Proxy Availability Changes
+
+## 概述
+
+根据后端API重构,简化了前端的代理可用性监控机制。
+
+## 主要变更
+
+### 1. API文件更新 (`app/src/api/upstream.ts`)
+- 新增 `getAvailability()` HTTP GET方法获取所有可用性状态
+- 新增 `availabilityWebSocket()` WebSocket方法实时更新状态
+- 更新了接口类型定义 `UpstreamAvailabilityResponse`
+
+### 2. Store重构 (`app/src/pinia/moudule/proxyAvailability.ts`)
+
+#### 移除的功能:
+- 复杂的组件注册/注销机制 (`registerComponent`, `unregisterComponent`)
+- 基于组件的目标管理
+- 向后端发送目标列表的逻辑
+- 防抖处理
+
+#### 新增的功能:
+- `initialize()` - 从HTTP GET接口初始化状态
+- `startMonitoring()` - 启动完整监控(HTTP初始化 + WebSocket连接)
+- `stopMonitoring()` - 停止监控并清理连接
+- `hasAvailabilityData()` - 检查目标是否有可用性数据
+- `getAllTargets()` - 获取所有可用目标列表
+
+#### 简化的功能:
+- `getAvailabilityResult()` - 简单地从缓存获取结果
+- 自动WebSocket连接管理
+- 页面卸载时自动清理
+
+### 3. 组件使用方式
+
+组件现在只需要:
+```typescript
+const proxyStore = useProxyAvailabilityStore()
+
+// 获取可用性结果(从缓存)
+const result = proxyStore.getAvailabilityResult(target)
+```
+
+不再需要注册/注销组件或发送目标到后端。
+
+### 4. 布局初始化 (`app/src/layouts/BaseLayout.vue`)
+
+在用户登录后的主布局中初始化监控,确保用户已认证:
+```typescript
+const proxyAvailabilityStore = useProxyAvailabilityStore()
+
+onMounted(() => {
+  // Start monitoring for upstream availability
+  proxyAvailabilityStore.startMonitoring()
+})
+
+onUnmounted(() => {
+  // Stop monitoring when layout is unmounted
+  proxyAvailabilityStore.stopMonitoring()
+})
+```
+
+## 工作流程
+
+1. **用户登录**:用户成功登录并进入主布局
+2. **布局挂载**:`BaseLayout.vue` 挂载时自动调用 `startMonitoring()`
+3. **初始化**:通过HTTP GET获取当前所有可用性状态
+4. **实时更新**:建立WebSocket连接,接收实时状态更新
+5. **组件使用**:组件直接从缓存读取状态,无需额外请求
+6. **自动清理**:布局卸载时自动断开WebSocket连接
+
+## 优势
+
+- **简化架构**:移除复杂的组件管理逻辑
+- **更好性能**:减少不必要的网络请求
+- **统一数据源**:所有组件共享同一份缓存数据
+- **自动管理**:无需手动处理连接生命周期 

+ 80 - 0
docs/upstream-availability-refactor.md

@@ -0,0 +1,80 @@
+# Upstream Availability Monitoring Refactor
+
+## 概述
+
+重构了upstream可用性检测功能,从原来的WebSocket实时检测改为后台定时任务方式,提供更高效和稳定的监控。
+
+## 主要改动
+
+### 1. 后台任务机制
+- 通过 `cache.RegisterCallback` 注册 `ParseProxyTargetsFromRawContent` 自动收集所有配置文件中的代理目标
+- 创建了 `UpstreamService` 单例服务管理所有代理目标和检测结果
+- 实现了去重机制,避免重复检测相同的地址
+- 支持跟踪每个代理目标的来源配置文件
+
+### 2. 定时检测
+- 添加了cron job,默认每30秒执行一次可用性检测
+- 当有WebSocket连接时,检测频率自动提高到每5秒一次
+- 实现了并发控制,防止多个检测任务同时运行
+
+### 3. API接口
+保留了两个简洁的接口:
+
+#### HTTP GET接口
+- 路径:`/api/upstream/availability`
+- 功能:获取所有upstream的缓存监控结果
+- 返回数据:
+  ```json
+  {
+    "results": {
+      "127.0.0.1:8080": {
+        "online": true,
+        "latency": 1.23
+      }
+    },
+    "targets": [...],
+    "last_update_time": "2024-01-01T00:00:00Z",
+    "target_count": 10
+  }
+  ```
+
+#### WebSocket接口
+- 路径:`/api/upstream/availability/ws`
+- 功能:实时推送监控结果
+- 特点:
+  - 连接时立即发送当前结果
+  - 每5秒推送一次最新结果
+  - 自动管理连接数,优化检测频率
+
+## 技术细节
+
+### 去重机制
+- 基于 `host:port` 作为唯一标识
+- 支持多个配置文件引用同一个代理目标
+- 当某个配置文件被删除时,只有独占的代理目标会被移除
+
+### 并发控制
+- 使用互斥锁防止多个检测任务同时运行
+- WebSocket连接计数器管理检测频率
+- 后台任务会检查是否有活跃的WebSocket连接,避免重复检测
+
+### 性能优化
+- 缓存检测结果,减少实时检测的开销
+- 批量检测所有目标,提高效率
+- 使用goroutine池限制并发连接数(MaxConcurrentWorker = 10)
+
+## 使用示例
+
+### 获取监控结果
+```bash
+curl http://localhost:9000/api/upstream/availability
+```
+
+### WebSocket连接
+```javascript
+const ws = new WebSocket('ws://localhost:9000/api/upstream/availability/ws');
+ws.onmessage = (event) => {
+  const results = JSON.parse(event.data);
+  console.log('Upstream status:', results);
+};
+``` 

+ 2 - 2
internal/cache/search.go

@@ -222,8 +222,8 @@ func (si *SearchIndexer) IndexDocument(doc SearchDocument) error {
 		return fmt.Errorf("search index not initialized")
 		return fmt.Errorf("search index not initialized")
 	}
 	}
 
 
-	logger.Debugf("Indexing document: ID=%s, Type=%s, Name=%s, Path=%s",
-		doc.ID, doc.Type, doc.Name, doc.Path)
+	// logger.Debugf("Indexing document: ID=%s, Type=%s, Name=%s, Path=%s",
+	// 	doc.ID, doc.Type, doc.Name, doc.Path)
 
 
 	return si.index.Index(doc.ID, doc)
 	return si.index.Index(doc.ID, doc)
 }
 }

+ 6 - 0
internal/cron/cron.go

@@ -47,6 +47,12 @@ func InitCronJobs(ctx context.Context) {
 		logger.Fatalf("AutoBackup Err: %v\n", err)
 		logger.Fatalf("AutoBackup Err: %v\n", err)
 	}
 	}
 
 
+	// Initialize upstream availability testing job
+	_, err = setupUpstreamAvailabilityJob(s)
+	if err != nil {
+		logger.Fatalf("UpstreamAvailability Err: %v\n", err)
+	}
+
 	// Start the scheduler
 	// Start the scheduler
 	s.Start()
 	s.Start()
 
 

+ 89 - 0
internal/cron/upstream_availability.go

@@ -0,0 +1,89 @@
+package cron
+
+import (
+	"time"
+
+	"github.com/0xJacky/Nginx-UI/internal/upstream"
+	"github.com/go-co-op/gocron/v2"
+	"github.com/uozi-tech/cosy/logger"
+)
+
+// upstreamAvailabilityJob holds the job instance
+var upstreamAvailabilityJob gocron.Job
+
+// setupUpstreamAvailabilityJob initializes the upstream availability testing job
+func setupUpstreamAvailabilityJob(scheduler gocron.Scheduler) (gocron.Job, error) {
+	job, err := scheduler.NewJob(
+		gocron.DurationJob(30*time.Second),
+		gocron.NewTask(executeUpstreamAvailabilityTest),
+		gocron.WithSingletonMode(gocron.LimitModeWait),
+		gocron.WithName("upstream_availability_test"),
+		gocron.JobOption(gocron.WithStartImmediately()),
+	)
+	if err != nil {
+		logger.Errorf("UpstreamAvailability Job: Err: %v\n", err)
+		return nil, err
+	}
+
+	upstreamAvailabilityJob = job
+	logger.Info("Upstream availability testing job started with 30s interval")
+	return job, nil
+}
+
+// executeUpstreamAvailabilityTest performs the upstream availability test
+func executeUpstreamAvailabilityTest() {
+	service := upstream.GetUpstreamService()
+
+	targetCount := service.GetTargetCount()
+	if targetCount == 0 {
+		logger.Debug("No upstream targets to test")
+		return
+	}
+
+	// Check if we should skip this test due to active WebSocket connections
+	// (WebSocket connections trigger more frequent checks)
+	if hasActiveWebSocketConnections() {
+		logger.Debug("Skipping scheduled test due to active WebSocket connections")
+		return
+	}
+
+	start := time.Now()
+	logger.Debug("Starting scheduled upstream availability test for", targetCount, "targets")
+
+	service.PerformAvailabilityTest()
+
+	duration := time.Since(start)
+	logger.Debug("Upstream availability test completed in", duration)
+}
+
+// hasActiveWebSocketConnections checks if there are active WebSocket connections
+// This is a placeholder - the actual implementation should check the API package
+func hasActiveWebSocketConnections() bool {
+	// TODO: This should check api/upstream.HasActiveWebSocketConnections()
+	// but we need to avoid circular dependencies
+	return false
+}
+
+// RestartUpstreamAvailabilityJob restarts the upstream availability job
+func RestartUpstreamAvailabilityJob() error {
+	logger.Info("Restarting upstream availability job...")
+
+	// Remove existing job if it exists
+	if upstreamAvailabilityJob != nil {
+		err := s.RemoveJob(upstreamAvailabilityJob.ID())
+		if err != nil {
+			logger.Error("Failed to remove existing upstream availability job:", err)
+		}
+		upstreamAvailabilityJob = nil
+	}
+
+	// Create new job
+	job, err := setupUpstreamAvailabilityJob(s)
+	if err != nil {
+		return err
+	}
+
+	upstreamAvailabilityJob = job
+	logger.Info("Upstream availability job restarted successfully")
+	return nil
+}

+ 0 - 46
internal/upstream/proxy_parser.go

@@ -16,29 +16,6 @@ type ProxyTarget struct {
 	Type string `json:"type"` // "proxy_pass" or "upstream"
 	Type string `json:"type"` // "proxy_pass" or "upstream"
 }
 }
 
 
-// ParseProxyTargets extracts proxy targets from nginx configuration
-func ParseProxyTargets(config *nginx.NgxConfig) []ProxyTarget {
-	var targets []ProxyTarget
-
-	if config == nil {
-		return targets
-	}
-
-	// Parse upstream servers
-	for _, upstream := range config.Upstreams {
-		upstreamTargets := parseUpstreamServers(upstream)
-		targets = append(targets, upstreamTargets...)
-	}
-
-	// Parse proxy_pass directives in servers
-	for _, server := range config.Servers {
-		proxyTargets := parseServerProxyPass(server)
-		targets = append(targets, proxyTargets...)
-	}
-
-	return deduplicateTargets(targets)
-}
-
 // ParseProxyTargetsFromRawContent parses proxy targets from raw nginx configuration content
 // ParseProxyTargetsFromRawContent parses proxy targets from raw nginx configuration content
 func ParseProxyTargetsFromRawContent(content string) []ProxyTarget {
 func ParseProxyTargetsFromRawContent(content string) []ProxyTarget {
 	var targets []ProxyTarget
 	var targets []ProxyTarget
@@ -105,29 +82,6 @@ func parseUpstreamServers(upstream *nginx.NgxUpstream) []ProxyTarget {
 	return targets
 	return targets
 }
 }
 
 
-// parseServerProxyPass extracts proxy_pass targets from server blocks
-func parseServerProxyPass(server *nginx.NgxServer) []ProxyTarget {
-	var targets []ProxyTarget
-
-	// Check directives in server block
-	for _, directive := range server.Directives {
-		if directive.Directive == "proxy_pass" {
-			target := parseProxyPassURL(directive.Params)
-			if target.Host != "" {
-				targets = append(targets, target)
-			}
-		}
-	}
-
-	// Check directives in location blocks
-	for _, location := range server.Locations {
-		locationTargets := parseLocationProxyPass(location.Content)
-		targets = append(targets, locationTargets...)
-	}
-
-	return targets
-}
-
 // parseLocationProxyPass extracts proxy_pass from location content
 // parseLocationProxyPass extracts proxy_pass from location content
 func parseLocationProxyPass(content string) []ProxyTarget {
 func parseLocationProxyPass(content string) []ProxyTarget {
 	var targets []ProxyTarget
 	var targets []ProxyTarget

+ 284 - 0
internal/upstream/service.go

@@ -0,0 +1,284 @@
+package upstream
+
+import (
+	"sync"
+	"time"
+
+	"github.com/0xJacky/Nginx-UI/internal/cache"
+	"github.com/uozi-tech/cosy/logger"
+)
+
+// TargetInfo contains proxy target information with source config
+type TargetInfo struct {
+	ProxyTarget
+	ConfigPath string    `json:"config_path"`
+	LastSeen   time.Time `json:"last_seen"`
+}
+
+// UpstreamService manages upstream availability testing
+type UpstreamService struct {
+	targets         map[string]*TargetInfo // key: host:port
+	availabilityMap map[string]*Status     // key: host:port
+	configTargets   map[string][]string    // configPath -> []targetKeys
+	targetsMutex    sync.RWMutex
+	lastUpdateTime  time.Time
+	testInProgress  bool
+	testMutex       sync.Mutex
+}
+
+var (
+	upstreamService *UpstreamService
+	serviceOnce     sync.Once
+)
+
+// GetUpstreamService returns the singleton upstream service instance
+func GetUpstreamService() *UpstreamService {
+	serviceOnce.Do(func() {
+		upstreamService = &UpstreamService{
+			targets:         make(map[string]*TargetInfo),
+			availabilityMap: make(map[string]*Status),
+			configTargets:   make(map[string][]string),
+			lastUpdateTime:  time.Now(),
+		}
+	})
+	return upstreamService
+}
+
+// init registers the ParseProxyTargetsFromRawContent callback
+func init() {
+	cache.RegisterCallback(scanForProxyTargets)
+}
+
+// scanForProxyTargets is the callback function for cache scanner
+func scanForProxyTargets(configPath string, content []byte) error {
+	// Parse proxy targets from config content
+	targets := ParseProxyTargetsFromRawContent(string(content))
+
+	service := GetUpstreamService()
+	service.updateTargetsFromConfig(configPath, targets)
+
+	return nil
+}
+
+// updateTargetsFromConfig updates proxy targets from a specific config file
+func (s *UpstreamService) updateTargetsFromConfig(configPath string, targets []ProxyTarget) {
+	s.targetsMutex.Lock()
+	defer s.targetsMutex.Unlock()
+
+	now := time.Now()
+
+	// Remove old targets from this config path
+	if oldTargetKeys, exists := s.configTargets[configPath]; exists {
+		for _, key := range oldTargetKeys {
+			if _, exists := s.targets[key]; exists {
+				// Only remove if this is the only config using this target
+				isOnlyConfig := true
+				for otherConfig, otherKeys := range s.configTargets {
+					if otherConfig != configPath {
+						for _, otherKey := range otherKeys {
+							if otherKey == key {
+								isOnlyConfig = false
+								break
+							}
+						}
+						if !isOnlyConfig {
+							break
+						}
+					}
+				}
+				if isOnlyConfig {
+					delete(s.targets, key)
+					delete(s.availabilityMap, key)
+					logger.Debug("Removed proxy target:", key, "from config:", configPath)
+				} else {
+					logger.Debug("Keeping proxy target:", key, "still used by other configs")
+				}
+			}
+		}
+	}
+
+	// Add/update new targets
+	newTargetKeys := make([]string, 0, len(targets))
+	for _, target := range targets {
+		key := target.Host + ":" + target.Port
+		newTargetKeys = append(newTargetKeys, key)
+
+		if existingTarget, exists := s.targets[key]; exists {
+			// Update existing target with latest info
+			existingTarget.LastSeen = now
+			existingTarget.ConfigPath = configPath // Update to latest config that referenced it
+			// logger.Debug("Updated proxy target:", key, "from config:", configPath)
+		} else {
+			// Add new target
+			s.targets[key] = &TargetInfo{
+				ProxyTarget: target,
+				ConfigPath:  configPath,
+				LastSeen:    now,
+			}
+			// logger.Debug("Added proxy target:", key, "type:", target.Type, "from config:", configPath)
+		}
+	}
+
+	// Update config target mapping
+	s.configTargets[configPath] = newTargetKeys
+	s.lastUpdateTime = now
+
+	// logger.Debug("Config", configPath, "updated with", len(targets), "targets")
+}
+
+// GetTargets returns a copy of current proxy targets
+func (s *UpstreamService) GetTargets() []ProxyTarget {
+	s.targetsMutex.RLock()
+	defer s.targetsMutex.RUnlock()
+
+	targets := make([]ProxyTarget, 0, len(s.targets))
+	for _, targetInfo := range s.targets {
+		targets = append(targets, targetInfo.ProxyTarget)
+	}
+	return targets
+}
+
+// GetTargetInfos returns a copy of current target infos
+func (s *UpstreamService) GetTargetInfos() []*TargetInfo {
+	s.targetsMutex.RLock()
+	defer s.targetsMutex.RUnlock()
+
+	targetInfos := make([]*TargetInfo, 0, len(s.targets))
+	for _, targetInfo := range s.targets {
+		// Create a copy
+		targetInfoCopy := &TargetInfo{
+			ProxyTarget: targetInfo.ProxyTarget,
+			ConfigPath:  targetInfo.ConfigPath,
+			LastSeen:    targetInfo.LastSeen,
+		}
+		targetInfos = append(targetInfos, targetInfoCopy)
+	}
+	return targetInfos
+}
+
+// GetAvailabilityMap returns a copy of current availability results
+func (s *UpstreamService) GetAvailabilityMap() map[string]*Status {
+	s.targetsMutex.RLock()
+	defer s.targetsMutex.RUnlock()
+
+	result := make(map[string]*Status)
+	for k, v := range s.availabilityMap {
+		// Create a copy of the status
+		result[k] = &Status{
+			Online:  v.Online,
+			Latency: v.Latency,
+		}
+	}
+	return result
+}
+
+// PerformAvailabilityTest performs availability test for all targets
+func (s *UpstreamService) PerformAvailabilityTest() {
+	// Prevent concurrent tests
+	s.testMutex.Lock()
+	if s.testInProgress {
+		s.testMutex.Unlock()
+		logger.Debug("Availability test already in progress, skipping")
+		return
+	}
+	s.testInProgress = true
+	s.testMutex.Unlock()
+
+	// Ensure we reset the flag when done
+	defer func() {
+		s.testMutex.Lock()
+		s.testInProgress = false
+		s.testMutex.Unlock()
+	}()
+
+	s.targetsMutex.RLock()
+	targetCount := len(s.targets)
+	s.targetsMutex.RUnlock()
+
+	if targetCount == 0 {
+		logger.Debug("No targets to test")
+		return
+	}
+
+	logger.Debug("Performing availability test for", targetCount, "unique targets")
+
+	// Get target keys for testing
+	s.targetsMutex.RLock()
+	testTargets := make([]string, 0, len(s.targets))
+	for key := range s.targets {
+		testTargets = append(testTargets, key)
+	}
+	s.targetsMutex.RUnlock()
+
+	// Perform the actual availability test
+	results := AvailabilityTest(testTargets)
+
+	// Update availability map
+	s.targetsMutex.Lock()
+	s.availabilityMap = results
+	s.targetsMutex.Unlock()
+
+	logger.Debug("Availability test completed for", len(results), "targets")
+}
+
+// ClearTargets clears all targets (useful for testing or reloading)
+func (s *UpstreamService) ClearTargets() {
+	s.targetsMutex.Lock()
+	defer s.targetsMutex.Unlock()
+
+	s.targets = make(map[string]*TargetInfo)
+	s.availabilityMap = make(map[string]*Status)
+	s.configTargets = make(map[string][]string)
+	s.lastUpdateTime = time.Now()
+
+	logger.Debug("Cleared all proxy targets")
+}
+
+// GetLastUpdateTime returns the last time targets were updated
+func (s *UpstreamService) GetLastUpdateTime() time.Time {
+	s.targetsMutex.RLock()
+	defer s.targetsMutex.RUnlock()
+	return s.lastUpdateTime
+}
+
+// GetTargetCount returns the number of unique targets
+func (s *UpstreamService) GetTargetCount() int {
+	s.targetsMutex.RLock()
+	defer s.targetsMutex.RUnlock()
+	return len(s.targets)
+}
+
+// RemoveConfigTargets removes all targets associated with a specific config file
+func (s *UpstreamService) RemoveConfigTargets(configPath string) {
+	s.targetsMutex.Lock()
+	defer s.targetsMutex.Unlock()
+
+	if targetKeys, exists := s.configTargets[configPath]; exists {
+		for _, key := range targetKeys {
+			// Check if this target is used by other configs
+			isUsedByOthers := false
+			for otherConfig, otherKeys := range s.configTargets {
+				if otherConfig != configPath {
+					for _, otherKey := range otherKeys {
+						if otherKey == key {
+							isUsedByOthers = true
+							break
+						}
+					}
+					if isUsedByOthers {
+						break
+					}
+				}
+			}
+
+			if !isUsedByOthers {
+				delete(s.targets, key)
+				delete(s.availabilityMap, key)
+				logger.Debug("Removed proxy target:", key, "after config removal:", configPath)
+			}
+		}
+		delete(s.configTargets, configPath)
+		s.lastUpdateTime = time.Now()
+		logger.Debug("Removed config targets for:", configPath)
+	}
+}