123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package upstream
- import (
- "context"
- "net/http"
- "sync"
- "time"
- "github.com/0xJacky/Nginx-UI/internal/helper"
- "github.com/0xJacky/Nginx-UI/internal/upstream"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "github.com/uozi-tech/cosy/logger"
- )
- type wsMessage struct {
- data interface{}
- done chan error
- }
- func AvailabilityTest(c *gin.Context) {
- var upGrader = websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- }
- // upgrade http to websocket
- ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
- if err != nil {
- logger.Error(err)
- return
- }
- defer ws.Close()
- var currentTargets []string
- var targetsMutex sync.RWMutex
- // Use context to manage goroutine lifecycle
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- // Use channel to serialize WebSocket write operations, avoiding concurrent conflicts
- writeChan := make(chan wsMessage, 10)
- testChan := make(chan bool, 1) // Immediate test signal
- // Create debouncer for test execution
- testDebouncer := helper.NewDebouncer(300 * time.Millisecond)
- // WebSocket writer goroutine - serialize all write operations
- go func() {
- defer logger.Debug("WebSocket writer goroutine stopped")
- for {
- select {
- case <-ctx.Done():
- return
- case msg := <-writeChan:
- err := ws.WriteJSON(msg.data)
- if msg.done != nil {
- msg.done <- err
- close(msg.done)
- }
- if err != nil {
- logger.Error("Failed to send WebSocket message:", err)
- if helper.IsUnexpectedWebsocketError(err) {
- cancel() // Cancel all goroutines
- }
- }
- }
- }
- }()
- // Safe WebSocket write function
- writeJSON := func(data interface{}) error {
- done := make(chan error, 1)
- msg := wsMessage{data: data, done: done}
- select {
- case writeChan <- msg:
- return <-done
- case <-ctx.Done():
- return ctx.Err()
- case <-time.After(5 * time.Second): // Prevent write blocking
- return context.DeadlineExceeded
- }
- }
- // Function to perform availability test
- performTest := func() {
- targetsMutex.RLock()
- targets := make([]string, len(currentTargets))
- copy(targets, currentTargets)
- targetsMutex.RUnlock()
- logger.Debug("Performing availability test for targets:", targets)
- if len(targets) > 0 {
- logger.Debug("Starting upstream.AvailabilityTest...")
- result := upstream.AvailabilityTest(targets)
- logger.Debug("Test completed, results:", result)
- logger.Debug("Sending results via WebSocket...")
- if err := writeJSON(result); err != nil {
- logger.Error("Failed to send WebSocket message:", err)
- if helper.IsUnexpectedWebsocketError(err) {
- cancel() // Cancel all goroutines
- }
- } else {
- logger.Debug("Results sent successfully")
- }
- } else {
- logger.Debug("No targets to test")
- // Send empty result even if no targets
- emptyResult := make(map[string]interface{})
- if err := writeJSON(emptyResult); err != nil {
- logger.Error("Failed to send empty result:", err)
- } else {
- logger.Debug("Empty result sent successfully")
- }
- }
- }
- // Goroutine to handle incoming messages (target updates)
- go func() {
- defer logger.Debug("WebSocket reader goroutine stopped")
- for {
- select {
- case <-ctx.Done():
- return
- default:
- }
- var newTargets []string
- // Set read timeout to avoid blocking
- ws.SetReadDeadline(time.Now().Add(30 * time.Second))
- err := ws.ReadJSON(&newTargets)
- ws.SetReadDeadline(time.Time{}) // Clear deadline
- if err != nil {
- if helper.IsUnexpectedWebsocketError(err) {
- logger.Error(err)
- }
- cancel() // Cancel all goroutines
- return
- }
- logger.Debug("Received targets from frontend:", newTargets)
- targetsMutex.Lock()
- currentTargets = newTargets
- targetsMutex.Unlock()
- // Use debouncer to trigger test execution
- testDebouncer.Trigger(func() {
- select {
- case testChan <- true:
- default:
- }
- })
- }
- }()
- // Main testing loop
- ticker := time.NewTicker(10 * time.Second)
- defer ticker.Stop()
- logger.Debug("WebSocket connection established, waiting for messages...")
- for {
- select {
- case <-ctx.Done():
- testDebouncer.Stop()
- logger.Debug("WebSocket connection closed")
- return
- case <-testChan:
- // Debounce triggered test or first test
- go performTest() // Execute asynchronously to avoid blocking main loop
- case <-ticker.C:
- // Periodic test execution
- go performTest() // Execute asynchronously to avoid blocking main loop
- }
- }
- }
|