|
@@ -92,7 +92,7 @@ func (r *blockingReader) Close() error { // Close forwards closing to the underl
|
|
|
}
|
|
|
|
|
|
// generateSourceData generates a byte slice with 4.5 chunks of data
|
|
|
-func generateSourceData(t *testing.T, size int64) ([]byte, io.ReadSeekCloser) {
|
|
|
+func generateSourceData(t *testing.T, size int) ([]byte, io.ReadSeekCloser) {
|
|
|
// We use small chunks for tests, let's check the ChunkSize just in case
|
|
|
assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
|
|
|
|
|
@@ -108,8 +108,8 @@ func generateSourceData(t *testing.T, size int64) ([]byte, io.ReadSeekCloser) {
|
|
|
// TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
|
|
|
func TestAsyncBufferReadAt(t *testing.T) {
|
|
|
// Let's use source buffer which is 4.5 chunks long
|
|
|
- source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
|
|
|
- asyncBuffer := FromReader(bytesReader)
|
|
|
+ source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
|
|
|
+ asyncBuffer := New(bytesReader)
|
|
|
defer asyncBuffer.Close()
|
|
|
|
|
|
asyncBuffer.Wait() // Wait for all chunks to be read since we're going to read all data
|
|
@@ -169,7 +169,7 @@ func TestAsyncBufferReadAt(t *testing.T) {
|
|
|
// TestAsyncBufferRead tests reading from AsyncBuffer using ReadAt method
|
|
|
func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
|
|
|
source, bytesReader := generateSourceData(t, 20)
|
|
|
- asyncBuffer := FromReader(bytesReader)
|
|
|
+ asyncBuffer := New(bytesReader)
|
|
|
defer asyncBuffer.Close()
|
|
|
|
|
|
// First, let's read all the data
|
|
@@ -196,16 +196,16 @@ func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestAsyncBufferReader(t *testing.T) {
|
|
|
- source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
|
|
|
+ source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
|
|
|
|
|
|
// Create an AsyncBuffer with the byte slice
|
|
|
- asyncBuffer := FromReader(bytesReader)
|
|
|
+ asyncBuffer := New(bytesReader)
|
|
|
defer asyncBuffer.Close()
|
|
|
|
|
|
// Let's wait for all chunks to be read
|
|
|
size, err := asyncBuffer.Wait()
|
|
|
require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
|
|
|
- assert.Equal(t, int64(chunkSize*4+halfChunkSize), size)
|
|
|
+ assert.Equal(t, chunkSize*4+halfChunkSize, size)
|
|
|
|
|
|
reader := asyncBuffer.Reader()
|
|
|
|
|
@@ -245,7 +245,7 @@ func TestAsyncBufferReader(t *testing.T) {
|
|
|
// Seek -10 bytes from end of the stream
|
|
|
pos, err = reader.Seek(-10, io.SeekEnd)
|
|
|
require.NoError(t, err)
|
|
|
- assert.Equal(t, size-10, pos)
|
|
|
+ assert.Equal(t, size-10, int(pos))
|
|
|
|
|
|
// Read last 10 bytes
|
|
|
n, err = reader.Read(smallSlice)
|
|
@@ -256,7 +256,7 @@ func TestAsyncBufferReader(t *testing.T) {
|
|
|
// Seek beyond the end of the stream and try to read
|
|
|
pos, err = reader.Seek(1024, io.SeekCurrent)
|
|
|
require.NoError(t, err)
|
|
|
- assert.Equal(t, size+1024, pos)
|
|
|
+ assert.Equal(t, size+1024, int(pos))
|
|
|
|
|
|
_, err = reader.Read(smallSlice)
|
|
|
require.ErrorIs(t, err, io.EOF)
|
|
@@ -264,10 +264,10 @@ func TestAsyncBufferReader(t *testing.T) {
|
|
|
|
|
|
// TestAsyncBufferClose tests closing the AsyncBuffer
|
|
|
func TestAsyncBufferClose(t *testing.T) {
|
|
|
- _, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
|
|
|
+ _, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
|
|
|
|
|
|
// Create an AsyncBuffer with the byte slice
|
|
|
- asyncBuffer := FromReader(bytesReader)
|
|
|
+ asyncBuffer := New(bytesReader)
|
|
|
|
|
|
reader1 := asyncBuffer.Reader()
|
|
|
reader2 := asyncBuffer.Reader()
|
|
@@ -292,9 +292,9 @@ func TestAsyncBufferClose(t *testing.T) {
|
|
|
// which would fail somewhere
|
|
|
func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
|
|
|
// Let's use source buffer which is 4.5 chunks long
|
|
|
- source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
|
|
|
+ source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
|
|
|
slowReader := &erraticReader{reader: bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
|
|
|
- asyncBuffer := FromReader(slowReader)
|
|
|
+ asyncBuffer := New(slowReader)
|
|
|
defer asyncBuffer.Close()
|
|
|
|
|
|
// Let's wait for all chunks to be read
|
|
@@ -325,9 +325,9 @@ func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
|
|
|
// with full = false
|
|
|
func TestAsyncBufferReadAsync(t *testing.T) {
|
|
|
// Let's use source buffer which is 4.5 chunks long
|
|
|
- source, bytesReader := generateSourceData(t, int64(chunkSize)*3)
|
|
|
+ source, bytesReader := generateSourceData(t, chunkSize*3)
|
|
|
blockingReader := newBlockingReader(bytesReader)
|
|
|
- asyncBuffer := FromReader(blockingReader)
|
|
|
+ asyncBuffer := New(blockingReader)
|
|
|
defer asyncBuffer.Close()
|
|
|
|
|
|
// flush the first chunk to allow reading
|
|
@@ -371,7 +371,7 @@ func TestAsyncBufferReadAsync(t *testing.T) {
|
|
|
func TestAsyncBufferReadAllCompability(t *testing.T) {
|
|
|
source, err := os.ReadFile("../testdata/test1.jpg")
|
|
|
require.NoError(t, err)
|
|
|
- asyncBuffer := FromReader(nopSeekCloser{bytes.NewReader(source)})
|
|
|
+ asyncBuffer := New(nopSeekCloser{bytes.NewReader(source)})
|
|
|
defer asyncBuffer.Close()
|
|
|
|
|
|
b, err := io.ReadAll(asyncBuffer.Reader())
|
|
@@ -380,8 +380,8 @@ func TestAsyncBufferReadAllCompability(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestAsyncBufferThreshold(t *testing.T) {
|
|
|
- _, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
|
|
|
- asyncBuffer := FromReader(bytesReader)
|
|
|
+ _, bytesReader := generateSourceData(t, pauseThreshold*2)
|
|
|
+ asyncBuffer := New(bytesReader)
|
|
|
defer asyncBuffer.Close()
|
|
|
|
|
|
target := make([]byte, chunkSize)
|
|
@@ -426,8 +426,8 @@ func TestAsyncBufferThreshold(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
|
|
|
- _, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
|
|
|
- asyncBuffer := FromReader(bytesReader)
|
|
|
+ _, bytesReader := generateSourceData(t, pauseThreshold*2)
|
|
|
+ asyncBuffer := New(bytesReader)
|
|
|
defer asyncBuffer.Close()
|
|
|
|
|
|
target := make([]byte, chunkSize)
|