semaphores.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package semaphores
  2. import (
  3. "context"
  4. "github.com/imgproxy/imgproxy/v3/monitoring"
  5. "golang.org/x/sync/semaphore"
  6. )
  7. // Semaphores is a container for the queue and processing semaphores
  8. type Semaphores struct {
  9. // queueSize semaphore: limits the queueSize size
  10. queueSize *semaphore.Weighted
  11. // processing semaphore: limits the number of concurrent image processings
  12. processing *semaphore.Weighted
  13. }
  14. // New creates new semaphores instance
  15. func New(config *Config) (*Semaphores, error) {
  16. if err := config.Validate(); err != nil {
  17. return nil, err
  18. }
  19. var queue *semaphore.Weighted
  20. if config.RequestsQueueSize > 0 {
  21. queue = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.Workers))
  22. }
  23. processing := semaphore.NewWeighted(int64(config.Workers))
  24. return &Semaphores{
  25. queueSize: queue,
  26. processing: processing,
  27. }, nil
  28. }
  29. // AcquireQueue acquires the queue semaphore and returns release function and error.
  30. // if queue semaphore is not configured, it returns a noop anonymous function to make
  31. // semaphore usage transparent.
  32. func (s *Semaphores) AcquireQueue() (context.CancelFunc, error) {
  33. if s.queueSize == nil {
  34. return func() {}, nil // return no-op cancel function if semaphore is disabled
  35. }
  36. acquired := s.queueSize.TryAcquire(1)
  37. if !acquired {
  38. return nil, newTooManyRequestsError()
  39. }
  40. return func() { s.queueSize.Release(1) }, nil
  41. }
  42. // AcquireProcessing acquires the processing semaphore
  43. func (s *Semaphores) AcquireProcessing(ctx context.Context) (context.CancelFunc, error) {
  44. defer monitoring.StartQueueSegment(ctx)()
  45. err := s.processing.Acquire(ctx, 1)
  46. if err != nil {
  47. return nil, err
  48. }
  49. return func() { s.processing.Release(1) }, nil
  50. }