1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- package semaphores
- import (
- "context"
- "github.com/imgproxy/imgproxy/v3/monitoring"
- "golang.org/x/sync/semaphore"
- )
- // Semaphores is a container for the queue and processing semaphores
- type Semaphores struct {
- // queueSize semaphore: limits the queueSize size
- queueSize *semaphore.Weighted
- // processing semaphore: limits the number of concurrent image processings
- processing *semaphore.Weighted
- }
- // New creates new semaphores instance
- func New(config *Config) (*Semaphores, error) {
- if err := config.Validate(); err != nil {
- return nil, err
- }
- var queue *semaphore.Weighted
- if config.RequestsQueueSize > 0 {
- queue = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.Workers))
- }
- processing := semaphore.NewWeighted(int64(config.Workers))
- return &Semaphores{
- queueSize: queue,
- processing: processing,
- }, nil
- }
- // AcquireQueue acquires the queue semaphore and returns release function and error.
- // if queue semaphore is not configured, it returns a noop anonymous function to make
- // semaphore usage transparent.
- func (s *Semaphores) AcquireQueue() (context.CancelFunc, error) {
- if s.queueSize == nil {
- return func() {}, nil // return no-op cancel function if semaphore is disabled
- }
- acquired := s.queueSize.TryAcquire(1)
- if !acquired {
- return nil, newTooManyRequestsError()
- }
- return func() { s.queueSize.Release(1) }, nil
- }
- // AcquireProcessing acquires the processing semaphore
- func (s *Semaphores) AcquireProcessing(ctx context.Context) (context.CancelFunc, error) {
- defer monitoring.StartQueueSegment(ctx)()
- err := s.processing.Acquire(ctx, 1)
- if err != nil {
- return nil, err
- }
- return func() { s.processing.Release(1) }, nil
- }
|