workers_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package workers
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "github.com/imgproxy/imgproxy/v3/testutil"
  8. "github.com/stretchr/testify/suite"
  9. "golang.org/x/sync/errgroup"
  10. )
  11. type WorkersTestSuite struct {
  12. testutil.LazySuite
  13. config testutil.LazyObj[*Config]
  14. workers testutil.LazyObj[*Workers]
  15. }
  16. func (s *WorkersTestSuite) SetupSuite() {
  17. s.config, _ = testutil.NewLazySuiteObj(s, func() (*Config, error) {
  18. return &Config{RequestsQueueSize: 0, WorkersNumber: 1}, nil
  19. })
  20. s.workers, _ = testutil.NewLazySuiteObj(s, func() (*Workers, error) {
  21. return New(s.config())
  22. })
  23. }
  24. func (s *WorkersTestSuite) acquire(ctx context.Context, n int, delay time.Duration) (int, error) {
  25. errg := new(errgroup.Group)
  26. aquired := int64(0)
  27. // Get the workers instance before running goroutines
  28. workers := s.workers()
  29. for i := 0; i < n; i++ {
  30. errg.Go(func() error {
  31. release, err := workers.Acquire(ctx)
  32. if err == nil {
  33. time.Sleep(delay)
  34. release()
  35. atomic.AddInt64(&aquired, 1)
  36. }
  37. return err
  38. })
  39. }
  40. err := errg.Wait()
  41. return int(aquired), err
  42. }
  43. func (s *WorkersTestSuite) TestQueueDisabled() {
  44. s.config().RequestsQueueSize = 0
  45. s.config().WorkersNumber = 2
  46. // Try to acquire workers that exceed allowed workers number
  47. aquired, err := s.acquire(s.T().Context(), 4, 10*time.Millisecond)
  48. s.Require().Equal(4, aquired, "All workers should be eventually acquired")
  49. s.Require().NoError(err, "All workers should be acquired without error")
  50. }
  51. func (s *WorkersTestSuite) TestQueueEnabled() {
  52. s.config().RequestsQueueSize = 1
  53. s.config().WorkersNumber = 2
  54. // Try to acquire workers that fit allowed workers number + queue size
  55. aquired, err := s.acquire(s.T().Context(), 3, 10*time.Millisecond)
  56. s.Require().Equal(3, aquired, "All workers should be eventually acquired")
  57. s.Require().NoError(err, "All workers should be acquired without error")
  58. // Try to acquire workers that exceed allowed workers number + queue size
  59. aquired, err = s.acquire(s.T().Context(), 6, 10*time.Millisecond)
  60. s.Require().Equal(3, aquired, "Only 4 workers should be acquired")
  61. s.Require().ErrorAs(err, new(TooManyRequestsError))
  62. }
  63. func (s *WorkersTestSuite) TestContextTimeout() {
  64. ctx, cancel := context.WithTimeout(s.T().Context(), 5*time.Millisecond)
  65. defer cancel()
  66. aquired, err := s.acquire(ctx, 2, 100*time.Millisecond)
  67. s.Require().Equal(1, aquired, "Only 1 worker should be acquired")
  68. s.Require().ErrorIs(err, context.DeadlineExceeded, "Context deadline exceeded error expected")
  69. }
  70. func (s *WorkersTestSuite) TestContextCanceled() {
  71. ctx, cancel := context.WithCancel(s.T().Context())
  72. cancel()
  73. aquired, err := s.acquire(ctx, 2, 100*time.Millisecond)
  74. s.Require().Equal(0, aquired, "No worker should be acquired")
  75. s.Require().ErrorIs(err, context.Canceled, "Context canceled error expected")
  76. }
  77. func (s *WorkersTestSuite) TestSemaphoresInvalidConfig() {
  78. _, err := New(&Config{RequestsQueueSize: 0, WorkersNumber: 0})
  79. s.Require().Error(err)
  80. _, err = New(&Config{RequestsQueueSize: -1, WorkersNumber: 1})
  81. s.Require().Error(err)
  82. }
  83. func TestWorkers(t *testing.T) {
  84. suite.Run(t, new(WorkersTestSuite))
  85. }