Procházet zdrojové kódy

Replace semaphore implementation with golang.org/x/sync/semaphore

DarthSim před 11 měsíci
rodič
revize
68d38949d8
3 změnil soubory, kde provedl 15 přidání a 61 odebrání
  1. 1 1
      go.mod
  2. 14 13
      processing_handler.go
  3. 0 47
      semaphore/semaphore.go

+ 1 - 1
go.mod

@@ -60,6 +60,7 @@ require (
 	go.uber.org/automaxprocs v1.5.3
 	golang.org/x/image v0.15.0
 	golang.org/x/net v0.24.0
+	golang.org/x/sync v0.7.0
 	golang.org/x/sys v0.19.0
 	google.golang.org/api v0.176.1
 	google.golang.org/grpc v1.63.2
@@ -167,7 +168,6 @@ require (
 	go.uber.org/multierr v1.11.0 // indirect
 	golang.org/x/crypto v0.22.0 // indirect
 	golang.org/x/oauth2 v0.19.0 // indirect
-	golang.org/x/sync v0.7.0 // indirect
 	golang.org/x/term v0.19.0 // indirect
 	golang.org/x/text v0.14.0 // indirect
 	golang.org/x/time v0.5.0 // indirect

+ 14 - 13
processing_handler.go

@@ -10,6 +10,7 @@ import (
 	"time"
 
 	log "github.com/sirupsen/logrus"
+	"golang.org/x/sync/semaphore"
 
 	"github.com/imgproxy/imgproxy/v3/config"
 	"github.com/imgproxy/imgproxy/v3/cookies"
@@ -25,24 +26,23 @@ import (
 	"github.com/imgproxy/imgproxy/v3/processing"
 	"github.com/imgproxy/imgproxy/v3/router"
 	"github.com/imgproxy/imgproxy/v3/security"
-	"github.com/imgproxy/imgproxy/v3/semaphore"
 	"github.com/imgproxy/imgproxy/v3/svg"
 	"github.com/imgproxy/imgproxy/v3/vips"
 )
 
 var (
-	queueSem      *semaphore.Semaphore
-	processingSem *semaphore.Semaphore
+	queueSem      *semaphore.Weighted
+	processingSem *semaphore.Weighted
 
 	headerVaryValue string
 )
 
 func initProcessingHandler() {
 	if config.RequestsQueueSize > 0 {
-		queueSem = semaphore.New(config.RequestsQueueSize + config.Workers)
+		queueSem = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.Workers))
 	}
 
-	processingSem = semaphore.New(config.Workers)
+	processingSem = semaphore.NewWeighted(int64(config.Workers))
 
 	vary := make([]string, 0)
 
@@ -205,11 +205,11 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
 	ctx := r.Context()
 
 	if queueSem != nil {
-		token, acquired := queueSem.TryAcquire()
+		acquired := queueSem.TryAcquire(1)
 		if !acquired {
 			panic(ierrors.New(429, "Too many requests", "Too many requests"))
 		}
-		defer token.Release()
+		defer queueSem.Release(1)
 	}
 
 	path := r.RequestURI
@@ -282,21 +282,22 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
 		}
 	}
 
-	// The heavy part start here, so we need to restrict worker number
-	var processingSemToken *semaphore.Token
+	// The heavy part starts here, so we need to restrict worker number
 	func() {
 		defer metrics.StartQueueSegment(ctx)()
 
-		var acquired bool
-		processingSemToken, acquired = processingSem.Acquire(ctx)
-		if !acquired {
+		err = processingSem.Acquire(ctx, 1)
+		if err != nil {
 			// We don't actually need to check timeout here,
 			// but it's an easy way to check if this is an actual timeout
 			// or the request was canceled
 			checkErr(ctx, "queue", router.CheckTimeout(ctx))
+			// We should never reach this line as err could be only ctx.Err()
+			// and we've already checked for it. But beter safe than sorry
+			sendErrAndPanic(ctx, "queue", err)
 		}
 	}()
-	defer processingSemToken.Release()
+	defer processingSem.Release(1)
 
 	stats.IncImagesInProgress()
 	defer stats.DecImagesInProgress()

+ 0 - 47
semaphore/semaphore.go

@@ -1,47 +0,0 @@
-package semaphore
-
-import (
-	"context"
-	"sync"
-)
-
-type Semaphore struct {
-	sem chan struct{}
-}
-
-func New(n int) *Semaphore {
-	return &Semaphore{
-		sem: make(chan struct{}, n),
-	}
-}
-
-func (s *Semaphore) Acquire(ctx context.Context) (*Token, bool) {
-	select {
-	case s.sem <- struct{}{}:
-		return &Token{release: s.release}, true
-	case <-ctx.Done():
-		return &Token{release: func() {}}, false
-	}
-}
-
-func (s *Semaphore) TryAcquire() (*Token, bool) {
-	select {
-	case s.sem <- struct{}{}:
-		return &Token{release: s.release}, true
-	default:
-		return &Token{release: func() {}}, false
-	}
-}
-
-func (s *Semaphore) release() {
-	<-s.sem
-}
-
-type Token struct {
-	release     func()
-	releaseOnce sync.Once
-}
-
-func (t *Token) Release() {
-	t.releaseOnce.Do(t.release)
-}