mutex.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package cert
  2. import (
  3. "sync"
  4. )
  5. var (
  6. // mutex is used to control access to certificate operations
  7. mutex sync.Mutex
  8. // statusChan is the channel to broadcast certificate status changes
  9. statusChan chan bool
  10. // subscribers is a map of channels that are subscribed to certificate status changes
  11. subscribers map[chan bool]struct{}
  12. // subscriberMux protects the subscribers map from concurrent access
  13. subscriberMux sync.RWMutex
  14. // isProcessing indicates whether a certificate operation is in progress
  15. isProcessing bool
  16. // processingMutex protects the isProcessing flag
  17. processingMutex sync.RWMutex
  18. )
  19. func init() {
  20. // Initialize channels and maps
  21. statusChan = make(chan bool, 10) // Buffer to prevent blocking
  22. subscribers = make(map[chan bool]struct{})
  23. // Start broadcasting goroutine
  24. go broadcastStatus()
  25. }
  26. // broadcastStatus listens for status changes and broadcasts to all subscribers
  27. func broadcastStatus() {
  28. for status := range statusChan {
  29. subscriberMux.RLock()
  30. for ch := range subscribers {
  31. // Non-blocking send to prevent slow subscribers from blocking others
  32. select {
  33. case ch <- status:
  34. default:
  35. // Skip if channel buffer is full
  36. }
  37. }
  38. subscriberMux.RUnlock()
  39. }
  40. }
  41. // SubscribeProcessingStatus allows a client to subscribe to certificate processing status changes
  42. func SubscribeProcessingStatus() chan bool {
  43. ch := make(chan bool, 5) // Buffer to prevent blocking
  44. // Add to subscribers
  45. subscriberMux.Lock()
  46. subscribers[ch] = struct{}{}
  47. subscriberMux.Unlock()
  48. // Send current status immediately
  49. processingMutex.RLock()
  50. currentStatus := isProcessing
  51. processingMutex.RUnlock()
  52. // Non-blocking send
  53. select {
  54. case ch <- currentStatus:
  55. default:
  56. }
  57. return ch
  58. }
  59. // UnsubscribeProcessingStatus removes a subscriber from receiving status updates
  60. func UnsubscribeProcessingStatus(ch chan bool) {
  61. subscriberMux.Lock()
  62. delete(subscribers, ch)
  63. subscriberMux.Unlock()
  64. // Close the channel so the client knows it's unsubscribed
  65. close(ch)
  66. }
  67. // lock acquires the certificate mutex
  68. func lock() {
  69. mutex.Lock()
  70. setProcessingStatus(true)
  71. }
  72. // unlock releases the certificate mutex
  73. func unlock() {
  74. setProcessingStatus(false)
  75. mutex.Unlock()
  76. }
  77. // IsProcessing returns whether a certificate operation is currently in progress
  78. func IsProcessing() bool {
  79. processingMutex.RLock()
  80. defer processingMutex.RUnlock()
  81. return isProcessing
  82. }
  83. // setProcessingStatus updates the processing status and broadcasts the change
  84. func setProcessingStatus(status bool) {
  85. processingMutex.Lock()
  86. if isProcessing != status {
  87. isProcessing = status
  88. statusChan <- status
  89. }
  90. processingMutex.Unlock()
  91. }