Forráskód Böngészése

Rename semaphores to workers and simlify it

DarthSim 3 hete
szülő
commit
b8861a1c84

+ 4 - 4
config.go

@@ -6,8 +6,8 @@ import (
 	"github.com/imgproxy/imgproxy/v3/fetcher"
 	processinghandler "github.com/imgproxy/imgproxy/v3/handlers/processing"
 	streamhandler "github.com/imgproxy/imgproxy/v3/handlers/stream"
-	"github.com/imgproxy/imgproxy/v3/semaphores"
 	"github.com/imgproxy/imgproxy/v3/server"
+	"github.com/imgproxy/imgproxy/v3/workers"
 )
 
 // HandlerConfigs holds the configurations for imgproxy handlers
@@ -18,7 +18,7 @@ type HandlerConfigs struct {
 
 // Config represents an instance configuration
 type Config struct {
-	Semaphores     semaphores.Config
+	Workers        workers.Config
 	FallbackImage  auximageprovider.StaticConfig
 	WatermarkImage auximageprovider.StaticConfig
 	Fetcher        fetcher.Config
@@ -29,7 +29,7 @@ type Config struct {
 // NewDefaultConfig creates a new default configuration
 func NewDefaultConfig() Config {
 	return Config{
-		Semaphores:     semaphores.NewDefaultConfig(),
+		Workers:        workers.NewDefaultConfig(),
 		FallbackImage:  auximageprovider.NewDefaultStaticConfig(),
 		WatermarkImage: auximageprovider.NewDefaultStaticConfig(),
 		Fetcher:        fetcher.NewDefaultConfig(),
@@ -59,7 +59,7 @@ func LoadConfigFromEnv(c *Config) (*Config, error) {
 		return nil, err
 	}
 
-	if _, err = semaphores.LoadConfigFromEnv(&c.Semaphores); err != nil {
+	if _, err = workers.LoadConfigFromEnv(&c.Workers); err != nil {
 		return nil, err
 	}
 

+ 2 - 2
handlers/processing/handler.go

@@ -15,13 +15,13 @@ import (
 	"github.com/imgproxy/imgproxy/v3/monitoring/stats"
 	"github.com/imgproxy/imgproxy/v3/options"
 	"github.com/imgproxy/imgproxy/v3/security"
-	"github.com/imgproxy/imgproxy/v3/semaphores"
 	"github.com/imgproxy/imgproxy/v3/server"
+	"github.com/imgproxy/imgproxy/v3/workers"
 )
 
 // HandlerContext provides access to shared handler dependencies
 type HandlerContext interface {
-	Semaphores() *semaphores.Semaphores
+	Workers() *workers.Workers
 	FallbackImage() auximageprovider.Provider
 	WatermarkImage() auximageprovider.Provider
 	ImageDataFactory() *imagedata.Factory

+ 3 - 10
handlers/processing/request.go

@@ -40,19 +40,12 @@ func (r *request) execute(ctx context.Context) error {
 		return handlers.NewCantSaveError(r.po.Format)
 	}
 
-	// Acquire queue semaphore (if enabled)
-	releaseQueueSem, err := r.Semaphores().AcquireQueue()
+	// Acquire worker
+	releaseWorker, err := r.acquireWorker(ctx)
 	if err != nil {
 		return err
 	}
-	defer releaseQueueSem()
-
-	// Acquire processing semaphore
-	releaseProcessingSem, err := r.acquireProcessingSem(ctx)
-	if err != nil {
-		return err
-	}
-	defer releaseProcessingSem()
+	defer releaseWorker()
 
 	// Deal with processing image counter
 	stats.IncImagesInProgress()

+ 3 - 3
handlers/processing/request_methods.go

@@ -36,11 +36,11 @@ func (r *request) makeImageRequestHeaders() http.Header {
 	return h
 }
 
-// acquireProcessingSem acquires the processing semaphore
-func (r *request) acquireProcessingSem(ctx context.Context) (context.CancelFunc, error) {
+// acquireWorker acquires the processing worker
+func (r *request) acquireWorker(ctx context.Context) (context.CancelFunc, error) {
 	defer monitoring.StartQueueSegment(ctx)()
 
-	fn, err := r.Semaphores().AcquireProcessing(ctx)
+	fn, err := r.Workers().Acquire(ctx)
 	if err != nil {
 		// We don't actually need to check timeout here,
 		// but it's an easy way to check if this is an actual timeout

+ 6 - 6
imgproxy.go

@@ -14,8 +14,8 @@ import (
 	"github.com/imgproxy/imgproxy/v3/imagedata"
 	"github.com/imgproxy/imgproxy/v3/memory"
 	"github.com/imgproxy/imgproxy/v3/monitoring/prometheus"
-	"github.com/imgproxy/imgproxy/v3/semaphores"
 	"github.com/imgproxy/imgproxy/v3/server"
+	"github.com/imgproxy/imgproxy/v3/workers"
 )
 
 const (
@@ -33,7 +33,7 @@ type ImgproxyHandlers struct {
 
 // Imgproxy holds all the components needed for imgproxy to function
 type Imgproxy struct {
-	semaphores       *semaphores.Semaphores
+	workers          *workers.Workers
 	fallbackImage    auximageprovider.Provider
 	watermarkImage   auximageprovider.Provider
 	fetcher          *fetcher.Fetcher
@@ -61,13 +61,13 @@ func New(ctx context.Context, config *Config) (*Imgproxy, error) {
 		return nil, err
 	}
 
-	semaphores, err := semaphores.New(&config.Semaphores)
+	workers, err := workers.New(&config.Workers)
 	if err != nil {
 		return nil, err
 	}
 
 	imgproxy := &Imgproxy{
-		semaphores:       semaphores,
+		workers:          workers,
 		fallbackImage:    fallbackImage,
 		watermarkImage:   watermarkImage,
 		fetcher:          fetcher,
@@ -172,8 +172,8 @@ func (i *Imgproxy) startMemoryTicker(ctx context.Context) {
 	}
 }
 
-func (i *Imgproxy) Semaphores() *semaphores.Semaphores {
-	return i.semaphores
+func (i *Imgproxy) Workers() *workers.Workers {
+	return i.workers
 }
 
 func (i *Imgproxy) FallbackImage() auximageprovider.Provider {

+ 0 - 65
semaphores/semaphores.go

@@ -1,65 +0,0 @@
-package semaphores
-
-import (
-	"context"
-
-	"github.com/imgproxy/imgproxy/v3/monitoring"
-	"golang.org/x/sync/semaphore"
-)
-
-// Semaphores is a container for the queue and processing semaphores
-type Semaphores struct {
-	// queueSize semaphore: limits the queueSize size
-	queueSize *semaphore.Weighted
-
-	// processing semaphore: limits the number of concurrent image processings
-	processing *semaphore.Weighted
-}
-
-// New creates new semaphores instance
-func New(config *Config) (*Semaphores, error) {
-	if err := config.Validate(); err != nil {
-		return nil, err
-	}
-
-	var queue *semaphore.Weighted
-
-	if config.RequestsQueueSize > 0 {
-		queue = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.Workers))
-	}
-
-	processing := semaphore.NewWeighted(int64(config.Workers))
-
-	return &Semaphores{
-		queueSize:  queue,
-		processing: processing,
-	}, nil
-}
-
-// AcquireQueue acquires the queue semaphore and returns release function and error.
-// if queue semaphore is not configured, it returns a noop anonymous function to make
-// semaphore usage transparent.
-func (s *Semaphores) AcquireQueue() (context.CancelFunc, error) {
-	if s.queueSize == nil {
-		return func() {}, nil // return no-op cancel function if semaphore is disabled
-	}
-
-	acquired := s.queueSize.TryAcquire(1)
-	if !acquired {
-		return nil, newTooManyRequestsError()
-	}
-
-	return func() { s.queueSize.Release(1) }, nil
-}
-
-// AcquireProcessing acquires the processing semaphore
-func (s *Semaphores) AcquireProcessing(ctx context.Context) (context.CancelFunc, error) {
-	defer monitoring.StartQueueSegment(ctx)()
-
-	err := s.processing.Acquire(ctx, 1)
-	if err != nil {
-		return nil, err
-	}
-
-	return func() { s.processing.Release(1) }, nil
-}

+ 0 - 50
semaphores/semaphores_test.go

@@ -1,50 +0,0 @@
-package semaphores
-
-import (
-	"testing"
-
-	"github.com/stretchr/testify/require"
-)
-
-func TestSemaphoresQueueDisabled(t *testing.T) {
-	s, err := New(&Config{RequestsQueueSize: 0, Workers: 1})
-	require.NoError(t, err)
-
-	// Queue acquire should always work when disabled
-	release, err := s.AcquireQueue()
-	require.NoError(t, err)
-	release() // Should not panic
-
-	procRelease, err := s.AcquireProcessing(t.Context())
-	require.NoError(t, err)
-	procRelease()
-}
-
-func TestSemaphoresQueueEnabled(t *testing.T) {
-	s, err := New(&Config{RequestsQueueSize: 1, Workers: 1})
-	require.NoError(t, err)
-
-	// Should be able to acquire up to queue size + workers
-	release1, err := s.AcquireQueue()
-	require.NoError(t, err)
-
-	release2, err := s.AcquireQueue()
-	require.NoError(t, err)
-
-	// Third should fail (exceeds capacity)
-	_, err = s.AcquireQueue()
-	require.Error(t, err)
-
-	// Release and try again
-	release1()
-	release3, err := s.AcquireQueue()
-	require.NoError(t, err)
-
-	release2()
-	release3()
-}
-
-func TestSemaphoresInvalidConfig(t *testing.T) {
-	_, err := New(&Config{RequestsQueueSize: 0, Workers: 0})
-	require.Error(t, err)
-}

+ 8 - 8
semaphores/config.go → workers/config.go

@@ -1,4 +1,4 @@
-package semaphores
+package workers
 
 import (
 	"fmt"
@@ -8,17 +8,17 @@ import (
 	"github.com/imgproxy/imgproxy/v3/ensure"
 )
 
-// Config represents handler config
+// Config represents [Workers] config
 type Config struct {
-	RequestsQueueSize int // Request queue size
-	Workers           int // Number of workers
+	RequestsQueueSize int // Maximum request queue size
+	WorkersNumber     int // Number of allowed workers
 }
 
 // NewDefaultConfig creates a new configuration with defaults
 func NewDefaultConfig() Config {
 	return Config{
 		RequestsQueueSize: 0,
-		Workers:           runtime.GOMAXPROCS(0) * 2,
+		WorkersNumber:     runtime.GOMAXPROCS(0) * 2,
 	}
 }
 
@@ -27,7 +27,7 @@ func LoadConfigFromEnv(c *Config) (*Config, error) {
 	c = ensure.Ensure(c, NewDefaultConfig)
 
 	c.RequestsQueueSize = config.RequestsQueueSize
-	c.Workers = config.Workers
+	c.WorkersNumber = config.Workers
 
 	return c, nil
 }
@@ -38,8 +38,8 @@ func (c *Config) Validate() error {
 		return fmt.Errorf("requests queue size should be greater than or equal 0, now - %d", c.RequestsQueueSize)
 	}
 
-	if c.Workers <= 0 {
-		return fmt.Errorf("workers number should be greater than 0, now - %d", c.Workers)
+	if c.WorkersNumber <= 0 {
+		return fmt.Errorf("workers number should be greater than 0, now - %d", c.WorkersNumber)
 	}
 
 	return nil

+ 1 - 1
semaphores/errors.go → workers/errors.go

@@ -1,4 +1,4 @@
-package semaphores
+package workers
 
 import (
 	"net/http"

+ 83 - 0
workers/workers.go

@@ -0,0 +1,83 @@
+package workers
+
+import (
+	"context"
+
+	"github.com/imgproxy/imgproxy/v3/monitoring"
+	"golang.org/x/sync/semaphore"
+)
+
+// Workers controls how many concurrent image processings are allowed.
+// Requests exceeding this limit will be queued.
+//
+// It can also optionally limit the number of requests in the queue.
+type Workers struct {
+	// queue semaphore: limits the queue size
+	queue *semaphore.Weighted
+
+	// workers semaphore: limits the number of concurrent image processings
+	workers *semaphore.Weighted
+}
+
+// New creates new semaphores instance
+func New(config *Config) (*Workers, error) {
+	if err := config.Validate(); err != nil {
+		return nil, err
+	}
+
+	var queue *semaphore.Weighted
+
+	if config.RequestsQueueSize > 0 {
+		queue = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.WorkersNumber))
+	}
+
+	workers := semaphore.NewWeighted(int64(config.WorkersNumber))
+
+	return &Workers{
+		queue:   queue,
+		workers: workers,
+	}, nil
+}
+
+// Acquire acquires a worker.
+// It returns a worker release function and an error if any.
+func (s *Workers) Acquire(ctx context.Context) (context.CancelFunc, error) {
+	defer monitoring.StartQueueSegment(ctx)()
+
+	// First, try to acquire the queue semaphore if configured.
+	// If the queue is full, return an error immediately.
+	releaseQueue, err := s.acquireQueue()
+	if err != nil {
+		return nil, err
+	}
+
+	// Next, acquire the workers semaphore.
+	err = s.workers.Acquire(ctx, 1)
+	if err != nil {
+		releaseQueue()
+		return nil, err
+	}
+
+	release := func() {
+		s.workers.Release(1)
+		releaseQueue()
+	}
+
+	return release, nil
+}
+
+// acquireQueue acquires the queue semaphore and returns release function and error.
+// If queue semaphore is not configured, it returns a noop anonymous function to make
+// semaphore usage transparent.
+func (s *Workers) acquireQueue() (context.CancelFunc, error) {
+	if s.queue == nil {
+		return func() {}, nil // return no-op cancel function if semaphore is disabled
+	}
+
+	acquired := s.queue.TryAcquire(1)
+	if !acquired {
+		return nil, newTooManyRequestsError()
+	}
+
+	return func() { s.queue.Release(1) }, nil
+}

+ 110 - 0
workers/workers_test.go

@@ -0,0 +1,110 @@
+package workers
+
+import (
+	"context"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/imgproxy/imgproxy/v3/testutil"
+	"github.com/stretchr/testify/suite"
+	"golang.org/x/sync/errgroup"
+)
+
+type WorkersTestSuite struct {
+	testutil.LazySuite
+
+	config  testutil.LazyObj[*Config]
+	workers testutil.LazyObj[*Workers]
+}
+
+func (s *WorkersTestSuite) SetupSuite() {
+	s.config, _ = testutil.NewLazySuiteObj(s, func() (*Config, error) {
+		return &Config{RequestsQueueSize: 0, WorkersNumber: 1}, nil
+	})
+
+	s.workers, _ = testutil.NewLazySuiteObj(s, func() (*Workers, error) {
+		return New(s.config())
+	})
+}
+
+func (s *WorkersTestSuite) acquire(ctx context.Context, n int, delay time.Duration) (int, error) {
+	errg := new(errgroup.Group)
+	aquired := int64(0)
+
+	// Get the workers instance before running goroutines
+	workers := s.workers()
+
+	for i := 0; i < n; i++ {
+		errg.Go(func() error {
+			release, err := workers.Acquire(ctx)
+
+			if err == nil {
+				time.Sleep(delay)
+
+				release()
+				atomic.AddInt64(&aquired, 1)
+			}
+
+			return err
+		})
+	}
+
+	err := errg.Wait()
+	return int(aquired), err
+}
+
+func (s *WorkersTestSuite) TestQueueDisabled() {
+	s.config().RequestsQueueSize = 0
+	s.config().WorkersNumber = 2
+
+	// Try to acquire workers that exceed allowed workers number
+	aquired, err := s.acquire(s.T().Context(), 4, 10*time.Millisecond)
+	s.Require().Equal(4, aquired, "All workers should be eventually acquired")
+	s.Require().NoError(err, "All workers should be acquired without error")
+}
+
+func (s *WorkersTestSuite) TestQueueEnabled() {
+	s.config().RequestsQueueSize = 1
+	s.config().WorkersNumber = 2
+
+	// Try to acquire workers that fit allowed workers number + queue size
+	aquired, err := s.acquire(s.T().Context(), 3, 10*time.Millisecond)
+	s.Require().Equal(3, aquired, "All workers should be eventually acquired")
+	s.Require().NoError(err, "All workers should be acquired without error")
+
+	// Try to acquire workers that exceed allowed workers number + queue size
+	aquired, err = s.acquire(s.T().Context(), 6, 10*time.Millisecond)
+	s.Require().Equal(3, aquired, "Only 4 workers should be acquired")
+	s.Require().ErrorAs(err, new(TooManyRequestsError))
+}
+
+func (s *WorkersTestSuite) TestContextTimeout() {
+	ctx, cancel := context.WithTimeout(s.T().Context(), 5*time.Millisecond)
+	defer cancel()
+
+	aquired, err := s.acquire(ctx, 2, 100*time.Millisecond)
+	s.Require().Equal(1, aquired, "Only 1 worker should be acquired")
+	s.Require().ErrorIs(err, context.DeadlineExceeded, "Context deadline exceeded error expected")
+}
+
+func (s *WorkersTestSuite) TestContextCanceled() {
+	ctx, cancel := context.WithCancel(s.T().Context())
+	cancel()
+
+	aquired, err := s.acquire(ctx, 2, 100*time.Millisecond)
+	s.Require().Equal(0, aquired, "No worker should be acquired")
+	s.Require().ErrorIs(err, context.Canceled, "Context canceled error expected")
+}
+
+func (s *WorkersTestSuite) TestSemaphoresInvalidConfig() {
+	_, err := New(&Config{RequestsQueueSize: 0, WorkersNumber: 0})
+	s.Require().Error(err)
+
+	_, err = New(&Config{RequestsQueueSize: -1, WorkersNumber: 1})
+	s.Require().Error(err)
+}
+
+func TestWorkers(t *testing.T) {
+	suite.Run(t, new(WorkersTestSuite))
+}