bus.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package event
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/uozi-tech/cosy/logger"
  6. )
  7. // WebSocketHub interface for broadcasting messages
  8. type WebSocketHub interface {
  9. BroadcastMessage(event string, data interface{})
  10. }
  11. // EventBus manages event publishing and WebSocket forwarding
  12. type EventBus struct {
  13. wsHub WebSocketHub
  14. wsMutex sync.RWMutex
  15. ctx context.Context
  16. cancel context.CancelFunc
  17. }
  18. var (
  19. globalBus *EventBus
  20. busOnce sync.Once
  21. )
  22. // GetEventBus returns the global event bus instance
  23. func GetEventBus() *EventBus {
  24. busOnce.Do(func() {
  25. ctx, cancel := context.WithCancel(context.Background())
  26. globalBus = &EventBus{
  27. ctx: ctx,
  28. cancel: cancel,
  29. }
  30. })
  31. return globalBus
  32. }
  33. // SetWebSocketHub sets the WebSocket hub for direct event forwarding
  34. func (eb *EventBus) SetWebSocketHub(hub WebSocketHub) {
  35. eb.wsMutex.Lock()
  36. defer eb.wsMutex.Unlock()
  37. eb.wsHub = hub
  38. logger.Info("WebSocket hub registered with event bus")
  39. }
  40. // Publish forwards an event directly to WebSocket clients
  41. func (eb *EventBus) Publish(event Event) {
  42. eb.wsMutex.RLock()
  43. hub := eb.wsHub
  44. eb.wsMutex.RUnlock()
  45. if hub == nil {
  46. return
  47. }
  48. // Directly broadcast the event using its type as the event name
  49. hub.BroadcastMessage(string(event.Type), event.Data)
  50. }
  51. // Shutdown gracefully shuts down the event bus
  52. func (eb *EventBus) Shutdown() {
  53. eb.cancel()
  54. eb.wsMutex.Lock()
  55. defer eb.wsMutex.Unlock()
  56. eb.wsHub = nil
  57. logger.Info("Event bus shutdown completed")
  58. }
  59. // Context returns the event bus context
  60. func (eb *EventBus) Context() context.Context {
  61. return eb.ctx
  62. }
  63. // Convenience functions for global event bus
  64. // Publish forwards an event to WebSocket clients on the global bus
  65. func Publish(event Event) {
  66. GetEventBus().Publish(event)
  67. }
  68. // SetWebSocketHub sets the WebSocket hub for the global event bus
  69. func SetWebSocketHub(hub WebSocketHub) {
  70. GetEventBus().SetWebSocketHub(hub)
  71. }