瀏覽代碼

DownloadSync/DownloadAsync

Viktor Sokolov 2 月之前
父節點
當前提交
cabe54632f

+ 1 - 1
.devcontainer/oss/devcontainer.json

@@ -16,7 +16,7 @@
     },
     },
     "mounts": [
     "mounts": [
         {
         {
-            "source": "${localWorkspaceFolder}/.devcontainer/images",
+            "source": "${localWorkspaceFolder}/testdata/test-images",
             "target": "/images",
             "target": "/images",
             "type": "bind"
             "type": "bind"
         }
         }

+ 5 - 4
asyncbuffer/buffer.go

@@ -70,8 +70,9 @@ type AsyncBuffer struct {
 	chunkCond *Cond  // Ticker that signals when a new chunk is ready
 	chunkCond *Cond  // Ticker that signals when a new chunk is ready
 }
 }
 
 
-// FromReadCloser creates a new AsyncBuffer that reads from the given io.Reader in background
-func FromReader(r io.ReadCloser) *AsyncBuffer {
+// New creates a new AsyncBuffer that reads from the given io.ReadCloser in background
+// and closes it when finished.
+func New(r io.ReadCloser) *AsyncBuffer {
 	ab := &AsyncBuffer{
 	ab := &AsyncBuffer{
 		r:         r,
 		r:         r,
 		paused:    NewLatch(),
 		paused:    NewLatch(),
@@ -234,7 +235,7 @@ func (ab *AsyncBuffer) WaitFor(off int64) error {
 
 
 // Wait waits for the reader to finish reading all data and returns
 // Wait waits for the reader to finish reading all data and returns
 // the total length of the data read.
 // the total length of the data read.
-func (ab *AsyncBuffer) Wait() (int64, error) {
+func (ab *AsyncBuffer) Wait() (int, error) {
 	// Wait ends till the end of the stream: unpause the reader
 	// Wait ends till the end of the stream: unpause the reader
 	ab.paused.Release()
 	ab.paused.Release()
 
 
@@ -246,7 +247,7 @@ func (ab *AsyncBuffer) Wait() (int64, error) {
 
 
 		// In case the reader is finished reading, we can return immediately
 		// In case the reader is finished reading, we can return immediately
 		if ab.finished.Load() {
 		if ab.finished.Load() {
-			return ab.len.Load(), ab.Error()
+			return int(ab.len.Load()), ab.Error()
 		}
 		}
 
 
 		// Lock until the next chunk is ready
 		// Lock until the next chunk is ready

+ 20 - 20
asyncbuffer/buffer_test.go

@@ -92,7 +92,7 @@ func (r *blockingReader) Close() error { // Close forwards closing to the underl
 }
 }
 
 
 // generateSourceData generates a byte slice with 4.5 chunks of data
 // generateSourceData generates a byte slice with 4.5 chunks of data
-func generateSourceData(t *testing.T, size int64) ([]byte, io.ReadSeekCloser) {
+func generateSourceData(t *testing.T, size int) ([]byte, io.ReadSeekCloser) {
 	// We use small chunks for tests, let's check the ChunkSize just in case
 	// We use small chunks for tests, let's check the ChunkSize just in case
 	assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
 	assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
 
 
@@ -108,8 +108,8 @@ func generateSourceData(t *testing.T, size int64) ([]byte, io.ReadSeekCloser) {
 // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
 // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
 func TestAsyncBufferReadAt(t *testing.T) {
 func TestAsyncBufferReadAt(t *testing.T) {
 	// Let's use source buffer which is 4.5 chunks long
 	// Let's use source buffer which is 4.5 chunks long
-	source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
-	asyncBuffer := FromReader(bytesReader)
+	source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
+	asyncBuffer := New(bytesReader)
 	defer asyncBuffer.Close()
 	defer asyncBuffer.Close()
 
 
 	asyncBuffer.Wait() // Wait for all chunks to be read since we're going to read all data
 	asyncBuffer.Wait() // Wait for all chunks to be read since we're going to read all data
@@ -169,7 +169,7 @@ func TestAsyncBufferReadAt(t *testing.T) {
 // TestAsyncBufferRead tests reading from AsyncBuffer using ReadAt method
 // TestAsyncBufferRead tests reading from AsyncBuffer using ReadAt method
 func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
 func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
 	source, bytesReader := generateSourceData(t, 20)
 	source, bytesReader := generateSourceData(t, 20)
-	asyncBuffer := FromReader(bytesReader)
+	asyncBuffer := New(bytesReader)
 	defer asyncBuffer.Close()
 	defer asyncBuffer.Close()
 
 
 	// First, let's read all the data
 	// First, let's read all the data
@@ -196,16 +196,16 @@ func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
 }
 }
 
 
 func TestAsyncBufferReader(t *testing.T) {
 func TestAsyncBufferReader(t *testing.T) {
-	source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
+	source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
 
 
 	// Create an AsyncBuffer with the byte slice
 	// Create an AsyncBuffer with the byte slice
-	asyncBuffer := FromReader(bytesReader)
+	asyncBuffer := New(bytesReader)
 	defer asyncBuffer.Close()
 	defer asyncBuffer.Close()
 
 
 	// Let's wait for all chunks to be read
 	// Let's wait for all chunks to be read
 	size, err := asyncBuffer.Wait()
 	size, err := asyncBuffer.Wait()
 	require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
 	require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
-	assert.Equal(t, int64(chunkSize*4+halfChunkSize), size)
+	assert.Equal(t, chunkSize*4+halfChunkSize, size)
 
 
 	reader := asyncBuffer.Reader()
 	reader := asyncBuffer.Reader()
 
 
@@ -245,7 +245,7 @@ func TestAsyncBufferReader(t *testing.T) {
 	// Seek -10 bytes from end of the stream
 	// Seek -10 bytes from end of the stream
 	pos, err = reader.Seek(-10, io.SeekEnd)
 	pos, err = reader.Seek(-10, io.SeekEnd)
 	require.NoError(t, err)
 	require.NoError(t, err)
-	assert.Equal(t, size-10, pos)
+	assert.Equal(t, size-10, int(pos))
 
 
 	// Read last 10 bytes
 	// Read last 10 bytes
 	n, err = reader.Read(smallSlice)
 	n, err = reader.Read(smallSlice)
@@ -256,7 +256,7 @@ func TestAsyncBufferReader(t *testing.T) {
 	// Seek beyond the end of the stream and try to read
 	// Seek beyond the end of the stream and try to read
 	pos, err = reader.Seek(1024, io.SeekCurrent)
 	pos, err = reader.Seek(1024, io.SeekCurrent)
 	require.NoError(t, err)
 	require.NoError(t, err)
-	assert.Equal(t, size+1024, pos)
+	assert.Equal(t, size+1024, int(pos))
 
 
 	_, err = reader.Read(smallSlice)
 	_, err = reader.Read(smallSlice)
 	require.ErrorIs(t, err, io.EOF)
 	require.ErrorIs(t, err, io.EOF)
@@ -264,10 +264,10 @@ func TestAsyncBufferReader(t *testing.T) {
 
 
 // TestAsyncBufferClose tests closing the AsyncBuffer
 // TestAsyncBufferClose tests closing the AsyncBuffer
 func TestAsyncBufferClose(t *testing.T) {
 func TestAsyncBufferClose(t *testing.T) {
-	_, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
+	_, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
 
 
 	// Create an AsyncBuffer with the byte slice
 	// Create an AsyncBuffer with the byte slice
-	asyncBuffer := FromReader(bytesReader)
+	asyncBuffer := New(bytesReader)
 
 
 	reader1 := asyncBuffer.Reader()
 	reader1 := asyncBuffer.Reader()
 	reader2 := asyncBuffer.Reader()
 	reader2 := asyncBuffer.Reader()
@@ -292,9 +292,9 @@ func TestAsyncBufferClose(t *testing.T) {
 // which would fail somewhere
 // which would fail somewhere
 func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
 func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
 	// Let's use source buffer which is 4.5 chunks long
 	// Let's use source buffer which is 4.5 chunks long
-	source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
+	source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
 	slowReader := &erraticReader{reader: bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
 	slowReader := &erraticReader{reader: bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
-	asyncBuffer := FromReader(slowReader)
+	asyncBuffer := New(slowReader)
 	defer asyncBuffer.Close()
 	defer asyncBuffer.Close()
 
 
 	// Let's wait for all chunks to be read
 	// Let's wait for all chunks to be read
@@ -325,9 +325,9 @@ func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
 // with full = false
 // with full = false
 func TestAsyncBufferReadAsync(t *testing.T) {
 func TestAsyncBufferReadAsync(t *testing.T) {
 	// Let's use source buffer which is 4.5 chunks long
 	// Let's use source buffer which is 4.5 chunks long
-	source, bytesReader := generateSourceData(t, int64(chunkSize)*3)
+	source, bytesReader := generateSourceData(t, chunkSize*3)
 	blockingReader := newBlockingReader(bytesReader)
 	blockingReader := newBlockingReader(bytesReader)
-	asyncBuffer := FromReader(blockingReader)
+	asyncBuffer := New(blockingReader)
 	defer asyncBuffer.Close()
 	defer asyncBuffer.Close()
 
 
 	// flush the first chunk to allow reading
 	// flush the first chunk to allow reading
@@ -371,7 +371,7 @@ func TestAsyncBufferReadAsync(t *testing.T) {
 func TestAsyncBufferReadAllCompability(t *testing.T) {
 func TestAsyncBufferReadAllCompability(t *testing.T) {
 	source, err := os.ReadFile("../testdata/test1.jpg")
 	source, err := os.ReadFile("../testdata/test1.jpg")
 	require.NoError(t, err)
 	require.NoError(t, err)
-	asyncBuffer := FromReader(nopSeekCloser{bytes.NewReader(source)})
+	asyncBuffer := New(nopSeekCloser{bytes.NewReader(source)})
 	defer asyncBuffer.Close()
 	defer asyncBuffer.Close()
 
 
 	b, err := io.ReadAll(asyncBuffer.Reader())
 	b, err := io.ReadAll(asyncBuffer.Reader())
@@ -380,8 +380,8 @@ func TestAsyncBufferReadAllCompability(t *testing.T) {
 }
 }
 
 
 func TestAsyncBufferThreshold(t *testing.T) {
 func TestAsyncBufferThreshold(t *testing.T) {
-	_, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
-	asyncBuffer := FromReader(bytesReader)
+	_, bytesReader := generateSourceData(t, pauseThreshold*2)
+	asyncBuffer := New(bytesReader)
 	defer asyncBuffer.Close()
 	defer asyncBuffer.Close()
 
 
 	target := make([]byte, chunkSize)
 	target := make([]byte, chunkSize)
@@ -426,8 +426,8 @@ func TestAsyncBufferThreshold(t *testing.T) {
 }
 }
 
 
 func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
 func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
-	_, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
-	asyncBuffer := FromReader(bytesReader)
+	_, bytesReader := generateSourceData(t, pauseThreshold*2)
+	asyncBuffer := New(bytesReader)
 	defer asyncBuffer.Close()
 	defer asyncBuffer.Close()
 
 
 	target := make([]byte, chunkSize)
 	target := make([]byte, chunkSize)

+ 1 - 1
asyncbuffer/reader.go

@@ -37,7 +37,7 @@ func (r *Reader) Seek(offset int64, whence int) (int64, error) {
 			return 0, err
 			return 0, err
 		}
 		}
 
 
-		r.pos = size + offset
+		r.pos = int64(size) + offset
 
 
 	default:
 	default:
 		return 0, errors.New("asyncbuffer.AsyncBuffer.ReadAt: invalid whence")
 		return 0, errors.New("asyncbuffer.AsyncBuffer.ReadAt: invalid whence")

+ 3 - 46
imagedata/download.go

@@ -1,20 +1,18 @@
 package imagedata
 package imagedata
 
 
 import (
 import (
-	"context"
 	"net/http"
 	"net/http"
 
 
-	"github.com/imgproxy/imgproxy/v3/config"
 	"github.com/imgproxy/imgproxy/v3/ierrors"
 	"github.com/imgproxy/imgproxy/v3/ierrors"
 	"github.com/imgproxy/imgproxy/v3/imagefetcher"
 	"github.com/imgproxy/imgproxy/v3/imagefetcher"
-	"github.com/imgproxy/imgproxy/v3/security"
 	"github.com/imgproxy/imgproxy/v3/transport"
 	"github.com/imgproxy/imgproxy/v3/transport"
 )
 )
 
 
 var (
 var (
 	Fetcher *imagefetcher.Fetcher
 	Fetcher *imagefetcher.Fetcher
 
 
-	// For tests
+	// For tests. This needs to move to fetcher once we will have a way to isolate
+	// the fetcher in tests.
 	redirectAllRequestsTo string
 	redirectAllRequestsTo string
 )
 )
 
 
@@ -29,7 +27,7 @@ func initDownloading() error {
 		return err
 		return err
 	}
 	}
 
 
-	Fetcher, err = imagefetcher.NewFetcher(ts, config.MaxRedirects)
+	Fetcher, err = imagefetcher.NewFetcher(ts, imagefetcher.NewConfigFromEnv())
 	if err != nil {
 	if err != nil {
 		return ierrors.Wrap(err, 0, ierrors.WithPrefix("can't create image fetcher"))
 		return ierrors.Wrap(err, 0, ierrors.WithPrefix("can't create image fetcher"))
 	}
 	}
@@ -37,47 +35,6 @@ func initDownloading() error {
 	return nil
 	return nil
 }
 }
 
 
-func download(ctx context.Context, imageURL string, opts DownloadOptions, secopts security.Options) (ImageData, http.Header, error) {
-	h := make(http.Header)
-
-	// We use this for testing
-	if len(redirectAllRequestsTo) > 0 {
-		imageURL = redirectAllRequestsTo
-	}
-
-	req, err := Fetcher.BuildRequest(ctx, imageURL, opts.Header, opts.CookieJar)
-	if err != nil {
-		return nil, h, err
-	}
-	defer req.Cancel()
-
-	res, err := req.FetchImage()
-	if res != nil {
-		h = res.Header.Clone()
-	}
-	if err != nil {
-		if res != nil {
-			res.Body.Close()
-		}
-		return nil, h, err
-	}
-
-	res, err = security.LimitResponseSize(res, secopts)
-	if res != nil {
-		defer res.Body.Close()
-	}
-	if err != nil {
-		return nil, h, err
-	}
-
-	imgdata, err := readAndCheckImage(res.Body, int(res.ContentLength), secopts)
-	if err != nil {
-		return nil, h, ierrors.Wrap(err, 0)
-	}
-
-	return imgdata, h, nil
-}
-
 func RedirectAllRequestsTo(u string) {
 func RedirectAllRequestsTo(u string) {
 	redirectAllRequestsTo = u
 	redirectAllRequestsTo = u
 }
 }

+ 147 - 3
imagedata/factory.go

@@ -2,10 +2,16 @@ package imagedata
 
 
 import (
 import (
 	"bytes"
 	"bytes"
+	"context"
 	"encoding/base64"
 	"encoding/base64"
+	"fmt"
 	"io"
 	"io"
+	"net/http"
 	"os"
 	"os"
 
 
+	"github.com/imgproxy/imgproxy/v3/asyncbuffer"
+	"github.com/imgproxy/imgproxy/v3/ierrors"
+	"github.com/imgproxy/imgproxy/v3/imagefetcher"
 	"github.com/imgproxy/imgproxy/v3/imagemeta"
 	"github.com/imgproxy/imgproxy/v3/imagemeta"
 	"github.com/imgproxy/imgproxy/v3/imagetype"
 	"github.com/imgproxy/imgproxy/v3/imagetype"
 	"github.com/imgproxy/imgproxy/v3/security"
 	"github.com/imgproxy/imgproxy/v3/security"
@@ -35,13 +41,13 @@ func NewFromBytes(b []byte) (ImageData, error) {
 
 
 // NewFromPath creates a new ImageData from an os.File
 // NewFromPath creates a new ImageData from an os.File
 func NewFromPath(path string, secopts security.Options) (ImageData, error) {
 func NewFromPath(path string, secopts security.Options) (ImageData, error) {
-	f, err := os.Open(path)
+	fl, err := os.Open(path)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	defer f.Close()
+	defer fl.Close()
 
 
-	fr, err := security.LimitFileSize(f, secopts)
+	fr, err := security.LimitFileSize(fl, secopts)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -91,3 +97,141 @@ func NewFromBase64(encoded string, secopts security.Options) (ImageData, error)
 
 
 	return NewFromBytes(b)
 	return NewFromBytes(b)
 }
 }
+
+// sendRequest is a common logic between sync and async download.
+func sendRequest(ctx context.Context, url string, opts DownloadOptions, secopts security.Options) (*imagefetcher.Request, *http.Response, http.Header, error) {
+	h := make(http.Header)
+
+	// NOTE: This will be removed in the future when our test context gets better isolation
+	if len(redirectAllRequestsTo) > 0 {
+		url = redirectAllRequestsTo
+	}
+
+	req, err := Fetcher.BuildRequest(ctx, url, opts.Header, opts.CookieJar)
+	if err != nil {
+		return req, nil, h, err
+	}
+
+	res, err := req.FetchImage()
+	if res != nil {
+		h = res.Header.Clone()
+	}
+	if err != nil {
+		if res != nil {
+			res.Body.Close()
+		}
+		req.Cancel()
+
+		return req, nil, h, err
+	}
+
+	res, err = security.LimitResponseSize(res, secopts)
+	if err != nil {
+		if res != nil {
+			res.Body.Close()
+		}
+		req.Cancel()
+
+		return req, nil, h, err
+	}
+
+	return req, res, h, nil
+}
+
+// DownloadSync downloads the image synchronously and returns the ImageData and HTTP headers.
+func downloadSync(ctx context.Context, imageURL string, opts DownloadOptions, secopts security.Options) (ImageData, http.Header, error) {
+	req, res, h, err := sendRequest(ctx, imageURL, opts, secopts)
+	if res != nil {
+		defer res.Body.Close()
+	}
+
+	if req != nil {
+		defer req.Cancel()
+	}
+
+	if err != nil {
+		return nil, h, err
+	}
+
+	b, err := io.ReadAll(res.Body)
+	if err != nil {
+		return nil, h, err
+	}
+
+	meta, err := imagemeta.DecodeMeta(bytes.NewReader(b))
+	if err != nil {
+		return nil, h, err
+	}
+
+	err = security.CheckMeta(meta, secopts)
+	if err != nil {
+		return nil, h, err
+	}
+
+	d := NewFromBytesWithFormat(meta.Format(), b)
+	return d, h, err
+}
+
+// downloadAsync downloads the image asynchronously and returns the ImageData
+// backed by AsyncBuffer and HTTP headers.
+func downloadAsync(ctx context.Context, imageURL string, opts DownloadOptions, secopts security.Options) (ImageData, http.Header, error) {
+	// We pass this responsibility to AsyncBuffer
+	//nolint:bodyclose
+	req, res, h, err := sendRequest(ctx, imageURL, opts, secopts)
+	if err != nil {
+		return nil, h, err
+	}
+
+	b := asyncbuffer.New(res.Body)
+
+	meta, err := imagemeta.DecodeMeta(b.Reader())
+	if err != nil {
+		b.Close()
+		req.Cancel()
+		return nil, h, err
+	}
+
+	err = security.CheckMeta(meta, secopts)
+	if err != nil {
+		b.Close()
+		req.Cancel()
+		return nil, h, err
+	}
+
+	d := &imageDataAsyncBuffer{
+		b:      b,
+		format: meta.Format(),
+		cancel: nil,
+	}
+	d.AddCancel(req.Cancel) // request will be closed when the image data is consumed
+
+	return d, h, err
+}
+
+// DownloadSyncWithDesc downloads the image synchronously and returns the ImageData, but
+// wraps errors with desc.
+func DownloadSync(ctx context.Context, imageURL, desc string, opts DownloadOptions, secopts security.Options) (ImageData, http.Header, error) {
+	imgdata, h, err := downloadSync(ctx, imageURL, opts, secopts)
+	if err != nil {
+		return nil, h, ierrors.Wrap(
+			err, 0,
+			ierrors.WithPrefix(fmt.Sprintf("can't download %s", desc)),
+		)
+	}
+
+	return imgdata, h, nil
+}
+
+// DownloadSyncWithDesc downloads the image synchronously and returns the ImageData, but
+// wraps errors with desc.
+func DownloadAsync(ctx context.Context, imageURL, desc string, opts DownloadOptions, secopts security.Options) (ImageData, http.Header, error) {
+	imgdata, h, err := downloadAsync(ctx, imageURL, opts, secopts)
+	if err != nil {
+		return nil, h, ierrors.Wrap(
+			err, 0,
+			ierrors.WithPrefix(fmt.Sprintf("can't download %s", desc)),
+		)
+	}
+
+	return imgdata, h, nil
+}

+ 45 - 18
imagedata/image_data.go

@@ -3,11 +3,11 @@ package imagedata
 import (
 import (
 	"bytes"
 	"bytes"
 	"context"
 	"context"
-	"fmt"
 	"io"
 	"io"
 	"net/http"
 	"net/http"
 	"sync"
 	"sync"
 
 
+	"github.com/imgproxy/imgproxy/v3/asyncbuffer"
 	"github.com/imgproxy/imgproxy/v3/config"
 	"github.com/imgproxy/imgproxy/v3/config"
 	"github.com/imgproxy/imgproxy/v3/ierrors"
 	"github.com/imgproxy/imgproxy/v3/ierrors"
 	"github.com/imgproxy/imgproxy/v3/imagetype"
 	"github.com/imgproxy/imgproxy/v3/imagetype"
@@ -36,6 +36,14 @@ type imageDataBytes struct {
 	cancelOnce sync.Once
 	cancelOnce sync.Once
 }
 }
 
 
+// imageDataAsyncBuffer is a struct that implements the ImageData interface backed by an AsyncBuffer
+type imageDataAsyncBuffer struct {
+	b          *asyncbuffer.AsyncBuffer
+	format     imagetype.Type
+	cancel     []context.CancelFunc
+	cancelOnce sync.Once
+}
+
 func (d *imageDataBytes) Close() error {
 func (d *imageDataBytes) Close() error {
 	d.cancelOnce.Do(func() {
 	d.cancelOnce.Do(func() {
 		for _, cancel := range d.cancel {
 		for _, cancel := range d.cancel {
@@ -57,18 +65,49 @@ func (d *imageDataBytes) Reader() io.ReadSeeker {
 }
 }
 
 
 // Size returns the size of the image data in bytes.
 // Size returns the size of the image data in bytes.
-// NOTE: asyncbuffer implementation will .Wait() for the data to be fully read
 func (d *imageDataBytes) Size() (int, error) {
 func (d *imageDataBytes) Size() (int, error) {
 	return len(d.data), nil
 	return len(d.data), nil
 }
 }
 
 
+// AddCancel attaches a cancel function to the image data
 func (d *imageDataBytes) AddCancel(cancel context.CancelFunc) {
 func (d *imageDataBytes) AddCancel(cancel context.CancelFunc) {
 	d.cancel = append(d.cancel, cancel)
 	d.cancel = append(d.cancel, cancel)
 }
 }
 
 
-func Init() error {
-	initRead()
+// Reader returns a ReadSeeker for the image data
+func (d *imageDataAsyncBuffer) Reader() io.ReadSeeker {
+	return d.b.Reader()
+}
+
+// Close closes the response body (hence, response) and the async buffer itself
+func (d *imageDataAsyncBuffer) Close() error {
+	d.cancelOnce.Do(func() {
+		for _, cancel := range d.cancel {
+			cancel()
+		}
+		d.b.Close()
+	})
+
+	return nil
+}
+
+// Format returns the image format from the metadata
+func (d *imageDataAsyncBuffer) Format() imagetype.Type {
+	return d.format
+}
 
 
+// Size returns the size of the image data in bytes.
+// It waits for the async buffer to finish reading.
+func (d *imageDataAsyncBuffer) Size() (int, error) {
+	return d.b.Wait()
+}
+
+// AddCancel attaches a cancel function to the image data
+func (d *imageDataAsyncBuffer) AddCancel(cancel context.CancelFunc) {
+	d.cancel = append(d.cancel, cancel)
+}
+
+func Init() error {
 	if err := initDownloading(); err != nil {
 	if err := initDownloading(); err != nil {
 		return err
 		return err
 	}
 	}
@@ -105,7 +144,7 @@ func loadWatermark() error {
 		}
 		}
 
 
 	case len(config.WatermarkURL) > 0:
 	case len(config.WatermarkURL) > 0:
-		Watermark, _, err = Download(context.Background(), config.WatermarkURL, "watermark", DownloadOptions{Header: nil, CookieJar: nil}, security.DefaultOptions())
+		Watermark, _, err = DownloadSync(context.Background(), config.WatermarkURL, "watermark", DownloadOptions{Header: nil, CookieJar: nil}, security.DefaultOptions())
 		if err != nil {
 		if err != nil {
 			return ierrors.Wrap(err, 0, ierrors.WithPrefix("can't download from URL"))
 			return ierrors.Wrap(err, 0, ierrors.WithPrefix("can't download from URL"))
 		}
 		}
@@ -132,7 +171,7 @@ func loadFallbackImage() (err error) {
 		}
 		}
 
 
 	case len(config.FallbackImageURL) > 0:
 	case len(config.FallbackImageURL) > 0:
-		FallbackImage, FallbackImageHeaders, err = Download(context.Background(), config.FallbackImageURL, "fallback image", DownloadOptions{Header: nil, CookieJar: nil}, security.DefaultOptions())
+		FallbackImage, FallbackImageHeaders, err = DownloadSync(context.Background(), config.FallbackImageURL, "fallback image", DownloadOptions{Header: nil, CookieJar: nil}, security.DefaultOptions())
 		if err != nil {
 		if err != nil {
 			return ierrors.Wrap(err, 0, ierrors.WithPrefix("can't download from URL"))
 			return ierrors.Wrap(err, 0, ierrors.WithPrefix("can't download from URL"))
 		}
 		}
@@ -143,15 +182,3 @@ func loadFallbackImage() (err error) {
 
 
 	return err
 	return err
 }
 }
-
-func Download(ctx context.Context, imageURL, desc string, opts DownloadOptions, secopts security.Options) (ImageData, http.Header, error) {
-	imgdata, h, err := download(ctx, imageURL, opts, secopts)
-	if err != nil {
-		return nil, h, ierrors.Wrap(
-			err, 0,
-			ierrors.WithPrefix(fmt.Sprintf("Can't download %s", desc)),
-		)
-	}
-
-	return imgdata, h, nil
-}

+ 11 - 11
imagedata/image_data_test.go

@@ -91,7 +91,7 @@ func (s *ImageDataTestSuite) SetupTest() {
 }
 }
 
 
 func (s *ImageDataTestSuite) TestDownloadStatusOK() {
 func (s *ImageDataTestSuite) TestDownloadStatusOK() {
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().NoError(err)
 	s.Require().NoError(err)
 	s.Require().NotNil(imgdata)
 	s.Require().NotNil(imgdata)
@@ -158,7 +158,7 @@ func (s *ImageDataTestSuite) TestDownloadStatusPartialContent() {
 		s.Run(tc.name, func() {
 		s.Run(tc.name, func() {
 			s.header.Set("Content-Range", tc.contentRange)
 			s.header.Set("Content-Range", tc.contentRange)
 
 
-			imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+			imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 			if tc.expectErr {
 			if tc.expectErr {
 				s.Require().Error(err)
 				s.Require().Error(err)
@@ -178,7 +178,7 @@ func (s *ImageDataTestSuite) TestDownloadStatusNotFound() {
 	s.data = []byte("Not Found")
 	s.data = []byte("Not Found")
 	s.header.Set("Content-Type", "text/plain")
 	s.header.Set("Content-Type", "text/plain")
 
 
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().Error(err)
 	s.Require().Error(err)
 	s.Require().Equal(404, ierrors.Wrap(err, 0).StatusCode())
 	s.Require().Equal(404, ierrors.Wrap(err, 0).StatusCode())
@@ -190,7 +190,7 @@ func (s *ImageDataTestSuite) TestDownloadStatusForbidden() {
 	s.data = []byte("Forbidden")
 	s.data = []byte("Forbidden")
 	s.header.Set("Content-Type", "text/plain")
 	s.header.Set("Content-Type", "text/plain")
 
 
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().Error(err)
 	s.Require().Error(err)
 	s.Require().Equal(404, ierrors.Wrap(err, 0).StatusCode())
 	s.Require().Equal(404, ierrors.Wrap(err, 0).StatusCode())
@@ -202,7 +202,7 @@ func (s *ImageDataTestSuite) TestDownloadStatusInternalServerError() {
 	s.data = []byte("Internal Server Error")
 	s.data = []byte("Internal Server Error")
 	s.header.Set("Content-Type", "text/plain")
 	s.header.Set("Content-Type", "text/plain")
 
 
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().Error(err)
 	s.Require().Error(err)
 	s.Require().Equal(500, ierrors.Wrap(err, 0).StatusCode())
 	s.Require().Equal(500, ierrors.Wrap(err, 0).StatusCode())
@@ -216,7 +216,7 @@ func (s *ImageDataTestSuite) TestDownloadUnreachable() {
 
 
 	serverURL := fmt.Sprintf("http://%s", l.Addr().String())
 	serverURL := fmt.Sprintf("http://%s", l.Addr().String())
 
 
-	imgdata, _, err := Download(context.Background(), serverURL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), serverURL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().Error(err)
 	s.Require().Error(err)
 	s.Require().Equal(500, ierrors.Wrap(err, 0).StatusCode())
 	s.Require().Equal(500, ierrors.Wrap(err, 0).StatusCode())
@@ -226,7 +226,7 @@ func (s *ImageDataTestSuite) TestDownloadUnreachable() {
 func (s *ImageDataTestSuite) TestDownloadInvalidImage() {
 func (s *ImageDataTestSuite) TestDownloadInvalidImage() {
 	s.data = []byte("invalid")
 	s.data = []byte("invalid")
 
 
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().Error(err)
 	s.Require().Error(err)
 	s.Require().Equal(422, ierrors.Wrap(err, 0).StatusCode())
 	s.Require().Equal(422, ierrors.Wrap(err, 0).StatusCode())
@@ -236,7 +236,7 @@ func (s *ImageDataTestSuite) TestDownloadInvalidImage() {
 func (s *ImageDataTestSuite) TestDownloadSourceAddressNotAllowed() {
 func (s *ImageDataTestSuite) TestDownloadSourceAddressNotAllowed() {
 	config.AllowLoopbackSourceAddresses = false
 	config.AllowLoopbackSourceAddresses = false
 
 
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().Error(err)
 	s.Require().Error(err)
 	s.Require().Equal(404, ierrors.Wrap(err, 0).StatusCode())
 	s.Require().Equal(404, ierrors.Wrap(err, 0).StatusCode())
@@ -246,7 +246,7 @@ func (s *ImageDataTestSuite) TestDownloadSourceAddressNotAllowed() {
 func (s *ImageDataTestSuite) TestDownloadImageTooLarge() {
 func (s *ImageDataTestSuite) TestDownloadImageTooLarge() {
 	config.MaxSrcResolution = 1
 	config.MaxSrcResolution = 1
 
 
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().Error(err)
 	s.Require().Error(err)
 	s.Require().Equal(422, ierrors.Wrap(err, 0).StatusCode())
 	s.Require().Equal(422, ierrors.Wrap(err, 0).StatusCode())
@@ -256,7 +256,7 @@ func (s *ImageDataTestSuite) TestDownloadImageTooLarge() {
 func (s *ImageDataTestSuite) TestDownloadImageFileTooLarge() {
 func (s *ImageDataTestSuite) TestDownloadImageFileTooLarge() {
 	config.MaxSrcFileSize = 1
 	config.MaxSrcFileSize = 1
 
 
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().Error(err)
 	s.Require().Error(err)
 	s.Require().Equal(422, ierrors.Wrap(err, 0).StatusCode())
 	s.Require().Equal(422, ierrors.Wrap(err, 0).StatusCode())
@@ -275,7 +275,7 @@ func (s *ImageDataTestSuite) TestDownloadGzip() {
 	s.data = buf.Bytes()
 	s.data = buf.Bytes()
 	s.header.Set("Content-Encoding", "gzip")
 	s.header.Set("Content-Encoding", "gzip")
 
 
-	imgdata, _, err := Download(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
+	imgdata, _, err := DownloadSync(context.Background(), s.server.URL, "Test image", DownloadOptions{}, security.DefaultOptions())
 
 
 	s.Require().NoError(err)
 	s.Require().NoError(err)
 	s.Require().NotNil(imgdata)
 	s.Require().NotNil(imgdata)

+ 0 - 62
imagedata/read.go

@@ -1,62 +0,0 @@
-package imagedata
-
-import (
-	"bytes"
-	"context"
-	"io"
-
-	"github.com/imgproxy/imgproxy/v3/bufpool"
-	"github.com/imgproxy/imgproxy/v3/bufreader"
-	"github.com/imgproxy/imgproxy/v3/config"
-	"github.com/imgproxy/imgproxy/v3/imagefetcher"
-	"github.com/imgproxy/imgproxy/v3/imagemeta"
-	"github.com/imgproxy/imgproxy/v3/security"
-)
-
-var downloadBufPool *bufpool.Pool
-
-func initRead() {
-	downloadBufPool = bufpool.New("download", config.Workers, config.DownloadBufferSize)
-}
-
-func readAndCheckImage(r io.Reader, contentLength int, secopts security.Options) (ImageData, error) {
-	buf := downloadBufPool.Get(contentLength, false)
-	cancel := func() { downloadBufPool.Put(buf) }
-
-	br := bufreader.New(r, buf)
-
-	meta, err := imagemeta.DecodeMeta(br)
-	if err != nil {
-		buf.Reset()
-		cancel()
-
-		return nil, imagefetcher.WrapError(err)
-	}
-
-	if err = security.CheckDimensions(meta.Width(), meta.Height(), 1, secopts); err != nil {
-		buf.Reset()
-		cancel()
-
-		return nil, imagefetcher.WrapError(err)
-	}
-
-	downloadBufPool.GrowBuffer(buf, contentLength)
-
-	if err = br.Flush(); err != nil {
-		buf.Reset()
-		cancel()
-
-		return nil, imagefetcher.WrapError(err)
-	}
-
-	i := NewFromBytesWithFormat(meta.Format(), buf.Bytes())
-	i.AddCancel(cancel)
-	return i, nil
-}
-
-func BorrowBuffer() (*bytes.Buffer, context.CancelFunc) {
-	buf := downloadBufPool.Get(0, false)
-	cancel := func() { downloadBufPool.Put(buf) }
-
-	return buf, cancel
-}

+ 16 - 0
imagefetcher/config.go

@@ -0,0 +1,16 @@
+package imagefetcher
+
+import "github.com/imgproxy/imgproxy/v3/config"
+
+// Config holds the configuration for the image fetcher.
+type Config struct {
+	// MaxRedirects is the maximum number of redirects to follow when fetching an image.
+	MaxRedirects int
+}
+
+// NewConfigFromEnv creates a new Config instance from environment variables or defaults.
+func NewConfigFromEnv() *Config {
+	return &Config{
+		MaxRedirects: config.MaxRedirects,
+	}
+}

+ 5 - 5
imagefetcher/fetcher.go

@@ -20,19 +20,19 @@ const (
 
 
 // Fetcher is a struct that holds the HTTP client and transport for fetching images
 // Fetcher is a struct that holds the HTTP client and transport for fetching images
 type Fetcher struct {
 type Fetcher struct {
-	transport    *transport.Transport // Transport used for making HTTP requests
-	maxRedirects int                  // Maximum number of redirects allowed
+	transport *transport.Transport // Transport used for making HTTP requests
+	config    *Config              // Configuration for the image fetcher
 }
 }
 
 
 // NewFetcher creates a new ImageFetcher with the provided transport
 // NewFetcher creates a new ImageFetcher with the provided transport
-func NewFetcher(transport *transport.Transport, maxRedirects int) (*Fetcher, error) {
-	return &Fetcher{transport, maxRedirects}, nil
+func NewFetcher(transport *transport.Transport, config *Config) (*Fetcher, error) {
+	return &Fetcher{transport, config}, nil
 }
 }
 
 
 // checkRedirect is a method that checks if the number of redirects exceeds the maximum allowed
 // checkRedirect is a method that checks if the number of redirects exceeds the maximum allowed
 func (f *Fetcher) checkRedirect(req *http.Request, via []*http.Request) error {
 func (f *Fetcher) checkRedirect(req *http.Request, via []*http.Request) error {
 	redirects := len(via)
 	redirects := len(via)
-	if redirects >= f.maxRedirects {
+	if redirects >= f.config.MaxRedirects {
 		return newImageTooManyRedirectsError(redirects)
 		return newImageTooManyRedirectsError(redirects)
 	}
 	}
 	return nil
 	return nil

+ 1 - 1
processing_handler.go

@@ -375,7 +375,7 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
 			checkErr(ctx, "download", err)
 			checkErr(ctx, "download", err)
 		}
 		}
 
 
-		return imagedata.Download(ctx, imageURL, "source image", downloadOpts, po.SecurityOptions)
+		return imagedata.DownloadAsync(ctx, imageURL, "source image", downloadOpts, po.SecurityOptions)
 	}()
 	}()
 
 
 	var nmErr imagefetcher.NotModifiedError
 	var nmErr imagefetcher.NotModifiedError

+ 14 - 1
svg/svg.go

@@ -1,21 +1,34 @@
 package svg
 package svg
 
 
 import (
 import (
+	"bytes"
+	"context"
 	"io"
 	"io"
 	"strings"
 	"strings"
 
 
 	"github.com/tdewolff/parse/v2"
 	"github.com/tdewolff/parse/v2"
 	"github.com/tdewolff/parse/v2/xml"
 	"github.com/tdewolff/parse/v2/xml"
 
 
+	"github.com/imgproxy/imgproxy/v3/bufpool"
+	"github.com/imgproxy/imgproxy/v3/config"
 	"github.com/imgproxy/imgproxy/v3/imagedata"
 	"github.com/imgproxy/imgproxy/v3/imagedata"
 	"github.com/imgproxy/imgproxy/v3/imagetype"
 	"github.com/imgproxy/imgproxy/v3/imagetype"
 )
 )
 
 
+var downloadBufPool *bufpool.Pool = bufpool.New("download", config.Workers, config.DownloadBufferSize)
+
+func BorrowBuffer() (*bytes.Buffer, context.CancelFunc) {
+	buf := downloadBufPool.Get(0, false)
+	cancel := func() { downloadBufPool.Put(buf) }
+
+	return buf, cancel
+}
+
 func Sanitize(data imagedata.ImageData) (imagedata.ImageData, error) {
 func Sanitize(data imagedata.ImageData) (imagedata.ImageData, error) {
 	r := data.Reader()
 	r := data.Reader()
 	l := xml.NewLexer(parse.NewInput(r))
 	l := xml.NewLexer(parse.NewInput(r))
 
 
-	buf, cancel := imagedata.BorrowBuffer()
+	buf, cancel := BorrowBuffer()
 
 
 	ignoreTag := 0
 	ignoreTag := 0
 
 

+ 11 - 11
vips/source.go

@@ -14,6 +14,12 @@ import (
 	"unsafe"
 	"unsafe"
 )
 )
 
 
+// newVipsSource creates a new VipsAsyncSource from an io.ReadSeeker.
+func newVipsImgproxySource(r io.ReadSeeker) *C.VipsImgproxySource {
+	handler := cgo.NewHandle(r)
+	return C.vips_new_imgproxy_source(C.uintptr_t(handler))
+}
+
 //export closeImgproxyReader
 //export closeImgproxyReader
 func closeImgproxyReader(handle C.uintptr_t) {
 func closeImgproxyReader(handle C.uintptr_t) {
 	h := cgo.Handle(handle)
 	h := cgo.Handle(handle)
@@ -25,13 +31,13 @@ func closeImgproxyReader(handle C.uintptr_t) {
 //export imgproxyReaderSeek
 //export imgproxyReaderSeek
 func imgproxyReaderSeek(handle C.uintptr_t, offset C.int64_t, whence int) C.int64_t {
 func imgproxyReaderSeek(handle C.uintptr_t, offset C.int64_t, whence int) C.int64_t {
 	h := cgo.Handle(handle)
 	h := cgo.Handle(handle)
-	reader, ok := h.Value().(io.ReadSeeker)
+	r, ok := h.Value().(io.ReadSeeker)
 	if !ok {
 	if !ok {
-		vipsError("imgproxyReaderSeek", "failed to cast handle to io.ReadSeeker")
+		vipsError("imgproxyReaderSeek", "failed to cast handle to *source")
 		return -1
 		return -1
 	}
 	}
 
 
-	pos, err := reader.Seek(int64(offset), whence)
+	pos, err := r.Seek(int64(offset), whence)
 	if err != nil {
 	if err != nil {
 		vipsError("imgproxyReaderSeek", "failed to seek: %v", err)
 		vipsError("imgproxyReaderSeek", "failed to seek: %v", err)
 		return -1
 		return -1
@@ -45,14 +51,14 @@ func imgproxyReaderSeek(handle C.uintptr_t, offset C.int64_t, whence int) C.int6
 //export imgproxyReaderRead
 //export imgproxyReaderRead
 func imgproxyReaderRead(handle C.uintptr_t, pointer unsafe.Pointer, size C.int64_t) C.int64_t {
 func imgproxyReaderRead(handle C.uintptr_t, pointer unsafe.Pointer, size C.int64_t) C.int64_t {
 	h := cgo.Handle(handle)
 	h := cgo.Handle(handle)
-	reader, ok := h.Value().(io.ReadSeeker)
+	r, ok := h.Value().(io.ReadSeeker)
 	if !ok {
 	if !ok {
 		vipsError("imgproxyReaderRead", "invalid reader handle")
 		vipsError("imgproxyReaderRead", "invalid reader handle")
 		return -1
 		return -1
 	}
 	}
 
 
 	buf := unsafe.Slice((*byte)(pointer), size)
 	buf := unsafe.Slice((*byte)(pointer), size)
-	n, err := reader.Read(buf)
+	n, err := r.Read(buf)
 	if err == io.EOF {
 	if err == io.EOF {
 		return 0
 		return 0
 	} else if err != nil {
 	} else if err != nil {
@@ -62,9 +68,3 @@ func imgproxyReaderRead(handle C.uintptr_t, pointer unsafe.Pointer, size C.int64
 
 
 	return C.int64_t(n)
 	return C.int64_t(n)
 }
 }
-
-// newVipsSource creates a new VipsAsyncSource from an io.ReadSeeker.
-func newVipsImgproxySource(r io.ReadSeeker) *C.VipsImgproxySource {
-	handler := cgo.NewHandle(r)
-	return C.vips_new_imgproxy_source(C.uintptr_t(handler))
-}

+ 2 - 4
vips/vips.go

@@ -358,8 +358,7 @@ func (img *Image) Load(imgdata imagedata.ImageData, shrink int, scale float64, p
 
 
 	err := C.int(0)
 	err := C.int(0)
 
 
-	reader := imgdata.Reader()
-	source := newVipsImgproxySource(reader)
+	source := newVipsImgproxySource(imgdata.Reader())
 	defer C.unref_imgproxy_source(source)
 	defer C.unref_imgproxy_source(source)
 
 
 	switch imgdata.Format() {
 	switch imgdata.Format() {
@@ -410,8 +409,7 @@ func (img *Image) LoadThumbnail(imgdata imagedata.ImageData) error {
 
 
 	var tmp *C.VipsImage
 	var tmp *C.VipsImage
 
 
-	reader := imgdata.Reader()
-	source := newVipsImgproxySource(reader)
+	source := newVipsImgproxySource(imgdata.Reader())
 	defer C.unref_imgproxy_source(source)
 	defer C.unref_imgproxy_source(source)
 
 
 	if err := C.vips_heifload_source_go(source, &tmp, C.int(1)); err != 0 {
 	if err := C.vips_heifload_source_go(source, &tmp, C.int(1)); err != 0 {