Browse Source

refactor: simplify event bus

Jacky 1 day ago
parent
commit
ca47b58905

+ 3 - 3
app/src/layouts/BaseLayout.vue

@@ -3,7 +3,7 @@ import { throttle } from 'lodash'
 import { storeToRefs } from 'pinia'
 import { storeToRefs } from 'pinia'
 import settings from '@/api/settings'
 import settings from '@/api/settings'
 import PageHeader from '@/components/PageHeader'
 import PageHeader from '@/components/PageHeader'
-import { useSettingsStore, useProxyAvailabilityStore } from '@/pinia'
+import { useProxyAvailabilityStore, useSettingsStore } from '@/pinia'
 import FooterLayout from './FooterLayout.vue'
 import FooterLayout from './FooterLayout.vue'
 import HeaderLayout from './HeaderLayout.vue'
 import HeaderLayout from './HeaderLayout.vue'
 import SideBar from './SideBar.vue'
 import SideBar from './SideBar.vue'
@@ -41,7 +41,7 @@ const proxyAvailabilityStore = useProxyAvailabilityStore()
 onMounted(() => {
 onMounted(() => {
   // Initialize layout
   // Initialize layout
   init()
   init()
-  
+
   // Start monitoring for upstream availability
   // Start monitoring for upstream availability
   proxyAvailabilityStore.startMonitoring()
   proxyAvailabilityStore.startMonitoring()
 })
 })
@@ -49,7 +49,7 @@ onMounted(() => {
 onUnmounted(() => {
 onUnmounted(() => {
   // Remove resize listener
   // Remove resize listener
   removeEventListener('resize', init)
   removeEventListener('resize', init)
-  
+
   // Stop monitoring when layout is unmounted
   // Stop monitoring when layout is unmounted
   proxyAvailabilityStore.stopMonitoring()
   proxyAvailabilityStore.stopMonitoring()
 })
 })

+ 38 - 38
app/src/pinia/moudule/proxyAvailability.ts

@@ -1,7 +1,8 @@
 import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type { ProxyTarget } from '@/api/site'
 import type { ProxyTarget } from '@/api/site'
+import type { UpstreamAvailabilityResponse, UpstreamStatus } from '@/api/upstream'
 import { defineStore } from 'pinia'
 import { defineStore } from 'pinia'
-import upstream, { type UpstreamStatus, type UpstreamAvailabilityResponse } from '@/api/upstream'
+import upstream from '@/api/upstream'
 
 
 // Alias for consistency with existing code
 // Alias for consistency with existing code
 export type ProxyAvailabilityResult = UpstreamStatus
 export type ProxyAvailabilityResult = UpstreamStatus
@@ -27,14 +28,13 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
     try {
     try {
       const response = await upstream.getAvailability()
       const response = await upstream.getAvailability()
       const data = response as UpstreamAvailabilityResponse
       const data = response as UpstreamAvailabilityResponse
-      
+
       availabilityResults.value = data.results || {}
       availabilityResults.value = data.results || {}
       lastUpdateTime.value = data.last_update_time || ''
       lastUpdateTime.value = data.last_update_time || ''
       targetCount.value = data.target_count || 0
       targetCount.value = data.target_count || 0
       isInitialized.value = true
       isInitialized.value = true
-      
-      console.log(`Initialized proxy availability with ${targetCount.value} targets`)
-    } catch (error) {
+    }
+    catch (error) {
       console.error('Failed to initialize proxy availability:', error)
       console.error('Failed to initialize proxy availability:', error)
     }
     }
   }
   }
@@ -50,39 +50,39 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
       websocket.value.close()
       websocket.value.close()
     }
     }
 
 
-         try {
-       // Create new WebSocket connection
-       const ws = upstream.availabilityWebSocket()
-       websocket.value = ws
-
-       ws.onopen = () => {
-         isConnected.value = true
-         console.log('Proxy availability WebSocket connected')
-       }
-
-       ws.onmessage = (e: MessageEvent) => {
-         try {
-           const results = JSON.parse(e.data) as Record<string, ProxyAvailabilityResult>
-           // Update availability results with latest data
-           availabilityResults.value = { ...results }
-           lastUpdateTime.value = new Date().toISOString()
-         } catch (error) {
-           console.error('Failed to parse WebSocket message:', error)
-         }
-       }
-
-       ws.onclose = () => {
-         isConnected.value = false
-         console.log('Proxy availability WebSocket disconnected')
-       }
-
-       ws.onerror = error => {
-         console.error('Proxy availability WebSocket error:', error)
-         isConnected.value = false
-       }
-     } catch (error) {
-       console.error('Failed to create WebSocket connection:', error)
-     }
+    try {
+      // Create new WebSocket connection
+      const ws = upstream.availabilityWebSocket()
+      websocket.value = ws
+
+      ws.onopen = () => {
+        isConnected.value = true
+      }
+
+      ws.onmessage = (e: MessageEvent) => {
+        try {
+          const results = JSON.parse(e.data) as Record<string, ProxyAvailabilityResult>
+          // Update availability results with latest data
+          availabilityResults.value = { ...results }
+          lastUpdateTime.value = new Date().toISOString()
+        }
+        catch (error) {
+          console.error('Failed to parse WebSocket message:', error)
+        }
+      }
+
+      ws.onclose = () => {
+        isConnected.value = false
+      }
+
+      ws.onerror = error => {
+        console.error('Proxy availability WebSocket error:', error)
+        isConnected.value = false
+      }
+    }
+    catch (error) {
+      console.error('Failed to create WebSocket connection:', error)
+    }
   }
   }
 
 
   // Start monitoring (initialize + WebSocket)
   // Start monitoring (initialize + WebSocket)

+ 0 - 77
docs/frontend-proxy-availability-changes.md

@@ -1,77 +0,0 @@
-# Frontend Proxy Availability Changes
-
-## 概述
-
-根据后端API重构,简化了前端的代理可用性监控机制。
-
-## 主要变更
-
-### 1. API文件更新 (`app/src/api/upstream.ts`)
-- 新增 `getAvailability()` HTTP GET方法获取所有可用性状态
-- 新增 `availabilityWebSocket()` WebSocket方法实时更新状态
-- 更新了接口类型定义 `UpstreamAvailabilityResponse`
-
-### 2. Store重构 (`app/src/pinia/moudule/proxyAvailability.ts`)
-
-#### 移除的功能:
-- 复杂的组件注册/注销机制 (`registerComponent`, `unregisterComponent`)
-- 基于组件的目标管理
-- 向后端发送目标列表的逻辑
-- 防抖处理
-
-#### 新增的功能:
-- `initialize()` - 从HTTP GET接口初始化状态
-- `startMonitoring()` - 启动完整监控(HTTP初始化 + WebSocket连接)
-- `stopMonitoring()` - 停止监控并清理连接
-- `hasAvailabilityData()` - 检查目标是否有可用性数据
-- `getAllTargets()` - 获取所有可用目标列表
-
-#### 简化的功能:
-- `getAvailabilityResult()` - 简单地从缓存获取结果
-- 自动WebSocket连接管理
-- 页面卸载时自动清理
-
-### 3. 组件使用方式
-
-组件现在只需要:
-```typescript
-const proxyStore = useProxyAvailabilityStore()
-
-// 获取可用性结果(从缓存)
-const result = proxyStore.getAvailabilityResult(target)
-```
-
-不再需要注册/注销组件或发送目标到后端。
-
-### 4. 布局初始化 (`app/src/layouts/BaseLayout.vue`)
-
-在用户登录后的主布局中初始化监控,确保用户已认证:
-```typescript
-const proxyAvailabilityStore = useProxyAvailabilityStore()
-
-onMounted(() => {
-  // Start monitoring for upstream availability
-  proxyAvailabilityStore.startMonitoring()
-})
-
-onUnmounted(() => {
-  // Stop monitoring when layout is unmounted
-  proxyAvailabilityStore.stopMonitoring()
-})
-```
-
-## 工作流程
-
-1. **用户登录**:用户成功登录并进入主布局
-2. **布局挂载**:`BaseLayout.vue` 挂载时自动调用 `startMonitoring()`
-3. **初始化**:通过HTTP GET获取当前所有可用性状态
-4. **实时更新**:建立WebSocket连接,接收实时状态更新
-5. **组件使用**:组件直接从缓存读取状态,无需额外请求
-6. **自动清理**:布局卸载时自动断开WebSocket连接
-
-## 优势
-
-- **简化架构**:移除复杂的组件管理逻辑
-- **更好性能**:减少不必要的网络请求
-- **统一数据源**:所有组件共享同一份缓存数据
-- **自动管理**:无需手动处理连接生命周期 

+ 0 - 80
docs/upstream-availability-refactor.md

@@ -1,80 +0,0 @@
-# Upstream Availability Monitoring Refactor
-
-## 概述
-
-重构了upstream可用性检测功能,从原来的WebSocket实时检测改为后台定时任务方式,提供更高效和稳定的监控。
-
-## 主要改动
-
-### 1. 后台任务机制
-- 通过 `cache.RegisterCallback` 注册 `ParseProxyTargetsFromRawContent` 自动收集所有配置文件中的代理目标
-- 创建了 `UpstreamService` 单例服务管理所有代理目标和检测结果
-- 实现了去重机制,避免重复检测相同的地址
-- 支持跟踪每个代理目标的来源配置文件
-
-### 2. 定时检测
-- 添加了cron job,默认每30秒执行一次可用性检测
-- 当有WebSocket连接时,检测频率自动提高到每5秒一次
-- 实现了并发控制,防止多个检测任务同时运行
-
-### 3. API接口
-保留了两个简洁的接口:
-
-#### HTTP GET接口
-- 路径:`/api/upstream/availability`
-- 功能:获取所有upstream的缓存监控结果
-- 返回数据:
-  ```json
-  {
-    "results": {
-      "127.0.0.1:8080": {
-        "online": true,
-        "latency": 1.23
-      }
-    },
-    "targets": [...],
-    "last_update_time": "2024-01-01T00:00:00Z",
-    "target_count": 10
-  }
-  ```
-
-#### WebSocket接口
-- 路径:`/api/upstream/availability/ws`
-- 功能:实时推送监控结果
-- 特点:
-  - 连接时立即发送当前结果
-  - 每5秒推送一次最新结果
-  - 自动管理连接数,优化检测频率
-
-## 技术细节
-
-### 去重机制
-- 基于 `host:port` 作为唯一标识
-- 支持多个配置文件引用同一个代理目标
-- 当某个配置文件被删除时,只有独占的代理目标会被移除
-
-### 并发控制
-- 使用互斥锁防止多个检测任务同时运行
-- WebSocket连接计数器管理检测频率
-- 后台任务会检查是否有活跃的WebSocket连接,避免重复检测
-
-### 性能优化
-- 缓存检测结果,减少实时检测的开销
-- 批量检测所有目标,提高效率
-- 使用goroutine池限制并发连接数(MaxConcurrentWorker = 10)
-
-## 使用示例
-
-### 获取监控结果
-```bash
-curl http://localhost:9000/api/upstream/availability
-```
-
-### WebSocket连接
-```javascript
-const ws = new WebSocket('ws://localhost:9000/api/upstream/availability/ws');
-ws.onmessage = (event) => {
-  const results = JSON.parse(event.data);
-  console.log('Upstream status:', results);
-};
-``` 

+ 9 - 110
internal/event/bus.go

@@ -12,20 +12,12 @@ type WebSocketHub interface {
 	BroadcastMessage(event string, data interface{})
 	BroadcastMessage(event string, data interface{})
 }
 }
 
 
-// WebSocketEventConfig holds configuration for WebSocket event forwarding
-type WebSocketEventConfig struct {
-	EventType     EventType
-	WSEventName   string
-	DataTransform func(data interface{}) interface{}
-}
-
 // EventBus manages event publishing and WebSocket forwarding
 // EventBus manages event publishing and WebSocket forwarding
 type EventBus struct {
 type EventBus struct {
-	wsHub     WebSocketHub
-	wsConfigs map[EventType]*WebSocketEventConfig
-	wsMutex   sync.RWMutex
-	ctx       context.Context
-	cancel    context.CancelFunc
+	wsHub   WebSocketHub
+	wsMutex sync.RWMutex
+	ctx     context.Context
+	cancel  context.CancelFunc
 }
 }
 
 
 var (
 var (
@@ -38,9 +30,8 @@ func GetEventBus() *EventBus {
 	busOnce.Do(func() {
 	busOnce.Do(func() {
 		ctx, cancel := context.WithCancel(context.Background())
 		ctx, cancel := context.WithCancel(context.Background())
 		globalBus = &EventBus{
 		globalBus = &EventBus{
-			wsConfigs: make(map[EventType]*WebSocketEventConfig),
-			ctx:       ctx,
-			cancel:    cancel,
+			ctx:    ctx,
+			cancel: cancel,
 		}
 		}
 	})
 	})
 	return globalBus
 	return globalBus
@@ -54,49 +45,18 @@ func (eb *EventBus) SetWebSocketHub(hub WebSocketHub) {
 	logger.Info("WebSocket hub registered with event bus")
 	logger.Info("WebSocket hub registered with event bus")
 }
 }
 
 
-// RegisterWebSocketEventForwarding registers an event type to be forwarded to WebSocket clients
-func (eb *EventBus) RegisterWebSocketEventForwarding(eventType EventType, wsEventName string) {
-	eb.RegisterWebSocketEventForwardingWithTransform(eventType, wsEventName, func(data interface{}) interface{} {
-		return data // Default: no transformation
-	})
-}
-
-// RegisterWebSocketEventForwardingWithTransform registers an event type with custom data transformation
-func (eb *EventBus) RegisterWebSocketEventForwardingWithTransform(eventType EventType, wsEventName string, transform func(data interface{}) interface{}) {
-	eb.wsMutex.Lock()
-	defer eb.wsMutex.Unlock()
-
-	// Only register if not already registered
-	if _, exists := eb.wsConfigs[eventType]; !exists {
-		config := &WebSocketEventConfig{
-			EventType:     eventType,
-			WSEventName:   wsEventName,
-			DataTransform: transform,
-		}
-		eb.wsConfigs[eventType] = config
-		logger.Debugf("Registered WebSocket event forwarding: %s -> %s", eventType, wsEventName)
-	}
-}
-
 // Publish forwards an event directly to WebSocket clients
 // Publish forwards an event directly to WebSocket clients
 func (eb *EventBus) Publish(event Event) {
 func (eb *EventBus) Publish(event Event) {
-	eb.forwardToWebSocket(event)
-}
-
-// forwardToWebSocket forwards an event to WebSocket clients if configured
-func (eb *EventBus) forwardToWebSocket(event Event) {
 	eb.wsMutex.RLock()
 	eb.wsMutex.RLock()
-	config, exists := eb.wsConfigs[event.Type]
 	hub := eb.wsHub
 	hub := eb.wsHub
 	eb.wsMutex.RUnlock()
 	eb.wsMutex.RUnlock()
 
 
-	if !exists || hub == nil {
+	if hub == nil {
 		return
 		return
 	}
 	}
 
 
-	// Apply data transformation
-	wsData := config.DataTransform(event.Data)
-	hub.BroadcastMessage(config.WSEventName, wsData)
+	// Directly broadcast the event using its type as the event name
+	hub.BroadcastMessage(string(event.Type), event.Data)
 }
 }
 
 
 // Shutdown gracefully shuts down the event bus
 // Shutdown gracefully shuts down the event bus
@@ -105,8 +65,6 @@ func (eb *EventBus) Shutdown() {
 	eb.wsMutex.Lock()
 	eb.wsMutex.Lock()
 	defer eb.wsMutex.Unlock()
 	defer eb.wsMutex.Unlock()
 
 
-	// Clear all configurations
-	eb.wsConfigs = make(map[EventType]*WebSocketEventConfig)
 	eb.wsHub = nil
 	eb.wsHub = nil
 	logger.Info("Event bus shutdown completed")
 	logger.Info("Event bus shutdown completed")
 }
 }
@@ -127,62 +85,3 @@ func Publish(event Event) {
 func SetWebSocketHub(hub WebSocketHub) {
 func SetWebSocketHub(hub WebSocketHub) {
 	GetEventBus().SetWebSocketHub(hub)
 	GetEventBus().SetWebSocketHub(hub)
 }
 }
-
-// RegisterWebSocketEventForwarding registers WebSocket event forwarding on the global bus
-func RegisterWebSocketEventForwarding(eventType EventType, wsEventName string) {
-	GetEventBus().RegisterWebSocketEventForwarding(eventType, wsEventName)
-}
-
-// RegisterWebSocketEventForwardingWithTransform registers WebSocket event forwarding with transform on the global bus
-func RegisterWebSocketEventForwardingWithTransform(eventType EventType, wsEventName string, transform func(data interface{}) interface{}) {
-	GetEventBus().RegisterWebSocketEventForwardingWithTransform(eventType, wsEventName, transform)
-}
-
-// RegisterWebSocketEventConfigs registers multiple WebSocket event configurations
-func RegisterWebSocketEventConfigs(configs []WebSocketEventConfig) {
-	bus := GetEventBus()
-	for _, config := range configs {
-		bus.RegisterWebSocketEventForwardingWithTransform(config.EventType, config.WSEventName, config.DataTransform)
-	}
-}
-
-// GetDefaultWebSocketEventConfigs returns the default WebSocket event configurations
-func GetDefaultWebSocketEventConfigs() []WebSocketEventConfig {
-	return []WebSocketEventConfig{
-		{
-			EventType:   EventTypeIndexScanning,
-			WSEventName: "index_scanning",
-			DataTransform: func(data interface{}) interface{} {
-				return data
-			},
-		},
-		{
-			EventType:   EventTypeAutoCertProcessing,
-			WSEventName: "auto_cert_processing",
-			DataTransform: func(data interface{}) interface{} {
-				return data
-			},
-		},
-		{
-			EventType:   EventTypeProcessingStatus,
-			WSEventName: "processing_status",
-			DataTransform: func(data interface{}) interface{} {
-				return data
-			},
-		},
-		{
-			EventType:   EventTypeNginxLogStatus,
-			WSEventName: "nginx_log_status",
-			DataTransform: func(data interface{}) interface{} {
-				return data
-			},
-		},
-		{
-			EventType:   EventTypeNotification,
-			WSEventName: "notification",
-			DataTransform: func(data interface{}) interface{} {
-				return data
-			},
-		},
-	}
-}

+ 1 - 11
internal/event/init.go

@@ -6,29 +6,19 @@ import (
 	"github.com/uozi-tech/cosy/logger"
 	"github.com/uozi-tech/cosy/logger"
 )
 )
 
 
-// InitEventSystem initializes the event system and sets up WebSocket forwarding
+// InitEventSystem initializes the event system
 func InitEventSystem(ctx context.Context) {
 func InitEventSystem(ctx context.Context) {
 	logger.Info("Initializing event system...")
 	logger.Info("Initializing event system...")
 
 
 	// Initialize the event bus by getting the singleton instance
 	// Initialize the event bus by getting the singleton instance
 	GetEventBus()
 	GetEventBus()
 
 
-	// Initialize WebSocket event forwarding configurations
-	initWebSocketEventForwarding()
-
 	logger.Info("Event system initialized successfully")
 	logger.Info("Event system initialized successfully")
 	defer ShutdownEventSystem()
 	defer ShutdownEventSystem()
 
 
 	<-ctx.Done()
 	<-ctx.Done()
 }
 }
 
 
-// initWebSocketEventForwarding initializes WebSocket event forwarding configurations
-func initWebSocketEventForwarding() {
-	// Register default event forwarding configurations
-	RegisterWebSocketEventConfigs(GetDefaultWebSocketEventConfigs())
-	logger.Info("WebSocket event forwarding initialized")
-}
-
 // ShutdownEventSystem gracefully shuts down the event system
 // ShutdownEventSystem gracefully shuts down the event system
 func ShutdownEventSystem() {
 func ShutdownEventSystem() {
 	logger.Info("Shutting down event system...")
 	logger.Info("Shutting down event system...")