node_record.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package analytic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/0xJacky/Nginx-UI/internal/logger"
  6. "github.com/0xJacky/Nginx-UI/model"
  7. "github.com/0xJacky/Nginx-UI/query"
  8. "github.com/gorilla/websocket"
  9. "net/http"
  10. "time"
  11. )
  12. var stopNodeRecordChan = make(chan struct{})
  13. func RestartRetrieveNodesStatus() {
  14. stopNodeRecordChan <- struct{}{}
  15. time.Sleep(10 * time.Second)
  16. go RetrieveNodesStatus()
  17. }
  18. func RetrieveNodesStatus() {
  19. NodeMap = make(TNodeMap)
  20. errChan := make(chan error)
  21. ctx, cancel := context.WithCancel(context.Background())
  22. defer cancel()
  23. env := query.Environment
  24. envs, err := env.Where(env.Enabled.Is(true)).Find()
  25. if err != nil {
  26. logger.Error(err)
  27. return
  28. }
  29. for _, v := range envs {
  30. go nodeAnalyticLive(v, errChan, ctx)
  31. }
  32. for {
  33. select {
  34. case err = <-errChan:
  35. logger.Error(err)
  36. case <-stopNodeRecordChan:
  37. logger.Info("RetrieveNodesStatus exited normally")
  38. return // will execute defer cancel()
  39. }
  40. }
  41. }
  42. func nodeAnalyticLive(env *model.Environment, errChan chan error, ctx context.Context) {
  43. for {
  44. err := nodeAnalyticRecord(env, ctx)
  45. if err != nil {
  46. // set node offline
  47. if NodeMap[env.ID] != nil {
  48. mutex.Lock()
  49. NodeMap[env.ID].Status = false
  50. mutex.Unlock()
  51. }
  52. logger.Error(err)
  53. errChan <- err
  54. // wait 5s then reconnect
  55. time.Sleep(5 * time.Second)
  56. }
  57. }
  58. }
  59. func nodeAnalyticRecord(env *model.Environment, ctx context.Context) (err error) {
  60. mutex.Lock()
  61. NodeMap[env.ID] = InitNode(env)
  62. mutex.Unlock()
  63. u, err := env.GetWebSocketURL("/api/analytic/intro")
  64. if err != nil {
  65. return
  66. }
  67. header := http.Header{}
  68. header.Set("X-Node-Secret", env.Token)
  69. c, _, err := websocket.DefaultDialer.Dial(u, header)
  70. if err != nil {
  71. return
  72. }
  73. defer c.Close()
  74. var nodeStat NodeStat
  75. go func() {
  76. // shutdown
  77. <-ctx.Done()
  78. _ = c.Close()
  79. }()
  80. for {
  81. _, message, err := c.ReadMessage()
  82. if err != nil || websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived,
  83. websocket.CloseNormalClosure) {
  84. return err
  85. }
  86. err = json.Unmarshal(message, &nodeStat)
  87. if err != nil {
  88. return err
  89. }
  90. // set online
  91. nodeStat.Status = true
  92. nodeStat.ResponseAt = time.Now()
  93. mutex.Lock()
  94. NodeMap[env.ID].NodeStat = nodeStat
  95. mutex.Unlock()
  96. }
  97. }