123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- package analytic
- import (
- "context"
- "encoding/json"
- "net/http"
- "sync"
- "time"
- "github.com/0xJacky/Nginx-UI/internal/helper"
- "github.com/0xJacky/Nginx-UI/model"
- "github.com/0xJacky/Nginx-UI/query"
- "github.com/gorilla/websocket"
- "github.com/uozi-tech/cosy/logger"
- )
- // NodeRecordManager manages the node status retrieval process
- type NodeRecordManager struct {
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
- mu sync.Mutex
- }
- type RetryConfig struct {
- BaseInterval time.Duration
- MaxInterval time.Duration
- MaxRetries int
- BackoffMultiple float64
- }
- var defaultRetryConfig = RetryConfig{
- BaseInterval: 5 * time.Second,
- MaxInterval: 5 * time.Minute,
- MaxRetries: 10,
- BackoffMultiple: 1.5,
- }
- type NodeRetryState struct {
- FailureCount int
- LastSuccess time.Time
- NextRetry time.Time
- }
- var (
- retryStates = make(map[uint64]*NodeRetryState)
- retryMutex sync.Mutex
- )
- func getRetryState(envID uint64) *NodeRetryState {
- retryMutex.Lock()
- defer retryMutex.Unlock()
- if state, exists := retryStates[envID]; exists {
- return state
- }
- state := &NodeRetryState{LastSuccess: time.Now(), NextRetry: time.Now()}
- retryStates[envID] = state
- return state
- }
- // updateNodeStatus directly updates node status without condition checks
- func updateNodeStatus(envID uint64, status bool, reason string) {
- mutex.Lock()
- defer mutex.Unlock()
- now := time.Now()
- if NodeMap[envID] == nil {
- NodeMap[envID] = &Node{NodeStat: NodeStat{}}
- }
- NodeMap[envID].Status = status
- NodeMap[envID].ResponseAt = now
- }
- func calculateNextRetryInterval(failureCount int) time.Duration {
- if failureCount == 0 {
- return defaultRetryConfig.BaseInterval
- }
- interval := defaultRetryConfig.BaseInterval
- for i := 1; i < failureCount; i++ {
- interval = time.Duration(float64(interval) * defaultRetryConfig.BackoffMultiple)
- if interval > defaultRetryConfig.MaxInterval {
- return defaultRetryConfig.MaxInterval
- }
- }
- return interval
- }
- func shouldRetry(envID uint64) bool {
- state := getRetryState(envID)
- now := time.Now()
- if state.FailureCount >= defaultRetryConfig.MaxRetries {
- if now.Sub(state.LastSuccess) < 30*time.Second {
- state.FailureCount = 0
- state.NextRetry = now
- return true
- }
- if now.Before(state.NextRetry) {
- return false
- }
- state.FailureCount = defaultRetryConfig.MaxRetries / 2
- state.NextRetry = now
- return true
- }
- return !now.Before(state.NextRetry)
- }
- func markConnectionFailure(envID uint64, err error) {
- state := getRetryState(envID)
- state.FailureCount++
- state.NextRetry = time.Now().Add(calculateNextRetryInterval(state.FailureCount))
- updateNodeStatus(envID, false, "connection_failed")
- }
- func markConnectionSuccess(envID uint64) {
- state := getRetryState(envID)
- state.FailureCount = 0
- state.LastSuccess = time.Now()
- state.NextRetry = time.Now()
- }
- func logCurrentNodeStatus(prefix string) {
- mutex.Lock()
- defer mutex.Unlock()
- if NodeMap != nil {
- logger.Debugf("%s: NodeMap contains %d nodes", prefix, len(NodeMap))
- }
- }
- func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
- ctx, cancel := context.WithCancel(parentCtx)
- return &NodeRecordManager{ctx: ctx, cancel: cancel}
- }
- func (m *NodeRecordManager) Start() {
- m.mu.Lock()
- defer m.mu.Unlock()
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
- RetrieveNodesStatus(m.ctx)
- }()
- }
- func (m *NodeRecordManager) Stop() {
- m.mu.Lock()
- defer m.mu.Unlock()
- m.cancel()
- m.wg.Wait()
- }
- func (m *NodeRecordManager) Restart() {
- m.Stop()
- m.ctx, m.cancel = context.WithCancel(context.Background())
- m.Start()
- }
- var (
- defaultManager *NodeRecordManager
- restartMu sync.Mutex
- )
- func InitDefaultManager() {
- if defaultManager != nil {
- defaultManager.Stop()
- }
- defaultManager = NewNodeRecordManager(context.Background())
- defaultManager.Start()
- }
- func RestartRetrieveNodesStatus() {
- restartMu.Lock()
- defer restartMu.Unlock()
- if defaultManager == nil {
- InitDefaultManager()
- } else {
- defaultManager.Restart()
- }
- }
- func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
- manager := NewNodeRecordManager(ctx)
- manager.Start()
- return manager
- }
- func StartDefaultManager() {
- restartMu.Lock()
- defer restartMu.Unlock()
- if defaultManager != nil {
- defaultManager.Restart()
- } else {
- InitDefaultManager()
- }
- }
- func cleanupDisabledNodes(enabledEnvIDs []uint64) {
- enabledMap := make(map[uint64]bool)
- for _, id := range enabledEnvIDs {
- enabledMap[id] = true
- }
- retryMutex.Lock()
- for envID := range retryStates {
- if !enabledMap[envID] {
- delete(retryStates, envID)
- }
- }
- retryMutex.Unlock()
- mutex.Lock()
- for envID := range NodeMap {
- if !enabledMap[envID] {
- delete(NodeMap, envID)
- }
- }
- mutex.Unlock()
- }
- func checkEnvironmentStillEnabled(envID uint64) bool {
- env := query.Environment
- environment, err := env.Where(env.ID.Eq(envID), env.Enabled.Is(true)).First()
- return err == nil && environment != nil
- }
- func RetrieveNodesStatus(ctx context.Context) {
- logger.Info("RetrieveNodesStatus start")
- defer logger.Info("RetrieveNodesStatus exited")
- mutex.Lock()
- if NodeMap == nil {
- NodeMap = make(TNodeMap)
- }
- mutex.Unlock()
- envCheckTicker := time.NewTicker(30 * time.Second)
- defer envCheckTicker.Stop()
- timeoutCheckTicker := time.NewTicker(10 * time.Second)
- defer timeoutCheckTicker.Stop()
- env := query.Environment
- envs, err := env.Where(env.Enabled.Is(true)).Find()
- if err != nil {
- logger.Error(err)
- return
- }
- var enabledEnvIDs []uint64
- for _, e := range envs {
- enabledEnvIDs = append(enabledEnvIDs, e.ID)
- }
- cleanupDisabledNodes(enabledEnvIDs)
- var wg sync.WaitGroup
- defer wg.Wait()
- // Channel to signal when environment list changes
- envUpdateChan := make(chan []uint64, 1)
- wg.Add(1)
- go func() {
- defer wg.Done()
- for {
- select {
- case <-ctx.Done():
- return
- case <-timeoutCheckTicker.C:
- checkNodeTimeouts(2 * time.Minute)
- case <-envCheckTicker.C:
- currentEnvs, err := env.Where(env.Enabled.Is(true)).Find()
- if err != nil {
- logger.Error("Failed to re-query environments:", err)
- continue
- }
- var currentEnabledIDs []uint64
- for _, e := range currentEnvs {
- currentEnabledIDs = append(currentEnabledIDs, e.ID)
- }
- if !equalUint64Slices(enabledEnvIDs, currentEnabledIDs) {
- cleanupDisabledNodes(currentEnabledIDs)
- enabledEnvIDs = currentEnabledIDs
- select {
- case envUpdateChan <- currentEnabledIDs:
- default:
- }
- }
- }
- }
- }()
- for _, env := range envs {
- wg.Add(1)
- go func(e *model.Environment) {
- defer wg.Done()
- retryTicker := time.NewTicker(1 * time.Second)
- defer retryTicker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case newEnabledIDs := <-envUpdateChan:
- found := false
- for _, id := range newEnabledIDs {
- if id == e.ID {
- found = true
- break
- }
- }
- if !found {
- return
- }
- case <-retryTicker.C:
- if !checkEnvironmentStillEnabled(e.ID) {
- retryMutex.Lock()
- delete(retryStates, e.ID)
- retryMutex.Unlock()
- return
- }
- if !shouldRetry(e.ID) {
- continue
- }
- if err := nodeAnalyticRecord(e, ctx); err != nil {
- logger.Error(err)
- markConnectionFailure(e.ID, err)
- } else {
- markConnectionSuccess(e.ID)
- }
- }
- }
- }(env)
- }
- }
- func checkNodeTimeouts(timeout time.Duration) {
- mutex.Lock()
- defer mutex.Unlock()
- now := time.Now()
- for _, node := range NodeMap {
- if node != nil && node.Status && now.Sub(node.ResponseAt) > timeout {
- node.Status = false
- node.ResponseAt = now
- }
- }
- }
- // equalUint64Slices compares two uint64 slices for equality
- func equalUint64Slices(a, b []uint64) bool {
- if len(a) != len(b) {
- return false
- }
- // Create maps for comparison
- mapA := make(map[uint64]bool)
- mapB := make(map[uint64]bool)
- for _, v := range a {
- mapA[v] = true
- }
- for _, v := range b {
- mapB[v] = true
- }
- // Compare maps
- for k := range mapA {
- if !mapB[k] {
- return false
- }
- }
- for k := range mapB {
- if !mapA[k] {
- return false
- }
- }
- return true
- }
- func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
- scopeCtx, cancel := context.WithCancel(ctx)
- defer cancel()
- node, err := InitNode(env)
- if err != nil {
- mutex.Lock()
- if NodeMap[env.ID] == nil {
- NodeMap[env.ID] = &Node{
- Environment: env,
- NodeStat: NodeStat{Status: false, ResponseAt: time.Now()},
- }
- } else {
- NodeMap[env.ID].Status = false
- NodeMap[env.ID].ResponseAt = time.Now()
- }
- mutex.Unlock()
- return err
- }
- mutex.Lock()
- NodeMap[env.ID] = node
- mutex.Unlock()
- u, err := env.GetWebSocketURL("/api/analytic/intro")
- if err != nil {
- return err
- }
- header := http.Header{}
- header.Set("X-Node-Secret", env.Token)
- dial := &websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: 5 * time.Second,
- }
- c, _, err := dial.Dial(u, header)
- if err != nil {
- updateNodeStatus(env.ID, false, "websocket_dial_failed")
- return err
- }
- defer func() {
- c.Close()
- updateNodeStatus(env.ID, false, "websocket_connection_closed")
- }()
- go func() {
- select {
- case <-scopeCtx.Done():
- _ = c.Close()
- case <-ctx.Done():
- _ = c.Close()
- }
- }()
- for {
- select {
- case <-scopeCtx.Done():
- return ctx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
- var rawMsg json.RawMessage
- err = c.ReadJSON(&rawMsg)
- if err != nil {
- if helper.IsUnexpectedWebsocketError(err) {
- updateNodeStatus(env.ID, false, "websocket_error")
- return err
- }
- return nil
- }
- mutex.Lock()
- if NodeMap[env.ID] == nil {
- NodeMap[env.ID] = &Node{
- Environment: env,
- NodeStat: NodeStat{Status: true, ResponseAt: time.Now()},
- }
- } else {
- var fullNode Node
- if err := json.Unmarshal(rawMsg, &fullNode); err == nil && fullNode.Version != "" {
- NodeMap[env.ID].NodeInfo = fullNode.NodeInfo
- NodeMap[env.ID].NodeStat = fullNode.NodeStat
- } else {
- var nodeStat NodeStat
- if err := json.Unmarshal(rawMsg, &nodeStat); err == nil {
- NodeMap[env.ID].NodeStat = nodeStat
- }
- }
- NodeMap[env.ID].Status = true
- NodeMap[env.ID].ResponseAt = time.Now()
- }
- mutex.Unlock()
- }
- }
|