|
@@ -89,6 +89,17 @@ func New(r io.ReadCloser, finishFn ...context.CancelFunc) *AsyncBuffer {
|
|
return ab
|
|
return ab
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// callFinishFn calls the finish functions registered with the AsyncBuffer.
|
|
|
|
+func (ab *AsyncBuffer) callFinishFn() {
|
|
|
|
+ ab.finishOnce.Do(func() {
|
|
|
|
+ for _, fn := range ab.finishFn {
|
|
|
|
+ if fn != nil {
|
|
|
|
+ fn()
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
// addChunk adds a new chunk to the AsyncBuffer, increments len and signals that a chunk is ready
|
|
// addChunk adds a new chunk to the AsyncBuffer, increments len and signals that a chunk is ready
|
|
func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
|
|
func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
|
|
ab.mu.Lock()
|
|
ab.mu.Lock()
|
|
@@ -119,11 +130,7 @@ func (ab *AsyncBuffer) readChunks() {
|
|
logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err)
|
|
logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err)
|
|
}
|
|
}
|
|
|
|
|
|
- ab.finishOnce.Do(func() {
|
|
|
|
- for _, fn := range ab.finishFn {
|
|
|
|
- fn()
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
|
|
+ ab.callFinishFn()
|
|
}()
|
|
}()
|
|
|
|
|
|
// Stop reading if the reader is closed
|
|
// Stop reading if the reader is closed
|
|
@@ -403,11 +410,7 @@ func (ab *AsyncBuffer) Close() error {
|
|
ab.paused.Release()
|
|
ab.paused.Release()
|
|
|
|
|
|
// Finish downloading
|
|
// Finish downloading
|
|
- ab.finishOnce.Do(func() {
|
|
|
|
- for _, fn := range ab.finishFn {
|
|
|
|
- fn()
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
|
|
+ ab.callFinishFn()
|
|
|
|
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|