mutex.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package cert
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. var (
  7. // mutex is used to control access to certificate operations
  8. mutex sync.Mutex
  9. // statusChan is the channel to broadcast certificate status changes
  10. statusChan = make(chan bool, 10)
  11. // subscribers is a map of channels that are subscribed to certificate status changes
  12. subscribers = make(map[chan bool]struct{})
  13. // subscriberMux protects the subscribers map from concurrent access
  14. subscriberMux sync.RWMutex
  15. // isProcessing indicates whether a certificate operation is in progress
  16. isProcessing bool
  17. // processingMutex protects the isProcessing flag
  18. processingMutex sync.RWMutex
  19. )
  20. func initBroadcastStatus(ctx context.Context) {
  21. // Start broadcasting goroutine
  22. go broadcastStatus(ctx)
  23. }
  24. // broadcastStatus listens for status changes and broadcasts to all subscribers
  25. func broadcastStatus(ctx context.Context) {
  26. for {
  27. select {
  28. case <-ctx.Done():
  29. // Context cancelled, clean up resources and exit
  30. close(statusChan)
  31. return
  32. case status, ok := <-statusChan:
  33. if !ok {
  34. // Channel closed, exit
  35. return
  36. }
  37. subscriberMux.RLock()
  38. for ch := range subscribers {
  39. // Non-blocking send to prevent slow subscribers from blocking others
  40. select {
  41. case ch <- status:
  42. default:
  43. // Skip if channel buffer is full
  44. }
  45. }
  46. subscriberMux.RUnlock()
  47. }
  48. }
  49. }
  50. // SubscribeProcessingStatus allows a client to subscribe to certificate processing status changes
  51. func SubscribeProcessingStatus() chan bool {
  52. ch := make(chan bool, 5) // Buffer to prevent blocking
  53. // Add to subscribers
  54. subscriberMux.Lock()
  55. subscribers[ch] = struct{}{}
  56. subscriberMux.Unlock()
  57. // Send current status immediately
  58. processingMutex.RLock()
  59. currentStatus := isProcessing
  60. processingMutex.RUnlock()
  61. // Non-blocking send
  62. select {
  63. case ch <- currentStatus:
  64. default:
  65. }
  66. return ch
  67. }
  68. // UnsubscribeProcessingStatus removes a subscriber from receiving status updates
  69. func UnsubscribeProcessingStatus(ch chan bool) {
  70. subscriberMux.Lock()
  71. delete(subscribers, ch)
  72. subscriberMux.Unlock()
  73. // Close the channel so the client knows it's unsubscribed
  74. close(ch)
  75. }
  76. // lock acquires the certificate mutex
  77. func lock() {
  78. mutex.Lock()
  79. setProcessingStatus(true)
  80. }
  81. // unlock releases the certificate mutex
  82. func unlock() {
  83. setProcessingStatus(false)
  84. mutex.Unlock()
  85. }
  86. // IsProcessing returns whether a certificate operation is currently in progress
  87. func IsProcessing() bool {
  88. processingMutex.RLock()
  89. defer processingMutex.RUnlock()
  90. return isProcessing
  91. }
  92. // setProcessingStatus updates the processing status and broadcasts the change
  93. func setProcessingStatus(status bool) {
  94. processingMutex.Lock()
  95. if isProcessing != status {
  96. isProcessing = status
  97. statusChan <- status
  98. }
  99. processingMutex.Unlock()
  100. }