node_record.go 11 KB


  1. package analytic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/cache"
  9. "github.com/0xJacky/Nginx-UI/internal/helper"
  10. "github.com/0xJacky/Nginx-UI/model"
  11. "github.com/0xJacky/Nginx-UI/query"
  12. "github.com/gorilla/websocket"
  13. "github.com/uozi-tech/cosy/logger"
  14. )
  15. // nodeCache contains both slice and map for efficient access
  16. type nodeCache struct {
  17. Nodes []*model.Node // For iteration
  18. NodeMap map[uint64]*model.Node // For fast lookup by ID
  19. }
  20. // NodeRecordManager manages the node status retrieval process
  21. type NodeRecordManager struct {
  22. ctx context.Context
  23. cancel context.CancelFunc
  24. wg sync.WaitGroup
  25. mu sync.Mutex
  26. }
  27. type RetryConfig struct {
  28. BaseInterval time.Duration
  29. MaxInterval time.Duration
  30. MaxRetries int
  31. BackoffMultiple float64
  32. }
  33. var defaultRetryConfig = RetryConfig{
  34. BaseInterval: 5 * time.Second,
  35. MaxInterval: 30 * time.Second,
  36. MaxRetries: 10,
  37. BackoffMultiple: 1.5,
  38. }
  39. type NodeRetryState struct {
  40. FailureCount int
  41. LastSuccess time.Time
  42. NextRetry time.Time
  43. }
  44. var (
  45. retryStates = make(map[uint64]*NodeRetryState)
  46. retryMutex sync.Mutex
  47. )
  48. func getRetryState(nodeID uint64) *NodeRetryState {
  49. retryMutex.Lock()
  50. defer retryMutex.Unlock()
  51. if state, exists := retryStates[nodeID]; exists {
  52. return state
  53. }
  54. state := &NodeRetryState{LastSuccess: time.Now(), NextRetry: time.Now()}
  55. retryStates[nodeID] = state
  56. return state
  57. }
  58. // updateNodeStatus directly updates node status without condition checks
  59. func updateNodeStatus(nodeID uint64, status bool, reason string) {
  60. mutex.Lock()
  61. defer mutex.Unlock()
  62. now := time.Now()
  63. if NodeMap[nodeID] == nil {
  64. NodeMap[nodeID] = &Node{NodeStat: NodeStat{}}
  65. }
  66. NodeMap[nodeID].Status = status
  67. NodeMap[nodeID].ResponseAt = now
  68. }
  69. func calculateNextRetryInterval(failureCount int) time.Duration {
  70. if failureCount == 0 {
  71. return defaultRetryConfig.BaseInterval
  72. }
  73. interval := defaultRetryConfig.BaseInterval
  74. for i := 1; i < failureCount; i++ {
  75. interval = time.Duration(float64(interval) * defaultRetryConfig.BackoffMultiple)
  76. if interval > defaultRetryConfig.MaxInterval {
  77. return defaultRetryConfig.MaxInterval
  78. }
  79. }
  80. return interval
  81. }
  82. func shouldRetry(nodeID uint64) bool {
  83. state := getRetryState(nodeID)
  84. now := time.Now()
  85. if state.FailureCount >= defaultRetryConfig.MaxRetries {
  86. if now.Sub(state.LastSuccess) < 30*time.Second {
  87. state.FailureCount = 0
  88. state.NextRetry = now
  89. return true
  90. }
  91. if now.Before(state.NextRetry) {
  92. return false
  93. }
  94. state.FailureCount = defaultRetryConfig.MaxRetries / 2
  95. state.NextRetry = now
  96. return true
  97. }
  98. return !now.Before(state.NextRetry)
  99. }
  100. func markConnectionFailure(nodeID uint64, err error) {
  101. state := getRetryState(nodeID)
  102. state.FailureCount++
  103. state.NextRetry = time.Now().Add(calculateNextRetryInterval(state.FailureCount))
  104. updateNodeStatus(nodeID, false, "connection_failed")
  105. }
  106. func markConnectionSuccess(nodeID uint64) {
  107. state := getRetryState(nodeID)
  108. state.FailureCount = 0
  109. state.LastSuccess = time.Now()
  110. state.NextRetry = time.Now()
  111. updateNodeStatus(nodeID, true, "connection_success")
  112. }
  113. func logCurrentNodeStatus(prefix string) {
  114. mutex.Lock()
  115. defer mutex.Unlock()
  116. if NodeMap != nil {
  117. logger.Debugf("%s: NodeMap contains %d nodes", prefix, len(NodeMap))
  118. }
  119. }
  120. func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
  121. ctx, cancel := context.WithCancel(parentCtx)
  122. return &NodeRecordManager{ctx: ctx, cancel: cancel}
  123. }
  124. func (m *NodeRecordManager) Start() {
  125. m.mu.Lock()
  126. defer m.mu.Unlock()
  127. m.wg.Add(1)
  128. go func() {
  129. defer m.wg.Done()
  130. RetrieveNodesStatus(m.ctx)
  131. }()
  132. }
  133. func (m *NodeRecordManager) Stop() {
  134. m.mu.Lock()
  135. defer m.mu.Unlock()
  136. m.cancel()
  137. m.wg.Wait()
  138. }
  139. func (m *NodeRecordManager) Restart() {
  140. m.Stop()
  141. m.ctx, m.cancel = context.WithCancel(context.Background())
  142. m.Start()
  143. }
  144. var (
  145. defaultManager *NodeRecordManager
  146. restartMu sync.Mutex
  147. )
  148. func InitDefaultManager() {
  149. if defaultManager != nil {
  150. defaultManager.Stop()
  151. }
  152. defaultManager = NewNodeRecordManager(context.Background())
  153. defaultManager.Start()
  154. }
  155. func RestartRetrieveNodesStatus() {
  156. restartMu.Lock()
  157. defer restartMu.Unlock()
  158. if defaultManager == nil {
  159. InitDefaultManager()
  160. } else {
  161. defaultManager.Restart()
  162. }
  163. }
  164. func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
  165. manager := NewNodeRecordManager(ctx)
  166. manager.Start()
  167. return manager
  168. }
  169. func StartDefaultManager() {
  170. restartMu.Lock()
  171. defer restartMu.Unlock()
  172. if defaultManager != nil {
  173. defaultManager.Restart()
  174. } else {
  175. InitDefaultManager()
  176. }
  177. }
  178. func cleanupDisabledNodes(enabledEnvIDs []uint64) {
  179. enabledMap := make(map[uint64]bool)
  180. for _, id := range enabledEnvIDs {
  181. enabledMap[id] = true
  182. }
  183. retryMutex.Lock()
  184. for envID := range retryStates {
  185. if !enabledMap[envID] {
  186. delete(retryStates, envID)
  187. }
  188. }
  189. retryMutex.Unlock()
  190. mutex.Lock()
  191. for envID := range NodeMap {
  192. if !enabledMap[envID] {
  193. delete(NodeMap, envID)
  194. }
  195. }
  196. mutex.Unlock()
  197. }
  198. // getEnabledNodes retrieves enabled nodes from cache or database
  199. func getEnabledNodes() ([]*model.Node, error) {
  200. if cached, found := cache.GetCachedNodes(); found {
  201. if nc, ok := cached.(*nodeCache); ok {
  202. return nc.Nodes, nil
  203. }
  204. }
  205. nodeQuery := query.Node
  206. nodes, err := nodeQuery.Where(nodeQuery.Enabled.Is(true)).Find()
  207. if err != nil {
  208. logger.Error("Failed to query enabled nodes:", err)
  209. return nil, err
  210. }
  211. // Create cache with both slice and map
  212. nodeMap := make(map[uint64]*model.Node, len(nodes))
  213. for _, node := range nodes {
  214. nodeMap[node.ID] = node
  215. }
  216. nc := &nodeCache{
  217. Nodes: nodes,
  218. NodeMap: nodeMap,
  219. }
  220. cache.SetCachedNodes(nc)
  221. return nodes, nil
  222. }
  223. // isNodeEnabled checks if a node is enabled using cached map for O(1) lookup
  224. func isNodeEnabled(nodeID uint64) bool {
  225. if cached, found := cache.GetCachedNodes(); found {
  226. if nc, ok := cached.(*nodeCache); ok {
  227. _, exists := nc.NodeMap[nodeID]
  228. return exists
  229. }
  230. }
  231. // Fallback: load cache and check again
  232. _, err := getEnabledNodes()
  233. if err != nil {
  234. return false
  235. }
  236. if cached, found := cache.GetCachedNodes(); found {
  237. if nc, ok := cached.(*nodeCache); ok {
  238. _, exists := nc.NodeMap[nodeID]
  239. return exists
  240. }
  241. }
  242. return false
  243. }
  244. func RetrieveNodesStatus(ctx context.Context) {
  245. logger.Info("RetrieveNodesStatus start")
  246. defer logger.Info("RetrieveNodesStatus exited")
  247. mutex.Lock()
  248. if NodeMap == nil {
  249. NodeMap = make(TNodeMap)
  250. }
  251. mutex.Unlock()
  252. envCheckTicker := time.NewTicker(30 * time.Second)
  253. defer envCheckTicker.Stop()
  254. timeoutCheckTicker := time.NewTicker(10 * time.Second)
  255. defer timeoutCheckTicker.Stop()
  256. nodes, err := getEnabledNodes()
  257. if err != nil {
  258. logger.Error(err)
  259. return
  260. }
  261. var enabledNodeIDs []uint64
  262. for _, n := range nodes {
  263. enabledNodeIDs = append(enabledNodeIDs, n.ID)
  264. }
  265. cleanupDisabledNodes(enabledNodeIDs)
  266. var wg sync.WaitGroup
  267. defer wg.Wait()
  268. // Channel to signal when nodes list changes
  269. nodeUpdateChan := make(chan []uint64, 1)
  270. wg.Add(1)
  271. go func() {
  272. defer wg.Done()
  273. for {
  274. select {
  275. case <-ctx.Done():
  276. return
  277. case <-timeoutCheckTicker.C:
  278. checkNodeTimeouts(2 * time.Minute)
  279. case <-envCheckTicker.C:
  280. currentNodes, err := getEnabledNodes()
  281. if err != nil {
  282. logger.Error("Failed to re-query nodes:", err)
  283. continue
  284. }
  285. var currentEnabledIDs []uint64
  286. for _, n := range currentNodes {
  287. currentEnabledIDs = append(currentEnabledIDs, n.ID)
  288. }
  289. if !equalUint64Slices(enabledNodeIDs, currentEnabledIDs) {
  290. cleanupDisabledNodes(currentEnabledIDs)
  291. enabledNodeIDs = currentEnabledIDs
  292. select {
  293. case nodeUpdateChan <- currentEnabledIDs:
  294. default:
  295. }
  296. }
  297. }
  298. }
  299. }()
  300. for _, node := range nodes {
  301. wg.Add(1)
  302. go func(n *model.Node) {
  303. defer wg.Done()
  304. retryTicker := time.NewTicker(1 * time.Second)
  305. defer retryTicker.Stop()
  306. for {
  307. select {
  308. case <-ctx.Done():
  309. return
  310. case newEnabledIDs := <-nodeUpdateChan:
  311. found := false
  312. for _, id := range newEnabledIDs {
  313. if id == n.ID {
  314. found = true
  315. break
  316. }
  317. }
  318. if !found {
  319. return
  320. }
  321. case <-retryTicker.C:
  322. if !isNodeEnabled(n.ID) {
  323. retryMutex.Lock()
  324. delete(retryStates, n.ID)
  325. retryMutex.Unlock()
  326. return
  327. }
  328. if !shouldRetry(n.ID) {
  329. continue
  330. }
  331. if err := nodeAnalyticRecord(n, ctx); err != nil {
  332. if helper.IsUnexpectedWebsocketError(err) {
  333. logger.Error(err)
  334. }
  335. markConnectionFailure(n.ID, err)
  336. } else {
  337. markConnectionSuccess(n.ID)
  338. }
  339. }
  340. }
  341. }(node)
  342. }
  343. }
  344. func checkNodeTimeouts(timeout time.Duration) {
  345. mutex.Lock()
  346. defer mutex.Unlock()
  347. now := time.Now()
  348. for _, node := range NodeMap {
  349. if node != nil && node.Status && now.Sub(node.ResponseAt) > timeout {
  350. node.Status = false
  351. node.ResponseAt = now
  352. }
  353. }
  354. }
  355. // equalUint64Slices compares two uint64 slices for equality
  356. func equalUint64Slices(a, b []uint64) bool {
  357. if len(a) != len(b) {
  358. return false
  359. }
  360. // Create maps for comparison
  361. mapA := make(map[uint64]bool)
  362. mapB := make(map[uint64]bool)
  363. for _, v := range a {
  364. mapA[v] = true
  365. }
  366. for _, v := range b {
  367. mapB[v] = true
  368. }
  369. // Compare maps
  370. for k := range mapA {
  371. if !mapB[k] {
  372. return false
  373. }
  374. }
  375. for k := range mapB {
  376. if !mapA[k] {
  377. return false
  378. }
  379. }
  380. return true
  381. }
  382. func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
  383. scopeCtx, cancel := context.WithCancel(ctx)
  384. defer cancel()
  385. node, err := InitNode(nodeModel)
  386. if err != nil {
  387. mutex.Lock()
  388. if NodeMap[nodeModel.ID] == nil {
  389. NodeMap[nodeModel.ID] = &Node{
  390. Node: nodeModel,
  391. NodeStat: NodeStat{Status: false, ResponseAt: time.Now()},
  392. }
  393. } else {
  394. NodeMap[nodeModel.ID].Status = false
  395. NodeMap[nodeModel.ID].ResponseAt = time.Now()
  396. }
  397. mutex.Unlock()
  398. return err
  399. }
  400. mutex.Lock()
  401. NodeMap[nodeModel.ID] = node
  402. mutex.Unlock()
  403. u, err := nodeModel.GetWebSocketURL("/api/analytic/intro")
  404. if err != nil {
  405. return err
  406. }
  407. header := http.Header{}
  408. header.Set("X-Node-Secret", nodeModel.Token)
  409. dial := &websocket.Dialer{
  410. Proxy: http.ProxyFromEnvironment,
  411. HandshakeTimeout: 5 * time.Second,
  412. }
  413. c, _, err := dial.Dial(u, header)
  414. if err != nil {
  415. updateNodeStatus(nodeModel.ID, false, "websocket_dial_failed")
  416. return err
  417. }
  418. defer func() {
  419. c.Close()
  420. updateNodeStatus(nodeModel.ID, false, "websocket_connection_closed")
  421. }()
  422. go func() {
  423. select {
  424. case <-scopeCtx.Done():
  425. _ = c.Close()
  426. case <-ctx.Done():
  427. _ = c.Close()
  428. }
  429. }()
  430. for {
  431. select {
  432. case <-scopeCtx.Done():
  433. return ctx.Err()
  434. case <-ctx.Done():
  435. return ctx.Err()
  436. default:
  437. }
  438. var rawMsg json.RawMessage
  439. err = c.ReadJSON(&rawMsg)
  440. if err != nil {
  441. if helper.IsUnexpectedWebsocketError(err) {
  442. updateNodeStatus(nodeModel.ID, false, "websocket_error")
  443. return err
  444. }
  445. return nil
  446. }
  447. mutex.Lock()
  448. if NodeMap[nodeModel.ID] == nil {
  449. NodeMap[nodeModel.ID] = &Node{
  450. Node: nodeModel,
  451. NodeStat: NodeStat{Status: true, ResponseAt: time.Now()},
  452. }
  453. } else {
  454. var fullNode Node
  455. if err := json.Unmarshal(rawMsg, &fullNode); err == nil && fullNode.Version != "" {
  456. NodeMap[nodeModel.ID].NodeInfo = fullNode.NodeInfo
  457. NodeMap[nodeModel.ID].NodeStat = fullNode.NodeStat
  458. } else {
  459. var nodeStat NodeStat
  460. if err := json.Unmarshal(rawMsg, &nodeStat); err == nil {
  461. NodeMap[nodeModel.ID].NodeStat = nodeStat
  462. }
  463. }
  464. NodeMap[nodeModel.ID].Status = true
  465. NodeMap[nodeModel.ID].ResponseAt = time.Now()
  466. }
  467. mutex.Unlock()
  468. }
  469. }