workers.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package workers
  2. import (
  3. "context"
  4. "github.com/imgproxy/imgproxy/v3/monitoring"
  5. "golang.org/x/sync/semaphore"
  6. )
  7. // Workers controls how many concurrent image processings are allowed.
  8. // Requests exceeding this limit will be queued.
  9. //
  10. // It can also optionally limit the number of requests in the queue.
  11. type Workers struct {
  12. // queue semaphore: limits the queue size
  13. queue *semaphore.Weighted
  14. // workers semaphore: limits the number of concurrent image processings
  15. workers *semaphore.Weighted
  16. }
  17. // New creates new semaphores instance
  18. func New(config *Config) (*Workers, error) {
  19. if err := config.Validate(); err != nil {
  20. return nil, err
  21. }
  22. var queue *semaphore.Weighted
  23. if config.RequestsQueueSize > 0 {
  24. queue = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.WorkersNumber))
  25. }
  26. workers := semaphore.NewWeighted(int64(config.WorkersNumber))
  27. return &Workers{
  28. queue: queue,
  29. workers: workers,
  30. }, nil
  31. }
  32. // Acquire acquires a worker.
  33. // It returns a worker release function and an error if any.
  34. func (s *Workers) Acquire(ctx context.Context) (context.CancelFunc, error) {
  35. defer monitoring.StartQueueSegment(ctx)()
  36. // First, try to acquire the queue semaphore if configured.
  37. // If the queue is full, return an error immediately.
  38. releaseQueue, err := s.acquireQueue()
  39. if err != nil {
  40. return nil, err
  41. }
  42. // Next, acquire the workers semaphore.
  43. err = s.workers.Acquire(ctx, 1)
  44. if err != nil {
  45. releaseQueue()
  46. return nil, err
  47. }
  48. release := func() {
  49. s.workers.Release(1)
  50. releaseQueue()
  51. }
  52. return release, nil
  53. }
  54. // acquireQueue acquires the queue semaphore and returns release function and error.
  55. // If queue semaphore is not configured, it returns a noop anonymous function to make
  56. // semaphore usage transparent.
  57. func (s *Workers) acquireQueue() (context.CancelFunc, error) {
  58. if s.queue == nil {
  59. return func() {}, nil // return no-op cancel function if semaphore is disabled
  60. }
  61. acquired := s.queue.TryAcquire(1)
  62. if !acquired {
  63. return nil, newTooManyRequestsError()
  64. }
  65. return func() { s.queue.Release(1) }, nil
  66. }