Browse Source

IMGPROXY_REQUESTS_QUEUE_SIZE config

DarthSim 2 years ago
parent
commit
9081fd5766
3 changed files with 30 additions and 9 deletions
  1. 15 8
      config/config.go
  2. 2 1
      docs/configuration.md
  3. 13 0
      processing_handler.go

+ 15 - 8
config/config.go

@@ -16,14 +16,15 @@ import (
 )
 
 var (
-	Network          string
-	Bind             string
-	ReadTimeout      int
-	WriteTimeout     int
-	KeepAliveTimeout int
-	DownloadTimeout  int
-	Concurrency      int
-	MaxClients       int
+	Network           string
+	Bind              string
+	ReadTimeout       int
+	WriteTimeout      int
+	KeepAliveTimeout  int
+	DownloadTimeout   int
+	Concurrency       int
+	RequestsQueueSize int
+	MaxClients        int
 
 	TTL                     int
 	CacheControlPassthrough bool
@@ -186,6 +187,7 @@ func Reset() {
 	KeepAliveTimeout = 10
 	DownloadTimeout = 5
 	Concurrency = runtime.NumCPU() * 2
+	RequestsQueueSize = 0
 	MaxClients = 2048
 
 	TTL = 31536000
@@ -341,6 +343,7 @@ func Configure() error {
 	configurators.Int(&KeepAliveTimeout, "IMGPROXY_KEEP_ALIVE_TIMEOUT")
 	configurators.Int(&DownloadTimeout, "IMGPROXY_DOWNLOAD_TIMEOUT")
 	configurators.Int(&Concurrency, "IMGPROXY_CONCURRENCY")
+	configurators.Int(&RequestsQueueSize, "IMGPROXY_REQUESTS_QUEUE_SIZE")
 	configurators.Int(&MaxClients, "IMGPROXY_MAX_CLIENTS")
 
 	configurators.Int(&TTL, "IMGPROXY_TTL")
@@ -534,6 +537,10 @@ func Configure() error {
 		return fmt.Errorf("Concurrency should be greater than 0, now - %d\n", Concurrency)
 	}
 
+	if RequestsQueueSize < 0 {
+		return fmt.Errorf("Requests queue size should be greater than or equal 0, now - %d\n", RequestsQueueSize)
+	}
+
 	if MaxClients < 0 {
 		return fmt.Errorf("Concurrency should be greater than or equal 0, now - %d\n", MaxClients)
 	}

+ 2 - 1
docs/configuration.md

@@ -32,7 +32,8 @@ echo $(xxd -g 2 -l 64 -p /dev/random | tr -d '\n')
 * `IMGPROXY_WRITE_TIMEOUT`: the maximum duration (in seconds) for writing the response. Default: `10`
 * `IMGPROXY_KEEP_ALIVE_TIMEOUT`: the maximum duration (in seconds) to wait for the next request before closing the connection. When set to `0`, keep-alive is disabled. Default: `10`
 * `IMGPROXY_DOWNLOAD_TIMEOUT`: the maximum duration (in seconds) for downloading the source image. Default: `5`
-* `IMGPROXY_CONCURRENCY`: the maximum number of image requests to be processed simultaneously. Default: the number of CPU cores multiplied by two
+* `IMGPROXY_CONCURRENCY`: the maximum number of image requests to be processed simultaneously. Requests that exceed this limit are put in the queue. Default: the number of CPU cores multiplied by two
+* `IMGPROXY_REQUESTS_QUEUE_SIZE`: the maximum number of image requests that can be put in the queue. Requests that exceed this limit are rejected with `429` HTTP status. When set to `0`, the requests queue is unlimited. Default: `0`
 * `IMGPROXY_MAX_CLIENTS`: the maximum number of simultaneous active connections. When set to `0`, connections number limitation is disabled. Default: `2048`
 * `IMGPROXY_TTL`: a duration (in seconds) sent via the `Expires` and `Cache-Control: max-age` HTTP headers. Default: `31536000` (1 year)
 * `IMGPROXY_CACHE_CONTROL_PASSTHROUGH`: when `true` and the source image response contains the `Expires` or `Cache-Control` headers, reuse those headers. Default: false

+ 13 - 0
processing_handler.go

@@ -29,12 +29,17 @@ import (
 )
 
 var (
+	queueSem      *semaphore.Semaphore
 	processingSem *semaphore.Semaphore
 
 	headerVaryValue string
 )
 
 func initProcessingHandler() {
+	if config.RequestsQueueSize > 0 {
+		queueSem = semaphore.New(config.RequestsQueueSize + config.Concurrency)
+	}
+
 	processingSem = semaphore.New(config.Concurrency)
 
 	vary := make([]string, 0)
@@ -176,6 +181,14 @@ func checkErr(ctx context.Context, errType string, err error) {
 func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
 	ctx := r.Context()
 
+	if queueSem != nil {
+		token, aquired := queueSem.TryAquire()
+		if !aquired {
+			panic(ierrors.New(429, "Too many requests", "Too many requests"))
+		}
+		defer token.Release()
+	}
+
 	path := r.RequestURI
 	if queryStart := strings.IndexByte(path, '?'); queryStart >= 0 {
 		path = path[:queryStart]