ソースを参照

Additional close() condition

Viktor Sokolov 2 ヶ月 前
コミット
b92171c511
2 ファイル変更15 行追加5 行削除
  1. 14 4
      asyncbuffer/buffer.go
  2. 1 1
      asyncbuffer/buffer_test.go

+ 14 - 4
asyncbuffer/buffer.go

@@ -15,6 +15,7 @@ package asyncbuffer
 
 import (
 	"errors"
+	"fmt"
 	"io"
 	"sync"
 	"sync/atomic"
@@ -100,6 +101,11 @@ func (ab *AsyncBuffer) getNewChunkSignal() chan struct{} {
 func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
 	ab.mu.Lock()
 	defer ab.mu.Unlock()
+	if ab.closed.Load() {
+		// If the reader is closed, we return the chunk to the pool
+		chunkPool.Put(chunk)
+		return
+	}
 
 	// Store the chunk, increase chunk size, increase length of the data read
 	ab.chunks = append(ab.chunks, chunk)
@@ -113,6 +119,9 @@ func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
 
 // finish marks the reader as finished
 func (ab *AsyncBuffer) finish() {
+	ab.mu.Lock()
+	defer ab.mu.Unlock()
+
 	// Indicate that the reader has finished reading
 	ab.finished.Store(true)
 
@@ -133,11 +142,11 @@ func (ab *AsyncBuffer) readChunks() {
 		// wait for pause to be released.
 		if ab.len.Load() >= pauseThreshold {
 			ab.paused.Wait()
-		}
 
-		// If the reader has been closed while waiting, we can stop reading
-		if ab.finished.Load() {
-			return // No more data to read
+			// If the reader has been closed while waiting, we can stop reading
+			if ab.finished.Load() {
+				return // No more data to read
+			}
 		}
 
 		// Get a chunk from the pool
@@ -235,6 +244,7 @@ func (ab *AsyncBuffer) WaitFor(off int64) error {
 	// In case we are trying to read data which would potentially hit the pause threshold,
 	// we need to unpause the reader ASAP.
 	if off >= pauseThreshold {
+		fmt.Println(off, pauseThreshold, "UNLOCKING")
 		ab.paused.Release()
 	}
 

+ 1 - 1
asyncbuffer/buffer_test.go

@@ -416,7 +416,7 @@ func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
 	target := make([]byte, chunkSize)
 	n, err := asyncBuffer.readAt(target, pauseThreshold+1)
 	require.NoError(t, err)
-	assert.Equal(t, chunkSize, n)
+	assert.GreaterOrEqual(t, chunkSize, n)
 
 	// Ensure that buffer hits the end of the stream
 	require.Eventually(t, func() bool {