|
|
@@ -97,6 +97,30 @@ func New(r io.ReadCloser, dataLen int, finishFn ...context.CancelFunc) *AsyncBuf
|
|
|
return ab
|
|
|
}
|
|
|
|
|
|
+// NewReadFull creates a new AsyncBuffer that reads from the given io.ReadCloser
|
|
|
+// in foreground, blocking until all data is read. It returns an error if reading
|
|
|
+// fails. When read fails, the reader is closed and resources are released immediately.
|
|
|
+func NewReadFull(r io.ReadCloser, dataLen int, finishFn ...context.CancelFunc) (*AsyncBuffer, error) {
|
|
|
+ ab := &AsyncBuffer{
|
|
|
+ r: r,
|
|
|
+ dataLen: dataLen,
|
|
|
+ paused: NewLatch(),
|
|
|
+ chunkCond: NewCond(),
|
|
|
+ finishFn: finishFn,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Read all data in foreground
|
|
|
+ ab.readChunks()
|
|
|
+
|
|
|
+ // If error occurred during reading, return it
|
|
|
+ if ab.Error() != nil {
|
|
|
+ ab.Close() // Reader should be closed and resources released
|
|
|
+ return nil, ab.Error()
|
|
|
+ }
|
|
|
+
|
|
|
+ return ab, nil
|
|
|
+}
|
|
|
+
|
|
|
// callFinishFn calls the finish functions registered with the AsyncBuffer.
|
|
|
func (ab *AsyncBuffer) callFinishFn() {
|
|
|
ab.finishOnce.Do(func() {
|
|
|
@@ -428,17 +452,14 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
|
|
|
return n, nil
|
|
|
}
|
|
|
|
|
|
-// Close closes the AsyncBuffer and releases all resources.
|
|
|
-// It returns an error if the reader was already closed or if there was
|
|
|
-// an error during reading data in background even if none of the subsequent
|
|
|
-// readers have reached the position where the error occurred.
|
|
|
+// Close closes the AsyncBuffer and releases all resources. It is idempotent.
|
|
|
func (ab *AsyncBuffer) Close() error {
|
|
|
ab.mu.Lock()
|
|
|
defer ab.mu.Unlock()
|
|
|
|
|
|
// If the reader is already closed, we return immediately error or nil
|
|
|
if ab.closed.Load() {
|
|
|
- return ab.Error()
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
ab.closed.Store(true)
|