processing_status.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package event
  2. import (
  3. "sync"
  4. "github.com/uozi-tech/cosy/logger"
  5. )
  6. // ProcessingStatusManager manages the global processing status
  7. type ProcessingStatusManager struct {
  8. mu sync.RWMutex
  9. status ProcessingStatusData
  10. }
  11. var (
  12. processingManager *ProcessingStatusManager
  13. processingOnce sync.Once
  14. )
  15. // GetProcessingStatusManager returns the singleton instance of ProcessingStatusManager
  16. func GetProcessingStatusManager() *ProcessingStatusManager {
  17. processingOnce.Do(func() {
  18. processingManager = &ProcessingStatusManager{
  19. status: ProcessingStatusData{
  20. IndexScanning: false,
  21. AutoCertProcessing: false,
  22. NginxLogIndexing: false,
  23. },
  24. }
  25. })
  26. return processingManager
  27. }
  28. // GetCurrentStatus returns the current processing status
  29. func (m *ProcessingStatusManager) GetCurrentStatus() ProcessingStatusData {
  30. m.mu.RLock()
  31. defer m.mu.RUnlock()
  32. return m.status
  33. }
  34. // UpdateIndexScanning updates the index scanning status
  35. func (m *ProcessingStatusManager) UpdateIndexScanning(scanning bool) {
  36. m.mu.Lock()
  37. defer m.mu.Unlock()
  38. if m.status.IndexScanning != scanning {
  39. m.status.IndexScanning = scanning
  40. logger.Infof("Index scanning status changed to: %t", scanning)
  41. m.publishStatus()
  42. }
  43. }
  44. // UpdateAutoCertProcessing updates the auto cert processing status
  45. func (m *ProcessingStatusManager) UpdateAutoCertProcessing(processing bool) {
  46. m.mu.Lock()
  47. defer m.mu.Unlock()
  48. if m.status.AutoCertProcessing != processing {
  49. m.status.AutoCertProcessing = processing
  50. logger.Infof("Auto cert processing status changed to: %t", processing)
  51. m.publishStatus()
  52. }
  53. }
  54. // UpdateNginxLogIndexing updates the nginx log indexing status
  55. func (m *ProcessingStatusManager) UpdateNginxLogIndexing(indexing bool) {
  56. m.mu.Lock()
  57. defer m.mu.Unlock()
  58. if m.status.NginxLogIndexing != indexing {
  59. m.status.NginxLogIndexing = indexing
  60. logger.Infof("Nginx log indexing status changed to: %t", indexing)
  61. m.publishStatus()
  62. // Also publish legacy nginx_log_status for backward compatibility
  63. Publish(Event{
  64. Type: TypeNginxLogStatus,
  65. Data: NginxLogStatusData{
  66. Indexing: indexing,
  67. },
  68. })
  69. }
  70. }
  71. // publishStatus publishes the current processing status
  72. func (m *ProcessingStatusManager) publishStatus() {
  73. Publish(Event{
  74. Type: TypeProcessingStatus,
  75. Data: m.status,
  76. })
  77. }
  78. // BroadcastCurrentStatus broadcasts the current status (used when clients connect)
  79. func (m *ProcessingStatusManager) BroadcastCurrentStatus() {
  80. m.mu.RLock()
  81. defer m.mu.RUnlock()
  82. logger.Info("Broadcasting current processing status to new client")
  83. Publish(Event{
  84. Type: TypeProcessingStatus,
  85. Data: m.status,
  86. })
  87. }