浏览代码

fix(websocket): increase buffer size for websocket channels and improve client handling

0xJacky 1 周之前
父节点
当前提交
a15ad68b99
共有 3 个文件被更改,包括 42 次插入12 次删除
  1. 36 8
      api/cluster/websocket.go
  2. 5 3
      api/event/websocket.go
  3. 1 1
      api/nginx/websocket.go

+ 36 - 8
api/cluster/websocket.go

@@ -10,8 +10,8 @@ import (
 	"time"
 
 	"github.com/0xJacky/Nginx-UI/internal/analytic"
-	"github.com/0xJacky/Nginx-UI/internal/kernel"
 	"github.com/0xJacky/Nginx-UI/internal/helper"
+	"github.com/0xJacky/Nginx-UI/internal/kernel"
 	"github.com/0xJacky/Nginx-UI/model"
 	"github.com/gin-gonic/gin"
 	"github.com/gorilla/websocket"
@@ -51,7 +51,7 @@ func GetHub() *Hub {
 	hubOnce.Do(func() {
 		hub = &Hub{
 			clients:    make(map[*Client]bool),
-			broadcast:  make(chan WebSocketMessage, 256),
+			broadcast:  make(chan WebSocketMessage, 1024), // Increased buffer size
 			register:   make(chan *Client),
 			unregister: make(chan *Client),
 		}
@@ -81,15 +81,35 @@ func (h *Hub) run() {
 
 		case message := <-h.broadcast:
 			h.mutex.RLock()
+			deadClients := make([]*Client, 0)
 			for client := range h.clients {
 				select {
 				case client.send <- message:
+				case <-time.After(100 * time.Millisecond):
+					// Client is too slow, mark for removal
+					logger.Debug("Client send channel timeout, marking for removal")
+					deadClients = append(deadClients, client)
 				default:
-					close(client.send)
-					delete(h.clients, client)
+					// Channel is full, mark for removal
+					logger.Debug("Client send channel full, marking for removal")
+					deadClients = append(deadClients, client)
 				}
 			}
 			h.mutex.RUnlock()
+
+			// Clean up dead clients
+			if len(deadClients) > 0 {
+				h.mutex.Lock()
+				for _, client := range deadClients {
+					if _, ok := h.clients[client]; ok {
+						close(client.send)
+						delete(h.clients, client)
+						client.cancel() // Trigger client cleanup
+					}
+				}
+				h.mutex.Unlock()
+				logger.Info("Cleaned up slow/unresponsive clients", "count", len(deadClients))
+			}
 		}
 	}
 }
@@ -135,7 +155,7 @@ func GetAllEnabledEnvironmentWS(c *gin.Context) {
 
 	client := &Client{
 		conn:   ws,
-		send:   make(chan WebSocketMessage, 256),
+		send:   make(chan WebSocketMessage, 1024), // Increased buffer size
 		ctx:    ctx,
 		cancel: cancel,
 	}
@@ -222,7 +242,7 @@ func (c *Client) handleEnvironmentMonitoring() {
 	}
 }
 
-// sendMessage sends a message to the client
+// sendMessage sends a message to the client with timeout and better error handling
 func (c *Client) sendMessage(event string, data any) {
 	message := WebSocketMessage{
 		Event: event,
@@ -231,8 +251,16 @@ func (c *Client) sendMessage(event string, data any) {
 
 	select {
 	case c.send <- message:
+	case <-time.After(5 * time.Second):
+		logger.Warn("Client send channel full, message dropped after timeout", "event", event)
+		// Force disconnect slow clients to prevent resource leakage
+		c.cancel()
 	default:
-		logger.Warn("Client send channel full, message dropped")
+		logger.Warn("Client send channel full, message dropped immediately", "event", event)
+		// For non-critical messages, we can drop them immediately
+		if event != "heartbeat" {
+			logger.Info("Dropping non-critical message due to full channel", "event", event)
+		}
 	}
 }
 
@@ -290,7 +318,7 @@ func (c *Client) readPump() {
 				if helper.IsUnexpectedWebsocketError(err) {
 					logger.Error("Websocket error:", err)
 				}
-				return 
+				return
 			}
 		}
 	}()

+ 5 - 3
api/event/websocket.go

@@ -49,7 +49,7 @@ func GetHub() *Hub {
 	hubOnce.Do(func() {
 		hub = &Hub{
 			clients:    make(map[*Client]bool),
-			broadcast:  make(chan WebSocketMessage, 256),
+			broadcast:  make(chan WebSocketMessage, 1024), // Increased buffer size
 			register:   make(chan *Client),
 			unregister: make(chan *Client),
 		}
@@ -69,8 +69,10 @@ func (h *Hub) BroadcastMessage(event string, data interface{}) {
 	}
 	select {
 	case h.broadcast <- message:
+	case <-time.After(1 * time.Second):
+		logger.Warn("Broadcast channel full, message dropped after timeout", "event", event)
 	default:
-		logger.Warn("Broadcast channel full, message dropped")
+		logger.Warn("Broadcast channel full, message dropped immediately", "event", event)
 	}
 }
 
@@ -131,7 +133,7 @@ func EventBus(c *gin.Context) {
 
 	client := &Client{
 		conn:   ws,
-		send:   make(chan WebSocketMessage, 256),
+		send:   make(chan WebSocketMessage, 1024), // Increased buffer size
 		ctx:    ctx,
 		cancel: cancel,
 	}

+ 1 - 1
api/nginx/websocket.go

@@ -153,7 +153,7 @@ func StreamDetailStatusWS(c *gin.Context) {
 
 	client := &NginxPerformanceClient{
 		conn:   ws,
-		send:   make(chan interface{}, 256),
+		send:   make(chan interface{}, 1024), // Increased buffer size
 		ctx:    ctx,
 		cancel: cancel,
 	}