1
0

mutex.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package cert
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. var (
  7. // mutex is used to control access to certificate operations
  8. mutex sync.Mutex
  9. // statusChan is the channel to broadcast certificate status changes
  10. statusChan chan bool
  11. // subscribers is a map of channels that are subscribed to certificate status changes
  12. subscribers map[chan bool]struct{}
  13. // subscriberMux protects the subscribers map from concurrent access
  14. subscriberMux sync.RWMutex
  15. // isProcessing indicates whether a certificate operation is in progress
  16. isProcessing bool
  17. // processingMutex protects the isProcessing flag
  18. processingMutex sync.RWMutex
  19. )
  20. func initBroadcastStatus(ctx context.Context) {
  21. // Initialize channels and maps
  22. statusChan = make(chan bool, 10) // Buffer to prevent blocking
  23. subscribers = make(map[chan bool]struct{})
  24. // Start broadcasting goroutine
  25. go broadcastStatus(ctx)
  26. }
  27. // broadcastStatus listens for status changes and broadcasts to all subscribers
  28. func broadcastStatus(ctx context.Context) {
  29. for {
  30. select {
  31. case <-ctx.Done():
  32. // Context cancelled, clean up resources and exit
  33. close(statusChan)
  34. return
  35. case status, ok := <-statusChan:
  36. if !ok {
  37. // Channel closed, exit
  38. return
  39. }
  40. subscriberMux.RLock()
  41. for ch := range subscribers {
  42. // Non-blocking send to prevent slow subscribers from blocking others
  43. select {
  44. case ch <- status:
  45. default:
  46. // Skip if channel buffer is full
  47. }
  48. }
  49. subscriberMux.RUnlock()
  50. }
  51. }
  52. }
  53. // SubscribeProcessingStatus allows a client to subscribe to certificate processing status changes
  54. func SubscribeProcessingStatus() chan bool {
  55. ch := make(chan bool, 5) // Buffer to prevent blocking
  56. // Add to subscribers
  57. subscriberMux.Lock()
  58. subscribers[ch] = struct{}{}
  59. subscriberMux.Unlock()
  60. // Send current status immediately
  61. processingMutex.RLock()
  62. currentStatus := isProcessing
  63. processingMutex.RUnlock()
  64. // Non-blocking send
  65. select {
  66. case ch <- currentStatus:
  67. default:
  68. }
  69. return ch
  70. }
  71. // UnsubscribeProcessingStatus removes a subscriber from receiving status updates
  72. func UnsubscribeProcessingStatus(ch chan bool) {
  73. subscriberMux.Lock()
  74. delete(subscribers, ch)
  75. subscriberMux.Unlock()
  76. // Close the channel so the client knows it's unsubscribed
  77. close(ch)
  78. }
  79. // lock acquires the certificate mutex
  80. func lock() {
  81. mutex.Lock()
  82. setProcessingStatus(true)
  83. }
  84. // unlock releases the certificate mutex
  85. func unlock() {
  86. setProcessingStatus(false)
  87. mutex.Unlock()
  88. }
  89. // IsProcessing returns whether a certificate operation is currently in progress
  90. func IsProcessing() bool {
  91. processingMutex.RLock()
  92. defer processingMutex.RUnlock()
  93. return isProcessing
  94. }
  95. // setProcessingStatus updates the processing status and broadcasts the change
  96. func setProcessingStatus(status bool) {
  97. processingMutex.Lock()
  98. if isProcessing != status {
  99. isProcessing = status
  100. statusChan <- status
  101. }
  102. processingMutex.Unlock()
  103. }