Browse Source

feat: ws support for real-time env monitoring

Jacky 1 day ago
parent
commit
f29bb7bcad

+ 0 - 85
api/cluster/environment.go

@@ -2,14 +2,8 @@ package cluster
 
 import (
 	"context"
-	"crypto/sha256"
-	"encoding/hex"
-	"encoding/json"
-	"io"
 	"net/http"
-	"time"
 
-	"github.com/0xJacky/Nginx-UI/api"
 	"github.com/0xJacky/Nginx-UI/internal/analytic"
 	"github.com/0xJacky/Nginx-UI/internal/cluster"
 	"github.com/0xJacky/Nginx-UI/model"
@@ -60,85 +54,6 @@ func GetEnvironmentList(c *gin.Context) {
 	})
 }
 
-func GetAllEnabledEnvironment(c *gin.Context) {
-	api.SetSSEHeaders(c)
-	notify := c.Writer.CloseNotify()
-
-	interval := 10
-
-	type respEnvironment struct {
-		*model.Environment
-		Status bool `json:"status"`
-	}
-
-	f := func() (any, bool) {
-		return cosy.Core[model.Environment](c).
-			SetFussy("name").
-			SetTransformer(func(m *model.Environment) any {
-				resp := respEnvironment{
-					Environment: m,
-					Status:      analytic.GetNode(m).Status,
-				}
-				return resp
-			}).ListAllData()
-	}
-
-	getHash := func(data any) string {
-		bytes, _ := json.Marshal(data)
-		hash := sha256.New()
-		hash.Write(bytes)
-		hashSum := hash.Sum(nil)
-		return hex.EncodeToString(hashSum)
-	}
-
-	dataHash := ""
-
-	{
-		data, ok := f()
-		if !ok {
-			return
-		}
-
-		c.Stream(func(w io.Writer) bool {
-			c.SSEvent("message", data)
-			dataHash = getHash(data)
-			return false
-		})
-	}
-
-	for {
-		select {
-		case <-time.After(time.Duration(interval) * time.Second):
-			data, ok := f()
-			if !ok {
-				return
-			}
-			// if data is not changed, send heartbeat
-			if dataHash == getHash(data) {
-				c.Stream(func(w io.Writer) bool {
-					c.SSEvent("heartbeat", "")
-					return false
-				})
-				return
-			}
-
-			dataHash = getHash(data)
-
-			c.Stream(func(w io.Writer) bool {
-				c.SSEvent("message", data)
-				return false
-			})
-		case <-time.After(30 * time.Second):
-			c.Stream(func(w io.Writer) bool {
-				c.SSEvent("heartbeat", "")
-				return false
-			})
-		case <-notify:
-			return
-		}
-	}
-}
-
 func AddEnvironment(c *gin.Context) {
 	cosy.Core[model.Environment](c).SetValidRules(gin.H{
 		"name":    "required",

+ 1 - 1
api/cluster/router.go

@@ -5,7 +5,7 @@ import "github.com/gin-gonic/gin"
 func InitRouter(r *gin.RouterGroup) {
 	// Environment
 	r.GET("environments", GetEnvironmentList)
-	r.GET("environments/enabled", GetAllEnabledEnvironment)
+	r.GET("environments/enabled", GetAllEnabledEnvironmentWS)
 	r.POST("environments/load_from_settings", LoadEnvironmentFromSettings)
 	envGroup := r.Group("environments")
 	{

+ 293 - 0
api/cluster/websocket.go

@@ -0,0 +1,293 @@
+package cluster
+
+import (
+	"context"
+	"crypto/sha256"
+	"encoding/hex"
+	"encoding/json"
+	"net/http"
+	"sync"
+	"time"
+
+	"github.com/0xJacky/Nginx-UI/internal/analytic"
+	"github.com/0xJacky/Nginx-UI/model"
+	"github.com/gin-gonic/gin"
+	"github.com/gorilla/websocket"
+	"github.com/uozi-tech/cosy/logger"
+)
+
+// WebSocketMessage represents the structure of messages sent to the client
+type WebSocketMessage struct {
+	Event string      `json:"event"`
+	Data  interface{} `json:"data"`
+}
+
+// Client represents a WebSocket client connection for cluster environment monitoring
+type Client struct {
+	conn   *websocket.Conn
+	send   chan WebSocketMessage
+	ctx    context.Context
+	cancel context.CancelFunc
+	mutex  sync.RWMutex
+}
+
+// Hub maintains the set of active clients and broadcasts messages to them
+type Hub struct {
+	clients    map[*Client]bool
+	broadcast  chan WebSocketMessage
+	register   chan *Client
+	unregister chan *Client
+	mutex      sync.RWMutex
+}
+
+var (
+	hub     *Hub
+	hubOnce sync.Once
+)
+
+// GetHub returns the singleton hub instance
+func GetHub() *Hub {
+	hubOnce.Do(func() {
+		hub = &Hub{
+			clients:    make(map[*Client]bool),
+			broadcast:  make(chan WebSocketMessage, 256),
+			register:   make(chan *Client),
+			unregister: make(chan *Client),
+		}
+		go hub.run()
+	})
+	return hub
+}
+
+// run handles the main hub loop
+func (h *Hub) run() {
+	for {
+		select {
+		case client := <-h.register:
+			h.mutex.Lock()
+			h.clients[client] = true
+			h.mutex.Unlock()
+			logger.Debug("Cluster environment client connected, total clients:", len(h.clients))
+
+		case client := <-h.unregister:
+			h.mutex.Lock()
+			if _, ok := h.clients[client]; ok {
+				delete(h.clients, client)
+				close(client.send)
+			}
+			h.mutex.Unlock()
+			logger.Debug("Cluster environment client disconnected, total clients:", len(h.clients))
+
+		case message := <-h.broadcast:
+			h.mutex.RLock()
+			for client := range h.clients {
+				select {
+				case client.send <- message:
+				default:
+					close(client.send)
+					delete(h.clients, client)
+				}
+			}
+			h.mutex.RUnlock()
+		}
+	}
+}
+
+// BroadcastMessage sends a message to all connected clients
+func (h *Hub) BroadcastMessage(event string, data any) {
+	message := WebSocketMessage{
+		Event: event,
+		Data:  data,
+	}
+	select {
+	case h.broadcast <- message:
+	default:
+		logger.Warn("Cluster environment broadcast channel full, message dropped")
+	}
+}
+
+// WebSocket upgrader configuration
+var upgrader = websocket.Upgrader{
+	CheckOrigin: func(r *http.Request) bool {
+		return true
+	},
+	ReadBufferSize:  1024,
+	WriteBufferSize: 1024,
+}
+
+type respEnvironment struct {
+	*model.Environment
+	Status bool `json:"status"`
+}
+
+// GetAllEnabledEnvironmentWS handles WebSocket connections for real-time environment monitoring
+func GetAllEnabledEnvironmentWS(c *gin.Context) {
+	ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
+	if err != nil {
+		logger.Error("Failed to upgrade connection:", err)
+		return
+	}
+	defer ws.Close()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	client := &Client{
+		conn:   ws,
+		send:   make(chan WebSocketMessage, 256),
+		ctx:    ctx,
+		cancel: cancel,
+	}
+
+	hub := GetHub()
+	hub.register <- client
+
+	// Start goroutines for handling environment monitoring
+	go client.handleEnvironmentMonitoring()
+
+	// Start write and read pumps
+	go client.writePump()
+	client.readPump()
+}
+
+// handleEnvironmentMonitoring monitors environment status and sends updates
+func (c *Client) handleEnvironmentMonitoring() {
+	interval := 10 * time.Second
+	heartbeatInterval := 30 * time.Second
+
+	getEnvironmentData := func() (interface{}, bool) {
+		// Query environments directly from database
+		var environments []model.Environment
+		err := model.UseDB().Where("enabled = ?", true).Find(&environments).Error
+		if err != nil {
+			logger.Error("Failed to query environments:", err)
+			return nil, false
+		}
+
+		// Transform environments to response format
+		var result []respEnvironment
+		for _, env := range environments {
+			result = append(result, respEnvironment{
+				Environment: &env,
+				Status:      analytic.GetNode(&env).Status,
+			})
+		}
+
+		return result, true
+	}
+
+	getHash := func(data interface{}) string {
+		bytes, _ := json.Marshal(data)
+		hash := sha256.New()
+		hash.Write(bytes)
+		hashSum := hash.Sum(nil)
+		return hex.EncodeToString(hashSum)
+	}
+
+	var dataHash string
+
+	// Send initial data
+	data, ok := getEnvironmentData()
+	if ok {
+		dataHash = getHash(data)
+		c.sendMessage("message", data)
+	}
+
+	ticker := time.NewTicker(interval)
+	heartbeatTicker := time.NewTicker(heartbeatInterval)
+	defer ticker.Stop()
+	defer heartbeatTicker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			data, ok := getEnvironmentData()
+			if !ok {
+				return
+			}
+
+			newHash := getHash(data)
+			if dataHash != newHash {
+				dataHash = newHash
+				c.sendMessage("message", data)
+			}
+
+		case <-heartbeatTicker.C:
+			c.sendMessage("heartbeat", "")
+
+		case <-c.ctx.Done():
+			return
+		}
+	}
+}
+
+// sendMessage sends a message to the client
+func (c *Client) sendMessage(event string, data any) {
+	message := WebSocketMessage{
+		Event: event,
+		Data:  data,
+	}
+
+	select {
+	case c.send <- message:
+	default:
+		logger.Warn("Client send channel full, message dropped")
+	}
+}
+
+// writePump pumps messages from the hub to the websocket connection
+func (c *Client) writePump() {
+	ticker := time.NewTicker(54 * time.Second)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case message, ok := <-c.send:
+			c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
+			if !ok {
+				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
+				return
+			}
+
+			if err := c.conn.WriteJSON(message); err != nil {
+				logger.Error("Error writing message to websocket:", err)
+				return
+			}
+
+		case <-ticker.C:
+			c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
+			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
+				return
+			}
+
+		case <-c.ctx.Done():
+			return
+		}
+	}
+}
+
+// readPump pumps messages from the websocket connection to the hub
+func (c *Client) readPump() {
+	defer func() {
+		hub := GetHub()
+		hub.unregister <- c
+		c.conn.Close()
+		c.cancel()
+	}()
+
+	c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
+	c.conn.SetPongHandler(func(string) error {
+		c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
+		return nil
+	})
+
+	for {
+		_, _, err := c.conn.ReadMessage()
+		if err != nil {
+			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+				logger.Error("Websocket error:", err)
+			}
+			break
+		}
+	}
+}

+ 44 - 20
app/src/components/EnvGroupTabs/EnvGroupTabs.vue

@@ -3,7 +3,7 @@ import type { EnvGroup } from '@/api/env_group'
 import type { Environment } from '@/api/environment'
 import { message } from 'ant-design-vue'
 import nodeApi from '@/api/node'
-import { useSSE } from '@/composables/useSSE'
+import ws from '@/lib/websocket'
 
 const props = defineProps<{
   envGroups: EnvGroup[]
@@ -18,36 +18,60 @@ const loading = ref({
   restart: false,
 })
 
-// Use SSE composable
-// Error handling is implemented internally by useSSE with automatic reconnection
-const { connect, disconnect } = useSSE()
+// WebSocket connection for environment monitoring
+let socket: WebSocket | null = null
 
 // Get node data when tab is not 'All'
 watch(modelValue, newVal => {
   if (newVal && newVal !== 0) {
-    connectSSE()
+    connectWebSocket()
   }
   else {
-    disconnect()
+    disconnectWebSocket()
   }
 }, { immediate: true })
 
-function connectSSE() {
-  connect({
-    url: 'api/environments/enabled',
-    onMessage: data => {
-      environments.value = data
-      environmentsMap.value = environments.value.reduce((acc, node) => {
-        acc[node.id] = node
-        return acc
-      }, {} as Record<number, Environment>)
-    },
-    onError: () => {
-      // 错误处理已由useSSE内部实现自动重连
-    },
-  })
+function connectWebSocket() {
+  if (socket) {
+    socket.close()
+  }
+
+  socket = ws('/api/cluster/environments/enabled/ws', true)
+
+  socket.onmessage = (event) => {
+    try {
+      const message = JSON.parse(event.data)
+      
+      if (message.event === 'message') {
+        const data: Environment[] = message.data
+        environments.value = data
+        environmentsMap.value = environments.value.reduce((acc, node) => {
+          acc[node.id] = node
+          return acc
+        }, {} as Record<number, Environment>)
+      }
+    } catch (error) {
+      console.error('Error parsing WebSocket message:', error)
+    }
+  }
+
+  socket.onerror = (error) => {
+    console.warn('Failed to connect to environments WebSocket endpoint', error)
+  }
 }
 
+function disconnectWebSocket() {
+  if (socket) {
+    socket.close()
+    socket = null
+  }
+}
+
+// Cleanup on unmount
+onUnmounted(() => {
+  disconnectWebSocket()
+})
+
 // Get the current Node Group data
 const currentEnvGroup = computed(() => {
   if (!modelValue.value || modelValue.value === 0)

+ 30 - 17
app/src/components/NodeSelector/NodeSelector.vue

@@ -1,6 +1,6 @@
 <script setup lang="ts">
 import type { Environment } from '@/api/environment'
-import { useSSE } from '@/composables/useSSE'
+import ws from '@/lib/websocket'
 
 const props = defineProps<{
   hiddenLocal?: boolean
@@ -12,23 +12,36 @@ const map = defineModel<Record<number, string>>('map')
 const data = ref<Environment[]>([])
 const data_map = ref<Record<number, Environment>>({})
 
-const { connect } = useSSE()
+// WebSocket connection for environment monitoring
+const socket = ws('/api/environments/enabled', true)
 
-// connect SSE and handle messages
-connect({
-  url: 'api/environments/enabled',
-  onMessage: (environments: Environment[]) => {
-    data.value = environments
-    nextTick(() => {
-      data_map.value = data.value.reduce((acc, node) => {
-        acc[node.id] = node
-        return acc
-      }, {} as Record<number, Environment>)
-    })
-  },
-  onError: () => {
-    console.warn('Failed to connect to environments SSE endpoint')
-  },
+socket.onmessage = event => {
+  try {
+    const message = JSON.parse(event.data)
+
+    if (message.event === 'message') {
+      const environments: Environment[] = message.data
+      data.value = environments
+      nextTick(() => {
+        data_map.value = data.value.reduce((acc, node) => {
+          acc[node.id] = node
+          return acc
+        }, {} as Record<number, Environment>)
+      })
+    }
+  }
+  catch (error) {
+    console.error('Error parsing WebSocket message:', error)
+  }
+}
+
+socket.onerror = error => {
+  console.warn('Failed to connect to environments WebSocket endpoint', error)
+}
+
+// Cleanup on unmount
+onUnmounted(() => {
+  socket.close()
 })
 
 const value = computed({

+ 1 - 1
app/src/version.json

@@ -1 +1 @@
-{"version":"2.1.10","build_id":6,"total_build":448}
+{"version":"2.1.10","build_id":7,"total_build":449}