upstream.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package upstream
  2. import (
  3. "context"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/helper"
  8. "github.com/0xJacky/Nginx-UI/internal/upstream"
  9. "github.com/gin-gonic/gin"
  10. "github.com/gorilla/websocket"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. type wsMessage struct {
  14. data interface{}
  15. done chan error
  16. }
  17. func AvailabilityTest(c *gin.Context) {
  18. var upGrader = websocket.Upgrader{
  19. CheckOrigin: func(r *http.Request) bool {
  20. return true
  21. },
  22. }
  23. // upgrade http to websocket
  24. ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  25. if err != nil {
  26. logger.Error(err)
  27. return
  28. }
  29. defer ws.Close()
  30. var currentTargets []string
  31. var targetsMutex sync.RWMutex
  32. // Use context to manage goroutine lifecycle
  33. ctx, cancel := context.WithCancel(context.Background())
  34. defer cancel()
  35. // Use channel to serialize WebSocket write operations, avoiding concurrent conflicts
  36. writeChan := make(chan wsMessage, 10)
  37. testChan := make(chan bool, 1) // Immediate test signal
  38. // Create debouncer for test execution
  39. testDebouncer := helper.NewDebouncer(300 * time.Millisecond)
  40. // WebSocket writer goroutine - serialize all write operations
  41. go func() {
  42. defer logger.Debug("WebSocket writer goroutine stopped")
  43. for {
  44. select {
  45. case <-ctx.Done():
  46. return
  47. case msg := <-writeChan:
  48. err := ws.WriteJSON(msg.data)
  49. if msg.done != nil {
  50. msg.done <- err
  51. close(msg.done)
  52. }
  53. if err != nil {
  54. logger.Error("Failed to send WebSocket message:", err)
  55. if helper.IsUnexpectedWebsocketError(err) {
  56. cancel() // Cancel all goroutines
  57. }
  58. }
  59. }
  60. }
  61. }()
  62. // Safe WebSocket write function
  63. writeJSON := func(data interface{}) error {
  64. done := make(chan error, 1)
  65. msg := wsMessage{data: data, done: done}
  66. select {
  67. case writeChan <- msg:
  68. return <-done
  69. case <-ctx.Done():
  70. return ctx.Err()
  71. case <-time.After(5 * time.Second): // Prevent write blocking
  72. return context.DeadlineExceeded
  73. }
  74. }
  75. // Function to perform availability test
  76. performTest := func() {
  77. targetsMutex.RLock()
  78. targets := make([]string, len(currentTargets))
  79. copy(targets, currentTargets)
  80. targetsMutex.RUnlock()
  81. logger.Debug("Performing availability test for targets:", targets)
  82. if len(targets) > 0 {
  83. logger.Debug("Starting upstream.AvailabilityTest...")
  84. result := upstream.AvailabilityTest(targets)
  85. logger.Debug("Test completed, results:", result)
  86. logger.Debug("Sending results via WebSocket...")
  87. if err := writeJSON(result); err != nil {
  88. logger.Error("Failed to send WebSocket message:", err)
  89. if helper.IsUnexpectedWebsocketError(err) {
  90. cancel() // Cancel all goroutines
  91. }
  92. } else {
  93. logger.Debug("Results sent successfully")
  94. }
  95. } else {
  96. logger.Debug("No targets to test")
  97. // Send empty result even if no targets
  98. emptyResult := make(map[string]interface{})
  99. if err := writeJSON(emptyResult); err != nil {
  100. logger.Error("Failed to send empty result:", err)
  101. } else {
  102. logger.Debug("Empty result sent successfully")
  103. }
  104. }
  105. }
  106. // Goroutine to handle incoming messages (target updates)
  107. go func() {
  108. defer logger.Debug("WebSocket reader goroutine stopped")
  109. for {
  110. select {
  111. case <-ctx.Done():
  112. return
  113. default:
  114. }
  115. var newTargets []string
  116. // Set read timeout to avoid blocking
  117. ws.SetReadDeadline(time.Now().Add(30 * time.Second))
  118. err := ws.ReadJSON(&newTargets)
  119. ws.SetReadDeadline(time.Time{}) // Clear deadline
  120. if err != nil {
  121. if helper.IsUnexpectedWebsocketError(err) {
  122. logger.Error(err)
  123. }
  124. cancel() // Cancel all goroutines
  125. return
  126. }
  127. logger.Debug("Received targets from frontend:", newTargets)
  128. targetsMutex.Lock()
  129. currentTargets = newTargets
  130. targetsMutex.Unlock()
  131. // Use debouncer to trigger test execution
  132. testDebouncer.Trigger(func() {
  133. select {
  134. case testChan <- true:
  135. default:
  136. }
  137. })
  138. }
  139. }()
  140. // Main testing loop
  141. ticker := time.NewTicker(10 * time.Second)
  142. defer ticker.Stop()
  143. logger.Debug("WebSocket connection established, waiting for messages...")
  144. for {
  145. select {
  146. case <-ctx.Done():
  147. testDebouncer.Stop()
  148. logger.Debug("WebSocket connection closed")
  149. return
  150. case <-testChan:
  151. // Debounce triggered test or first test
  152. go performTest() // Execute asynchronously to avoid blocking main loop
  153. case <-ticker.C:
  154. // Periodic test execution
  155. go performTest() // Execute asynchronously to avoid blocking main loop
  156. }
  157. }
  158. }