| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 | package analyticimport (	"context"	"net/http"	"sync"	"time"	"github.com/0xJacky/Nginx-UI/model"	"github.com/0xJacky/Nginx-UI/query"	"github.com/gorilla/websocket"	"github.com/uozi-tech/cosy/logger")// NodeRecordManager manages the node status retrieval processtype NodeRecordManager struct {	ctx    context.Context	cancel context.CancelFunc	wg     sync.WaitGroup	mu     sync.Mutex}// NewNodeRecordManager creates a new NodeRecordManager with the provided contextfunc NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {	ctx, cancel := context.WithCancel(parentCtx)	return &NodeRecordManager{		ctx:    ctx,		cancel: cancel,	}}// Start begins retrieving node status using the manager's contextfunc (m *NodeRecordManager) Start() {	m.mu.Lock()	defer m.mu.Unlock()	m.wg.Add(1)	go func() {		defer m.wg.Done()		RetrieveNodesStatus(m.ctx)	}()}// Stop cancels the current context and waits for operations to completefunc (m *NodeRecordManager) Stop() {	m.mu.Lock()	defer m.mu.Unlock()	m.cancel()	m.wg.Wait()}// Restart stops and then restarts the node status retrievalfunc (m *NodeRecordManager) Restart() {	m.Stop()	// Create new context	m.ctx, m.cancel = context.WithCancel(context.Background())	// Start retrieval with new context	m.Start()}// For backward compatibilityvar (	defaultManager *NodeRecordManager	setupOnce      sync.Once	restartMu      sync.Mutex)// InitDefaultManager initializes the default NodeRecordManagerfunc InitDefaultManager() {	setupOnce.Do(func() {		defaultManager = NewNodeRecordManager(context.Background())	})}// RestartRetrieveNodesStatus restarts the node status retrieval process// Kept for backward compatibilityfunc RestartRetrieveNodesStatus() {	restartMu.Lock()	defer restartMu.Unlock()	if defaultManager == nil {		InitDefaultManager()	}	defaultManager.Restart()}// StartRetrieveNodesStatus starts the node status retrieval with a custom contextfunc StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {	manager := NewNodeRecordManager(ctx)	manager.Start()	return manager}func RetrieveNodesStatus(ctx context.Context) {	logger.Info("RetrieveNodesStatus start")	defer logger.Info("RetrieveNodesStatus exited")	mutex.Lock()	if NodeMap == nil {		NodeMap = make(TNodeMap)	}	mutex.Unlock()	env := query.Environment	envs, err := env.Where(env.Enabled.Is(true)).Find()	if err != nil {		logger.Error(err)		return	}	var wg sync.WaitGroup	defer wg.Wait()	for _, env := range envs {		wg.Add(1)		go func(e *model.Environment) {			defer wg.Done()			retryTicker := time.NewTicker(5 * time.Second)			defer retryTicker.Stop()			for {				select {				case <-ctx.Done():					return				default:					if err := nodeAnalyticRecord(e, ctx); err != nil {						logger.Error(err)						mutex.Lock()						if NodeMap[env.ID] != nil {							NodeMap[env.ID].Status = false						}						mutex.Unlock()						select {						case <-retryTicker.C:						case <-ctx.Done():							return						}					}				}			}		}(env)	}}func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {	scopeCtx, cancel := context.WithCancel(ctx)	defer cancel()	node, err := InitNode(env)	mutex.Lock()	NodeMap[env.ID] = node	mutex.Unlock()	if err != nil {		return err	}	u, err := env.GetWebSocketURL("/api/analytic/intro")	if err != nil {		return err	}	header := http.Header{}	header.Set("X-Node-Secret", env.Token)	dial := &websocket.Dialer{		Proxy:            http.ProxyFromEnvironment,		HandshakeTimeout: 5 * time.Second,	}	c, _, err := dial.Dial(u, header)	if err != nil {		return err	}	defer c.Close()	go func() {		<-scopeCtx.Done()		_ = c.Close()	}()	var nodeStat NodeStat	for {		err = c.ReadJSON(&nodeStat)		if err != nil {			return err		}		// set online		nodeStat.Status = true		nodeStat.ResponseAt = time.Now()		mutex.Lock()		NodeMap[env.ID].NodeStat = nodeStat		mutex.Unlock()	}}
 |