bus.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package event
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/uozi-tech/cosy/logger"
  6. )
  7. // WebSocketHub interface for broadcasting messages
  8. type WebSocketHub interface {
  9. BroadcastMessage(event string, data interface{})
  10. }
  11. // WebSocketEventConfig holds configuration for WebSocket event forwarding
  12. type WebSocketEventConfig struct {
  13. EventType EventType
  14. WSEventName string
  15. DataTransform func(data interface{}) interface{}
  16. }
  17. // EventBus manages event publishing and WebSocket forwarding
  18. type EventBus struct {
  19. wsHub WebSocketHub
  20. wsConfigs map[EventType]*WebSocketEventConfig
  21. wsMutex sync.RWMutex
  22. ctx context.Context
  23. cancel context.CancelFunc
  24. }
  25. var (
  26. globalBus *EventBus
  27. busOnce sync.Once
  28. )
  29. // GetEventBus returns the global event bus instance
  30. func GetEventBus() *EventBus {
  31. busOnce.Do(func() {
  32. ctx, cancel := context.WithCancel(context.Background())
  33. globalBus = &EventBus{
  34. wsConfigs: make(map[EventType]*WebSocketEventConfig),
  35. ctx: ctx,
  36. cancel: cancel,
  37. }
  38. })
  39. return globalBus
  40. }
  41. // SetWebSocketHub sets the WebSocket hub for direct event forwarding
  42. func (eb *EventBus) SetWebSocketHub(hub WebSocketHub) {
  43. eb.wsMutex.Lock()
  44. defer eb.wsMutex.Unlock()
  45. eb.wsHub = hub
  46. logger.Info("WebSocket hub registered with event bus")
  47. }
  48. // RegisterWebSocketEventForwarding registers an event type to be forwarded to WebSocket clients
  49. func (eb *EventBus) RegisterWebSocketEventForwarding(eventType EventType, wsEventName string) {
  50. eb.RegisterWebSocketEventForwardingWithTransform(eventType, wsEventName, func(data interface{}) interface{} {
  51. return data // Default: no transformation
  52. })
  53. }
  54. // RegisterWebSocketEventForwardingWithTransform registers an event type with custom data transformation
  55. func (eb *EventBus) RegisterWebSocketEventForwardingWithTransform(eventType EventType, wsEventName string, transform func(data interface{}) interface{}) {
  56. eb.wsMutex.Lock()
  57. defer eb.wsMutex.Unlock()
  58. // Only register if not already registered
  59. if _, exists := eb.wsConfigs[eventType]; !exists {
  60. config := &WebSocketEventConfig{
  61. EventType: eventType,
  62. WSEventName: wsEventName,
  63. DataTransform: transform,
  64. }
  65. eb.wsConfigs[eventType] = config
  66. logger.Debugf("Registered WebSocket event forwarding: %s -> %s", eventType, wsEventName)
  67. }
  68. }
  69. // Publish forwards an event directly to WebSocket clients
  70. func (eb *EventBus) Publish(event Event) {
  71. eb.forwardToWebSocket(event)
  72. }
  73. // forwardToWebSocket forwards an event to WebSocket clients if configured
  74. func (eb *EventBus) forwardToWebSocket(event Event) {
  75. eb.wsMutex.RLock()
  76. config, exists := eb.wsConfigs[event.Type]
  77. hub := eb.wsHub
  78. eb.wsMutex.RUnlock()
  79. if !exists || hub == nil {
  80. return
  81. }
  82. // Apply data transformation
  83. wsData := config.DataTransform(event.Data)
  84. hub.BroadcastMessage(config.WSEventName, wsData)
  85. }
  86. // Shutdown gracefully shuts down the event bus
  87. func (eb *EventBus) Shutdown() {
  88. eb.cancel()
  89. eb.wsMutex.Lock()
  90. defer eb.wsMutex.Unlock()
  91. // Clear all configurations
  92. eb.wsConfigs = make(map[EventType]*WebSocketEventConfig)
  93. eb.wsHub = nil
  94. logger.Info("Event bus shutdown completed")
  95. }
  96. // Context returns the event bus context
  97. func (eb *EventBus) Context() context.Context {
  98. return eb.ctx
  99. }
  100. // Convenience functions for global event bus
  101. // Publish forwards an event to WebSocket clients on the global bus
  102. func Publish(event Event) {
  103. GetEventBus().Publish(event)
  104. }
  105. // SetWebSocketHub sets the WebSocket hub for the global event bus
  106. func SetWebSocketHub(hub WebSocketHub) {
  107. GetEventBus().SetWebSocketHub(hub)
  108. }
  109. // RegisterWebSocketEventForwarding registers WebSocket event forwarding on the global bus
  110. func RegisterWebSocketEventForwarding(eventType EventType, wsEventName string) {
  111. GetEventBus().RegisterWebSocketEventForwarding(eventType, wsEventName)
  112. }
  113. // RegisterWebSocketEventForwardingWithTransform registers WebSocket event forwarding with transform on the global bus
  114. func RegisterWebSocketEventForwardingWithTransform(eventType EventType, wsEventName string, transform func(data interface{}) interface{}) {
  115. GetEventBus().RegisterWebSocketEventForwardingWithTransform(eventType, wsEventName, transform)
  116. }
  117. // RegisterWebSocketEventConfigs registers multiple WebSocket event configurations
  118. func RegisterWebSocketEventConfigs(configs []WebSocketEventConfig) {
  119. bus := GetEventBus()
  120. for _, config := range configs {
  121. bus.RegisterWebSocketEventForwardingWithTransform(config.EventType, config.WSEventName, config.DataTransform)
  122. }
  123. }
  124. // GetDefaultWebSocketEventConfigs returns the default WebSocket event configurations
  125. func GetDefaultWebSocketEventConfigs() []WebSocketEventConfig {
  126. return []WebSocketEventConfig{
  127. {
  128. EventType: EventTypeIndexScanning,
  129. WSEventName: "index_scanning",
  130. DataTransform: func(data interface{}) interface{} {
  131. return data
  132. },
  133. },
  134. {
  135. EventType: EventTypeAutoCertProcessing,
  136. WSEventName: "auto_cert_processing",
  137. DataTransform: func(data interface{}) interface{} {
  138. return data
  139. },
  140. },
  141. {
  142. EventType: EventTypeProcessingStatus,
  143. WSEventName: "processing_status",
  144. DataTransform: func(data interface{}) interface{} {
  145. return data
  146. },
  147. },
  148. {
  149. EventType: EventTypeNginxLogStatus,
  150. WSEventName: "nginx_log_status",
  151. DataTransform: func(data interface{}) interface{} {
  152. return data
  153. },
  154. },
  155. {
  156. EventType: EventTypeNotification,
  157. WSEventName: "notification",
  158. DataTransform: func(data interface{}) interface{} {
  159. return data
  160. },
  161. },
  162. }
  163. }