node_record.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. package analytic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/0xJacky/Nginx-UI/internal/helper"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/0xJacky/Nginx-UI/query"
  11. "github.com/gorilla/websocket"
  12. "github.com/uozi-tech/cosy/logger"
  13. )
  14. // NodeRecordManager manages the node status retrieval process
  15. type NodeRecordManager struct {
  16. ctx context.Context
  17. cancel context.CancelFunc
  18. wg sync.WaitGroup
  19. mu sync.Mutex
  20. }
  21. type RetryConfig struct {
  22. BaseInterval time.Duration
  23. MaxInterval time.Duration
  24. MaxRetries int
  25. BackoffMultiple float64
  26. }
  27. var defaultRetryConfig = RetryConfig{
  28. BaseInterval: 5 * time.Second,
  29. MaxInterval: 5 * time.Minute,
  30. MaxRetries: 10,
  31. BackoffMultiple: 1.5,
  32. }
  33. type NodeRetryState struct {
  34. FailureCount int
  35. LastSuccess time.Time
  36. NextRetry time.Time
  37. }
  38. var (
  39. retryStates = make(map[uint64]*NodeRetryState)
  40. retryMutex sync.Mutex
  41. )
  42. func getRetryState(envID uint64) *NodeRetryState {
  43. retryMutex.Lock()
  44. defer retryMutex.Unlock()
  45. if state, exists := retryStates[envID]; exists {
  46. return state
  47. }
  48. state := &NodeRetryState{LastSuccess: time.Now(), NextRetry: time.Now()}
  49. retryStates[envID] = state
  50. return state
  51. }
  52. // updateNodeStatus directly updates node status without condition checks
  53. func updateNodeStatus(envID uint64, status bool, reason string) {
  54. mutex.Lock()
  55. defer mutex.Unlock()
  56. now := time.Now()
  57. if NodeMap[envID] == nil {
  58. NodeMap[envID] = &Node{NodeStat: NodeStat{}}
  59. }
  60. NodeMap[envID].Status = status
  61. NodeMap[envID].ResponseAt = now
  62. }
  63. func calculateNextRetryInterval(failureCount int) time.Duration {
  64. if failureCount == 0 {
  65. return defaultRetryConfig.BaseInterval
  66. }
  67. interval := defaultRetryConfig.BaseInterval
  68. for i := 1; i < failureCount; i++ {
  69. interval = time.Duration(float64(interval) * defaultRetryConfig.BackoffMultiple)
  70. if interval > defaultRetryConfig.MaxInterval {
  71. return defaultRetryConfig.MaxInterval
  72. }
  73. }
  74. return interval
  75. }
  76. func shouldRetry(envID uint64) bool {
  77. state := getRetryState(envID)
  78. now := time.Now()
  79. if state.FailureCount >= defaultRetryConfig.MaxRetries {
  80. if now.Sub(state.LastSuccess) < 30*time.Second {
  81. state.FailureCount = 0
  82. state.NextRetry = now
  83. return true
  84. }
  85. if now.Before(state.NextRetry) {
  86. return false
  87. }
  88. state.FailureCount = defaultRetryConfig.MaxRetries / 2
  89. state.NextRetry = now
  90. return true
  91. }
  92. return !now.Before(state.NextRetry)
  93. }
  94. func markConnectionFailure(envID uint64, err error) {
  95. state := getRetryState(envID)
  96. state.FailureCount++
  97. state.NextRetry = time.Now().Add(calculateNextRetryInterval(state.FailureCount))
  98. updateNodeStatus(envID, false, "connection_failed")
  99. }
  100. func markConnectionSuccess(envID uint64) {
  101. state := getRetryState(envID)
  102. state.FailureCount = 0
  103. state.LastSuccess = time.Now()
  104. state.NextRetry = time.Now()
  105. }
  106. func logCurrentNodeStatus(prefix string) {
  107. mutex.Lock()
  108. defer mutex.Unlock()
  109. if NodeMap != nil {
  110. logger.Debugf("%s: NodeMap contains %d nodes", prefix, len(NodeMap))
  111. }
  112. }
  113. func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
  114. ctx, cancel := context.WithCancel(parentCtx)
  115. return &NodeRecordManager{ctx: ctx, cancel: cancel}
  116. }
  117. func (m *NodeRecordManager) Start() {
  118. m.mu.Lock()
  119. defer m.mu.Unlock()
  120. m.wg.Add(1)
  121. go func() {
  122. defer m.wg.Done()
  123. RetrieveNodesStatus(m.ctx)
  124. }()
  125. }
  126. func (m *NodeRecordManager) Stop() {
  127. m.mu.Lock()
  128. defer m.mu.Unlock()
  129. m.cancel()
  130. m.wg.Wait()
  131. }
  132. func (m *NodeRecordManager) Restart() {
  133. m.Stop()
  134. m.ctx, m.cancel = context.WithCancel(context.Background())
  135. m.Start()
  136. }
  137. var (
  138. defaultManager *NodeRecordManager
  139. restartMu sync.Mutex
  140. )
  141. func InitDefaultManager() {
  142. if defaultManager != nil {
  143. defaultManager.Stop()
  144. }
  145. defaultManager = NewNodeRecordManager(context.Background())
  146. defaultManager.Start()
  147. }
  148. func RestartRetrieveNodesStatus() {
  149. restartMu.Lock()
  150. defer restartMu.Unlock()
  151. if defaultManager == nil {
  152. InitDefaultManager()
  153. } else {
  154. defaultManager.Restart()
  155. }
  156. }
  157. func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
  158. manager := NewNodeRecordManager(ctx)
  159. manager.Start()
  160. return manager
  161. }
  162. func StartDefaultManager() {
  163. restartMu.Lock()
  164. defer restartMu.Unlock()
  165. if defaultManager != nil {
  166. defaultManager.Restart()
  167. } else {
  168. InitDefaultManager()
  169. }
  170. }
  171. func cleanupDisabledNodes(enabledEnvIDs []uint64) {
  172. enabledMap := make(map[uint64]bool)
  173. for _, id := range enabledEnvIDs {
  174. enabledMap[id] = true
  175. }
  176. retryMutex.Lock()
  177. for envID := range retryStates {
  178. if !enabledMap[envID] {
  179. delete(retryStates, envID)
  180. }
  181. }
  182. retryMutex.Unlock()
  183. mutex.Lock()
  184. for envID := range NodeMap {
  185. if !enabledMap[envID] {
  186. delete(NodeMap, envID)
  187. }
  188. }
  189. mutex.Unlock()
  190. }
  191. func checkEnvironmentStillEnabled(envID uint64) bool {
  192. env := query.Environment
  193. environment, err := env.Where(env.ID.Eq(envID), env.Enabled.Is(true)).First()
  194. return err == nil && environment != nil
  195. }
  196. func RetrieveNodesStatus(ctx context.Context) {
  197. logger.Info("RetrieveNodesStatus start")
  198. defer logger.Info("RetrieveNodesStatus exited")
  199. mutex.Lock()
  200. if NodeMap == nil {
  201. NodeMap = make(TNodeMap)
  202. }
  203. mutex.Unlock()
  204. envCheckTicker := time.NewTicker(30 * time.Second)
  205. defer envCheckTicker.Stop()
  206. timeoutCheckTicker := time.NewTicker(10 * time.Second)
  207. defer timeoutCheckTicker.Stop()
  208. env := query.Environment
  209. envs, err := env.Where(env.Enabled.Is(true)).Find()
  210. if err != nil {
  211. logger.Error(err)
  212. return
  213. }
  214. var enabledEnvIDs []uint64
  215. for _, e := range envs {
  216. enabledEnvIDs = append(enabledEnvIDs, e.ID)
  217. }
  218. cleanupDisabledNodes(enabledEnvIDs)
  219. var wg sync.WaitGroup
  220. defer wg.Wait()
  221. // Channel to signal when environment list changes
  222. envUpdateChan := make(chan []uint64, 1)
  223. wg.Add(1)
  224. go func() {
  225. defer wg.Done()
  226. for {
  227. select {
  228. case <-ctx.Done():
  229. return
  230. case <-timeoutCheckTicker.C:
  231. checkNodeTimeouts(2 * time.Minute)
  232. case <-envCheckTicker.C:
  233. currentEnvs, err := env.Where(env.Enabled.Is(true)).Find()
  234. if err != nil {
  235. logger.Error("Failed to re-query environments:", err)
  236. continue
  237. }
  238. var currentEnabledIDs []uint64
  239. for _, e := range currentEnvs {
  240. currentEnabledIDs = append(currentEnabledIDs, e.ID)
  241. }
  242. if !equalUint64Slices(enabledEnvIDs, currentEnabledIDs) {
  243. cleanupDisabledNodes(currentEnabledIDs)
  244. enabledEnvIDs = currentEnabledIDs
  245. select {
  246. case envUpdateChan <- currentEnabledIDs:
  247. default:
  248. }
  249. }
  250. }
  251. }
  252. }()
  253. for _, env := range envs {
  254. wg.Add(1)
  255. go func(e *model.Environment) {
  256. defer wg.Done()
  257. retryTicker := time.NewTicker(1 * time.Second)
  258. defer retryTicker.Stop()
  259. for {
  260. select {
  261. case <-ctx.Done():
  262. return
  263. case newEnabledIDs := <-envUpdateChan:
  264. found := false
  265. for _, id := range newEnabledIDs {
  266. if id == e.ID {
  267. found = true
  268. break
  269. }
  270. }
  271. if !found {
  272. return
  273. }
  274. case <-retryTicker.C:
  275. if !checkEnvironmentStillEnabled(e.ID) {
  276. retryMutex.Lock()
  277. delete(retryStates, e.ID)
  278. retryMutex.Unlock()
  279. return
  280. }
  281. if !shouldRetry(e.ID) {
  282. continue
  283. }
  284. if err := nodeAnalyticRecord(e, ctx); err != nil {
  285. logger.Error(err)
  286. markConnectionFailure(e.ID, err)
  287. } else {
  288. markConnectionSuccess(e.ID)
  289. }
  290. }
  291. }
  292. }(env)
  293. }
  294. }
  295. func checkNodeTimeouts(timeout time.Duration) {
  296. mutex.Lock()
  297. defer mutex.Unlock()
  298. now := time.Now()
  299. for _, node := range NodeMap {
  300. if node != nil && node.Status && now.Sub(node.ResponseAt) > timeout {
  301. node.Status = false
  302. node.ResponseAt = now
  303. }
  304. }
  305. }
  306. // equalUint64Slices compares two uint64 slices for equality
  307. func equalUint64Slices(a, b []uint64) bool {
  308. if len(a) != len(b) {
  309. return false
  310. }
  311. // Create maps for comparison
  312. mapA := make(map[uint64]bool)
  313. mapB := make(map[uint64]bool)
  314. for _, v := range a {
  315. mapA[v] = true
  316. }
  317. for _, v := range b {
  318. mapB[v] = true
  319. }
  320. // Compare maps
  321. for k := range mapA {
  322. if !mapB[k] {
  323. return false
  324. }
  325. }
  326. for k := range mapB {
  327. if !mapA[k] {
  328. return false
  329. }
  330. }
  331. return true
  332. }
  333. func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
  334. scopeCtx, cancel := context.WithCancel(ctx)
  335. defer cancel()
  336. node, err := InitNode(env)
  337. if err != nil {
  338. mutex.Lock()
  339. if NodeMap[env.ID] == nil {
  340. NodeMap[env.ID] = &Node{
  341. Environment: env,
  342. NodeStat: NodeStat{Status: false, ResponseAt: time.Now()},
  343. }
  344. } else {
  345. NodeMap[env.ID].Status = false
  346. NodeMap[env.ID].ResponseAt = time.Now()
  347. }
  348. mutex.Unlock()
  349. return err
  350. }
  351. mutex.Lock()
  352. NodeMap[env.ID] = node
  353. mutex.Unlock()
  354. u, err := env.GetWebSocketURL("/api/analytic/intro")
  355. if err != nil {
  356. return err
  357. }
  358. header := http.Header{}
  359. header.Set("X-Node-Secret", env.Token)
  360. dial := &websocket.Dialer{
  361. Proxy: http.ProxyFromEnvironment,
  362. HandshakeTimeout: 5 * time.Second,
  363. }
  364. c, _, err := dial.Dial(u, header)
  365. if err != nil {
  366. updateNodeStatus(env.ID, false, "websocket_dial_failed")
  367. return err
  368. }
  369. defer func() {
  370. c.Close()
  371. updateNodeStatus(env.ID, false, "websocket_connection_closed")
  372. }()
  373. go func() {
  374. select {
  375. case <-scopeCtx.Done():
  376. _ = c.Close()
  377. case <-ctx.Done():
  378. _ = c.Close()
  379. }
  380. }()
  381. for {
  382. select {
  383. case <-scopeCtx.Done():
  384. return ctx.Err()
  385. case <-ctx.Done():
  386. return ctx.Err()
  387. default:
  388. }
  389. var rawMsg json.RawMessage
  390. err = c.ReadJSON(&rawMsg)
  391. if err != nil {
  392. if helper.IsUnexpectedWebsocketError(err) {
  393. updateNodeStatus(env.ID, false, "websocket_error")
  394. return err
  395. }
  396. return nil
  397. }
  398. mutex.Lock()
  399. if NodeMap[env.ID] == nil {
  400. NodeMap[env.ID] = &Node{
  401. Environment: env,
  402. NodeStat: NodeStat{Status: true, ResponseAt: time.Now()},
  403. }
  404. } else {
  405. var fullNode Node
  406. if err := json.Unmarshal(rawMsg, &fullNode); err == nil && fullNode.Version != "" {
  407. NodeMap[env.ID].NodeInfo = fullNode.NodeInfo
  408. NodeMap[env.ID].NodeStat = fullNode.NodeStat
  409. } else {
  410. var nodeStat NodeStat
  411. if err := json.Unmarshal(rawMsg, &nodeStat); err == nil {
  412. NodeMap[env.ID].NodeStat = nodeStat
  413. }
  414. }
  415. NodeMap[env.ID].Status = true
  416. NodeMap[env.ID].ResponseAt = time.Now()
  417. }
  418. mutex.Unlock()
  419. }
  420. }