Browse Source

Fix io.EOF behaviour

Viktor Sokolov 2 months ago
parent
commit
9a417ad4fc
4 changed files with 13 additions and 17 deletions
  1. 10 5
      asyncbuffer/buffer.go
  2. 3 4
      asyncbuffer/buffer_test.go
  3. 0 1
      asyncbuffer/cond.go
  4. 0 7
      asyncbuffer/latch.go

+ 10 - 5
asyncbuffer/buffer.go

@@ -110,11 +110,11 @@ func (ab *AsyncBuffer) readChunks() {
 
 		// Close the upstream reader
 		if err := ab.r.Close(); err != nil {
-			logrus.WithField("asyncBuffer", ab).Warningf("error closing upstream reader: %v", err)
+			logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err)
 		}
 	}()
 
-	// Stop reading if the reader is finished
+	// Stop reading if the reader is closed
 	for !ab.closed.Load() {
 		// In case we are trying to read data beyond threshold and we are paused,
 		// wait for pause to be released.
@@ -136,7 +136,8 @@ func (ab *AsyncBuffer) readChunks() {
 		}
 
 		// Read data into the chunk's buffer
-		// There is no way to guarantee that this would
+		// There is no way to guarantee that ReadFull will abort on context cancellation,
+		// unfortunately, this is how golang works.
 		n, err := io.ReadFull(ab.r, chunk.buf)
 
 		// If it's not the EOF, we need to store the error
@@ -342,7 +343,11 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
 	for size > 0 {
 		// If data is not available at the given offset, we can return data read so far.
 		ok, err := ab.offsetAvailable(off)
-		if !ok || err != nil {
+		if !ok {
+			if err == io.EOF {
+				return n, nil
+			}
+
 			return n, err
 		}
 
@@ -355,7 +360,7 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
 		// If we read data shorter than ChunkSize or, in case that was the last chunk, less than
 		// the size of the tail, return kind of EOF
 		if int64(nX) < min(size, int64(chunkSize)) {
-			return n, io.EOF
+			return n, nil
 		}
 	}
 

+ 3 - 4
asyncbuffer/buffer_test.go

@@ -126,7 +126,7 @@ func TestAsyncBufferReadAt(t *testing.T) {
 	target = make([]byte, len(source)+1)
 
 	n, err = asyncBuffer.readAt(target, 0)
-	require.ErrorIs(t, err, io.EOF) // We read all the data, and reached end
+	require.NoError(t, err) // We read all the data, and reached end
 	assert.Equal(t, len(source), n)
 	assert.Equal(t, target[:n], source)
 
@@ -154,8 +154,7 @@ func TestAsyncBufferReadAt(t *testing.T) {
 	// Let's try to read more data then available in the stream
 	target = make([]byte, chunkSize*2)
 	n, err = asyncBuffer.readAt(target, chunkSize*4)
-	require.Error(t, err)
-	assert.Equal(t, err, io.EOF)
+	require.NoError(t, err)
 	assert.Equal(t, chunkSize/2, n)
 	assert.Equal(t, target[:chunkSize/2], source[chunkSize*4:]) // We read only last half chunk
 
@@ -357,7 +356,7 @@ func TestAsyncBufferReadAsync(t *testing.T) {
 	// Try to read near end of the stream, EOF
 	target = make([]byte, chunkSize)
 	n, err = asyncBuffer.readAt(target, chunkSize*3-1)
-	require.ErrorIs(t, err, io.EOF)
+	require.NoError(t, err)
 	assert.Equal(t, 1, n)
 	assert.Equal(t, target[0], source[chunkSize*3-1])
 

+ 0 - 1
asyncbuffer/cond.go

@@ -8,7 +8,6 @@ type condCh = chan struct{}
 
 // Cond signals that an event has occurred to a multiple waiters.
 type Cond struct {
-	_         noCopy
 	mu        sync.RWMutex
 	ch        condCh
 	closeOnce sync.Once

+ 0 - 7
asyncbuffer/latch.go

@@ -6,7 +6,6 @@ import (
 
 // Latch is once-releasing semaphore.
 type Latch struct {
-	_    noCopy
 	once sync.Once
 	done chan struct{}
 }
@@ -25,9 +24,3 @@ func (g *Latch) Release() {
 func (g *Latch) Wait() {
 	<-g.done
 }
-
-// checked by 'go vet
-type noCopy struct{}
-
-func (*noCopy) Lock()   {}
-func (*noCopy) Unlock() {}