Prechádzať zdrojové kódy

io.ReadCloser in buffer

Viktor Sokolov 2 mesiacov pred
rodič
commit
ebdc1c1782
2 zmenil súbory, kde vykonal 36 pridanie a 10 odobranie
  1. 9 2
      asyncbuffer/buffer.go
  2. 27 8
      asyncbuffer/buffer_test.go

+ 9 - 2
asyncbuffer/buffer.go

@@ -54,7 +54,7 @@ var chunkPool = sync.Pool{
 // AsyncBuffer is a wrapper around io.Reader that reads data in chunks
 // in background and allows reading from synchronously.
 type AsyncBuffer struct {
-	r io.Reader // Upstream reader
+	r io.ReadCloser // Upstream reader
 
 	chunks []*byteChunk // References to the chunks read from the upstream reader
 
@@ -76,7 +76,7 @@ type Reader struct {
 }
 
 // FromReadCloser creates a new AsyncBuffer that reads from the given io.Reader in background
-func FromReader(r io.Reader) *AsyncBuffer {
+func FromReader(r io.ReadCloser) *AsyncBuffer {
 	ab := &AsyncBuffer{
 		r:              r,
 		newChunkSignal: make(chan struct{}),
@@ -130,6 +130,13 @@ func (ab *AsyncBuffer) finish() {
 	if !ab.closed.Load() {
 		close(ab.newChunkSignal)
 	}
+
+	err := ab.r.Close() // Close the upstream reader
+	if err != nil {
+		// If there was an error while closing the upstream reader, store it
+		ab.err.Store(err)
+		return
+	}
 }
 
 // readChunks reads data from the upstream reader in background and stores them in the pool

+ 27 - 8
asyncbuffer/buffer_test.go

@@ -20,9 +20,19 @@ const (
 	quaterChunkSize = chunkSize / 4
 )
 
+// nopSeekCloser is a wrapper around io.ReadSeeker that implements no-op io.Closer
+type nopSeekCloser struct {
+	io.ReadSeeker
+}
+
+// Close implements io.Closer interface, but does nothing
+func (nopSeekCloser) Close() error {
+	return nil
+}
+
 // erraticReader is a test reader that simulates a slow read and can fail after reading a certain number of bytes
 type erraticReader struct {
-	reader bytes.Reader
+	reader io.ReadSeekCloser
 	failAt int64 // if set, will return an error after reading this many bytes
 }
 
@@ -35,15 +45,20 @@ func (r *erraticReader) Read(p []byte) (n int, err error) {
 	return r.reader.Read(p)
 }
 
+// Close forwards closing to the underlying reader
+func (r *erraticReader) Close() error {
+	return r.reader.Close()
+}
+
 // blockingReader is a test reader which flushes data in chunks
 type blockingReader struct {
-	reader    bytes.Reader
+	reader    io.ReadCloser
 	mu        sync.Mutex  // locked reader does not return anything
 	unlocking atomic.Bool // if true, will proceed without locking each chunk
 }
 
 // newBlockingReader creates a new partialReader in locked state
-func newBlockingReader(reader bytes.Reader) *blockingReader {
+func newBlockingReader(reader io.ReadCloser) *blockingReader {
 	r := &blockingReader{
 		reader: reader,
 	}
@@ -72,8 +87,12 @@ func (r *blockingReader) Read(p []byte) (n int, err error) {
 	return n, err
 }
 
+func (r *blockingReader) Close() error { // Close forwards closing to the underlying reader
+	return r.reader.Close()
+}
+
 // generateSourceData generates a byte slice with 4.5 chunks of data
-func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
+func generateSourceData(t *testing.T, size int64) ([]byte, io.ReadSeekCloser) {
 	// We use small chunks for tests, let's check the ChunkSize just in case
 	assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
 
@@ -83,7 +102,7 @@ func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
 	// Fill the source with random data
 	_, err := rand.Read(source)
 	require.NoError(t, err)
-	return source, bytes.NewReader(source)
+	return source, nopSeekCloser{bytes.NewReader(source)}
 }
 
 // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
@@ -275,7 +294,7 @@ func TestAsyncBufferClose(t *testing.T) {
 func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
 	// Let's use source buffer which is 4.5 chunks long
 	source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
-	slowReader := &erraticReader{reader: *bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
+	slowReader := &erraticReader{reader: bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
 	asyncBuffer := FromReader(slowReader)
 	defer asyncBuffer.Close()
 
@@ -308,7 +327,7 @@ func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
 func TestAsyncBufferReadAsync(t *testing.T) {
 	// Let's use source buffer which is 4.5 chunks long
 	source, bytesReader := generateSourceData(t, int64(chunkSize)*3)
-	blockingReader := newBlockingReader(*bytesReader)
+	blockingReader := newBlockingReader(bytesReader)
 	asyncBuffer := FromReader(blockingReader)
 	defer asyncBuffer.Close()
 
@@ -353,7 +372,7 @@ func TestAsyncBufferReadAsync(t *testing.T) {
 func TestAsyncBufferReadAllCompability(t *testing.T) {
 	source, err := os.ReadFile("../testdata/test1.jpg")
 	require.NoError(t, err)
-	asyncBuffer := FromReader(bytes.NewReader(source))
+	asyncBuffer := FromReader(nopSeekCloser{bytes.NewReader(source)})
 	defer asyncBuffer.Close()
 
 	b, err := io.ReadAll(asyncBuffer.Reader())