فهرست منبع

refactor: improved WebSocket handling

0xJacky 2 ماه پیش
والد
کامیت
d24d845816
40فایلهای تغییر یافته به همراه472 افزوده شده و 544 حذف شده
  1. 0 1
      app/package.json
  2. 2 7
      app/src/api/analytic.ts
  3. 1 5
      app/src/api/llm.ts
  4. 49 6
      app/src/api/node.ts
  5. 1 4
      app/src/api/self_check.ts
  6. 2 5
      app/src/api/site_navigation.ts
  7. 2 5
      app/src/api/upstream.ts
  8. 4 3
      app/src/components/CodeEditor/CodeCompletion.ts
  9. 8 7
      app/src/components/GeoLiteDownload/GeoLiteDownload.vue
  10. 3 3
      app/src/components/NamespaceTabs/NamespaceTabs.vue
  11. 4 3
      app/src/components/NodeSelector/NodeSelector.vue
  12. 10 4
      app/src/components/Notification/Notification.vue
  13. 10 4
      app/src/components/ProcessingStatus/ProcessingStatus.vue
  14. 6 3
      app/src/components/SelfCheck/tasks/frontend/websocket.ts
  15. 4 4
      app/src/composables/useTerminalSession.ts
  16. 0 181
      app/src/composables/useWebSocketEventBus.ts
  17. 31 11
      app/src/lib/websocket/index.ts
  18. 5 0
      app/src/pinia/index.ts
  19. 69 79
      app/src/pinia/moudule/nodeAvailability.ts
  20. 10 67
      app/src/pinia/moudule/proxyAvailability.ts
  21. 148 0
      app/src/pinia/moudule/websocketEventBus.ts
  22. 1 1
      app/src/version.json
  23. 5 4
      app/src/views/certificate/components/RemoveCert.vue
  24. 4 4
      app/src/views/dashboard/NginxDashBoard.vue
  25. 5 47
      app/src/views/dashboard/Nodes.vue
  26. 4 3
      app/src/views/dashboard/ServerAnalytic.vue
  27. 5 4
      app/src/views/dashboard/SiteNavigation.vue
  28. 3 22
      app/src/views/dashboard/components/NodeAnalyticItem.vue
  29. 15 9
      app/src/views/nginx_log/NginxLogList.vue
  30. 19 8
      app/src/views/nginx_log/composables/useIndexProgress.ts
  31. 4 4
      app/src/views/nginx_log/raw/RawLogViewer.vue
  32. 10 3
      app/src/views/nginx_log/structured/StructuredLogViewer.vue
  33. 8 7
      app/src/views/node/BatchUpgrader.vue
  34. 6 5
      app/src/views/site/site_edit/components/Cert/ObtainCertLive.vue
  35. 8 7
      app/src/views/system/Upgrade.vue
  36. 3 2
      app/src/views/terminal/components/TerminalStatusBar.vue
  37. 1 1
      go.mod
  38. 2 2
      go.sum
  39. 0 1
      internal/analytic/node.go
  40. 0 8
      pnpm-lock.yaml

+ 0 - 1
app/package.json

@@ -43,7 +43,6 @@
     "nprogress": "^0.2.0",
     "pinia": "^3.0.4",
     "pinia-plugin-persistedstate": "^4.7.1",
-    "reconnecting-websocket": "^4.4.0",
     "sortablejs": "^1.15.6",
     "splitpanes": "^4.0.4",
     "sse.js": "^2.7.2",

+ 2 - 7
app/src/api/analytic.ts

@@ -1,5 +1,4 @@
 import { http } from '@uozi-admin/request'
-import ws from '@/lib/websocket'
 
 export interface CPUInfoStat {
   cpu: number
@@ -120,12 +119,8 @@ const analytic = {
   init(): Promise<AnalyticInit> {
     return http.get('/analytic/init')
   },
-  server() {
-    return ws('/api/analytic')
-  },
-  nodes() {
-    return ws('/api/analytic/nodes')
-  },
+  serverWebSocketUrl: '/api/analytic',
+  nodesWebSocketUrl: '/api/analytic/nodes',
 }
 
 export default analytic

+ 1 - 5
app/src/api/llm.ts

@@ -1,6 +1,4 @@
-import type ReconnectingWebSocket from 'reconnecting-websocket'
 import { http } from '@uozi-admin/request'
-import ws from '@/lib/websocket'
 
 export interface ChatComplicationMessage {
   role: string
@@ -41,9 +39,7 @@ const llm = {
   store_messages(data: { file_name?: string, messages?: ChatComplicationMessage[] }) {
     return http.post('/llm_messages', data)
   },
-  code_completion() {
-    return ws('/api/code_completion') as ReconnectingWebSocket
-  },
+  codeCompletionWebSocketUrl: '/api/code_completion',
   get_code_completion_enabled_status() {
     return http.get<{ enabled: boolean }>('/code_completion/enabled')
   },

+ 49 - 6
app/src/api/node.ts

@@ -5,15 +5,58 @@ export interface Node extends ModelBase {
   name: string
   url: string
   token: string
-  status?: boolean
-  response_at?: Date
+  status: boolean
+  enabled: boolean
+  response_at?: string
+}
+
+export interface NodeStatus {
+  avg_load: {
+    load1: number
+    load5: number
+    load15: number
+  }
+  cpu_percent: number
+  memory_percent: number
+  disk_percent: number
+  network: {
+    name: string
+    bytesSent: number
+    bytesRecv: number
+    packetsSent: number
+    packetsRecv: number
+    errin: number
+    errout: number
+    dropin: number
+    dropout: number
+    fifoin: number
+    fifoout: number
+  }
+  status: boolean
+  response_at?: string
+  upstream_status_map: {
+    [key: string]: {
+      online: boolean
+      latency: number
+    }
+  }
 }
 
 export interface NodeInfo {
-  id: number
-  name: string
-  token: string
-  response_at?: Date
+  node_runtime_info: {
+    os: string
+    arch: string
+    ex_path: string
+    cur_version: string
+    in_docker: boolean
+  }
+  version: string
+  cpu_num: number
+  memory_total: string
+  disk_total: string
+}
+
+export interface AnalyticNode extends Node, NodeInfo, NodeStatus {
 }
 
 const baseUrl = '/nodes'

+ 1 - 4
app/src/api/self_check.ts

@@ -1,7 +1,6 @@
 import type { Container } from '@/language'
 import type { CosyError } from '@/lib/http'
 import { http } from '@uozi-admin/request'
-import ws from '@/lib/websocket'
 
 export const ReportStatus = {
   Success: 'success',
@@ -27,9 +26,7 @@ const selfCheck = {
   fix(taskName: string) {
     return http.post(`/self_check/${taskName}/fix`)
   },
-  websocket() {
-    return ws('/api/self_check/websocket', false)
-  },
+  websocketUrl: '/api/self_check/websocket',
   timeoutCheck() {
     return http.get('/self_check/timeout')
   },

+ 2 - 5
app/src/api/site_navigation.ts

@@ -1,6 +1,5 @@
 import type { SiteStatusType } from '@/constants/site-status'
 import { http } from '@uozi-admin/request'
-import ws from '@/lib/websocket'
 
 export interface SiteInfo {
   id: number // primary identifier for API operations
@@ -144,8 +143,6 @@ export const siteNavigationApi = {
     return http.post(`/site_navigation/test_health_check/${id}`, { config })
   },
 
-  // WebSocket connection using lib/websocket
-  createWebSocket() {
-    return ws('/api/site_navigation_ws', true)
-  },
+  // WebSocket URL for real-time updates
+  websocketUrl: '/api/site_navigation_ws',
 }

+ 2 - 5
app/src/api/upstream.ts

@@ -1,5 +1,4 @@
 import { http } from '@uozi-admin/request'
-import ws from '@/lib/websocket'
 
 export interface UpstreamStatus {
   online: boolean
@@ -45,10 +44,8 @@ const upstream = {
     return http.get('/upstream/availability')
   },
 
-  // WebSocket interface for real-time availability updates
-  availabilityWebSocket() {
-    return ws('/api/upstream/availability_ws')
-  },
+  // WebSocket URL for real-time availability updates
+  availabilityWebSocketUrl: '/api/upstream/availability_ws',
 
   // Get all sockets with their configuration and health status
   getSocketList(): Promise<SocketListResponse> {

+ 4 - 3
app/src/components/CodeEditor/CodeCompletion.ts

@@ -1,10 +1,10 @@
 import type { Editor } from 'ace-builds'
 import type { Point } from 'ace-builds-internal/document'
-import type ReconnectingWebSocket from 'reconnecting-websocket'
 import ace from 'ace-builds'
 import { debounce } from 'lodash'
 import { v4 as uuidv4 } from 'uuid'
 import llm from '@/api/llm'
+import { useWebSocket } from '@/lib/websocket'
 
 function debug(...args: unknown[]) {
   if (import.meta.env.DEV) {
@@ -43,7 +43,7 @@ function useCodeCompletion() {
   const lastTriggerTime = ref<number>(0)
   const lastTriggerPosition = ref<{ row: number, column: number } | null>(null)
 
-  const ws = shallowRef<ReconnectingWebSocket>()
+  const ws = shallowRef<WebSocket>()
 
   // Check if the current file is a configuration file
   function checkIfConfigFile(filename: string, content: string): boolean {
@@ -496,7 +496,8 @@ function useCodeCompletion() {
       return
     }
 
-    ws.value = llm.code_completion()
+    const { ws: wsRef } = useWebSocket(llm.codeCompletionWebSocketUrl, false)
+    ws.value = wsRef.value!
 
     editorRef.value = editor
 

+ 8 - 7
app/src/components/GeoLiteDownload/GeoLiteDownload.vue

@@ -3,7 +3,7 @@ import type { Ref } from 'vue'
 import { CheckCircleOutlined, DownloadOutlined, InfoCircleOutlined } from '@ant-design/icons-vue'
 import geolite from '@/api/geolite'
 import { formatDateTime } from '@/lib/helper'
-import websocket from '@/lib/websocket'
+import { useWebSocket } from '@/lib/websocket'
 
 interface Emits {
   (e: 'downloadComplete'): void
@@ -55,16 +55,17 @@ function downloadGeoLiteDB() {
   downloadProgress.value = 0
   downloadMessage.value = $gettext('Starting download...')
 
-  const ws = websocket('/api/geolite/download', false)
+  const { ws } = useWebSocket('/api/geolite/download', false)
+  const socket = ws.value!
 
   let isFailed = false
   let currentPhase = 'download' // 'download' or 'decompress'
 
-  ws.onopen = () => {
-    // WebSocket connected, server will start download
+  socket.onopen = () => {
+    socket.send('start')
   }
 
-  ws.onmessage = async m => {
+  socket.onmessage = async m => {
     const r = JSON.parse(m.data)
 
     // Update message and detect phase changes
@@ -99,13 +100,13 @@ function downloadGeoLiteDB() {
     }
   }
 
-  ws.onerror = () => {
+  socket.onerror = () => {
     isFailed = true
     downloadStatus.value = 'exception'
     downloadMessage.value = $gettext('Download failed')
   }
 
-  ws.onclose = async () => {
+  socket.onclose = async () => {
     if (isFailed) {
       downloading.value = false
       return

+ 3 - 3
app/src/components/NamespaceTabs/NamespaceTabs.vue

@@ -53,7 +53,7 @@ const loading = ref({
 const currentNamespace = computed(() => {
   if (!modelValue.value || modelValue.value === 0)
     return null
-  return namespaces.value.find(g => g.id === Number(modelValue.value))
+  return namespaces.value.find(g => g.id === modelValue.value)
 })
 
 // Get the list of nodes in the current group
@@ -74,7 +74,7 @@ async function handleReloadNginx() {
   if (!currentNamespace.value || !syncNodes.value.length)
     return
 
-  const nodeIds = syncNodes.value.map(node => node.id)
+  const nodeIds = syncNodes.value.map(node => node.id).filter(id => id !== undefined)
 
   loading.value.reload = true
   try {
@@ -94,7 +94,7 @@ async function handleRestartNginx() {
   if (!currentNamespace.value || !syncNodes.value.length)
     return
 
-  const nodeIds = syncNodes.value.map(node => node.id)
+  const nodeIds = syncNodes.value.map(node => node.id).filter(id => id !== undefined)
 
   loading.value.restart = true
   try {

+ 4 - 3
app/src/components/NodeSelector/NodeSelector.vue

@@ -1,5 +1,5 @@
 <script setup lang="ts">
-import type { NodeStatus } from '@/pinia/moudule/nodeAvailability'
+import type { AnalyticNode } from '@/api/node'
 import { useNodeAvailabilityStore } from '@/pinia/moudule/nodeAvailability'
 
 const props = defineProps<{
@@ -16,9 +16,10 @@ const data = computed(() => nodeStore.getAllNodes())
 const data_map = computed(() => {
   const nodes = nodeStore.getAllNodes()
   return nodes.reduce((acc, node) => {
-    acc[node.id] = node
+    if (node.id)
+      acc[node.id] = node
     return acc
-  }, {} as Record<number, NodeStatus>)
+  }, {} as Record<number, Partial<AnalyticNode>>)
 })
 
 const value = computed({

+ 10 - 4
app/src/components/Notification/Notification.vue

@@ -6,9 +6,8 @@ import dayjs from 'dayjs'
 import relativeTime from 'dayjs/plugin/relativeTime'
 import notificationApi from '@/api/notification'
 import { detailRender } from '@/components/Notification/detailRender'
-import { useWebSocketEventBus } from '@/composables/useWebSocketEventBus'
 import { NotificationTypeT } from '@/constants'
-import { useUserStore } from '@/pinia'
+import { useUserStore, useWebSocketEventBusStore } from '@/pinia'
 import notifications from './notifications'
 
 defineProps<{
@@ -25,10 +24,11 @@ const { unreadCount } = storeToRefs(useUserStore())
 
 const data = ref([]) as Ref<Notification[]>
 
-const { subscribe } = useWebSocketEventBus()
+const websocketEventBus = useWebSocketEventBusStore()
+let notificationSubscriptionId: string | null = null
 
 onMounted(() => {
-  subscribe('notification', (data: Notification) => {
+  notificationSubscriptionId = websocketEventBus.subscribe('notification', (data: Notification) => {
     const typeTrans = {
       0: 'error',
       1: 'warning',
@@ -43,6 +43,12 @@ onMounted(() => {
   })
 })
 
+onUnmounted(() => {
+  if (notificationSubscriptionId) {
+    websocketEventBus.unsubscribe(notificationSubscriptionId)
+  }
+})
+
 function init() {
   loading.value = true
   notificationApi.getList({ sort: 'desc', order_by: 'created_at' }).then(r => {

+ 10 - 4
app/src/components/ProcessingStatus/ProcessingStatus.vue

@@ -1,19 +1,25 @@
 <script setup lang="tsx">
 import { SyncOutlined } from '@ant-design/icons-vue'
-import { useWebSocketEventBus } from '@/composables/useWebSocketEventBus'
-import { useGlobalStore } from '@/pinia'
+import { useGlobalStore, useWebSocketEventBusStore } from '@/pinia'
 
-const { subscribe } = useWebSocketEventBus()
+const websocketEventBus = useWebSocketEventBusStore()
+let processingStatusSubscriptionId: string | null = null
 
 const globalStore = useGlobalStore()
 const { processingStatus } = storeToRefs(globalStore)
 
 onMounted(() => {
-  subscribe('processing_status', data => {
+  processingStatusSubscriptionId = websocketEventBus.subscribe('processing_status', data => {
     processingStatus.value = data
   })
 })
 
+onUnmounted(() => {
+  if (processingStatusSubscriptionId) {
+    websocketEventBus.unsubscribe(processingStatusSubscriptionId)
+  }
+})
+
 const isProcessing = computed(() => {
   return Object.values(processingStatus.value).some(v => v)
 })

+ 6 - 3
app/src/components/SelfCheck/tasks/frontend/websocket.ts

@@ -1,6 +1,7 @@
 import type { FrontendTask } from '../types'
 import type { ReportStatusType } from '@/api/self_check'
 import selfCheck, { ReportStatus } from '@/api/self_check'
+import { useWebSocket } from '@/lib/websocket'
 
 /**
  * WebSocket Task
@@ -17,11 +18,13 @@ const WebsocketTask: FrontendTask = {
   check: async (): Promise<ReportStatusType> => {
     try {
       const connected = await new Promise<boolean>(resolve => {
-        const ws = selfCheck.websocket()
-        ws.onopen = () => {
+        const { ws } = useWebSocket(selfCheck.websocketUrl, false)
+        const socket = ws.value!
+        socket.onopen = () => {
+          socket.close()
           resolve(true)
         }
-        ws.onerror = () => {
+        socket.onerror = () => {
           resolve(false)
         }
         // Set a timeout for the connection attempt

+ 4 - 4
app/src/composables/useTerminalSession.ts

@@ -1,14 +1,13 @@
-import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type { TerminalTab } from '@/pinia/moudule/terminal'
 import { FitAddon } from '@xterm/addon-fit'
 import { Terminal } from '@xterm/xterm'
 import { throttle } from 'lodash'
-import ws from '@/lib/websocket'
+import { useWebSocket } from '@/lib/websocket'
 
 export interface TerminalSession {
   tab: TerminalTab
   terminal: Terminal
-  websocket: ReconnectingWebSocket | WebSocket
+  websocket: WebSocket
   fitAddon: FitAddon
   ping?: ReturnType<typeof setTimeout>
   isWebSocketReady: boolean
@@ -48,7 +47,8 @@ export function useTerminalSession() {
     const fitAddon = new FitAddon()
     terminal.loadAddon(fitAddon)
 
-    const websocket = ws(`/api/pty?X-Secure-Session-ID=${secureSessionId}`, false)
+    const { ws } = useWebSocket(`/api/pty?X-Secure-Session-ID=${secureSessionId}`, false)
+    const websocket = ws.value!
 
     const session: TerminalSession = {
       tab,

+ 0 - 181
app/src/composables/useWebSocketEventBus.ts

@@ -1,181 +0,0 @@
-import { v4 as uuidv4 } from 'uuid'
-import ws from '@/lib/websocket'
-
-export interface WebSocketMessage {
-  event: string
-  // eslint-disable-next-line ts/no-explicit-any
-  data: any
-}
-
-// eslint-disable-next-line ts/no-explicit-any
-export type EventHandler<T = any> = (data: T) => void
-
-export interface EventSubscription {
-  id: string
-  event: string
-  handler: EventHandler
-}
-
-class WebSocketEventBus {
-  private static instance: WebSocketEventBus
-  private ws: WebSocket | null = null
-  private subscriptions: Map<string, EventSubscription> = new Map()
-  private isConnected = false
-
-  private constructor() {}
-
-  static getInstance(): WebSocketEventBus {
-    if (!WebSocketEventBus.instance) {
-      WebSocketEventBus.instance = new WebSocketEventBus()
-    }
-    return WebSocketEventBus.instance
-  }
-
-  // Connect to WebSocket
-  connect(): void {
-    if (this.ws && this.isConnected) {
-      return
-    }
-
-    // Close existing connection
-    if (this.ws) {
-      this.ws.close()
-    }
-
-    // Use the lib/websocket to create connection with auto-reconnect
-    this.ws = ws('/api/events', true) as WebSocket
-
-    this.ws.onopen = () => {
-      this.isConnected = true
-    }
-
-    this.ws.onmessage = (event: MessageEvent) => {
-      try {
-        const message: WebSocketMessage = JSON.parse(event.data)
-        this.handleMessage(message)
-      }
-      catch (error) {
-        console.error('Error parsing WebSocket message:', error)
-      }
-    }
-
-    this.ws.onclose = () => {
-      this.isConnected = false
-    }
-
-    this.ws.onerror = error => {
-      console.error('WebSocket error:', error)
-      this.isConnected = false
-    }
-  }
-
-  // Handle incoming WebSocket message
-  private handleMessage(message: WebSocketMessage): void {
-    // Find all subscriptions for this event
-    this.subscriptions.forEach(subscription => {
-      if (subscription.event === message.event) {
-        try {
-          subscription.handler(message.data)
-        }
-        catch (error) {
-          console.error(`Error handling event ${message.event}:`, error)
-        }
-      }
-    })
-  }
-
-  // Subscribe to an event
-  // eslint-disable-next-line ts/no-explicit-any
-  subscribe<T = any>(event: string, handler: EventHandler<T>): string {
-    const id = uuidv4()
-
-    this.subscriptions.set(id, {
-      id,
-      event,
-      handler,
-    })
-
-    // Ensure WebSocket is connected
-    if (!this.isConnected) {
-      this.connect()
-    }
-
-    return id
-  }
-
-  // Unsubscribe from an event
-  unsubscribe(subscriptionId: string): void {
-    this.subscriptions.delete(subscriptionId)
-  }
-
-  // Disconnect WebSocket
-  disconnect(): void {
-    this.isConnected = false
-    this.subscriptions.clear()
-
-    if (this.ws) {
-      this.ws.close()
-      this.ws = null
-    }
-  }
-
-  // Get connection status
-  getConnectionStatus(): boolean {
-    return this.isConnected
-  }
-}
-
-// WebSocket Event Bus Composable
-export function useWebSocketEventBus() {
-  const eventBus = WebSocketEventBus.getInstance()
-  const subscriptionIds = ref<string[]>([])
-
-  // Subscribe to an event
-  // eslint-disable-next-line ts/no-explicit-any
-  function subscribe<T = any>(event: string, handler: EventHandler<T>): string {
-    const id = eventBus.subscribe(event, handler)
-    subscriptionIds.value.push(id)
-    return id
-  }
-
-  // Unsubscribe from an event
-  function unsubscribe(subscriptionId: string): void {
-    eventBus.unsubscribe(subscriptionId)
-    const index = subscriptionIds.value.indexOf(subscriptionId)
-    if (index > -1) {
-      subscriptionIds.value.splice(index, 1)
-    }
-  }
-
-  // Connect to WebSocket
-  function connect(): void {
-    eventBus.connect()
-  }
-
-  // Disconnect from WebSocket
-  function disconnect(): void {
-    eventBus.disconnect()
-  }
-
-  // Get connection status
-  const isConnected = computed(() => eventBus.getConnectionStatus())
-
-  // Auto cleanup on unmount
-  if (getCurrentInstance()) {
-    onUnmounted(() => {
-      // Unsubscribe all subscriptions for this component
-      subscriptionIds.value.forEach(id => {
-        eventBus.unsubscribe(id)
-      })
-      subscriptionIds.value = []
-    })
-  }
-
-  return {
-    subscribe,
-    unsubscribe,
-    connect,
-    disconnect,
-    isConnected: readonly(isConnected),
-  }
-}

+ 31 - 11
app/src/lib/websocket/index.ts

@@ -1,12 +1,13 @@
+import type { UseWebSocketOptions, UseWebSocketReturn } from '@vueuse/core'
+import { useWebSocket as vueUseWebSocket } from '@vueuse/core'
 import { storeToRefs } from 'pinia'
-import ReconnectingWebSocket from 'reconnecting-websocket'
 import { urlJoin } from '@/lib/helper'
 import { useSettingsStore, useUserStore } from '@/pinia'
 
 /**
  * Build WebSocket URL based on environment
  */
-function buildWebSocketUrl(url: string, token: string, shortToken: string, nodeId?: number): string {
+export function buildWebSocketUrl(url: string, token: string, shortToken: string, nodeId?: number): string {
   const node_id = nodeId && nodeId > 0 ? `&x_node_id=${nodeId}` : ''
 
   // Use shortToken if available (without base64 encoding), otherwise use regular token (with base64 encoding)
@@ -24,17 +25,36 @@ function buildWebSocketUrl(url: string, token: string, shortToken: string, nodeI
   return urlJoin(protocol + window.location.host, window.location.pathname, url, `?${authParam}`, node_id)
 }
 
-function ws(url: string, reconnect: boolean = true): ReconnectingWebSocket | WebSocket {
+/**
+ * Create a WebSocket connection using VueUse
+ * @param url - The WebSocket endpoint URL
+ * @param reconnect - Whether to enable auto-reconnect (default: true)
+ * @param options - Additional VueUse WebSocket options
+ */
+// eslint-disable-next-line ts/no-explicit-any
+export function useWebSocket<T = any>(
+  url: string,
+  reconnect: boolean = true,
+  options?: Omit<UseWebSocketOptions, 'autoReconnect'>,
+): UseWebSocketReturn<T> {
   const user = useUserStore()
   const settings = useSettingsStore()
   const { token, shortToken } = storeToRefs(user)
 
-  const _url = buildWebSocketUrl(url, token.value, shortToken.value, settings.node.id)
-
-  if (reconnect)
-    return new ReconnectingWebSocket(_url, undefined, { maxRetries: 10 })
-
-  return new WebSocket(_url)
+  const wsUrl = buildWebSocketUrl(url, token.value, shortToken.value, settings.node.id)
+
+  return vueUseWebSocket<T>(wsUrl, {
+    autoReconnect: reconnect
+      ? {
+          retries: 10,
+          delay: 1000,
+          onFailed: () => {
+            console.warn(`Failed to reconnect to WebSocket after 10 retries: ${url}`)
+          },
+        }
+      : false,
+    immediate: true,
+    autoClose: true,
+    ...options,
+  })
 }
-
-export default ws

+ 5 - 0
app/src/pinia/index.ts

@@ -4,6 +4,10 @@ import { useProxyAvailabilityStore } from './moudule/proxyAvailability'
 import { useSettingsStore } from './moudule/settings'
 import { useTerminalStore } from './moudule/terminal'
 import { useUserStore } from './moudule/user'
+import { useWebSocketEventBusStore } from './moudule/websocketEventBus'
+
+// Re-export types
+export type { EventHandler, EventSubscription, WebSocketMessage } from './moudule/websocketEventBus'
 
 export {
   useAppStore,
@@ -12,4 +16,5 @@ export {
   useSettingsStore,
   useTerminalStore,
   useUserStore,
+  useWebSocketEventBusStore,
 }

+ 69 - 79
app/src/pinia/moudule/nodeAvailability.ts

@@ -1,23 +1,53 @@
-import type ReconnectingWebSocket from 'reconnecting-websocket'
-import type { Node } from '@/api/node'
+import type { AnalyticNode, Node } from '@/api/node'
+import analytic from '@/api/analytic'
 import nodeApi from '@/api/node'
-import ws from '@/lib/websocket'
-
-export interface NodeStatus {
-  id: number
-  name: string
-  status: boolean
-  url?: string
-  token?: string
-  enabled?: boolean
-}
+import { useWebSocket } from '@/lib/websocket'
 
 export const useNodeAvailabilityStore = defineStore('nodeAvailability', () => {
-  const nodes = ref<Record<number, NodeStatus>>({})
-  const websocket = shallowRef<ReconnectingWebSocket | WebSocket>()
+  const nodes = ref<Record<string, Partial<AnalyticNode>>>({})
+  const websocket = shallowRef<WebSocket | null>(null)
   const isConnected = ref(false)
   const isInitialized = ref(false)
   const lastUpdateTime = ref<string>('')
+  const isConnecting = ref(false)
+  const nodeList = computed<Partial<AnalyticNode>[]>(() => Object.values(nodes.value))
+
+  const socket = useWebSocket<Record<string, Partial<AnalyticNode>>>(analytic.nodesWebSocketUrl, true, {
+    immediate: false,
+    autoClose: false,
+    onConnected(webSocket) {
+      websocket.value = webSocket
+      isConnected.value = true
+      isConnecting.value = false
+    },
+    onDisconnected() {
+      isConnected.value = false
+      isConnecting.value = false
+      websocket.value = null
+    },
+    onError(event) {
+      console.warn('Failed to connect to nodes WebSocket endpoint', event)
+      isConnected.value = false
+      isConnecting.value = false
+    },
+    onMessage(_, event) {
+      try {
+        const nodesData = JSON.parse(event.data) as Record<string, Partial<AnalyticNode>>
+
+        Object.keys(nodesData).forEach((nodeIdStr: string) => {
+          const nodeId = Number.parseInt(nodeIdStr)
+          const nodeData = nodesData[nodeIdStr]
+
+          nodes.value[nodeId] = nodeData
+        })
+
+        lastUpdateTime.value = new Date().toISOString()
+      }
+      catch (error) {
+        console.error('Error parsing WebSocket message:', error)
+      }
+    },
+  })
 
   // Initialize node data from API and WebSocket
   async function initialize() {
@@ -28,13 +58,13 @@ export const useNodeAvailabilityStore = defineStore('nodeAvailability', () => {
     try {
       // First, load the initial node data from API
       const response = await nodeApi.getList({ enabled: true })
-      const nodeMap: Record<number, NodeStatus> = {}
+      const nodeMap: Record<string, Partial<AnalyticNode>> = {}
 
       response.data.forEach((node: Node) => {
         nodeMap[node.id] = {
           id: node.id,
           name: node.name,
-          status: node.status ?? false,
+          status: node.status,
           url: node.url,
           token: node.token,
           enabled: true,
@@ -57,71 +87,27 @@ export const useNodeAvailabilityStore = defineStore('nodeAvailability', () => {
 
   // Connect to WebSocket for real-time updates
   function connectWebSocket() {
-    if (websocket.value && isConnected.value) {
+    const readyState = socket.ws.value?.readyState
+
+    if (readyState === WebSocket.OPEN) {
+      isConnected.value = true
+      isConnecting.value = false
       return
     }
 
-    // Close existing connection if any
-    if (websocket.value) {
-      websocket.value.close()
+    if (readyState === WebSocket.CONNECTING || isConnecting.value) {
+      isConnecting.value = true
+      return
     }
 
-    try {
-      // Create new WebSocket connection
-      const socket = ws('/api/analytic/nodes', true)
-      websocket.value = socket
+    isConnecting.value = true
 
-      socket.onopen = () => {
-        isConnected.value = true
-      }
-
-      socket.onmessage = event => {
-        try {
-          const nodesData = JSON.parse(event.data)
-
-          // The /api/analytic/nodes endpoint returns an object with node IDs as keys
-          // Update existing nodes' status or create new ones if not exist
-          Object.keys(nodesData).forEach((nodeIdStr: string) => {
-            const nodeId = Number.parseInt(nodeIdStr)
-            const nodeData = nodesData[nodeIdStr]
-
-            // Update existing node or create new one
-            const existingNode = nodes.value[nodeId]
-            if (existingNode) {
-              // Update status for existing node
-              existingNode.status = nodeData.status ?? false
-            }
-            else {
-              // Create new node entry (this should be initialized from API call)
-              nodes.value[nodeId] = {
-                id: nodeId,
-                name: nodeData.name || `Node ${nodeId}`,
-                status: nodeData.status ?? false,
-                url: nodeData.url,
-                token: nodeData.token,
-                enabled: true,
-              }
-            }
-          })
-
-          lastUpdateTime.value = new Date().toISOString()
-        }
-        catch (error) {
-          console.error('Error parsing WebSocket message:', error)
-        }
-      }
-
-      socket.onclose = () => {
-        isConnected.value = false
-      }
-
-      socket.onerror = error => {
-        console.warn('Failed to connect to nodes WebSocket endpoint', error)
-        isConnected.value = false
-      }
+    try {
+      socket.open()
     }
     catch (error) {
       console.error('Failed to create WebSocket connection:', error)
+      isConnecting.value = false
     }
   }
 
@@ -132,25 +118,27 @@ export const useNodeAvailabilityStore = defineStore('nodeAvailability', () => {
 
   // Stop monitoring and cleanup
   function stopMonitoring() {
-    if (websocket.value) {
-      websocket.value.close()
-      websocket.value = undefined
-      isConnected.value = false
+    if (socket.ws.value && socket.ws.value.readyState !== WebSocket.CLOSED) {
+      socket.close()
     }
+
+    websocket.value = null
+    isConnected.value = false
+    isConnecting.value = false
   }
 
   // Get node status by ID
-  function getNodeStatus(nodeId: number): NodeStatus | undefined {
+  function getNodeStatus(nodeId: number): Partial<AnalyticNode> | undefined {
     return nodes.value[nodeId]
   }
 
   // Get all nodes as array
-  function getAllNodes(): NodeStatus[] {
+  function getAllNodes(): Partial<AnalyticNode>[] {
     return Object.values(nodes.value)
   }
 
   // Get enabled nodes only
-  function getEnabledNodes(): NodeStatus[] {
+  function getEnabledNodes(): Partial<AnalyticNode>[] {
     return Object.values(nodes.value).filter(node => node.enabled)
   }
 
@@ -175,9 +163,11 @@ export const useNodeAvailabilityStore = defineStore('nodeAvailability', () => {
 
   return {
     nodes: readonly(nodes),
+    nodeList,
     isConnected: readonly(isConnected),
     isInitialized: readonly(isInitialized),
     lastUpdateTime: readonly(lastUpdateTime),
+    isConnecting: readonly(isConnecting),
     initialize,
     startMonitoring,
     stopMonitoring,

+ 10 - 67
app/src/pinia/moudule/proxyAvailability.ts

@@ -1,7 +1,5 @@
-import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type { ProxyTarget } from '@/api/site'
 import type { UpstreamAvailabilityResponse, UpstreamStatus } from '@/api/upstream'
-import analytic from '@/api/analytic'
 import upstream from '@/api/upstream'
 import { useNodeAvailabilityStore } from './nodeAvailability'
 
@@ -25,8 +23,8 @@ export type ProxyAvailabilityResult = UpstreamStatus
 export const useProxyAvailabilityStore = defineStore('proxyAvailability', () => {
   const availabilityResults = ref<Record<string, ProxyAvailabilityResult>>({})
   const upstreamStatusMap = ref<UpstreamStatusMap>({})
-  const websocket = shallowRef<ReconnectingWebSocket | WebSocket>()
-  const nodeAnalyticsWebsocket = shallowRef<ReconnectingWebSocket | WebSocket>()
+  const websocket = shallowRef<WebSocket>()
+  const nodeAnalyticsWebsocket = shallowRef<WebSocket>()
   const isConnected = ref(false)
   const isNodeAnalyticsConnected = ref(false)
   const isInitialized = ref(false)
@@ -76,7 +74,7 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
   }
 
   // Connect to WebSocket for real-time updates
-  function connectWebSocket() {
+  async function connectWebSocket() {
     if (websocket.value && isConnected.value) {
       return
     }
@@ -88,14 +86,15 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
 
     try {
       // Create new WebSocket connection
-      const ws = upstream.availabilityWebSocket()
-      websocket.value = ws
+      const { useWebSocket } = await import('@/lib/websocket')
+      const { ws } = useWebSocket(upstream.availabilityWebSocketUrl)
+      websocket.value = ws.value!
 
-      ws.onopen = () => {
+      ws.value!.onopen = () => {
         isConnected.value = true
       }
 
-      ws.onmessage = (e: MessageEvent) => {
+      ws.value!.onmessage = (e: MessageEvent) => {
         try {
           const results = JSON.parse(e.data) as Record<string, ProxyAvailabilityResult>
           // Update availability results with latest data
@@ -107,11 +106,11 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
         }
       }
 
-      ws.onclose = () => {
+      ws.value!.onclose = () => {
         isConnected.value = false
       }
 
-      ws.onerror = error => {
+      ws.value!.onerror = error => {
         console.error('Proxy availability WebSocket error:', error)
         isConnected.value = false
       }
@@ -121,60 +120,6 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
     }
   }
 
-  // Connect to node analytics WebSocket for multi-node upstream data
-  function connectNodeAnalyticsWebSocket() {
-    if (nodeAnalyticsWebsocket.value && isNodeAnalyticsConnected.value) {
-      return
-    }
-
-    // Close existing connection if any
-    if (nodeAnalyticsWebsocket.value) {
-      nodeAnalyticsWebsocket.value.close()
-    }
-
-    try {
-      // Create new WebSocket connection to node analytics
-      const ws = analytic.nodes()
-      nodeAnalyticsWebsocket.value = ws
-
-      ws.onopen = () => {
-        isNodeAnalyticsConnected.value = true
-      }
-
-      ws.onmessage = (e: MessageEvent) => {
-        try {
-          const nodeData = JSON.parse(e.data)
-
-          // Process each node's data
-          for (const [nodeIdStr, nodeInfo] of Object.entries(nodeData)) {
-            const nodeId = Number.parseInt(nodeIdStr)
-            if (nodeInfo && typeof nodeInfo === 'object' && 'upstream_status_map' in nodeInfo) {
-              const upstreamData = nodeInfo.upstream_status_map as Record<string, NodeUpstreamStatus>
-              updateUpstreamStatusMapFromNode(nodeId, upstreamData)
-            }
-          }
-
-          lastUpdateTime.value = new Date().toISOString()
-        }
-        catch (error) {
-          console.error('Failed to parse node analytics WebSocket message:', error)
-        }
-      }
-
-      ws.onclose = () => {
-        isNodeAnalyticsConnected.value = false
-      }
-
-      ws.onerror = error => {
-        console.error('Node analytics WebSocket error:', error)
-        isNodeAnalyticsConnected.value = false
-      }
-    }
-    catch (error) {
-      console.error('Failed to create node analytics WebSocket connection:', error)
-    }
-  }
-
   // Start monitoring (initialize + WebSocket)
   async function startMonitoring() {
     // Initialize node store first
@@ -184,7 +129,6 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
 
     await initialize()
     connectWebSocket()
-    connectNodeAnalyticsWebSocket()
   }
 
   // Stop monitoring and cleanup
@@ -287,7 +231,6 @@ export const useProxyAvailabilityStore = defineStore('proxyAvailability', () =>
     startMonitoring,
     stopMonitoring,
     connectWebSocket,
-    connectNodeAnalyticsWebSocket,
     getAvailabilityResult,
     hasAvailabilityData,
     getAllTargets,

+ 148 - 0
app/src/pinia/moudule/websocketEventBus.ts

@@ -0,0 +1,148 @@
+import { v4 as uuidv4 } from 'uuid'
+import { useWebSocket } from '@/lib/websocket'
+
+export interface WebSocketMessage {
+  event: string
+  // eslint-disable-next-line ts/no-explicit-any
+  data: any
+}
+
+// eslint-disable-next-line ts/no-explicit-any
+export type EventHandler<T = any> = (data: T) => void
+
+export interface EventSubscription {
+  id: string
+  event: string
+  handler: EventHandler
+}
+
+export const useWebSocketEventBusStore = defineStore('websocketEventBus', () => {
+  // State
+  const ws = ref<WebSocket | null>(null)
+  const subscriptions = ref<Map<string, EventSubscription>>(new Map())
+  const isConnected = ref(false)
+  const isConnecting = ref(false)
+
+  // Handle incoming WebSocket message
+  function handleMessage(message: WebSocketMessage): void {
+    // Find all subscriptions for this event
+    subscriptions.value.forEach(subscription => {
+      if (subscription.event === message.event) {
+        try {
+          subscription.handler(message.data)
+        }
+        catch (error) {
+          console.error(`Error handling event ${message.event}:`, error)
+        }
+      }
+    })
+  }
+
+  // Connect to WebSocket
+  const socket = useWebSocket<WebSocketMessage>('/api/events', true, {
+    immediate: false,
+    autoClose: false,
+    onConnected(webSocket) {
+      ws.value = webSocket
+      isConnected.value = true
+      isConnecting.value = false
+    },
+    onDisconnected() {
+      isConnected.value = false
+      isConnecting.value = false
+      ws.value = null
+    },
+    onError(event) {
+      console.error('WebSocket error:', event)
+      isConnected.value = false
+      isConnecting.value = false
+    },
+    onMessage(_, event) {
+      try {
+        const message: WebSocketMessage = JSON.parse(event.data)
+        handleMessage(message)
+      }
+      catch (error) {
+        console.error('Error parsing WebSocket message:', error)
+      }
+    },
+  })
+
+  function connect(): void {
+    const readyState = socket.ws.value?.readyState
+
+    if (readyState === WebSocket.OPEN) {
+      isConnected.value = true
+      isConnecting.value = false
+      return
+    }
+
+    if (readyState === WebSocket.CONNECTING || isConnecting.value) {
+      isConnecting.value = true
+      return
+    }
+
+    isConnecting.value = true
+
+    try {
+      socket.open()
+    }
+    catch (error) {
+      console.error('Failed to initiate WebSocket connection:', error)
+      isConnecting.value = false
+    }
+  }
+
+  // Subscribe to an event
+  // eslint-disable-next-line ts/no-explicit-any
+  function subscribe<T = any>(event: string, handler: EventHandler<T>): string {
+    const id = uuidv4()
+
+    subscriptions.value.set(id, {
+      id,
+      event,
+      handler,
+    })
+
+    // Ensure WebSocket is connected
+    if (!isConnected.value) {
+      connect()
+    }
+
+    return id
+  }
+
+  // Unsubscribe from an event
+  function unsubscribe(subscriptionId: string): void {
+    subscriptions.value.delete(subscriptionId)
+  }
+
+  // Disconnect WebSocket
+  function disconnect(): void {
+    isConnected.value = false
+    isConnecting.value = false
+    subscriptions.value.clear()
+
+    if (socket.ws.value && socket.ws.value.readyState !== WebSocket.CLOSED) {
+      socket.close()
+    }
+
+    ws.value = null
+  }
+
+  // Get all subscriptions for debugging
+  const allSubscriptions = computed(() => Array.from(subscriptions.value.values()))
+
+  return {
+    // State (readonly)
+    isConnected: readonly(isConnected),
+    allSubscriptions,
+    isConnecting: readonly(isConnecting),
+
+    // Actions
+    connect,
+    disconnect,
+    subscribe,
+    unsubscribe,
+  }
+})

+ 1 - 1
app/src/version.json

@@ -1 +1 @@
-{"version":"2.2.0","build_id":3,"total_build":502}
+{"version":"2.2.0","build_id":6,"total_build":505}

+ 5 - 4
app/src/views/certificate/components/RemoveCert.vue

@@ -2,7 +2,7 @@
 import type { Cert } from '@/api/cert'
 import cert from '@/api/cert'
 import { AutoCertState } from '@/constants'
-import websocket from '@/lib/websocket'
+import { useWebSocket } from '@/lib/websocket'
 
 const props = defineProps<{
   id: number
@@ -42,9 +42,10 @@ function handleConfirm() {
 
   if (shouldRevoke.value) {
     // Revoke certificate using WebSocket
-    const ws = websocket(`/api/certs/${props.id}/revoke`, false)
+    const { ws } = useWebSocket(`/api/certs/${props.id}/revoke`, false)
+    const socket = ws.value!
 
-    ws.onmessage = m => {
+    socket.onmessage = m => {
       const response = JSON.parse(m.data)
 
       if (response.status === 'success') {
@@ -60,7 +61,7 @@ function handleConfirm() {
       }
     }
 
-    ws.onerror = () => {
+    socket.onerror = () => {
       message.error($gettext('WebSocket connection error'))
       confirmLoading.value = false
     }

+ 4 - 4
app/src/views/dashboard/NginxDashBoard.vue

@@ -1,11 +1,10 @@
 <script setup lang="ts">
-import type ReconnectingWebSocket from 'reconnecting-websocket'
 import { ClockCircleOutlined, ReloadOutlined } from '@ant-design/icons-vue'
 import { storeToRefs } from 'pinia'
 import ngx from '@/api/ngx'
 import { useNginxPerformance } from '@/composables/useNginxPerformance'
 import { NginxStatus } from '@/constants'
-import ws from '@/lib/websocket'
+import { useWebSocket } from '@/lib/websocket'
 import { useGlobalStore } from '@/pinia'
 import ConnectionMetricsCard from './components/ConnectionMetricsCard.vue'
 import ParamsOptimization from './components/ParamsOptimization.vue'
@@ -32,7 +31,7 @@ const {
 } = useNginxPerformance()
 
 // WebSocket connection
-const wsInstance = shallowRef<WebSocket | ReconnectingWebSocket | null>(null)
+const wsInstance = shallowRef<WebSocket | null>(null)
 
 // Toggle stub_status module status
 async function toggleStubStatus() {
@@ -67,7 +66,8 @@ function connectWebSocket() {
   loading.value = true
 
   try {
-    const wsConnection = ws('api/nginx/detail_status/ws')
+    const { ws } = useWebSocket('/api/nginx/detail_status/ws', false)
+    const wsConnection = ws.value!
     wsInstance.value = wsConnection
 
     wsConnection.onmessage = event => {

+ 5 - 47
app/src/views/dashboard/Nodes.vue

@@ -3,9 +3,7 @@ import type { Ref } from 'vue'
 import type { Namespace } from '@/api/namespace'
 import type { Node } from '@/api/node'
 import Icon, { LinkOutlined, ThunderboltOutlined } from '@ant-design/icons-vue'
-import analytic from '@/api/analytic'
 import namespaceApi from '@/api/namespace'
-import nodeApi from '@/api/node'
 import logo from '@/assets/img/logo.png'
 import pulse from '@/assets/svg/pulse.svg?component'
 import NamespaceTabs from '@/components/NamespaceTabs'
@@ -16,26 +14,16 @@ import { version } from '@/version.json'
 import NodeAnalyticItem from './components/NodeAnalyticItem.vue'
 
 const nodeStore = useNodeAvailabilityStore()
-const data = ref([]) as Ref<Node[]>
+const { nodeList } = storeToRefs(nodeStore)
 const activeNamespaceKey = ref<string | number>(0)
 
-const nodeMap = computed(() => {
-  const o = {} as Record<number, Node>
-
-  data.value.forEach(v => {
-    o[v.id] = v
-  })
-
-  return o
-})
-
 // Get namespaces to filter nodes
 const namespaces = ref([]) as Ref<Namespace[]>
 
 // Filtered nodes based on active namespace
 const filteredNodes = computed(() => {
   if (activeNamespaceKey.value === 0) {
-    return data.value
+    return nodeList.value
   }
 
   const currentNamespace = namespaces.value.find(ns => ns.id === Number(activeNamespaceKey.value))
@@ -43,7 +31,8 @@ const filteredNodes = computed(() => {
     return []
   }
 
-  return data.value.filter(node => currentNamespace.sync_node_ids.includes(node.id))
+  return nodeList.value
+    .filter(node => currentNamespace.sync_node_ids.includes(node.id ?? 0))
 })
 
 // Load all namespaces (handle pagination)
@@ -74,40 +63,9 @@ async function loadAllNamespaces() {
 }
 
 onMounted(() => {
-  nodeApi.getList({ enabled: true }).then(r => {
-    data.value.push(...r.data)
-  })
-
   loadAllNamespaces()
 })
 
-onMounted(() => {
-  const websocket = analytic.nodes()
-  websocket.onmessage = async m => {
-    const nodes = JSON.parse(m.data)
-
-    Object.keys(nodes).forEach((v: string) => {
-      const key = Number.parseInt(v)
-
-      // update node online status
-      if (nodeMap.value[key]) {
-        Object.assign(nodeMap.value[key], nodes[key])
-        nodeMap.value[key].response_at = new Date()
-
-        // Also update global store
-        const nodeStatus = nodeStore.getNodeStatus(key)
-        if (nodeStatus) {
-          nodeStatus.status = nodes[key].status ?? false
-        }
-      }
-    })
-  }
-
-  onUnmounted(() => {
-    websocket.close()
-  })
-})
-
 const settingsStore = useSettingsStore()
 const { node } = storeToRefs(settingsStore)
 
@@ -120,7 +78,7 @@ const visible = computed(() => {
   if (node.value.id > 0)
     return false
   else
-    return data.value?.length
+    return nodeList.value.length
 })
 </script>
 

+ 4 - 3
app/src/views/dashboard/ServerAnalytic.vue

@@ -1,14 +1,14 @@
 <script setup lang="ts">
-import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type { CPUInfoStat, DiskStat, HostInfoStat, LoadStat, MemStat } from '@/api/analytic'
 import type { Series } from '@/components/Chart/types'
 import analytic from '@/api/analytic'
 import AreaChart from '@/components/Chart/AreaChart.vue'
 import RadialBarChart from '@/components/Chart/RadialBarChart.vue'
 import { bytesToSize } from '@/lib/helper'
+import { useWebSocket } from '@/lib/websocket'
 import { useSettingsStore } from '@/pinia'
 
-let websocket: ReconnectingWebSocket | WebSocket
+let websocket: WebSocket
 
 const settings = useSettingsStore()
 
@@ -85,7 +85,8 @@ onMounted(() => {
     disk_io_analytic[0].data = disk_io_analytic[0].data.concat(r.disk_io.writes)
     disk_io_analytic[1].data = disk_io_analytic[1].data.concat(r.disk_io.reads)
 
-    websocket = analytic.server()
+    const { ws } = useWebSocket(analytic.serverWebSocketUrl)
+    websocket = ws.value!
     websocket.onmessage = wsOnMessage
   })
 })

+ 5 - 4
app/src/views/dashboard/SiteNavigation.vue

@@ -1,5 +1,4 @@
 <script setup lang="ts">
-import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type { SiteInfo } from '@/api/site_navigation'
 import { GlobalOutlined } from '@ant-design/icons-vue'
 import Sortable from 'sortablejs'
@@ -19,7 +18,7 @@ const configModalVisible = ref(false)
 const configTarget = ref<SiteInfo>()
 
 let sortableInstance: Sortable | null = null
-let websocket: ReconnectingWebSocket | WebSocket | null = null
+let websocket: WebSocket | null = null
 
 // Display sites - use draggable sites in settings mode, backend sorted sites otherwise
 const displaySites = computed(() => {
@@ -27,9 +26,11 @@ const displaySites = computed(() => {
 })
 
 // WebSocket connection
-function connectWebSocket() {
+async function connectWebSocket() {
   try {
-    websocket = siteNavigationApi.createWebSocket()
+    const { useWebSocket } = await import('@/lib/websocket')
+    const { ws } = useWebSocket(siteNavigationApi.websocketUrl)
+    websocket = ws.value!
 
     if (!websocket) {
       isConnected.value = false

+ 3 - 22
app/src/views/dashboard/components/NodeAnalyticItem.vue

@@ -1,35 +1,16 @@
 <script setup lang="ts">
-import type { Node } from '@/api/node'
+import type { AnalyticNode } from '@/api/node'
 import Icon, { ArrowDownOutlined, ArrowUpOutlined, DatabaseOutlined, LineChartOutlined, SendOutlined } from '@ant-design/icons-vue'
 import cpu from '@/assets/svg/cpu.svg?component'
 import memory from '@/assets/svg/memory.svg?component'
 import UsageProgressLine from '@/components/Chart/UsageProgressLine.vue'
 import { bytesToSize } from '@/lib/helper'
 
-interface ExtendedNode extends Node {
-  version?: string
-  avg_load?: {
-    load1: number
-    load5: number
-    load15: number
-  }
-  network?: {
-    bytesSent: number
-    bytesRecv: number
-  }
-  cpu_percent?: number
-  cpu_num?: number
-  memory_percent?: number
-  memory_total?: string
-  disk_percent?: number
-  disk_total?: string
-}
-
 defineProps<{
-  item: ExtendedNode
+  item: AnalyticNode
   currentNodeId?: number
   localVersion?: string
-  onLinkStart?: (item: ExtendedNode) => void
+  onLinkStart?: (item: AnalyticNode) => void
 }>()
 </script>
 

+ 15 - 9
app/src/views/nginx_log/NginxLogList.vue

@@ -10,8 +10,7 @@ import dayjs from 'dayjs'
 import nginxLog from '@/api/nginx_log'
 import { DevDebugPanel } from '@/components/DevDebugPanel'
 import { TabFilter } from '@/components/TabFilter'
-import { useWebSocketEventBus } from '@/composables/useWebSocketEventBus'
-import { useGlobalStore } from '@/pinia'
+import { useGlobalStore, useWebSocketEventBusStore } from '@/pinia'
 import IndexingSettingsModal from './components/IndexingSettingsModal.vue'
 import { useIndexProgress } from './composables/useIndexProgress'
 import IndexProgressBar from './indexing/components/IndexProgressBar.vue'
@@ -27,7 +26,8 @@ const advancedIndexingEnabled = ref(false)
 const enableIndexingLoading = ref(false)
 
 // WebSocket event bus and global store
-const { subscribe } = useWebSocketEventBus()
+const websocketEventBus = useWebSocketEventBusStore()
+const subscriptionIds = ref<string[]>([])
 const globalStore = useGlobalStore()
 const { nginxLogStatus, processingStatus } = storeToRefs(globalStore)
 
@@ -92,7 +92,7 @@ onMounted(async () => {
   await checkAdvancedIndexingStatus()
 
   // Subscribe to processing status events
-  subscribe('processing_status', data => {
+  subscriptionIds.value.push(websocketEventBus.subscribe('processing_status', data => {
     const wasIndexing = processingStatus.value?.nginx_log_indexing
     processingStatus.value = data
 
@@ -111,27 +111,33 @@ onMounted(async () => {
         }
       }, 500)
     }
-  })
+  }))
 
   // Subscribe to nginx log status events (backward compatibility)
-  subscribe('nginx_log_status', data => {
+  subscriptionIds.value.push(websocketEventBus.subscribe('nginx_log_status', data => {
     nginxLogStatus.value = data
-  })
+  }))
 
   // Subscribe to index ready events to refresh the list
-  subscribe('nginx_log_index_ready', () => {
+  subscriptionIds.value.push(websocketEventBus.subscribe('nginx_log_index_ready', () => {
     // Refresh the table data
     if (stdCurdRef.value) {
       setTimeout(() => {
         stdCurdRef.value.refresh()
       }, 1000)
     }
-  })
+  }))
 })
 
 onUnmounted(() => {
   // Clean up auto refresh timer
   stopAutoRefresh()
+
+  // Clean up WebSocket subscriptions
+  subscriptionIds.value.forEach(id => {
+    websocketEventBus.unsubscribe(id)
+  })
+  subscriptionIds.value = []
 })
 
 // Base columns that are always visible

+ 19 - 8
app/src/views/nginx_log/composables/useIndexProgress.ts

@@ -1,5 +1,5 @@
 import type { IndexProgress } from '../indexing/components/IndexProgressBar.vue'
-import { useWebSocketEventBus } from '@/composables/useWebSocketEventBus'
+import { useWebSocketEventBusStore } from '@/pinia'
 
 export interface IndexProgressData {
   log_path: string
@@ -20,7 +20,8 @@ export interface IndexCompleteData {
 }
 
 export function useIndexProgress() {
-  const { subscribe } = useWebSocketEventBus()
+  const websocketEventBus = useWebSocketEventBusStore()
+  const subscriptionIds = ref<string[]>([])
 
   // Store progress for each log file
   const progressMap = ref<Map<string, IndexProgress>>(new Map())
@@ -39,7 +40,7 @@ export function useIndexProgress() {
   })
 
   // Subscribe to progress events
-  subscribe<IndexProgressData>('nginx_log_index_progress', data => {
+  subscriptionIds.value.push(websocketEventBus.subscribe<IndexProgressData>('nginx_log_index_progress', data => {
     // Ignore status_update events - these are just status changes, not actual progress
     if (data.stage === 'status_update') {
       // If there's no actual indexing progress, remove any existing progress bar
@@ -63,10 +64,10 @@ export function useIndexProgress() {
 
     // Update global progress
     updateGlobalProgress()
-  })
+  }))
 
   // Subscribe to completion events
-  subscribe<IndexCompleteData>('nginx_log_index_complete', data => {
+  subscriptionIds.value.push(websocketEventBus.subscribe<IndexCompleteData>('nginx_log_index_complete', data => {
     if (data.success) {
       // Keep progress for a short time to show completion, then remove
       setTimeout(() => {
@@ -92,17 +93,27 @@ export function useIndexProgress() {
         updateGlobalProgress()
       }, 5000)
     }
-  })
+  }))
 
   // Subscribe to processing status events for global state
-  subscribe<{ nginx_log_indexing: boolean }>('processing_status', data => {
+  subscriptionIds.value.push(websocketEventBus.subscribe<{ nginx_log_indexing: boolean }>('processing_status', data => {
     isGlobalIndexing.value = data.nginx_log_indexing
     if (!data.nginx_log_indexing) {
       // Clear all progress when indexing stops
       progressMap.value.clear()
       updateGlobalProgress()
     }
-  })
+  }))
+
+  // Auto cleanup on unmount
+  if (getCurrentInstance()) {
+    onUnmounted(() => {
+      subscriptionIds.value.forEach(id => {
+        websocketEventBus.unsubscribe(id)
+      })
+      subscriptionIds.value = []
+    })
+  }
 
   function updateGlobalProgress() {
     const activeFiles = Array.from(progressMap.value.values())

+ 4 - 4
app/src/views/nginx_log/raw/RawLogViewer.vue

@@ -1,10 +1,9 @@
 <script setup lang="ts">
-import type ReconnectingWebSocket from 'reconnecting-websocket'
 import type { NginxLogData } from '@/api/nginx_log'
 import { useElementSize } from '@vueuse/core'
 import { debounce } from 'lodash'
 import nginx_log from '@/api/nginx_log'
-import ws from '@/lib/websocket'
+import { useWebSocket } from '@/lib/websocket'
 
 interface Props {
   logPath: string
@@ -18,7 +17,7 @@ const props = defineProps<Props>()
 const logContainer = useTemplateRef('logContainer')
 
 // Reactive data
-let websocket: ReconnectingWebSocket | WebSocket
+let websocket: WebSocket
 // Line-based storage for virtualization
 const lines = ref<string[]>([])
 const tailFragment = ref('') // carry over partial line when appending
@@ -113,7 +112,8 @@ const bottomPaddingStyle = computed(() => ({ height: `${bottomPaddingPx.value}px
 
 // WebSocket functions
 function openWs() {
-  websocket = ws('/api/nginx_log')
+  const { ws } = useWebSocket('/api/nginx_log', false)
+  websocket = ws.value!
 
   websocket.onopen = () => {
     websocket.send(JSON.stringify(control.value))

+ 10 - 3
app/src/views/nginx_log/structured/StructuredLogViewer.vue

@@ -5,8 +5,8 @@ import { DownOutlined, ExclamationCircleOutlined, ReloadOutlined } from '@ant-de
 import { Tag } from 'ant-design-vue'
 import dayjs from 'dayjs'
 import nginx_log from '@/api/nginx_log'
-import { useWebSocketEventBus } from '@/composables/useWebSocketEventBus'
 import { bytesToSize } from '@/lib/helper'
+import { useWebSocketEventBusStore } from '@/pinia'
 import LoadingState from '../components/LoadingState.vue'
 import { useIndexProgress } from '../composables/useIndexProgress'
 import SearchFilters from './components/SearchFilters.vue'
@@ -31,7 +31,8 @@ const { message } = App.useApp()
 const route = useRoute()
 
 // WebSocket event bus for index ready notifications
-const { subscribe: subscribeToEvent } = useWebSocketEventBus()
+const websocketEventBus = useWebSocketEventBusStore()
+let indexReadySubscriptionId: string | null = null
 
 // Index progress tracking for this specific file
 const { isFileIndexing } = useIndexProgress()
@@ -609,7 +610,7 @@ onMounted(async () => {
   }
 
   // Subscribe to index ready notifications
-  subscribeToEvent('nginx_log_index_ready', data => {
+  indexReadySubscriptionId = websocketEventBus.subscribe('nginx_log_index_ready', data => {
     setTimeout(() => handleIndexReadyNotification(data), 1000)
   })
 
@@ -633,6 +634,12 @@ onMounted(async () => {
   }
 })
 
+onUnmounted(() => {
+  if (indexReadySubscriptionId) {
+    websocketEventBus.unsubscribe(indexReadySubscriptionId)
+  }
+})
+
 // Watch for log path changes to clear cache and reload
 watch(logPath, (newPath, oldPath) => {
   // Clear cache when path changes

+ 8 - 7
app/src/views/node/BatchUpgrader.vue

@@ -4,7 +4,7 @@ import type { ReleaseInfo } from '@/api/upgrade'
 import { cloneDeep } from 'lodash'
 import { marked } from 'marked'
 import upgrade from '@/api/upgrade'
-import websocket from '@/lib/websocket'
+import { useWebSocket } from '@/lib/websocket'
 
 const emit = defineEmits(['success'])
 
@@ -91,19 +91,20 @@ async function performUpgrade() {
 
   for (let i = 0; i < nodesNum; i++) {
     await new Promise(resolve => {
-      const ws = websocket(`/api/upgrade/perform?x_node_id=${nodeIds.value[i]}`, false)
+      const { ws } = useWebSocket(`/api/upgrade/perform?x_node_id=${nodeIds.value[i]}`, false)
+      const socket = ws.value!
 
       let last = 0
 
-      ws.onopen = () => {
-        ws.send(JSON.stringify({
+      socket.onopen = () => {
+        socket.send(JSON.stringify({
           dry_run: dryRun.value,
           channel: channel.value,
         }))
       }
       let isFailed = false
 
-      ws.onmessage = async m => {
+      socket.onmessage = async m => {
         const r = JSON.parse(m.data)
         if (r.message)
           log(r.message)
@@ -125,11 +126,11 @@ async function performUpgrade() {
         }
       }
 
-      ws.onerror = () => {
+      socket.onerror = () => {
         resolve({})
       }
 
-      ws.onclose = async () => {
+      socket.onclose = async () => {
         resolve({})
 
         progressPercent.value = 100 * ((i + 1) / nodesNum)

+ 6 - 5
app/src/views/site/site_edit/components/Cert/ObtainCertLive.vue

@@ -2,7 +2,7 @@
 import type { Ref } from 'vue'
 import type { AutoCertOptions } from '@/api/auto_cert'
 import type { CertificateResult } from '@/api/cert'
-import websocket from '@/lib/websocket'
+import { useWebSocket } from '@/lib/websocket'
 import { useSiteEditorStore } from '../SiteEditor/store'
 
 const props = defineProps<{
@@ -45,17 +45,18 @@ async function issue_cert(config_name: string, server_name: string[], key_type:
 
     log($gettext('Getting the certificate, please wait...'))
 
-    const ws = websocket(`/api/domain/${config_name}/cert`, false)
+    const { ws } = useWebSocket(`/api/domain/${config_name}/cert`, false)
+    const socket = ws.value!
 
-    ws.onopen = () => {
-      ws.send(JSON.stringify({
+    socket.onopen = () => {
+      socket.send(JSON.stringify({
         server_name,
         ...props.options,
         key_type,
       }))
     }
 
-    ws.onmessage = async m => {
+    socket.onmessage = async m => {
       const r = JSON.parse(m.data)
 
       log(T(r))

+ 8 - 7
app/src/views/system/Upgrade.vue

@@ -5,7 +5,7 @@ import dayjs from 'dayjs'
 
 import { marked } from 'marked'
 import upgrade from '@/api/upgrade'
-import websocket from '@/lib/websocket'
+import { useWebSocket } from '@/lib/websocket'
 import version from '@/version.json'
 
 const route = useRoute()
@@ -79,12 +79,13 @@ async function performUpgrade() {
 
   log($gettext('Upgrading Nginx UI, please wait...'))
 
-  const ws = websocket('/api/upgrade/perform', false)
+  const { ws } = useWebSocket('/api/upgrade/perform', false)
+  const socket = ws.value!
 
   let last = 0
 
-  ws.onopen = () => {
-    ws.send(JSON.stringify({
+  socket.onopen = () => {
+    socket.send(JSON.stringify({
       dry_run: dryRun.value,
       channel: channel.value,
       test_commit_and_restart: testCommitAndRestart.value,
@@ -93,7 +94,7 @@ async function performUpgrade() {
 
   let isFailed = false
 
-  ws.onmessage = async m => {
+  socket.onmessage = async m => {
     const r = JSON.parse(m.data)
     if (r.message)
       log(r.message)
@@ -116,13 +117,13 @@ async function performUpgrade() {
     }
   }
 
-  ws.onerror = () => {
+  socket.onerror = () => {
     isFailed = true
     progressStatus.value = 'exception'
     modalClosable.value = true
   }
 
-  ws.onclose = async () => {
+  socket.onclose = async () => {
     if (isFailed)
       return
 

+ 3 - 2
app/src/views/terminal/components/TerminalStatusBar.vue

@@ -2,6 +2,7 @@
 import type { DiskStat, LoadStat, MemStat } from '@/api/analytic'
 import analytic from '@/api/analytic'
 import { formatDateTime } from '@/lib/helper'
+import { useWebSocket } from '@/lib/websocket'
 
 interface StatusData {
   version: string
@@ -118,8 +119,8 @@ async function initializeData() {
 // Connect to WebSocket for real-time updates
 function connectWebSocket() {
   try {
-    const ws = analytic.server()
-    websocket.value = ws as WebSocket
+    const { ws } = useWebSocket(analytic.serverWebSocketUrl)
+    websocket.value = ws.value!
 
     if (websocket.value) {
       websocket.value.onmessage = event => {

+ 1 - 1
go.mod

@@ -93,7 +93,7 @@ require (
 	github.com/alibabacloud-go/tea v1.3.13 // indirect
 	github.com/alibabacloud-go/tea-utils/v2 v2.0.7 // indirect
 	github.com/aliyun/aliyun-log-go-sdk v0.1.111 // indirect
-	github.com/aliyun/credentials-go v1.4.8 // indirect
+	github.com/aliyun/credentials-go v1.4.7 // indirect
 	github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect
 	github.com/aws/aws-sdk-go-v2/config v1.31.17 // indirect
 	github.com/aws/aws-sdk-go-v2/credentials v1.18.21 // indirect

+ 2 - 2
go.sum

@@ -750,8 +750,8 @@ github.com/aliyun/credentials-go v1.1.2/go.mod h1:ozcZaMR5kLM7pwtCMEpVmQ242suV6q
 github.com/aliyun/credentials-go v1.3.1/go.mod h1:8jKYhQuDawt8x2+fusqa1Y6mPxemTsBEN04dgcAcYz0=
 github.com/aliyun/credentials-go v1.3.6/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM=
 github.com/aliyun/credentials-go v1.4.5/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U=
-github.com/aliyun/credentials-go v1.4.8 h1:MEfZGWGC3L1icM1nGcYF8rWdQBG2k1Sya2pq9uRwd30=
-github.com/aliyun/credentials-go v1.4.8/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U=
+github.com/aliyun/credentials-go v1.4.7 h1:T17dLqEtPUFvjDRRb5giVvLh6dFT8IcNFJJb7MeyCxw=
+github.com/aliyun/credentials-go v1.4.7/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U=
 github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
 github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0=

+ 0 - 1
internal/analytic/node.go

@@ -38,7 +38,6 @@ type NodeStat struct {
 }
 
 type Node struct {
-	NodeID int `json:"node_id,omitempty"`
 	*model.Node
 	NodeStat
 	NodeInfo

+ 0 - 8
pnpm-lock.yaml

@@ -100,9 +100,6 @@ importers:
       pinia-plugin-persistedstate:
         specifier: ^4.7.1
         version: 4.7.1(pinia@3.0.4(typescript@5.9.3)(vue@3.5.24(typescript@5.9.3)))
-      reconnecting-websocket:
-        specifier: ^4.4.0
-        version: 4.4.0
       sortablejs:
         specifier: ^1.15.6
         version: 1.15.6
@@ -3906,9 +3903,6 @@ packages:
     resolution: {integrity: sha512-GDhwkLfywWL2s6vEjyhri+eXmfH6j1L7JE27WhqLeYzoh/A3DBaYGEj2H/HFZCn/kMfim73FXxEJTw06WtxQwg==}
     engines: {node: '>= 14.18.0'}
 
-  reconnecting-websocket@4.4.0:
-    resolution: {integrity: sha512-D2E33ceRPga0NvTDhJmphEgJ7FUYF0v4lr1ki0csq06OdlxKfugGzN0dSkxM/NfqCxYELK4KcaTOUOjTV6Dcng==}
-
   refa@0.12.1:
     resolution: {integrity: sha512-J8rn6v4DBb2nnFqkqwy6/NnTYMcgLA+sLr0iIO41qpv0n+ngb7ksag2tMRl0inb1bbO/esUwzW1vbJi7K0sI0g==}
     engines: {node: ^12.0.0 || ^14.0.0 || >=16.0.0}
@@ -8753,8 +8747,6 @@ snapshots:
 
   readdirp@4.1.2: {}
 
-  reconnecting-websocket@4.4.0: {}
-
   refa@0.12.1:
     dependencies:
       '@eslint-community/regexpp': 4.12.2