upstream.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package upstream
  2. import (
  3. "context"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/helper"
  8. "github.com/0xJacky/Nginx-UI/internal/upstream"
  9. "github.com/gin-gonic/gin"
  10. "github.com/gorilla/websocket"
  11. "github.com/uozi-tech/cosy/logger"
  12. )
  13. // GetAvailability returns cached upstream availability results via HTTP GET
  14. func GetAvailability(c *gin.Context) {
  15. service := upstream.GetUpstreamService()
  16. result := gin.H{
  17. "results": service.GetAvailabilityMap(),
  18. "targets": service.GetTargetInfos(),
  19. "last_update_time": service.GetLastUpdateTime(),
  20. "target_count": service.GetTargetCount(),
  21. }
  22. c.JSON(http.StatusOK, result)
  23. }
  24. // GetUpstreamDefinitions returns all upstream definitions for debugging
  25. func GetUpstreamDefinitions(c *gin.Context) {
  26. service := upstream.GetUpstreamService()
  27. result := gin.H{
  28. "upstreams": service.GetAllUpstreamDefinitions(),
  29. "last_update_time": service.GetLastUpdateTime(),
  30. }
  31. c.JSON(http.StatusOK, result)
  32. }
  33. // AvailabilityWebSocket handles WebSocket connections for real-time availability monitoring
  34. func AvailabilityWebSocket(c *gin.Context) {
  35. var upGrader = websocket.Upgrader{
  36. CheckOrigin: func(r *http.Request) bool {
  37. return true
  38. },
  39. }
  40. // Upgrade HTTP to WebSocket
  41. ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  42. if err != nil {
  43. logger.Error(err)
  44. return
  45. }
  46. defer ws.Close()
  47. // Use context to manage goroutine lifecycle
  48. ctx, cancel := context.WithCancel(context.Background())
  49. defer cancel()
  50. // Register this connection and increase check frequency
  51. registerWebSocketConnection()
  52. defer unregisterWebSocketConnection()
  53. // Send initial results immediately
  54. service := upstream.GetUpstreamService()
  55. initialResults := service.GetAvailabilityMap()
  56. if err := ws.WriteJSON(initialResults); err != nil {
  57. logger.Error("Failed to send initial results:", err)
  58. return
  59. }
  60. // Create ticker for periodic updates (every 5 seconds when WebSocket is connected)
  61. ticker := time.NewTicker(5 * time.Second)
  62. defer ticker.Stop()
  63. // Monitor for incoming messages (ping/pong or close)
  64. go func() {
  65. defer cancel()
  66. for {
  67. // Read message (we don't expect any specific data, just use it for connection health)
  68. _, _, err := ws.ReadMessage()
  69. if err != nil {
  70. if helper.IsUnexpectedWebsocketError(err) {
  71. logger.Error("WebSocket read error:", err)
  72. }
  73. return
  74. }
  75. }
  76. }()
  77. // Main loop to send periodic updates
  78. for {
  79. select {
  80. case <-ctx.Done():
  81. logger.Debug("WebSocket connection closed")
  82. return
  83. case <-ticker.C:
  84. // Get latest results from service
  85. results := service.GetAvailabilityMap()
  86. // Send results via WebSocket
  87. if err := ws.WriteJSON(results); err != nil {
  88. logger.Error("Failed to send WebSocket update:", err)
  89. if helper.IsUnexpectedWebsocketError(err) {
  90. return
  91. }
  92. }
  93. }
  94. }
  95. }
  96. // WebSocket connection tracking for managing check frequency
  97. var (
  98. wsConnections int
  99. wsConnectionMutex sync.Mutex
  100. )
  101. // registerWebSocketConnection increments the WebSocket connection counter
  102. func registerWebSocketConnection() {
  103. wsConnectionMutex.Lock()
  104. defer wsConnectionMutex.Unlock()
  105. wsConnections++
  106. logger.Debug("WebSocket connection registered, total connections:", wsConnections)
  107. // Trigger immediate check when first connection is established
  108. if wsConnections == 1 {
  109. service := upstream.GetUpstreamService()
  110. go service.PerformAvailabilityTest()
  111. }
  112. }
  113. // unregisterWebSocketConnection decrements the WebSocket connection counter
  114. func unregisterWebSocketConnection() {
  115. wsConnectionMutex.Lock()
  116. defer wsConnectionMutex.Unlock()
  117. if wsConnections > 0 {
  118. wsConnections--
  119. }
  120. logger.Debug("WebSocket connection unregistered, remaining connections:", wsConnections)
  121. }
  122. // HasActiveWebSocketConnections returns true if there are active WebSocket connections
  123. func HasActiveWebSocketConnections() bool {
  124. wsConnectionMutex.Lock()
  125. defer wsConnectionMutex.Unlock()
  126. return wsConnections > 0
  127. }