瀏覽代碼

Proper semaphore

DarthSim 3 年之前
父節點
當前提交
526724105e
共有 2 個文件被更改,包括 53 次插入6 次删除
  1. 6 6
      processing_handler.go
  2. 47 0
      semaphore/semaphore.go

+ 6 - 6
processing_handler.go

@@ -23,18 +23,19 @@ import (
 	"github.com/imgproxy/imgproxy/v3/processing"
 	"github.com/imgproxy/imgproxy/v3/router"
 	"github.com/imgproxy/imgproxy/v3/security"
+	"github.com/imgproxy/imgproxy/v3/semaphore"
 	"github.com/imgproxy/imgproxy/v3/svg"
 	"github.com/imgproxy/imgproxy/v3/vips"
 )
 
 var (
-	processingSem chan struct{}
+	processingSem *semaphore.Semaphore
 
 	headerVaryValue string
 )
 
 func initProcessingHandler() {
-	processingSem = make(chan struct{}, config.Concurrency)
+	processingSem = semaphore.New(config.Concurrency)
 
 	vary := make([]string, 0)
 
@@ -235,15 +236,14 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
 	}
 
 	// The heavy part start here, so we need to restrict concurrency
-	select {
-	case processingSem <- struct{}{}:
-	case <-ctx.Done():
+	processingSemToken, aquired := processingSem.Aquire(ctx)
+	if !aquired {
 		// We don't actually need to check timeout here,
 		// but it's an easy way to check if this is an actual timeout
 		// or the request was cancelled
 		checkErr(ctx, "queue", router.CheckTimeout(ctx))
 	}
-	defer func() { <-processingSem }()
+	defer processingSemToken.Release()
 
 	statusCode := http.StatusOK
 

+ 47 - 0
semaphore/semaphore.go

@@ -0,0 +1,47 @@
+package semaphore
+
+import (
+	"context"
+	"sync"
+)
+
+type Semaphore struct {
+	sem chan struct{}
+}
+
+func New(n int) *Semaphore {
+	return &Semaphore{
+		sem: make(chan struct{}, n),
+	}
+}
+
+func (s *Semaphore) Aquire(ctx context.Context) (*Token, bool) {
+	select {
+	case s.sem <- struct{}{}:
+		return &Token{release: s.release}, true
+	case <-ctx.Done():
+		return &Token{release: func() {}}, false
+	}
+}
+
+func (s *Semaphore) TryAquire() (*Token, bool) {
+	select {
+	case s.sem <- struct{}{}:
+		return &Token{release: s.release}, true
+	default:
+		return &Token{release: func() {}}, false
+	}
+}
+
+func (s *Semaphore) release() {
+	<-s.sem
+}
+
+type Token struct {
+	release     func()
+	releaseOnce sync.Once
+}
+
+func (t *Token) Release() {
+	t.releaseOnce.Do(t.release)
+}