node_record.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. package analytic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/cache"
  9. "github.com/0xJacky/Nginx-UI/internal/helper"
  10. "github.com/0xJacky/Nginx-UI/model"
  11. "github.com/0xJacky/Nginx-UI/query"
  12. "github.com/gorilla/websocket"
  13. "github.com/uozi-tech/cosy/logger"
  14. )
  15. // nodeCache contains both slice and map for efficient access
  16. type nodeCache struct {
  17. Nodes []*model.Node // For iteration
  18. NodeMap map[uint64]*model.Node // For fast lookup by ID
  19. }
  20. // NodeRecordManager manages the node status retrieval process
  21. type NodeRecordManager struct {
  22. ctx context.Context
  23. cancel context.CancelFunc
  24. wg sync.WaitGroup
  25. mu sync.Mutex
  26. }
  27. type RetryConfig struct {
  28. BaseInterval time.Duration
  29. MaxInterval time.Duration
  30. MaxRetries int
  31. BackoffMultiple float64
  32. }
  33. var defaultRetryConfig = RetryConfig{
  34. BaseInterval: 5 * time.Second,
  35. MaxInterval: 30 * time.Second,
  36. MaxRetries: 10,
  37. BackoffMultiple: 1.5,
  38. }
  39. type NodeRetryState struct {
  40. FailureCount int
  41. LastSuccess time.Time
  42. NextRetry time.Time
  43. }
  44. var (
  45. retryStates = make(map[uint64]*NodeRetryState)
  46. retryMutex sync.Mutex
  47. )
  48. func getRetryState(nodeID uint64) *NodeRetryState {
  49. retryMutex.Lock()
  50. defer retryMutex.Unlock()
  51. if state, exists := retryStates[nodeID]; exists {
  52. return state
  53. }
  54. state := &NodeRetryState{LastSuccess: time.Now(), NextRetry: time.Now()}
  55. retryStates[nodeID] = state
  56. return state
  57. }
  58. // updateNodeStatus directly updates node status without condition checks
  59. func updateNodeStatus(nodeID uint64, status bool, reason string) {
  60. mutex.Lock()
  61. defer mutex.Unlock()
  62. now := time.Now()
  63. if NodeMap[nodeID] == nil {
  64. NodeMap[nodeID] = &Node{NodeStat: NodeStat{}}
  65. }
  66. NodeMap[nodeID].Status = status
  67. NodeMap[nodeID].ResponseAt = now
  68. }
  69. func calculateNextRetryInterval(failureCount int) time.Duration {
  70. if failureCount == 0 {
  71. return defaultRetryConfig.BaseInterval
  72. }
  73. interval := defaultRetryConfig.BaseInterval
  74. for i := 1; i < failureCount; i++ {
  75. interval = time.Duration(float64(interval) * defaultRetryConfig.BackoffMultiple)
  76. if interval > defaultRetryConfig.MaxInterval {
  77. return defaultRetryConfig.MaxInterval
  78. }
  79. }
  80. return interval
  81. }
  82. func shouldRetry(nodeID uint64) bool {
  83. state := getRetryState(nodeID)
  84. now := time.Now()
  85. if state.FailureCount >= defaultRetryConfig.MaxRetries {
  86. if now.Sub(state.LastSuccess) < 30*time.Second {
  87. state.FailureCount = 0
  88. state.NextRetry = now
  89. return true
  90. }
  91. if now.Before(state.NextRetry) {
  92. return false
  93. }
  94. state.FailureCount = defaultRetryConfig.MaxRetries / 2
  95. state.NextRetry = now
  96. return true
  97. }
  98. return !now.Before(state.NextRetry)
  99. }
  100. func markConnectionFailure(nodeID uint64, err error) {
  101. state := getRetryState(nodeID)
  102. state.FailureCount++
  103. state.NextRetry = time.Now().Add(calculateNextRetryInterval(state.FailureCount))
  104. updateNodeStatus(nodeID, false, "connection_failed")
  105. }
  106. func markConnectionSuccess(nodeID uint64) {
  107. state := getRetryState(nodeID)
  108. state.FailureCount = 0
  109. state.LastSuccess = time.Now()
  110. state.NextRetry = time.Now()
  111. updateNodeStatus(nodeID, true, "connection_success")
  112. }
  113. func logCurrentNodeStatus(prefix string) {
  114. mutex.Lock()
  115. defer mutex.Unlock()
  116. if NodeMap != nil {
  117. logger.Debugf("%s: NodeMap contains %d nodes", prefix, len(NodeMap))
  118. }
  119. }
  120. func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
  121. ctx, cancel := context.WithCancel(parentCtx)
  122. return &NodeRecordManager{ctx: ctx, cancel: cancel}
  123. }
  124. func (m *NodeRecordManager) Start() {
  125. m.mu.Lock()
  126. defer m.mu.Unlock()
  127. m.wg.Add(1)
  128. go func() {
  129. defer m.wg.Done()
  130. RetrieveNodesStatus(m.ctx)
  131. }()
  132. }
  133. func (m *NodeRecordManager) Stop() {
  134. m.mu.Lock()
  135. defer m.mu.Unlock()
  136. m.cancel()
  137. m.wg.Wait()
  138. }
  139. func (m *NodeRecordManager) Restart() {
  140. m.Stop()
  141. m.ctx, m.cancel = context.WithCancel(context.Background())
  142. m.Start()
  143. }
  144. var (
  145. defaultManager *NodeRecordManager
  146. restartMu sync.Mutex
  147. )
  148. func InitDefaultManager() {
  149. if defaultManager != nil {
  150. defaultManager.Stop()
  151. }
  152. defaultManager = NewNodeRecordManager(context.Background())
  153. defaultManager.Start()
  154. }
  155. func RestartRetrieveNodesStatus() {
  156. restartMu.Lock()
  157. defer restartMu.Unlock()
  158. if defaultManager == nil {
  159. InitDefaultManager()
  160. } else {
  161. defaultManager.Restart()
  162. }
  163. }
  164. func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
  165. manager := NewNodeRecordManager(ctx)
  166. manager.Start()
  167. return manager
  168. }
  169. func StartDefaultManager() {
  170. restartMu.Lock()
  171. defer restartMu.Unlock()
  172. if defaultManager != nil {
  173. defaultManager.Restart()
  174. } else {
  175. InitDefaultManager()
  176. }
  177. }
  178. func cleanupDisabledNodes(enabledEnvIDs []uint64) {
  179. enabledMap := make(map[uint64]bool)
  180. for _, id := range enabledEnvIDs {
  181. enabledMap[id] = true
  182. }
  183. retryMutex.Lock()
  184. for envID := range retryStates {
  185. if !enabledMap[envID] {
  186. delete(retryStates, envID)
  187. }
  188. }
  189. retryMutex.Unlock()
  190. mutex.Lock()
  191. for envID := range NodeMap {
  192. if !enabledMap[envID] {
  193. delete(NodeMap, envID)
  194. }
  195. }
  196. mutex.Unlock()
  197. }
  198. // getEnabledNodes retrieves enabled nodes from cache or database
  199. func getEnabledNodes() ([]*model.Node, error) {
  200. if cached, found := cache.GetCachedNodes(); found {
  201. if nc, ok := cached.(*nodeCache); ok {
  202. return nc.Nodes, nil
  203. }
  204. }
  205. nodeQuery := query.Node
  206. nodes, err := nodeQuery.Where(nodeQuery.Enabled.Is(true)).Find()
  207. if err != nil {
  208. logger.Error("Failed to query enabled nodes:", err)
  209. return nil, err
  210. }
  211. // Create cache with both slice and map
  212. nodeMap := make(map[uint64]*model.Node, len(nodes))
  213. for _, node := range nodes {
  214. nodeMap[node.ID] = node
  215. }
  216. nc := &nodeCache{
  217. Nodes: nodes,
  218. NodeMap: nodeMap,
  219. }
  220. cache.SetCachedNodes(nc)
  221. logger.Debug("Queried and cached %d enabled nodes", len(nodes))
  222. return nodes, nil
  223. }
  224. // isNodeEnabled checks if a node is enabled using cached map for O(1) lookup
  225. func isNodeEnabled(nodeID uint64) bool {
  226. if cached, found := cache.GetCachedNodes(); found {
  227. if nc, ok := cached.(*nodeCache); ok {
  228. _, exists := nc.NodeMap[nodeID]
  229. return exists
  230. }
  231. }
  232. // Fallback: load cache and check again
  233. _, err := getEnabledNodes()
  234. if err != nil {
  235. return false
  236. }
  237. if cached, found := cache.GetCachedNodes(); found {
  238. if nc, ok := cached.(*nodeCache); ok {
  239. _, exists := nc.NodeMap[nodeID]
  240. return exists
  241. }
  242. }
  243. return false
  244. }
  245. func RetrieveNodesStatus(ctx context.Context) {
  246. logger.Info("RetrieveNodesStatus start")
  247. defer logger.Info("RetrieveNodesStatus exited")
  248. mutex.Lock()
  249. if NodeMap == nil {
  250. NodeMap = make(TNodeMap)
  251. }
  252. mutex.Unlock()
  253. envCheckTicker := time.NewTicker(30 * time.Second)
  254. defer envCheckTicker.Stop()
  255. timeoutCheckTicker := time.NewTicker(10 * time.Second)
  256. defer timeoutCheckTicker.Stop()
  257. nodes, err := getEnabledNodes()
  258. if err != nil {
  259. logger.Error(err)
  260. return
  261. }
  262. var enabledNodeIDs []uint64
  263. for _, n := range nodes {
  264. enabledNodeIDs = append(enabledNodeIDs, n.ID)
  265. }
  266. cleanupDisabledNodes(enabledNodeIDs)
  267. var wg sync.WaitGroup
  268. defer wg.Wait()
  269. // Channel to signal when nodes list changes
  270. nodeUpdateChan := make(chan []uint64, 1)
  271. wg.Add(1)
  272. go func() {
  273. defer wg.Done()
  274. for {
  275. select {
  276. case <-ctx.Done():
  277. return
  278. case <-timeoutCheckTicker.C:
  279. checkNodeTimeouts(2 * time.Minute)
  280. case <-envCheckTicker.C:
  281. currentNodes, err := getEnabledNodes()
  282. if err != nil {
  283. logger.Error("Failed to re-query nodes:", err)
  284. continue
  285. }
  286. var currentEnabledIDs []uint64
  287. for _, n := range currentNodes {
  288. currentEnabledIDs = append(currentEnabledIDs, n.ID)
  289. }
  290. if !equalUint64Slices(enabledNodeIDs, currentEnabledIDs) {
  291. cleanupDisabledNodes(currentEnabledIDs)
  292. enabledNodeIDs = currentEnabledIDs
  293. select {
  294. case nodeUpdateChan <- currentEnabledIDs:
  295. default:
  296. }
  297. }
  298. }
  299. }
  300. }()
  301. for _, node := range nodes {
  302. wg.Add(1)
  303. go func(n *model.Node) {
  304. defer wg.Done()
  305. retryTicker := time.NewTicker(1 * time.Second)
  306. defer retryTicker.Stop()
  307. for {
  308. select {
  309. case <-ctx.Done():
  310. return
  311. case newEnabledIDs := <-nodeUpdateChan:
  312. found := false
  313. for _, id := range newEnabledIDs {
  314. if id == n.ID {
  315. found = true
  316. break
  317. }
  318. }
  319. if !found {
  320. return
  321. }
  322. case <-retryTicker.C:
  323. if !isNodeEnabled(n.ID) {
  324. retryMutex.Lock()
  325. delete(retryStates, n.ID)
  326. retryMutex.Unlock()
  327. return
  328. }
  329. if !shouldRetry(n.ID) {
  330. continue
  331. }
  332. if err := nodeAnalyticRecord(n, ctx); err != nil {
  333. if helper.IsUnexpectedWebsocketError(err) {
  334. logger.Error(err)
  335. }
  336. markConnectionFailure(n.ID, err)
  337. } else {
  338. markConnectionSuccess(n.ID)
  339. }
  340. }
  341. }
  342. }(node)
  343. }
  344. }
  345. func checkNodeTimeouts(timeout time.Duration) {
  346. mutex.Lock()
  347. defer mutex.Unlock()
  348. now := time.Now()
  349. for _, node := range NodeMap {
  350. if node != nil && node.Status && now.Sub(node.ResponseAt) > timeout {
  351. node.Status = false
  352. node.ResponseAt = now
  353. }
  354. }
  355. }
  356. // equalUint64Slices compares two uint64 slices for equality
  357. func equalUint64Slices(a, b []uint64) bool {
  358. if len(a) != len(b) {
  359. return false
  360. }
  361. // Create maps for comparison
  362. mapA := make(map[uint64]bool)
  363. mapB := make(map[uint64]bool)
  364. for _, v := range a {
  365. mapA[v] = true
  366. }
  367. for _, v := range b {
  368. mapB[v] = true
  369. }
  370. // Compare maps
  371. for k := range mapA {
  372. if !mapB[k] {
  373. return false
  374. }
  375. }
  376. for k := range mapB {
  377. if !mapA[k] {
  378. return false
  379. }
  380. }
  381. return true
  382. }
  383. func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
  384. scopeCtx, cancel := context.WithCancel(ctx)
  385. defer cancel()
  386. node, err := InitNode(nodeModel)
  387. if err != nil {
  388. mutex.Lock()
  389. if NodeMap[nodeModel.ID] == nil {
  390. NodeMap[nodeModel.ID] = &Node{
  391. Node: nodeModel,
  392. NodeStat: NodeStat{Status: false, ResponseAt: time.Now()},
  393. }
  394. } else {
  395. NodeMap[nodeModel.ID].Status = false
  396. NodeMap[nodeModel.ID].ResponseAt = time.Now()
  397. }
  398. mutex.Unlock()
  399. return err
  400. }
  401. mutex.Lock()
  402. NodeMap[nodeModel.ID] = node
  403. mutex.Unlock()
  404. u, err := nodeModel.GetWebSocketURL("/api/analytic/intro")
  405. if err != nil {
  406. return err
  407. }
  408. header := http.Header{}
  409. header.Set("X-Node-Secret", nodeModel.Token)
  410. dial := &websocket.Dialer{
  411. Proxy: http.ProxyFromEnvironment,
  412. HandshakeTimeout: 5 * time.Second,
  413. }
  414. c, _, err := dial.Dial(u, header)
  415. if err != nil {
  416. updateNodeStatus(nodeModel.ID, false, "websocket_dial_failed")
  417. return err
  418. }
  419. defer func() {
  420. c.Close()
  421. updateNodeStatus(nodeModel.ID, false, "websocket_connection_closed")
  422. }()
  423. go func() {
  424. select {
  425. case <-scopeCtx.Done():
  426. _ = c.Close()
  427. case <-ctx.Done():
  428. _ = c.Close()
  429. }
  430. }()
  431. for {
  432. select {
  433. case <-scopeCtx.Done():
  434. return ctx.Err()
  435. case <-ctx.Done():
  436. return ctx.Err()
  437. default:
  438. }
  439. var rawMsg json.RawMessage
  440. err = c.ReadJSON(&rawMsg)
  441. if err != nil {
  442. if helper.IsUnexpectedWebsocketError(err) {
  443. updateNodeStatus(nodeModel.ID, false, "websocket_error")
  444. return err
  445. }
  446. return nil
  447. }
  448. mutex.Lock()
  449. if NodeMap[nodeModel.ID] == nil {
  450. NodeMap[nodeModel.ID] = &Node{
  451. Node: nodeModel,
  452. NodeStat: NodeStat{Status: true, ResponseAt: time.Now()},
  453. }
  454. } else {
  455. var fullNode Node
  456. if err := json.Unmarshal(rawMsg, &fullNode); err == nil && fullNode.Version != "" {
  457. NodeMap[nodeModel.ID].NodeInfo = fullNode.NodeInfo
  458. NodeMap[nodeModel.ID].NodeStat = fullNode.NodeStat
  459. } else {
  460. var nodeStat NodeStat
  461. if err := json.Unmarshal(rawMsg, &nodeStat); err == nil {
  462. NodeMap[nodeModel.ID].NodeStat = nodeStat
  463. }
  464. }
  465. NodeMap[nodeModel.ID].Status = true
  466. NodeMap[nodeModel.ID].ResponseAt = time.Now()
  467. }
  468. mutex.Unlock()
  469. }
  470. }