node_record.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. package analytic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/helper"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/0xJacky/Nginx-UI/query"
  11. "github.com/gorilla/websocket"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // NodeRecordManager manages the node status retrieval process
  15. type NodeRecordManager struct {
  16. ctx context.Context
  17. cancel context.CancelFunc
  18. wg sync.WaitGroup
  19. mu sync.Mutex
  20. }
  21. type RetryConfig struct {
  22. BaseInterval time.Duration
  23. MaxInterval time.Duration
  24. MaxRetries int
  25. BackoffMultiple float64
  26. }
  27. var defaultRetryConfig = RetryConfig{
  28. BaseInterval: 5 * time.Second,
  29. MaxInterval: 30 * time.Second,
  30. MaxRetries: 10,
  31. BackoffMultiple: 1.5,
  32. }
  33. type NodeRetryState struct {
  34. FailureCount int
  35. LastSuccess time.Time
  36. NextRetry time.Time
  37. }
  38. var (
  39. retryStates = make(map[uint64]*NodeRetryState)
  40. retryMutex sync.Mutex
  41. )
  42. func getRetryState(nodeID uint64) *NodeRetryState {
  43. retryMutex.Lock()
  44. defer retryMutex.Unlock()
  45. if state, exists := retryStates[nodeID]; exists {
  46. return state
  47. }
  48. state := &NodeRetryState{LastSuccess: time.Now(), NextRetry: time.Now()}
  49. retryStates[nodeID] = state
  50. return state
  51. }
  52. // updateNodeStatus directly updates node status without condition checks
  53. func updateNodeStatus(nodeID uint64, status bool, reason string) {
  54. mutex.Lock()
  55. defer mutex.Unlock()
  56. now := time.Now()
  57. if NodeMap[nodeID] == nil {
  58. NodeMap[nodeID] = &Node{NodeStat: NodeStat{}}
  59. }
  60. NodeMap[nodeID].Status = status
  61. NodeMap[nodeID].ResponseAt = now
  62. }
  63. func calculateNextRetryInterval(failureCount int) time.Duration {
  64. if failureCount == 0 {
  65. return defaultRetryConfig.BaseInterval
  66. }
  67. interval := defaultRetryConfig.BaseInterval
  68. for i := 1; i < failureCount; i++ {
  69. interval = time.Duration(float64(interval) * defaultRetryConfig.BackoffMultiple)
  70. if interval > defaultRetryConfig.MaxInterval {
  71. return defaultRetryConfig.MaxInterval
  72. }
  73. }
  74. return interval
  75. }
  76. func shouldRetry(nodeID uint64) bool {
  77. state := getRetryState(nodeID)
  78. now := time.Now()
  79. if state.FailureCount >= defaultRetryConfig.MaxRetries {
  80. if now.Sub(state.LastSuccess) < 30*time.Second {
  81. state.FailureCount = 0
  82. state.NextRetry = now
  83. return true
  84. }
  85. if now.Before(state.NextRetry) {
  86. return false
  87. }
  88. state.FailureCount = defaultRetryConfig.MaxRetries / 2
  89. state.NextRetry = now
  90. return true
  91. }
  92. return !now.Before(state.NextRetry)
  93. }
  94. func markConnectionFailure(nodeID uint64, err error) {
  95. state := getRetryState(nodeID)
  96. state.FailureCount++
  97. state.NextRetry = time.Now().Add(calculateNextRetryInterval(state.FailureCount))
  98. updateNodeStatus(nodeID, false, "connection_failed")
  99. }
  100. func markConnectionSuccess(nodeID uint64) {
  101. state := getRetryState(nodeID)
  102. state.FailureCount = 0
  103. state.LastSuccess = time.Now()
  104. state.NextRetry = time.Now()
  105. updateNodeStatus(nodeID, true, "connection_success")
  106. }
  107. func logCurrentNodeStatus(prefix string) {
  108. mutex.Lock()
  109. defer mutex.Unlock()
  110. if NodeMap != nil {
  111. logger.Debugf("%s: NodeMap contains %d nodes", prefix, len(NodeMap))
  112. }
  113. }
  114. func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
  115. ctx, cancel := context.WithCancel(parentCtx)
  116. return &NodeRecordManager{ctx: ctx, cancel: cancel}
  117. }
  118. func (m *NodeRecordManager) Start() {
  119. m.mu.Lock()
  120. defer m.mu.Unlock()
  121. m.wg.Add(1)
  122. go func() {
  123. defer m.wg.Done()
  124. RetrieveNodesStatus(m.ctx)
  125. }()
  126. }
  127. func (m *NodeRecordManager) Stop() {
  128. m.mu.Lock()
  129. defer m.mu.Unlock()
  130. m.cancel()
  131. m.wg.Wait()
  132. }
  133. func (m *NodeRecordManager) Restart() {
  134. m.Stop()
  135. m.ctx, m.cancel = context.WithCancel(context.Background())
  136. m.Start()
  137. }
  138. var (
  139. defaultManager *NodeRecordManager
  140. restartMu sync.Mutex
  141. )
  142. func InitDefaultManager() {
  143. if defaultManager != nil {
  144. defaultManager.Stop()
  145. }
  146. defaultManager = NewNodeRecordManager(context.Background())
  147. defaultManager.Start()
  148. }
  149. func RestartRetrieveNodesStatus() {
  150. restartMu.Lock()
  151. defer restartMu.Unlock()
  152. if defaultManager == nil {
  153. InitDefaultManager()
  154. } else {
  155. defaultManager.Restart()
  156. }
  157. }
  158. func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
  159. manager := NewNodeRecordManager(ctx)
  160. manager.Start()
  161. return manager
  162. }
  163. func StartDefaultManager() {
  164. restartMu.Lock()
  165. defer restartMu.Unlock()
  166. if defaultManager != nil {
  167. defaultManager.Restart()
  168. } else {
  169. InitDefaultManager()
  170. }
  171. }
  172. func cleanupDisabledNodes(enabledEnvIDs []uint64) {
  173. enabledMap := make(map[uint64]bool)
  174. for _, id := range enabledEnvIDs {
  175. enabledMap[id] = true
  176. }
  177. retryMutex.Lock()
  178. for envID := range retryStates {
  179. if !enabledMap[envID] {
  180. delete(retryStates, envID)
  181. }
  182. }
  183. retryMutex.Unlock()
  184. mutex.Lock()
  185. for envID := range NodeMap {
  186. if !enabledMap[envID] {
  187. delete(NodeMap, envID)
  188. }
  189. }
  190. mutex.Unlock()
  191. }
  192. func checkNodeStillEnabled(nodeID uint64) bool {
  193. nodeQuery := query.Node
  194. node, err := nodeQuery.Where(nodeQuery.ID.Eq(nodeID), nodeQuery.Enabled.Is(true)).First()
  195. return err == nil && node != nil
  196. }
  197. func RetrieveNodesStatus(ctx context.Context) {
  198. logger.Info("RetrieveNodesStatus start")
  199. defer logger.Info("RetrieveNodesStatus exited")
  200. mutex.Lock()
  201. if NodeMap == nil {
  202. NodeMap = make(TNodeMap)
  203. }
  204. mutex.Unlock()
  205. envCheckTicker := time.NewTicker(30 * time.Second)
  206. defer envCheckTicker.Stop()
  207. timeoutCheckTicker := time.NewTicker(10 * time.Second)
  208. defer timeoutCheckTicker.Stop()
  209. nodeQuery := query.Node
  210. nodes, err := nodeQuery.Where(nodeQuery.Enabled.Is(true)).Find()
  211. if err != nil {
  212. logger.Error(err)
  213. return
  214. }
  215. var enabledNodeIDs []uint64
  216. for _, n := range nodes {
  217. enabledNodeIDs = append(enabledNodeIDs, n.ID)
  218. }
  219. cleanupDisabledNodes(enabledNodeIDs)
  220. var wg sync.WaitGroup
  221. defer wg.Wait()
  222. // Channel to signal when nodes list changes
  223. nodeUpdateChan := make(chan []uint64, 1)
  224. wg.Add(1)
  225. go func() {
  226. defer wg.Done()
  227. for {
  228. select {
  229. case <-ctx.Done():
  230. return
  231. case <-timeoutCheckTicker.C:
  232. checkNodeTimeouts(2 * time.Minute)
  233. case <-envCheckTicker.C:
  234. currentNodes, err := nodeQuery.Where(nodeQuery.Enabled.Is(true)).Find()
  235. if err != nil {
  236. logger.Error("Failed to re-query nodes:", err)
  237. continue
  238. }
  239. var currentEnabledIDs []uint64
  240. for _, n := range currentNodes {
  241. currentEnabledIDs = append(currentEnabledIDs, n.ID)
  242. }
  243. if !equalUint64Slices(enabledNodeIDs, currentEnabledIDs) {
  244. cleanupDisabledNodes(currentEnabledIDs)
  245. enabledNodeIDs = currentEnabledIDs
  246. select {
  247. case nodeUpdateChan <- currentEnabledIDs:
  248. default:
  249. }
  250. }
  251. }
  252. }
  253. }()
  254. for _, node := range nodes {
  255. wg.Add(1)
  256. go func(n *model.Node) {
  257. defer wg.Done()
  258. retryTicker := time.NewTicker(1 * time.Second)
  259. defer retryTicker.Stop()
  260. for {
  261. select {
  262. case <-ctx.Done():
  263. return
  264. case newEnabledIDs := <-nodeUpdateChan:
  265. found := false
  266. for _, id := range newEnabledIDs {
  267. if id == n.ID {
  268. found = true
  269. break
  270. }
  271. }
  272. if !found {
  273. return
  274. }
  275. case <-retryTicker.C:
  276. if !checkNodeStillEnabled(n.ID) {
  277. retryMutex.Lock()
  278. delete(retryStates, n.ID)
  279. retryMutex.Unlock()
  280. return
  281. }
  282. if !shouldRetry(n.ID) {
  283. continue
  284. }
  285. if err := nodeAnalyticRecord(n, ctx); err != nil {
  286. logger.Error(err)
  287. markConnectionFailure(n.ID, err)
  288. } else {
  289. markConnectionSuccess(n.ID)
  290. }
  291. }
  292. }
  293. }(node)
  294. }
  295. }
  296. func checkNodeTimeouts(timeout time.Duration) {
  297. mutex.Lock()
  298. defer mutex.Unlock()
  299. now := time.Now()
  300. for _, node := range NodeMap {
  301. if node != nil && node.Status && now.Sub(node.ResponseAt) > timeout {
  302. node.Status = false
  303. node.ResponseAt = now
  304. }
  305. }
  306. }
  307. // equalUint64Slices compares two uint64 slices for equality
  308. func equalUint64Slices(a, b []uint64) bool {
  309. if len(a) != len(b) {
  310. return false
  311. }
  312. // Create maps for comparison
  313. mapA := make(map[uint64]bool)
  314. mapB := make(map[uint64]bool)
  315. for _, v := range a {
  316. mapA[v] = true
  317. }
  318. for _, v := range b {
  319. mapB[v] = true
  320. }
  321. // Compare maps
  322. for k := range mapA {
  323. if !mapB[k] {
  324. return false
  325. }
  326. }
  327. for k := range mapB {
  328. if !mapA[k] {
  329. return false
  330. }
  331. }
  332. return true
  333. }
  334. func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
  335. scopeCtx, cancel := context.WithCancel(ctx)
  336. defer cancel()
  337. node, err := InitNode(nodeModel)
  338. if err != nil {
  339. mutex.Lock()
  340. if NodeMap[nodeModel.ID] == nil {
  341. NodeMap[nodeModel.ID] = &Node{
  342. Node: nodeModel,
  343. NodeStat: NodeStat{Status: false, ResponseAt: time.Now()},
  344. }
  345. } else {
  346. NodeMap[nodeModel.ID].Status = false
  347. NodeMap[nodeModel.ID].ResponseAt = time.Now()
  348. }
  349. mutex.Unlock()
  350. return err
  351. }
  352. mutex.Lock()
  353. NodeMap[nodeModel.ID] = node
  354. mutex.Unlock()
  355. u, err := nodeModel.GetWebSocketURL("/api/analytic/intro")
  356. if err != nil {
  357. return err
  358. }
  359. header := http.Header{}
  360. header.Set("X-Node-Secret", nodeModel.Token)
  361. dial := &websocket.Dialer{
  362. Proxy: http.ProxyFromEnvironment,
  363. HandshakeTimeout: 5 * time.Second,
  364. }
  365. c, _, err := dial.Dial(u, header)
  366. if err != nil {
  367. updateNodeStatus(nodeModel.ID, false, "websocket_dial_failed")
  368. return err
  369. }
  370. defer func() {
  371. c.Close()
  372. updateNodeStatus(nodeModel.ID, false, "websocket_connection_closed")
  373. }()
  374. go func() {
  375. select {
  376. case <-scopeCtx.Done():
  377. _ = c.Close()
  378. case <-ctx.Done():
  379. _ = c.Close()
  380. }
  381. }()
  382. for {
  383. select {
  384. case <-scopeCtx.Done():
  385. return ctx.Err()
  386. case <-ctx.Done():
  387. return ctx.Err()
  388. default:
  389. }
  390. var rawMsg json.RawMessage
  391. err = c.ReadJSON(&rawMsg)
  392. if err != nil {
  393. if helper.IsUnexpectedWebsocketError(err) {
  394. updateNodeStatus(nodeModel.ID, false, "websocket_error")
  395. return err
  396. }
  397. return nil
  398. }
  399. mutex.Lock()
  400. if NodeMap[nodeModel.ID] == nil {
  401. NodeMap[nodeModel.ID] = &Node{
  402. Node: nodeModel,
  403. NodeStat: NodeStat{Status: true, ResponseAt: time.Now()},
  404. }
  405. } else {
  406. var fullNode Node
  407. if err := json.Unmarshal(rawMsg, &fullNode); err == nil && fullNode.Version != "" {
  408. NodeMap[nodeModel.ID].NodeInfo = fullNode.NodeInfo
  409. NodeMap[nodeModel.ID].NodeStat = fullNode.NodeStat
  410. } else {
  411. var nodeStat NodeStat
  412. if err := json.Unmarshal(rawMsg, &nodeStat); err == nil {
  413. NodeMap[nodeModel.ID].NodeStat = nodeStat
  414. }
  415. }
  416. NodeMap[nodeModel.ID].Status = true
  417. NodeMap[nodeModel.ID].ResponseAt = time.Now()
  418. }
  419. mutex.Unlock()
  420. }
  421. }