upstream.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package upstream
  2. import (
  3. "context"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/helper"
  8. "github.com/0xJacky/Nginx-UI/internal/upstream"
  9. "github.com/gin-gonic/gin"
  10. "github.com/gorilla/websocket"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // GetAvailability returns cached upstream availability results via HTTP GET
  14. func GetAvailability(c *gin.Context) {
  15. service := upstream.GetUpstreamService()
  16. result := gin.H{
  17. "results": service.GetAvailabilityMap(),
  18. "targets": service.GetTargetInfos(),
  19. "last_update_time": service.GetLastUpdateTime(),
  20. "target_count": service.GetTargetCount(),
  21. }
  22. c.JSON(http.StatusOK, result)
  23. }
  24. // AvailabilityWebSocket handles WebSocket connections for real-time availability monitoring
  25. func AvailabilityWebSocket(c *gin.Context) {
  26. var upGrader = websocket.Upgrader{
  27. CheckOrigin: func(r *http.Request) bool {
  28. return true
  29. },
  30. }
  31. // Upgrade HTTP to WebSocket
  32. ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  33. if err != nil {
  34. logger.Error(err)
  35. return
  36. }
  37. defer ws.Close()
  38. // Use context to manage goroutine lifecycle
  39. ctx, cancel := context.WithCancel(context.Background())
  40. defer cancel()
  41. // Register this connection and increase check frequency
  42. registerWebSocketConnection()
  43. defer unregisterWebSocketConnection()
  44. // Send initial results immediately
  45. service := upstream.GetUpstreamService()
  46. initialResults := service.GetAvailabilityMap()
  47. if err := ws.WriteJSON(initialResults); err != nil {
  48. logger.Error("Failed to send initial results:", err)
  49. return
  50. }
  51. // Create ticker for periodic updates (every 5 seconds when WebSocket is connected)
  52. ticker := time.NewTicker(5 * time.Second)
  53. defer ticker.Stop()
  54. // Monitor for incoming messages (ping/pong or close)
  55. go func() {
  56. defer cancel()
  57. for {
  58. // Read message (we don't expect any specific data, just use it for connection health)
  59. _, _, err := ws.ReadMessage()
  60. if err != nil {
  61. if helper.IsUnexpectedWebsocketError(err) {
  62. logger.Error("WebSocket read error:", err)
  63. }
  64. return
  65. }
  66. }
  67. }()
  68. // Main loop to send periodic updates
  69. for {
  70. select {
  71. case <-ctx.Done():
  72. logger.Debug("WebSocket connection closed")
  73. return
  74. case <-ticker.C:
  75. // Get latest results from service
  76. results := service.GetAvailabilityMap()
  77. // Send results via WebSocket
  78. if err := ws.WriteJSON(results); err != nil {
  79. logger.Error("Failed to send WebSocket update:", err)
  80. if helper.IsUnexpectedWebsocketError(err) {
  81. return
  82. }
  83. }
  84. }
  85. }
  86. }
  87. // WebSocket connection tracking for managing check frequency
  88. var (
  89. wsConnections int
  90. wsConnectionMutex sync.Mutex
  91. )
  92. // registerWebSocketConnection increments the WebSocket connection counter
  93. func registerWebSocketConnection() {
  94. wsConnectionMutex.Lock()
  95. defer wsConnectionMutex.Unlock()
  96. wsConnections++
  97. logger.Debug("WebSocket connection registered, total connections:", wsConnections)
  98. // Trigger immediate check when first connection is established
  99. if wsConnections == 1 {
  100. service := upstream.GetUpstreamService()
  101. go service.PerformAvailabilityTest()
  102. }
  103. }
  104. // unregisterWebSocketConnection decrements the WebSocket connection counter
  105. func unregisterWebSocketConnection() {
  106. wsConnectionMutex.Lock()
  107. defer wsConnectionMutex.Unlock()
  108. if wsConnections > 0 {
  109. wsConnections--
  110. }
  111. logger.Debug("WebSocket connection unregistered, remaining connections:", wsConnections)
  112. }
  113. // HasActiveWebSocketConnections returns true if there are active WebSocket connections
  114. func HasActiveWebSocketConnections() bool {
  115. wsConnectionMutex.Lock()
  116. defer wsConnectionMutex.Unlock()
  117. return wsConnections > 0
  118. }