node_record.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. package analytic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "sync"
  8. "time"
  9. "github.com/0xJacky/Nginx-UI/internal/helper"
  10. "github.com/0xJacky/Nginx-UI/model"
  11. "github.com/0xJacky/Nginx-UI/query"
  12. "github.com/gorilla/websocket"
  13. "github.com/uozi-tech/cosy/logger"
  14. )
  15. // NodeRecordManager manages the node status retrieval process
  16. type NodeRecordManager struct {
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. wg sync.WaitGroup
  20. mu sync.Mutex
  21. }
  22. // RetryConfig holds configuration for retry logic
  23. type RetryConfig struct {
  24. BaseInterval time.Duration // Base retry interval
  25. MaxInterval time.Duration // Maximum retry interval
  26. MaxRetries int // Maximum consecutive failures before giving up temporarily
  27. BackoffMultiple float64 // Multiplier for exponential backoff
  28. ResetAfter time.Duration // Time to reset failure count if successful
  29. }
  30. // Default retry configuration
  31. var defaultRetryConfig = RetryConfig{
  32. BaseInterval: 5 * time.Second, // Start with 5 seconds
  33. MaxInterval: 5 * time.Minute, // Max 5 minutes between retries
  34. MaxRetries: 10, // Max 10 consecutive failures
  35. BackoffMultiple: 1.5, // 1.5x backoff each time
  36. ResetAfter: 30 * time.Second, // Reset failure count after 30s of success
  37. }
  38. // NodeRetryState tracks retry state for each node
  39. type NodeRetryState struct {
  40. FailureCount int
  41. LastRetryTime time.Time
  42. LastSuccessTime time.Time
  43. NextRetryTime time.Time
  44. }
  45. var (
  46. retryStates = make(map[uint64]*NodeRetryState)
  47. retryMutex sync.Mutex
  48. )
  49. // getRetryState gets or creates retry state for a node
  50. func getRetryState(envID uint64) *NodeRetryState {
  51. retryMutex.Lock()
  52. defer retryMutex.Unlock()
  53. if state, exists := retryStates[envID]; exists {
  54. return state
  55. }
  56. state := &NodeRetryState{
  57. FailureCount: 0,
  58. LastSuccessTime: time.Now(),
  59. NextRetryTime: time.Now(),
  60. }
  61. retryStates[envID] = state
  62. return state
  63. }
  64. // updateNodeStatus safely updates node status with proper timestamp
  65. func updateNodeStatus(envID uint64, status bool, reason string) {
  66. mutex.Lock()
  67. defer mutex.Unlock()
  68. now := time.Now()
  69. if NodeMap[envID] != nil {
  70. NodeMap[envID].Status = status
  71. NodeMap[envID].ResponseAt = now
  72. logger.Debugf("updateNodeStatus: Node[%d] status updated to %t (%s) at %v",
  73. envID, status, reason, now)
  74. } else {
  75. logger.Debugf("updateNodeStatus: Warning - Node[%d] not found in NodeMap", envID)
  76. }
  77. }
  78. // calculateNextRetryInterval calculates the next retry interval using exponential backoff
  79. func calculateNextRetryInterval(state *NodeRetryState, config RetryConfig) time.Duration {
  80. if state.FailureCount == 0 {
  81. return config.BaseInterval
  82. }
  83. interval := config.BaseInterval
  84. for i := 0; i < state.FailureCount-1; i++ {
  85. interval = time.Duration(float64(interval) * config.BackoffMultiple)
  86. if interval > config.MaxInterval {
  87. interval = config.MaxInterval
  88. break
  89. }
  90. }
  91. logger.Debugf("calculateNextRetryInterval: FailureCount=%d, NextInterval=%v",
  92. state.FailureCount, interval)
  93. return interval
  94. }
  95. // shouldRetry determines if we should retry connection for a node
  96. func shouldRetry(envID uint64, config RetryConfig) bool {
  97. state := getRetryState(envID)
  98. now := time.Now()
  99. // Check if we've exceeded max retries
  100. if state.FailureCount >= config.MaxRetries {
  101. // If we've been successful recently, reset the failure count
  102. if now.Sub(state.LastSuccessTime) < config.ResetAfter {
  103. logger.Debugf("shouldRetry: Resetting failure count for node %d due to recent success", envID)
  104. state.FailureCount = 0
  105. state.NextRetryTime = now
  106. return true
  107. }
  108. // Too many failures, back off for a longer period
  109. if now.Before(state.NextRetryTime) {
  110. logger.Debugf("shouldRetry: Node %d in backoff period until %v (failures: %d)",
  111. envID, state.NextRetryTime, state.FailureCount)
  112. return false
  113. }
  114. // Reset after long backoff period
  115. logger.Debugf("shouldRetry: Resetting failure count for node %d after backoff period", envID)
  116. state.FailureCount = config.MaxRetries / 2 // Start from middle to avoid immediate max again
  117. state.NextRetryTime = now
  118. return true
  119. }
  120. // Normal retry logic
  121. if now.Before(state.NextRetryTime) {
  122. return false
  123. }
  124. return true
  125. }
  126. // markConnectionFailure marks a connection failure and calculates next retry time
  127. func markConnectionFailure(envID uint64, config RetryConfig, err error) {
  128. state := getRetryState(envID)
  129. now := time.Now()
  130. state.FailureCount++
  131. state.LastRetryTime = now
  132. nextInterval := calculateNextRetryInterval(state, config)
  133. state.NextRetryTime = now.Add(nextInterval)
  134. logger.Debugf("markConnectionFailure: Node %d failed (count: %d), next retry at %v, error: %v",
  135. envID, state.FailureCount, state.NextRetryTime, err)
  136. // Update node status to offline
  137. updateNodeStatus(envID, false, "connection_failed")
  138. }
  139. // markConnectionSuccess marks a successful connection
  140. func markConnectionSuccess(envID uint64) {
  141. state := getRetryState(envID)
  142. now := time.Now()
  143. state.FailureCount = 0
  144. state.LastSuccessTime = now
  145. state.NextRetryTime = now // Can retry immediately if connection drops
  146. logger.Debugf("markConnectionSuccess: Node %d connection successful, failure count reset", envID)
  147. // Status will be updated in nodeAnalyticRecord when we receive actual data
  148. }
  149. // logCurrentNodeStatus logs current node status for debugging
  150. func logCurrentNodeStatus(prefix string) {
  151. mutex.Lock()
  152. defer mutex.Unlock()
  153. if NodeMap == nil {
  154. logger.Debugf("%s: NodeMap is nil", prefix)
  155. return
  156. }
  157. logger.Debugf("%s: Current NodeMap contains %d nodes", prefix, len(NodeMap))
  158. for envID, node := range NodeMap {
  159. if node == nil {
  160. logger.Debugf("%s: Node[%d] is nil", prefix, envID)
  161. continue
  162. }
  163. // Also log retry state
  164. retryMutex.Lock()
  165. state := retryStates[envID]
  166. retryMutex.Unlock()
  167. retryInfo := "no_retry_state"
  168. if state != nil {
  169. retryInfo = fmt.Sprintf("failures=%d,next_retry=%v",
  170. state.FailureCount, state.NextRetryTime)
  171. }
  172. logger.Debugf("%s: Node[%d] - Status: %t, ResponseAt: %v, RetryState: %s",
  173. prefix, envID, node.Status, node.ResponseAt, retryInfo)
  174. }
  175. }
  176. // NewNodeRecordManager creates a new NodeRecordManager with the provided context
  177. func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
  178. logger.Debug("Creating new NodeRecordManager")
  179. ctx, cancel := context.WithCancel(parentCtx)
  180. return &NodeRecordManager{
  181. ctx: ctx,
  182. cancel: cancel,
  183. }
  184. }
  185. // Start begins retrieving node status using the manager's context
  186. func (m *NodeRecordManager) Start() {
  187. m.mu.Lock()
  188. defer m.mu.Unlock()
  189. logger.Debug("NodeRecordManager: Starting node status retrieval")
  190. logCurrentNodeStatus("NodeRecordManager.Start - Before start")
  191. m.wg.Add(1)
  192. go func() {
  193. defer m.wg.Done()
  194. RetrieveNodesStatus(m.ctx)
  195. }()
  196. logger.Debug("NodeRecordManager: Started successfully")
  197. }
  198. // Stop cancels the current context and waits for operations to complete
  199. func (m *NodeRecordManager) Stop() {
  200. m.mu.Lock()
  201. defer m.mu.Unlock()
  202. logger.Debug("NodeRecordManager: Stopping node status retrieval")
  203. logCurrentNodeStatus("NodeRecordManager.Stop - Before stop")
  204. m.cancel()
  205. m.wg.Wait()
  206. logger.Debug("NodeRecordManager: Stopped successfully")
  207. logCurrentNodeStatus("NodeRecordManager.Stop - After stop")
  208. }
  209. // Restart stops and then restarts the node status retrieval
  210. func (m *NodeRecordManager) Restart() {
  211. logger.Debug("NodeRecordManager: Restarting node status retrieval")
  212. logCurrentNodeStatus("NodeRecordManager.Restart - Before restart")
  213. m.Stop()
  214. logger.Debug("NodeRecordManager: Creating new context for restart")
  215. // Create new context
  216. m.ctx, m.cancel = context.WithCancel(context.Background())
  217. // Start retrieval with new context
  218. m.Start()
  219. logger.Debug("NodeRecordManager: Restart completed")
  220. logCurrentNodeStatus("NodeRecordManager.Restart - After restart")
  221. }
  222. // For backward compatibility
  223. var (
  224. defaultManager *NodeRecordManager
  225. restartMu sync.Mutex
  226. )
  227. // InitDefaultManager initializes the default NodeRecordManager
  228. func InitDefaultManager() {
  229. logger.Debug("Initializing default NodeRecordManager")
  230. logCurrentNodeStatus("InitDefaultManager - Before init")
  231. if defaultManager != nil {
  232. logger.Debug("Default manager exists, stopping it first")
  233. defaultManager.Stop()
  234. }
  235. defaultManager = NewNodeRecordManager(context.Background())
  236. defaultManager.Start()
  237. logger.Debug("Default NodeRecordManager initialized")
  238. logCurrentNodeStatus("InitDefaultManager - After init")
  239. }
  240. // RestartRetrieveNodesStatus restarts the node status retrieval process
  241. // Kept for backward compatibility
  242. func RestartRetrieveNodesStatus() {
  243. restartMu.Lock()
  244. defer restartMu.Unlock()
  245. logger.Debug("RestartRetrieveNodesStatus called")
  246. logCurrentNodeStatus("RestartRetrieveNodesStatus - Before restart")
  247. if defaultManager == nil {
  248. logger.Debug("Default manager is nil, initializing new one")
  249. InitDefaultManager()
  250. return
  251. }
  252. logger.Debug("Restarting existing default manager")
  253. defaultManager.Restart()
  254. logger.Debug("RestartRetrieveNodesStatus completed")
  255. logCurrentNodeStatus("RestartRetrieveNodesStatus - After restart")
  256. }
  257. // StartRetrieveNodesStatus starts the node status retrieval with a custom context
  258. func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
  259. logger.Debug("Starting node status retrieval with custom context")
  260. logCurrentNodeStatus("StartRetrieveNodesStatus - Before start")
  261. manager := NewNodeRecordManager(ctx)
  262. manager.Start()
  263. logger.Debug("Custom NodeRecordManager started")
  264. logCurrentNodeStatus("StartRetrieveNodesStatus - After start")
  265. return manager
  266. }
  267. // StartDefaultManager starts the default node status retrieval manager
  268. // This should be called at system startup
  269. func StartDefaultManager() {
  270. restartMu.Lock()
  271. defer restartMu.Unlock()
  272. logger.Debug("StartDefaultManager called")
  273. logCurrentNodeStatus("StartDefaultManager - Before start")
  274. if defaultManager != nil {
  275. logger.Info("DefaultManager already running, restarting...")
  276. logger.Debug("Default manager exists, performing restart")
  277. defaultManager.Restart()
  278. return
  279. }
  280. logger.Info("Starting default NodeRecordManager...")
  281. logger.Debug("No default manager exists, initializing new one")
  282. InitDefaultManager()
  283. logger.Debug("StartDefaultManager completed")
  284. logCurrentNodeStatus("StartDefaultManager - After start")
  285. }
  286. // cleanupDisabledNodes removes retry states for environments that are no longer enabled
  287. func cleanupDisabledNodes(enabledEnvIDs []uint64) {
  288. retryMutex.Lock()
  289. defer retryMutex.Unlock()
  290. // Create a map for quick lookup
  291. enabledMap := make(map[uint64]bool)
  292. for _, id := range enabledEnvIDs {
  293. enabledMap[id] = true
  294. }
  295. // Remove retry states for disabled environments
  296. var cleanedUp []uint64
  297. for envID := range retryStates {
  298. if !enabledMap[envID] {
  299. delete(retryStates, envID)
  300. cleanedUp = append(cleanedUp, envID)
  301. }
  302. }
  303. if len(cleanedUp) > 0 {
  304. logger.Debugf("cleanupDisabledNodes: Cleaned up retry states for disabled environments: %v", cleanedUp)
  305. }
  306. }
  307. // removeFromNodeMap removes disabled nodes from NodeMap
  308. func removeFromNodeMap(enabledEnvIDs []uint64) {
  309. mutex.Lock()
  310. defer mutex.Unlock()
  311. // Create a map for quick lookup
  312. enabledMap := make(map[uint64]bool)
  313. for _, id := range enabledEnvIDs {
  314. enabledMap[id] = true
  315. }
  316. // Remove nodes for disabled environments
  317. var removed []uint64
  318. for envID := range NodeMap {
  319. if !enabledMap[envID] {
  320. delete(NodeMap, envID)
  321. removed = append(removed, envID)
  322. }
  323. }
  324. if len(removed) > 0 {
  325. logger.Debugf("removeFromNodeMap: Removed disabled nodes from NodeMap: %v", removed)
  326. }
  327. }
  328. // checkEnvironmentStillEnabled checks if an environment is still enabled
  329. func checkEnvironmentStillEnabled(envID uint64) bool {
  330. env := query.Environment
  331. environment, err := env.Where(env.ID.Eq(envID), env.Enabled.Is(true)).First()
  332. if err != nil {
  333. logger.Debugf("checkEnvironmentStillEnabled: Environment ID %d no longer enabled or not found", envID)
  334. return false
  335. }
  336. return environment != nil
  337. }
  338. func RetrieveNodesStatus(ctx context.Context) {
  339. logger.Info("RetrieveNodesStatus start")
  340. logger.Debug("RetrieveNodesStatus: Initializing node status retrieval")
  341. defer logger.Info("RetrieveNodesStatus exited")
  342. defer logger.Debug("RetrieveNodesStatus: Cleanup completed")
  343. mutex.Lock()
  344. if NodeMap == nil {
  345. logger.Debug("RetrieveNodesStatus: NodeMap is nil, creating new one")
  346. NodeMap = make(TNodeMap)
  347. } else {
  348. logger.Debugf("RetrieveNodesStatus: NodeMap already exists with %d nodes", len(NodeMap))
  349. }
  350. mutex.Unlock()
  351. logCurrentNodeStatus("RetrieveNodesStatus - Initial state")
  352. // Add periodic environment checking ticker
  353. envCheckTicker := time.NewTicker(30 * time.Second) // Check every 30 seconds
  354. defer envCheckTicker.Stop()
  355. env := query.Environment
  356. envs, err := env.Where(env.Enabled.Is(true)).Find()
  357. if err != nil {
  358. logger.Error(err)
  359. logger.Debug("RetrieveNodesStatus: Failed to query enabled environments")
  360. return
  361. }
  362. logger.Debugf("RetrieveNodesStatus: Found %d enabled environments", len(envs))
  363. for i, e := range envs {
  364. logger.Debugf("RetrieveNodesStatus: Environment[%d] - ID: %d, Name: %s, Enabled: %t",
  365. i, e.ID, e.Name, e.Enabled)
  366. }
  367. // Get current enabled environment IDs
  368. var enabledEnvIDs []uint64
  369. for _, e := range envs {
  370. enabledEnvIDs = append(enabledEnvIDs, e.ID)
  371. }
  372. // Clean up disabled nodes
  373. cleanupDisabledNodes(enabledEnvIDs)
  374. removeFromNodeMap(enabledEnvIDs)
  375. var wg sync.WaitGroup
  376. defer wg.Wait()
  377. // Channel to signal when environment list changes
  378. envUpdateChan := make(chan []uint64, 1)
  379. // Start environment monitoring goroutine
  380. wg.Add(1)
  381. go func() {
  382. defer wg.Done()
  383. defer logger.Debug("RetrieveNodesStatus: Environment monitor goroutine completed")
  384. for {
  385. select {
  386. case <-ctx.Done():
  387. logger.Debug("RetrieveNodesStatus: Environment monitor context cancelled")
  388. return
  389. case <-envCheckTicker.C:
  390. // Re-check enabled environments
  391. currentEnvs, err := env.Where(env.Enabled.Is(true)).Find()
  392. if err != nil {
  393. logger.Error("RetrieveNodesStatus: Failed to re-query environments:", err)
  394. continue
  395. }
  396. var currentEnabledIDs []uint64
  397. for _, e := range currentEnvs {
  398. currentEnabledIDs = append(currentEnabledIDs, e.ID)
  399. }
  400. // Check if environment list changed
  401. if !equalUint64Slices(enabledEnvIDs, currentEnabledIDs) {
  402. logger.Debugf("RetrieveNodesStatus: Environment list changed from %v to %v", enabledEnvIDs, currentEnabledIDs)
  403. cleanupDisabledNodes(currentEnabledIDs)
  404. removeFromNodeMap(currentEnabledIDs)
  405. // Update the list
  406. enabledEnvIDs = currentEnabledIDs
  407. // Notify about the change
  408. select {
  409. case envUpdateChan <- currentEnabledIDs:
  410. default:
  411. // Non-blocking send
  412. }
  413. }
  414. }
  415. }
  416. }()
  417. for _, env := range envs {
  418. wg.Add(1)
  419. logger.Debugf("RetrieveNodesStatus: Starting goroutine for environment ID: %d, Name: %s", env.ID, env.Name)
  420. go func(e *model.Environment) {
  421. defer wg.Done()
  422. defer logger.Debugf("RetrieveNodesStatus: Goroutine completed for environment ID: %d", e.ID)
  423. // Retry ticker - check every 1 second but use backoff logic to determine actual retry
  424. retryTicker := time.NewTicker(1 * time.Second)
  425. defer retryTicker.Stop()
  426. for {
  427. select {
  428. case <-ctx.Done():
  429. logger.Debugf("RetrieveNodesStatus: Context cancelled for environment ID: %d", e.ID)
  430. return
  431. case newEnabledIDs := <-envUpdateChan:
  432. // Check if this environment is still enabled
  433. found := false
  434. for _, id := range newEnabledIDs {
  435. if id == e.ID {
  436. found = true
  437. break
  438. }
  439. }
  440. if !found {
  441. logger.Debugf("RetrieveNodesStatus: Environment ID %d has been disabled, stopping goroutine", e.ID)
  442. return
  443. }
  444. case <-retryTicker.C:
  445. // Double-check if environment is still enabled before retrying
  446. if !checkEnvironmentStillEnabled(e.ID) {
  447. logger.Debugf("RetrieveNodesStatus: Environment ID %d no longer enabled, stopping goroutine", e.ID)
  448. // Clean up retry state
  449. retryMutex.Lock()
  450. delete(retryStates, e.ID)
  451. retryMutex.Unlock()
  452. return
  453. }
  454. // Check if we should retry based on backoff logic
  455. if !shouldRetry(e.ID, defaultRetryConfig) {
  456. continue // Skip this iteration
  457. }
  458. logger.Debugf("RetrieveNodesStatus: Attempting connection to environment ID: %d", e.ID)
  459. if err := nodeAnalyticRecord(e, ctx); err != nil {
  460. logger.Error(err)
  461. logger.Debugf("RetrieveNodesStatus: Connection failed for environment ID: %d, error: %v", e.ID, err)
  462. markConnectionFailure(e.ID, defaultRetryConfig, err)
  463. } else {
  464. logger.Debugf("RetrieveNodesStatus: Connection successful for environment ID: %d", e.ID)
  465. markConnectionSuccess(e.ID)
  466. }
  467. }
  468. }
  469. }(env)
  470. }
  471. logger.Debug("RetrieveNodesStatus: All goroutines started, waiting for completion")
  472. }
  473. // equalUint64Slices compares two uint64 slices for equality
  474. func equalUint64Slices(a, b []uint64) bool {
  475. if len(a) != len(b) {
  476. return false
  477. }
  478. // Create maps for comparison
  479. mapA := make(map[uint64]bool)
  480. mapB := make(map[uint64]bool)
  481. for _, v := range a {
  482. mapA[v] = true
  483. }
  484. for _, v := range b {
  485. mapB[v] = true
  486. }
  487. // Compare maps
  488. for k := range mapA {
  489. if !mapB[k] {
  490. return false
  491. }
  492. }
  493. for k := range mapB {
  494. if !mapA[k] {
  495. return false
  496. }
  497. }
  498. return true
  499. }
  500. func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
  501. logger.Debugf("nodeAnalyticRecord: Starting for environment ID: %d, Name: %s", env.ID, env.Name)
  502. scopeCtx, cancel := context.WithCancel(ctx)
  503. defer cancel()
  504. node, err := InitNode(env)
  505. mutex.Lock()
  506. NodeMap[env.ID] = node
  507. mutex.Unlock()
  508. logger.Debugf("nodeAnalyticRecord: Node initialized for environment ID: %d", env.ID)
  509. if err != nil {
  510. logger.Debugf("nodeAnalyticRecord: InitNode failed for environment ID: %d, error: %v", env.ID, err)
  511. return err
  512. }
  513. u, err := env.GetWebSocketURL("/api/analytic/intro")
  514. if err != nil {
  515. logger.Debugf("nodeAnalyticRecord: GetWebSocketURL failed for environment ID: %d, error: %v", env.ID, err)
  516. return err
  517. }
  518. logger.Debugf("nodeAnalyticRecord: Connecting to WebSocket URL: %s for environment ID: %d", u, env.ID)
  519. header := http.Header{}
  520. header.Set("X-Node-Secret", env.Token)
  521. dial := &websocket.Dialer{
  522. Proxy: http.ProxyFromEnvironment,
  523. HandshakeTimeout: 5 * time.Second,
  524. }
  525. c, _, err := dial.Dial(u, header)
  526. if err != nil {
  527. logger.Debugf("nodeAnalyticRecord: WebSocket dial failed for environment ID: %d, error: %v", env.ID, err)
  528. return err
  529. }
  530. defer c.Close()
  531. logger.Debugf("nodeAnalyticRecord: WebSocket connection established for environment ID: %d", env.ID)
  532. go func() {
  533. <-scopeCtx.Done()
  534. logger.Debugf("nodeAnalyticRecord: Context cancelled, closing WebSocket for environment ID: %d", env.ID)
  535. _ = c.Close()
  536. }()
  537. messageCount := 0
  538. for {
  539. // Use json.RawMessage to handle both NodeStat and Node types
  540. var rawMsg json.RawMessage
  541. err = c.ReadJSON(&rawMsg)
  542. if err != nil {
  543. if helper.IsUnexpectedWebsocketError(err) {
  544. logger.Debugf("nodeAnalyticRecord: Unexpected WebSocket error for environment ID: %d, error: %v", env.ID, err)
  545. return err
  546. }
  547. logger.Debugf("nodeAnalyticRecord: WebSocket read completed for environment ID: %d", env.ID)
  548. return nil
  549. }
  550. messageCount++
  551. logger.Debugf("nodeAnalyticRecord: Received message #%d from environment ID: %d", messageCount, env.ID)
  552. mutex.Lock()
  553. if NodeMap[env.ID] != nil {
  554. // Try to unmarshal as complete Node first (contains both NodeInfo and NodeStat)
  555. var fullNode Node
  556. if err := json.Unmarshal(rawMsg, &fullNode); err == nil && fullNode.Version != "" {
  557. // Check if version has changed
  558. oldVersion := NodeMap[env.ID].Version
  559. if oldVersion != "" && oldVersion != fullNode.Version {
  560. logger.Infof("nodeAnalyticRecord: Version updated for environment ID: %d, from %s to %s",
  561. env.ID, oldVersion, fullNode.Version)
  562. }
  563. // Update complete Node with version info
  564. NodeMap[env.ID].NodeInfo = fullNode.NodeInfo
  565. NodeMap[env.ID].NodeStat = fullNode.NodeStat
  566. // Ensure status and response time are set
  567. NodeMap[env.ID].NodeStat.Status = true
  568. NodeMap[env.ID].NodeStat.ResponseAt = time.Now()
  569. } else {
  570. // Fall back to NodeStat only
  571. var nodeStat NodeStat
  572. if err := json.Unmarshal(rawMsg, &nodeStat); err == nil {
  573. // Set node online status
  574. nodeStat.Status = true
  575. nodeStat.ResponseAt = time.Now()
  576. NodeMap[env.ID].NodeStat = nodeStat
  577. } else {
  578. logger.Debugf("nodeAnalyticRecord: Failed to unmarshal message for environment ID: %d, error: %v", env.ID, err)
  579. }
  580. }
  581. } else {
  582. logger.Debugf("nodeAnalyticRecord: Warning - Node not found in NodeMap for environment ID: %d", env.ID)
  583. }
  584. mutex.Unlock()
  585. }
  586. }