浏览代码

Refine usage of `closed` and `finished` flags in asyncbuffer

DarthSim 2 月之前
父节点
当前提交
f0455fe2fd
共有 1 个文件被更改,包括 25 次插入42 次删除
  1. 25 42
      asyncbuffer/buffer.go

+ 25 - 42
asyncbuffer/buffer.go

@@ -51,9 +51,9 @@ type AsyncBuffer struct {
 	chunks []*byteChunk // References to the chunks read from the upstream reader
 
 	err      atomic.Value // Error that occurred during reading
-	finished atomic.Bool  // Indicates that the reader has finished reading
+	finished atomic.Bool  // Indicates that the buffer has finished reading
 	len      atomic.Int64 // Total length of the data read
-	closed   atomic.Bool  // Indicates that the reader was closed
+	closed   atomic.Bool  // Indicates that the buffer was closed
 
 	mu             sync.RWMutex  // Mutex on chunks slice
 	newChunkSignal chan struct{} // Tick-tock channel that indicates that a new chunk is ready
@@ -92,6 +92,13 @@ func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
 	ab.mu.Lock()
 	defer ab.mu.Unlock()
 
+	if ab.closed.Load() {
+		// If the buffer is closed, then the chunk is not needed anymore,
+		// so we return it to the pool
+		chunkPool.Put(chunk)
+		return
+	}
+
 	// Store the chunk, increase chunk size, increase length of the data read
 	ab.chunks = append(ab.chunks, chunk)
 	ab.len.Add(int64(len(chunk.data)))
@@ -102,24 +109,18 @@ func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
 	close(currSignal)
 }
 
-// finish marks the reader as finished
-func (ab *AsyncBuffer) finish() {
-	// Indicate that the reader has finished reading
-	ab.finished.Store(true)
-
-	// This indicates that Close() was called before all the chunks were read, we do not need to close the channel
-	// since it was closed already.
-	if !ab.closed.Load() {
-		close(ab.newChunkSignal)
-	}
-}
-
 // readChunks reads data from the upstream reader in background and stores them in the pool
 func (ab *AsyncBuffer) readChunks() {
-	defer ab.finish()
+	defer func() {
+		// Indicate that the buffer has finished reading
+		ab.finished.Store(true)
+
+		// Since the buffer is finished reading, we should close the channel to unlock any waiting readers
+		close(ab.newChunkSignal)
+	}()
 
-	// Stop reading if the reader is finished
-	for !ab.finished.Load() {
+	// Stop reading if the buffer is closed
+	for !ab.closed.Load() {
 		// Get a chunk from the pool
 		// If the pool is empty, it will create a new byteChunk with ChunkSize
 		chunk, ok := chunkPool.Get().(*byteChunk)
@@ -177,8 +178,8 @@ func (ab *AsyncBuffer) closedError() error {
 // It may return io.EOF if the reader is finished reading and the offset is beyond the end of the stream.
 func (ab *AsyncBuffer) offsetAvailable(off int64) (bool, error) {
 	// We can not read data from the closed reader, none
-	if ab.closed.Load() {
-		return false, ab.closedError()
+	if err := ab.closedError(); err != nil {
+		return false, err
 	}
 
 	// In case the offset falls within the already read chunks, we can return immediately,
@@ -222,27 +223,15 @@ func (ab *AsyncBuffer) WaitFor(off int64) error {
 func (ab *AsyncBuffer) Wait() (int64, error) {
 	for {
 		// We can not read data from the closed reader even if there were no errors
-		if ab.closed.Load() {
-			return 0, ab.closedError()
+		if err := ab.closedError(); err != nil {
+			return 0, err
 		}
 
 		// In case the reader is finished reading, we can return immediately
 		if ab.finished.Load() {
-			size := ab.len.Load()
-
 			// If there was an error during reading, we need to return it no matter what position
 			// had the error happened
-			err := ab.err.Load()
-			if err != nil {
-				err, ok := err.(error)
-				if !ok {
-					return size, errors.New("asyncbuffer.AsyncBuffer.Wait: failed to get error")
-				}
-
-				return size, err
-			}
-
-			return size, nil
+			return ab.len.Load(), ab.Error()
 		}
 
 		// Lock until the next chunk is ready
@@ -320,8 +309,8 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
 	defer ab.mu.RUnlock()
 
 	// If the reader is closed, we return an error
-	if ab.closed.Load() {
-		return 0, ab.closedError()
+	if err := ab.closedError(); err != nil {
+		return 0, err
 	}
 
 	// Read data from the first chunk
@@ -372,12 +361,6 @@ func (ab *AsyncBuffer) Close() error {
 
 	ab.closed.Store(true)
 
-	// If the reader is still running, we need to signal that it should stop and close the channel
-	if !ab.finished.Load() {
-		ab.finished.Store(true)
-		close(ab.newChunkSignal)
-	}
-
 	// Return all chunks to the pool
 	for _, chunk := range ab.chunks {
 		chunkPool.Put(chunk)