1
0

upstream.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package upstream
  2. import (
  3. "context"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/0xJacky/Nginx-UI/internal/helper"
  8. "github.com/0xJacky/Nginx-UI/internal/kernel"
  9. "github.com/0xJacky/Nginx-UI/internal/upstream"
  10. "github.com/gin-gonic/gin"
  11. "github.com/gorilla/websocket"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // GetAvailability returns cached upstream availability results via HTTP GET
  15. func GetAvailability(c *gin.Context) {
  16. service := upstream.GetUpstreamService()
  17. result := gin.H{
  18. "results": service.GetAvailabilityMap(),
  19. "targets": service.GetTargetInfos(),
  20. "last_update_time": service.GetLastUpdateTime(),
  21. "target_count": service.GetTargetCount(),
  22. }
  23. c.JSON(http.StatusOK, result)
  24. }
  25. // GetUpstreamDefinitions returns all upstream definitions for debugging
  26. func GetUpstreamDefinitions(c *gin.Context) {
  27. service := upstream.GetUpstreamService()
  28. result := gin.H{
  29. "upstreams": service.GetAllUpstreamDefinitions(),
  30. "last_update_time": service.GetLastUpdateTime(),
  31. }
  32. c.JSON(http.StatusOK, result)
  33. }
  34. // AvailabilityWebSocket handles WebSocket connections for real-time availability monitoring
  35. func AvailabilityWebSocket(c *gin.Context) {
  36. var upGrader = websocket.Upgrader{
  37. CheckOrigin: func(r *http.Request) bool {
  38. return true
  39. },
  40. }
  41. // Upgrade HTTP to WebSocket
  42. ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  43. if err != nil {
  44. logger.Error(err)
  45. return
  46. }
  47. defer ws.Close()
  48. // Use context to manage goroutine lifecycle
  49. ctx, cancel := context.WithCancel(context.Background())
  50. defer cancel()
  51. // Register this connection and increase check frequency
  52. registerWebSocketConnection()
  53. defer unregisterWebSocketConnection()
  54. // Send initial results immediately
  55. service := upstream.GetUpstreamService()
  56. initialResults := service.GetAvailabilityMap()
  57. if err := ws.WriteJSON(initialResults); err != nil {
  58. logger.Error("Failed to send initial results:", err)
  59. return
  60. }
  61. // Create ticker for periodic updates (every 5 seconds when WebSocket is connected)
  62. ticker := time.NewTicker(5 * time.Second)
  63. defer ticker.Stop()
  64. // Monitor for incoming messages (ping/pong or close)
  65. go func() {
  66. defer cancel()
  67. for {
  68. // Read message (we don't expect any specific data, just use it for connection health)
  69. _, _, err := ws.ReadMessage()
  70. if err != nil {
  71. if helper.IsUnexpectedWebsocketError(err) {
  72. logger.Error("WebSocket read error:", err)
  73. }
  74. return
  75. }
  76. }
  77. }()
  78. // Main loop to send periodic updates
  79. for {
  80. select {
  81. case <-ctx.Done():
  82. logger.Debug("WebSocket connection closed")
  83. return
  84. case <-ticker.C:
  85. // Get latest results from service
  86. results := service.GetAvailabilityMap()
  87. // Send results via WebSocket
  88. if err := ws.WriteJSON(results); err != nil {
  89. logger.Error("Failed to send WebSocket update:", err)
  90. if helper.IsUnexpectedWebsocketError(err) {
  91. return
  92. }
  93. }
  94. case <-kernel.Context.Done():
  95. logger.Debug("AvailabilityWebSocket: Context cancelled, closing WebSocket")
  96. return
  97. }
  98. }
  99. }
  100. // WebSocket connection tracking for managing check frequency
  101. var (
  102. wsConnections int
  103. wsConnectionMutex sync.Mutex
  104. )
  105. // registerWebSocketConnection increments the WebSocket connection counter
  106. func registerWebSocketConnection() {
  107. wsConnectionMutex.Lock()
  108. defer wsConnectionMutex.Unlock()
  109. wsConnections++
  110. logger.Debug("WebSocket connection registered, total connections:", wsConnections)
  111. // Trigger immediate check when first connection is established
  112. if wsConnections == 1 {
  113. service := upstream.GetUpstreamService()
  114. go service.PerformAvailabilityTest()
  115. }
  116. }
  117. // unregisterWebSocketConnection decrements the WebSocket connection counter
  118. func unregisterWebSocketConnection() {
  119. wsConnectionMutex.Lock()
  120. defer wsConnectionMutex.Unlock()
  121. if wsConnections > 0 {
  122. wsConnections--
  123. }
  124. logger.Debug("WebSocket connection unregistered, remaining connections:", wsConnections)
  125. }