Browse Source

feat(nginx-performance-matrix): implement WebSocket support for detailed Nginx status monitoring

Jacky 1 day ago
parent
commit
1c29693107

+ 2 - 1
api/nginx/router.go

@@ -16,7 +16,8 @@ func InitRouter(r *gin.RouterGroup) {
 	// Get detailed Nginx status information, including connection count, process information, etc. (Issue #850)
 	// Get detailed Nginx status information, including connection count, process information, etc. (Issue #850)
 	r.GET("nginx/detail_status", GetDetailStatus)
 	r.GET("nginx/detail_status", GetDetailStatus)
 	// Use SSE to push detailed Nginx status information
 	// Use SSE to push detailed Nginx status information
-	r.GET("nginx/detail_status/stream", StreamDetailStatus)
+	// Use WebSocket to push detailed Nginx status information
+	r.GET("nginx/detail_status/ws", StreamDetailStatusWS)
 	// Get stub_status module status
 	// Get stub_status module status
 	r.GET("nginx/stub_status", CheckStubStatus)
 	r.GET("nginx/stub_status", CheckStubStatus)
 	// Enable or disable stub_status module
 	// Enable or disable stub_status module

+ 0 - 43
api/nginx/status.go

@@ -6,14 +6,11 @@ package nginx
 import (
 import (
 	"net/http"
 	"net/http"
 	"strings"
 	"strings"
-	"time"
 
 
-	"github.com/0xJacky/Nginx-UI/api"
 	"github.com/0xJacky/Nginx-UI/internal/nginx"
 	"github.com/0xJacky/Nginx-UI/internal/nginx"
 	"github.com/0xJacky/Nginx-UI/internal/performance"
 	"github.com/0xJacky/Nginx-UI/internal/performance"
 	"github.com/gin-gonic/gin"
 	"github.com/gin-gonic/gin"
 	"github.com/uozi-tech/cosy"
 	"github.com/uozi-tech/cosy"
-	"github.com/uozi-tech/cosy/logger"
 )
 )
 
 
 // NginxPerformanceInfo stores Nginx performance-related information
 // NginxPerformanceInfo stores Nginx performance-related information
@@ -34,46 +31,6 @@ func GetDetailStatus(c *gin.Context) {
 	c.JSON(http.StatusOK, response)
 	c.JSON(http.StatusOK, response)
 }
 }
 
 
-// StreamDetailStatus streams Nginx detailed status information using SSE
-func StreamDetailStatus(c *gin.Context) {
-	// Set SSE response headers
-	api.SetSSEHeaders(c)
-
-	// Create context that cancels when client disconnects
-	ctx := c.Request.Context()
-
-	// Create a ticker channel to prevent goroutine leaks
-	ticker := time.NewTicker(5 * time.Second)
-	defer ticker.Stop()
-
-	// Send initial data immediately
-	sendPerformanceData(c)
-
-	// Use goroutine to send data periodically
-	for {
-		select {
-		case <-ticker.C:
-			// Send performance data
-			sendPerformanceData(c)
-		case <-ctx.Done():
-			// Client closed connection or request canceled
-			logger.Debug("Client closed connection")
-			return
-		}
-	}
-}
-
-// sendPerformanceData sends performance data once
-func sendPerformanceData(c *gin.Context) {
-	response := performance.GetPerformanceData()
-
-	// Send SSE event
-	c.SSEvent("message", response)
-
-	// Flush buffer to ensure data is sent immediately
-	c.Writer.Flush()
-}
-
 // CheckStubStatus gets Nginx stub_status module status
 // CheckStubStatus gets Nginx stub_status module status
 func CheckStubStatus(c *gin.Context) {
 func CheckStubStatus(c *gin.Context) {
 	stubStatus := performance.GetStubStatus()
 	stubStatus := performance.GetStubStatus()

+ 227 - 0
api/nginx/websocket.go

@@ -0,0 +1,227 @@
+package nginx
+
+import (
+	"context"
+	"net/http"
+	"sync"
+	"time"
+
+	"github.com/0xJacky/Nginx-UI/internal/helper"
+	"github.com/0xJacky/Nginx-UI/internal/kernel"
+	"github.com/0xJacky/Nginx-UI/internal/performance"
+	"github.com/gin-gonic/gin"
+	"github.com/gorilla/websocket"
+	"github.com/uozi-tech/cosy/logger"
+)
+
+// NginxPerformanceClient represents a WebSocket client for Nginx performance monitoring
+type NginxPerformanceClient struct {
+	conn   *websocket.Conn
+	send   chan interface{}
+	ctx    context.Context
+	cancel context.CancelFunc
+	mutex  sync.RWMutex
+}
+
+// NginxPerformanceHub manages WebSocket connections for Nginx performance monitoring
+type NginxPerformanceHub struct {
+	clients    map[*NginxPerformanceClient]bool
+	register   chan *NginxPerformanceClient
+	unregister chan *NginxPerformanceClient
+	mutex      sync.RWMutex
+	ticker     *time.Ticker
+}
+
+var (
+	performanceHub     *NginxPerformanceHub
+	performanceHubOnce sync.Once
+)
+
+// GetNginxPerformanceHub returns the singleton hub instance
+func GetNginxPerformanceHub() *NginxPerformanceHub {
+	performanceHubOnce.Do(func() {
+		performanceHub = &NginxPerformanceHub{
+			clients:    make(map[*NginxPerformanceClient]bool),
+			register:   make(chan *NginxPerformanceClient),
+			unregister: make(chan *NginxPerformanceClient),
+			ticker:     time.NewTicker(5 * time.Second),
+		}
+		go performanceHub.run()
+	})
+	return performanceHub
+}
+
+// run handles the main hub loop
+func (h *NginxPerformanceHub) run() {
+	defer h.ticker.Stop()
+
+	for {
+		select {
+		case client := <-h.register:
+			h.mutex.Lock()
+			h.clients[client] = true
+			h.mutex.Unlock()
+			logger.Debug("Nginx performance client connected, total clients:", len(h.clients))
+
+			// Send initial data to the new client
+			go h.sendPerformanceDataToClient(client)
+
+		case client := <-h.unregister:
+			h.mutex.Lock()
+			if _, ok := h.clients[client]; ok {
+				delete(h.clients, client)
+				close(client.send)
+			}
+			h.mutex.Unlock()
+			logger.Debug("Nginx performance client disconnected, total clients:", len(h.clients))
+
+		case <-h.ticker.C:
+			// Send performance data to all connected clients
+			h.broadcastPerformanceData()
+
+		case <-kernel.Context.Done():
+			// Shutdown all clients
+			h.mutex.Lock()
+			for client := range h.clients {
+				close(client.send)
+				delete(h.clients, client)
+			}
+			h.mutex.Unlock()
+			return
+		}
+	}
+}
+
+// sendPerformanceDataToClient sends performance data to a specific client
+func (h *NginxPerformanceHub) sendPerformanceDataToClient(client *NginxPerformanceClient) {
+	response := performance.GetPerformanceData()
+
+	select {
+	case client.send <- response:
+	default:
+		// Channel is full, remove client
+		h.unregister <- client
+	}
+}
+
+// broadcastPerformanceData sends performance data to all connected clients
+func (h *NginxPerformanceHub) broadcastPerformanceData() {
+	response := performance.GetPerformanceData()
+
+	h.mutex.RLock()
+	for client := range h.clients {
+		select {
+		case client.send <- response:
+		default:
+			// Channel is full, remove client
+			close(client.send)
+			delete(h.clients, client)
+		}
+	}
+	h.mutex.RUnlock()
+}
+
+// WebSocket upgrader configuration
+var nginxPerformanceUpgrader = websocket.Upgrader{
+	CheckOrigin: func(r *http.Request) bool {
+		return true
+	},
+	ReadBufferSize:  1024,
+	WriteBufferSize: 1024,
+}
+
+// StreamDetailStatusWS handles WebSocket connection for Nginx performance monitoring
+func StreamDetailStatusWS(c *gin.Context) {
+	ws, err := nginxPerformanceUpgrader.Upgrade(c.Writer, c.Request, nil)
+	if err != nil {
+		logger.Error("Failed to upgrade connection:", err)
+		return
+	}
+	defer ws.Close()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	client := &NginxPerformanceClient{
+		conn:   ws,
+		send:   make(chan interface{}, 256),
+		ctx:    ctx,
+		cancel: cancel,
+	}
+
+	hub := GetNginxPerformanceHub()
+	hub.register <- client
+
+	// Start write and read pumps
+	go client.writePump()
+	client.readPump()
+}
+
+// writePump pumps messages from the hub to the websocket connection
+func (c *NginxPerformanceClient) writePump() {
+	ticker := time.NewTicker(30 * time.Second)
+	defer func() {
+		ticker.Stop()
+		c.conn.Close()
+	}()
+
+	for {
+		select {
+		case message, ok := <-c.send:
+			c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
+			if !ok {
+				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
+				return
+			}
+
+			if err := c.conn.WriteJSON(message); err != nil {
+				logger.Error("Failed to write message:", err)
+				if helper.IsUnexpectedWebsocketError(err) {
+					return
+				}
+			}
+
+		case <-ticker.C:
+			c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
+			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
+				logger.Error("Failed to write ping:", err)
+				return
+			}
+
+		case <-c.ctx.Done():
+			return
+
+		case <-kernel.Context.Done():
+			return
+		}
+	}
+}
+
+// readPump pumps messages from the websocket connection to the hub
+func (c *NginxPerformanceClient) readPump() {
+	defer func() {
+		hub := GetNginxPerformanceHub()
+		hub.unregister <- c
+		c.conn.Close()
+		c.cancel()
+	}()
+
+	c.conn.SetReadLimit(512)
+	c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
+	c.conn.SetPongHandler(func(string) error {
+		c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
+		return nil
+	})
+
+	for {
+		_, _, err := c.conn.ReadMessage()
+		if err != nil {
+			if helper.IsUnexpectedWebsocketError(err) {
+				logger.Error("Unexpected WebSocket error:", err)
+			}
+			break
+		}
+		// Handle incoming messages if needed
+		// For now, this is a one-way communication (server to client)
+	}
+}

+ 126 - 0
app/src/composables/useNginxWebSocket.ts

@@ -0,0 +1,126 @@
+import ws from '@/lib/websocket'
+import type ReconnectingWebSocket from 'reconnecting-websocket'
+
+export interface NginxWebSocketOptions {
+  url: string
+  onMessage?: (data: any) => void
+  onError?: () => void
+  reconnectInterval?: number
+}
+
+/**
+ * Nginx WebSocket Composable
+ * Provide the ability to create, manage, and automatically clean up WebSocket connections for Nginx performance monitoring
+ */
+export function useNginxWebSocket() {
+  const wsInstance = shallowRef<ReconnectingWebSocket | WebSocket>()
+  const isConnected = ref(false)
+  const isReconnecting = ref(false)
+  const currentOptions = shallowRef<NginxWebSocketOptions>()
+
+  /**
+   * Connect to WebSocket service
+   */
+  function connect(options: NginxWebSocketOptions) {
+    const {
+      url,
+      onMessage,
+      onError,
+    } = options
+
+    // Store current options for reconnection
+    currentOptions.value = options
+
+    // Disconnect existing connection before creating new one
+    if (wsInstance.value) {
+      disconnect()
+    }
+
+    try {
+      const wsConnection = ws(url, true) as ReconnectingWebSocket
+
+      // Handle connection open
+      wsConnection.onopen = () => {
+        isConnected.value = true
+        isReconnecting.value = false
+        console.log('WebSocket connected')
+      }
+
+      // Handle messages
+      wsConnection.onmessage = (event) => {
+        if (!event.data) {
+          return
+        }
+
+        // Reset reconnecting state on successful message
+        isReconnecting.value = false
+
+        try {
+          const parsedData = JSON.parse(event.data)
+          onMessage?.(parsedData)
+        }
+        catch (error) {
+          console.error('Error parsing WebSocket message:', error)
+        }
+      }
+
+      // Handle errors and connection close
+      wsConnection.onerror = (error) => {
+        console.error('WebSocket error:', error)
+        isConnected.value = false
+        isReconnecting.value = true
+        onError?.()
+      }
+
+      wsConnection.onclose = () => {
+        isConnected.value = false
+        console.log('WebSocket disconnected')
+      }
+
+      wsInstance.value = wsConnection
+      return wsConnection
+    }
+    catch (error) {
+      console.error('Failed to create WebSocket connection:', error)
+      onError?.()
+    }
+  }
+
+  /**
+   * Disconnect WebSocket connection
+   */
+  function disconnect() {
+    if (wsInstance.value) {
+      wsInstance.value.close()
+      wsInstance.value = undefined
+    }
+    isConnected.value = false
+    isReconnecting.value = false
+    currentOptions.value = undefined
+  }
+
+  /**
+   * Send message to WebSocket
+   */
+  function send(data: any) {
+    if (wsInstance.value && isConnected.value) {
+      wsInstance.value.send(JSON.stringify(data))
+    }
+  }
+
+  // Automatically disconnect when the component is unmounted
+  if (getCurrentInstance()) {
+    onUnmounted(() => {
+      disconnect()
+    })
+  }
+
+  return {
+    connect,
+    disconnect,
+    send,
+    wsInstance,
+    isConnected: readonly(isConnected),
+    isReconnecting: readonly(isReconnecting),
+  }
+} 

+ 1 - 1
app/src/version.json

@@ -1 +1 @@
-{"version":"2.1.10","build_id":5,"total_build":447}
+{"version":"2.1.10","build_id":6,"total_build":448}

+ 10 - 9
app/src/views/dashboard/NginxDashBoard.vue

@@ -1,8 +1,9 @@
 <script setup lang="ts">
 <script setup lang="ts">
 import { ClockCircleOutlined, ReloadOutlined } from '@ant-design/icons-vue'
 import { ClockCircleOutlined, ReloadOutlined } from '@ant-design/icons-vue'
+import { storeToRefs } from 'pinia'
 import ngx from '@/api/ngx'
 import ngx from '@/api/ngx'
 import { useNginxPerformance } from '@/composables/useNginxPerformance'
 import { useNginxPerformance } from '@/composables/useNginxPerformance'
-import { useSSE } from '@/composables/useSSE'
+import { useNginxWebSocket } from '@/composables/useNginxWebSocket'
 import { NginxStatus } from '@/constants'
 import { NginxStatus } from '@/constants'
 import { useGlobalStore } from '@/pinia'
 import { useGlobalStore } from '@/pinia'
 import ConnectionMetricsCard from './components/ConnectionMetricsCard.vue'
 import ConnectionMetricsCard from './components/ConnectionMetricsCard.vue'
@@ -29,8 +30,8 @@ const {
   stubStatusError,
   stubStatusError,
 } = useNginxPerformance()
 } = useNginxPerformance()
 
 
-// SSE connection
-const { connect, disconnect } = useSSE()
+// WebSocket connection
+const { connect, disconnect } = useNginxWebSocket()
 
 
 // Toggle stub_status module status
 // Toggle stub_status module status
 async function toggleStubStatus() {
 async function toggleStubStatus() {
@@ -47,7 +48,7 @@ async function toggleStubStatus() {
       stubStatusError.value = response.error
       stubStatusError.value = response.error
     }
     }
     else {
     else {
-      fetchInitialData().then(connectSSE)
+      fetchInitialData().then(connectWebSocket)
     }
     }
   }
   }
   catch (err) {
   catch (err) {
@@ -59,13 +60,13 @@ async function toggleStubStatus() {
   }
   }
 }
 }
 
 
-// Connect SSE
-function connectSSE() {
+// Connect WebSocket
+function connectWebSocket() {
   disconnect()
   disconnect()
   loading.value = true
   loading.value = true
 
 
   connect({
   connect({
-    url: 'api/nginx/detail_status/stream',
+    url: 'api/nginx/detail_status/ws',
     onMessage: data => {
     onMessage: data => {
       loading.value = false
       loading.value = false
 
 
@@ -96,12 +97,12 @@ function connectSSE() {
 
 
 // Manually refresh data
 // Manually refresh data
 function refreshData() {
 function refreshData() {
-  fetchInitialData().then(connectSSE)
+  fetchInitialData().then(connectWebSocket)
 }
 }
 
 
 // Initialize connection when the component is mounted
 // Initialize connection when the component is mounted
 onMounted(() => {
 onMounted(() => {
-  fetchInitialData().then(connectSSE)
+  fetchInitialData().then(connectWebSocket)
 })
 })
 </script>
 </script>