123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- package workers
- import (
- "context"
- "sync/atomic"
- "testing"
- "time"
- "github.com/imgproxy/imgproxy/v3/testutil"
- "github.com/stretchr/testify/suite"
- "golang.org/x/sync/errgroup"
- )
- type WorkersTestSuite struct {
- testutil.LazySuite
- config testutil.LazyObj[*Config]
- workers testutil.LazyObj[*Workers]
- }
- func (s *WorkersTestSuite) SetupSuite() {
- s.config, _ = testutil.NewLazySuiteObj(s, func() (*Config, error) {
- return &Config{RequestsQueueSize: 0, WorkersNumber: 1}, nil
- })
- s.workers, _ = testutil.NewLazySuiteObj(s, func() (*Workers, error) {
- return New(s.config())
- })
- }
- func (s *WorkersTestSuite) acquire(ctx context.Context, n int, delay time.Duration) (int, error) {
- errg := new(errgroup.Group)
- aquired := int64(0)
- // Get the workers instance before running goroutines
- workers := s.workers()
- for i := 0; i < n; i++ {
- errg.Go(func() error {
- release, err := workers.Acquire(ctx)
- if err == nil {
- time.Sleep(delay)
- release()
- atomic.AddInt64(&aquired, 1)
- }
- return err
- })
- }
- err := errg.Wait()
- return int(aquired), err
- }
- func (s *WorkersTestSuite) TestQueueDisabled() {
- s.config().RequestsQueueSize = 0
- s.config().WorkersNumber = 2
- // Try to acquire workers that exceed allowed workers number
- aquired, err := s.acquire(s.T().Context(), 4, 10*time.Millisecond)
- s.Require().Equal(4, aquired, "All workers should be eventually acquired")
- s.Require().NoError(err, "All workers should be acquired without error")
- }
- func (s *WorkersTestSuite) TestQueueEnabled() {
- s.config().RequestsQueueSize = 1
- s.config().WorkersNumber = 2
- // Try to acquire workers that fit allowed workers number + queue size
- aquired, err := s.acquire(s.T().Context(), 3, 10*time.Millisecond)
- s.Require().Equal(3, aquired, "All workers should be eventually acquired")
- s.Require().NoError(err, "All workers should be acquired without error")
- // Try to acquire workers that exceed allowed workers number + queue size
- aquired, err = s.acquire(s.T().Context(), 6, 10*time.Millisecond)
- s.Require().Equal(3, aquired, "Only 4 workers should be acquired")
- s.Require().ErrorAs(err, new(TooManyRequestsError))
- }
- func (s *WorkersTestSuite) TestContextTimeout() {
- ctx, cancel := context.WithTimeout(s.T().Context(), 5*time.Millisecond)
- defer cancel()
- aquired, err := s.acquire(ctx, 2, 100*time.Millisecond)
- s.Require().Equal(1, aquired, "Only 1 worker should be acquired")
- s.Require().ErrorIs(err, context.DeadlineExceeded, "Context deadline exceeded error expected")
- }
- func (s *WorkersTestSuite) TestContextCanceled() {
- ctx, cancel := context.WithCancel(s.T().Context())
- cancel()
- aquired, err := s.acquire(ctx, 2, 100*time.Millisecond)
- s.Require().Equal(0, aquired, "No worker should be acquired")
- s.Require().ErrorIs(err, context.Canceled, "Context canceled error expected")
- }
- func (s *WorkersTestSuite) TestSemaphoresInvalidConfig() {
- _, err := New(&Config{RequestsQueueSize: 0, WorkersNumber: 0})
- s.Require().Error(err)
- _, err = New(&Config{RequestsQueueSize: -1, WorkersNumber: 1})
- s.Require().Error(err)
- }
- func TestWorkers(t *testing.T) {
- suite.Run(t, new(WorkersTestSuite))
- }
|