|
@@ -66,16 +66,16 @@ type AsyncBuffer struct {
|
|
|
finished atomic.Bool // Indicates that the buffer has finished reading
|
|
|
closed atomic.Bool // Indicates that the buffer was closed
|
|
|
|
|
|
- paused *Latch // Paused buffer does not read data beyond threshold
|
|
|
- ticker *Cond // Ticker that signals when a new chunk is ready
|
|
|
+ paused *Latch // Paused buffer does not read data beyond threshold
|
|
|
+ chunkCond *Cond // Ticker that signals when a new chunk is ready
|
|
|
}
|
|
|
|
|
|
// FromReadCloser creates a new AsyncBuffer that reads from the given io.Reader in background
|
|
|
func FromReader(r io.ReadCloser) *AsyncBuffer {
|
|
|
ab := &AsyncBuffer{
|
|
|
- r: r,
|
|
|
- paused: NewLatch(),
|
|
|
- ticker: NewCond(),
|
|
|
+ r: r,
|
|
|
+ paused: NewLatch(),
|
|
|
+ chunkCond: NewCond(),
|
|
|
}
|
|
|
|
|
|
go ab.readChunks()
|
|
@@ -98,7 +98,7 @@ func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
|
|
|
ab.chunks = append(ab.chunks, chunk)
|
|
|
ab.len.Add(int64(len(chunk.data)))
|
|
|
|
|
|
- ab.ticker.Tick()
|
|
|
+ ab.chunkCond.Tick()
|
|
|
}
|
|
|
|
|
|
// readChunks reads data from the upstream reader in background and stores them in the pool
|
|
@@ -106,7 +106,7 @@ func (ab *AsyncBuffer) readChunks() {
|
|
|
defer func() {
|
|
|
// Indicate that the reader has finished reading
|
|
|
ab.finished.Store(true)
|
|
|
- ab.ticker.Close()
|
|
|
+ ab.chunkCond.Close()
|
|
|
|
|
|
// Close the upstream reader
|
|
|
if err := ab.r.Close(); err != nil {
|
|
@@ -227,7 +227,7 @@ func (ab *AsyncBuffer) WaitFor(off int64) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- ab.ticker.Wait()
|
|
|
+ ab.chunkCond.Wait()
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -249,7 +249,7 @@ func (ab *AsyncBuffer) Wait() (int64, error) {
|
|
|
}
|
|
|
|
|
|
// Lock until the next chunk is ready
|
|
|
- ab.ticker.Wait()
|
|
|
+ ab.chunkCond.Wait()
|
|
|
}
|
|
|
}
|
|
|
|