workers.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package workers
  2. import (
  3. "context"
  4. "golang.org/x/sync/semaphore"
  5. )
  6. // Workers controls how many concurrent image processings are allowed.
  7. // Requests exceeding this limit will be queued.
  8. //
  9. // It can also optionally limit the number of requests in the queue.
  10. type Workers struct {
  11. // queue semaphore: limits the queue size
  12. queue *semaphore.Weighted
  13. // workers semaphore: limits the number of concurrent image processings
  14. workers *semaphore.Weighted
  15. }
  16. // New creates new semaphores instance
  17. func New(config *Config) (*Workers, error) {
  18. if err := config.Validate(); err != nil {
  19. return nil, err
  20. }
  21. var queue *semaphore.Weighted
  22. if config.RequestsQueueSize > 0 {
  23. queue = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.WorkersNumber))
  24. }
  25. workers := semaphore.NewWeighted(int64(config.WorkersNumber))
  26. return &Workers{
  27. queue: queue,
  28. workers: workers,
  29. }, nil
  30. }
  31. // Acquire acquires a worker.
  32. // It returns a worker release function and an error if any.
  33. func (s *Workers) Acquire(ctx context.Context) (context.CancelFunc, error) {
  34. // First, try to acquire the queue semaphore if configured.
  35. // If the queue is full, return an error immediately.
  36. releaseQueue, err := s.acquireQueue()
  37. if err != nil {
  38. return nil, err
  39. }
  40. // Next, acquire the workers semaphore.
  41. err = s.workers.Acquire(ctx, 1)
  42. if err != nil {
  43. releaseQueue()
  44. return nil, err
  45. }
  46. release := func() {
  47. s.workers.Release(1)
  48. releaseQueue()
  49. }
  50. return release, nil
  51. }
  52. // acquireQueue acquires the queue semaphore and returns release function and error.
  53. // If queue semaphore is not configured, it returns a noop anonymous function to make
  54. // semaphore usage transparent.
  55. func (s *Workers) acquireQueue() (context.CancelFunc, error) {
  56. if s.queue == nil {
  57. return func() {}, nil // return no-op cancel function if semaphore is disabled
  58. }
  59. acquired := s.queue.TryAcquire(1)
  60. if !acquired {
  61. return nil, newTooManyRequestsError()
  62. }
  63. return func() { s.queue.Release(1) }, nil
  64. }