buffer_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package asyncbuffer
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "errors"
  6. "io"
  7. "os"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. )
  15. const (
  16. halfChunkSize = chunkSize / 2
  17. quaterChunkSize = chunkSize / 4
  18. )
  19. // nopSeekCloser is a wrapper around io.ReadSeeker that implements no-op io.Closer
  20. type nopSeekCloser struct {
  21. io.ReadSeeker
  22. }
  23. // Close implements io.Closer interface, but does nothing
  24. func (nopSeekCloser) Close() error {
  25. return nil
  26. }
  27. // erraticReader is a test reader that simulates a slow read and can fail after reading a certain number of bytes
  28. type erraticReader struct {
  29. reader io.ReadSeekCloser
  30. failAt int64 // if set, will return an error after reading this many bytes
  31. }
  32. // Read reads data from the testReader, simulating a slow read and a potential failure
  33. func (r *erraticReader) Read(p []byte) (n int, err error) {
  34. cur, _ := r.reader.Seek(0, io.SeekCurrent)
  35. if r.failAt > 0 && r.failAt < cur+int64(len(p)) {
  36. return 0, errors.New("simulated read failure")
  37. }
  38. return r.reader.Read(p)
  39. }
  40. // Close forwards closing to the underlying reader
  41. func (r *erraticReader) Close() error {
  42. return r.reader.Close()
  43. }
  44. // blockingReader is a test reader which flushes data in chunks
  45. type blockingReader struct {
  46. reader io.ReadCloser
  47. mu sync.Mutex // locked reader does not return anything
  48. unlocking atomic.Bool // if true, will proceed without locking each chunk
  49. }
  50. // newBlockingReader creates a new partialReader in locked state
  51. func newBlockingReader(reader io.ReadCloser) *blockingReader {
  52. r := &blockingReader{
  53. reader: reader,
  54. }
  55. r.mu.Lock()
  56. return r
  57. }
  58. // flushNextChunk unlocks the reader, allowing it to return the next chunk of data
  59. func (r *blockingReader) flushNextChunk() {
  60. r.mu.Unlock()
  61. }
  62. // flush unlocks the reader, allowing it to return all data as usual
  63. func (r *blockingReader) flush() {
  64. r.unlocking.Store(true) // allow reading data without blocking
  65. r.mu.Unlock() // and continue
  66. }
  67. // Read reads data from the testReader, simulating a slow read and a potential failure
  68. func (r *blockingReader) Read(p []byte) (n int, err error) {
  69. if !r.unlocking.Load() {
  70. r.mu.Lock()
  71. }
  72. n, err = r.reader.Read(p)
  73. return n, err
  74. }
  75. func (r *blockingReader) Close() error { // Close forwards closing to the underlying reader
  76. return r.reader.Close()
  77. }
  78. // generateSourceData generates a byte slice with 4.5 chunks of data
  79. func generateSourceData(t *testing.T, size int64) ([]byte, io.ReadSeekCloser) {
  80. // We use small chunks for tests, let's check the ChunkSize just in case
  81. assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
  82. // Create a byte slice with 4 chunks of ChunkSize
  83. source := make([]byte, size)
  84. // Fill the source with random data
  85. _, err := rand.Read(source)
  86. require.NoError(t, err)
  87. return source, nopSeekCloser{bytes.NewReader(source)}
  88. }
  89. // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
  90. func TestAsyncBufferReadAt(t *testing.T) {
  91. // Let's use source buffer which is 4.5 chunks long
  92. source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
  93. asyncBuffer := FromReader(bytesReader)
  94. defer asyncBuffer.Close()
  95. asyncBuffer.Wait() // Wait for all chunks to be read since we're going to read all data
  96. // Let's read all the data
  97. target := make([]byte, len(source))
  98. n, err := asyncBuffer.readAt(target, 0)
  99. require.NoError(t, err)
  100. assert.Equal(t, len(source), n)
  101. assert.Equal(t, target, source)
  102. // Let's read all the data + a bit more
  103. target = make([]byte, len(source)+1)
  104. n, err = asyncBuffer.readAt(target, 0)
  105. require.ErrorIs(t, err, io.EOF) // We read all the data, and reached end
  106. assert.Equal(t, len(source), n)
  107. assert.Equal(t, target[:n], source)
  108. // Let's read > 1 chunk, but with offset from the beginning and the end
  109. target = make([]byte, len(source)-halfChunkSize)
  110. n, err = asyncBuffer.readAt(target, quaterChunkSize)
  111. require.NoError(t, err)
  112. assert.Equal(t, len(target), n)
  113. assert.Equal(t, target, source[quaterChunkSize:len(source)-quaterChunkSize])
  114. // Let's read some data from the middle of the stream < chunk size
  115. target = make([]byte, chunkSize/4)
  116. n, err = asyncBuffer.readAt(target, chunkSize+chunkSize/4)
  117. require.NoError(t, err)
  118. assert.Equal(t, quaterChunkSize, n)
  119. assert.Equal(t, target, source[chunkSize+quaterChunkSize:chunkSize+quaterChunkSize*2])
  120. // Let's read some data from the latest half chunk
  121. target = make([]byte, quaterChunkSize)
  122. n, err = asyncBuffer.readAt(target, chunkSize*4+quaterChunkSize)
  123. require.NoError(t, err)
  124. assert.Equal(t, quaterChunkSize, n)
  125. assert.Equal(t, target, source[chunkSize*4+quaterChunkSize:chunkSize*4+halfChunkSize])
  126. // Let's try to read more data then available in the stream
  127. target = make([]byte, chunkSize*2)
  128. n, err = asyncBuffer.readAt(target, chunkSize*4)
  129. require.Error(t, err)
  130. assert.Equal(t, err, io.EOF)
  131. assert.Equal(t, chunkSize/2, n)
  132. assert.Equal(t, target[:chunkSize/2], source[chunkSize*4:]) // We read only last half chunk
  133. // Let's try to read data beyond the end of the stream
  134. target = make([]byte, chunkSize*2)
  135. n, err = asyncBuffer.readAt(target, chunkSize*5)
  136. require.Error(t, err)
  137. assert.Equal(t, err, io.EOF)
  138. assert.Equal(t, 0, n)
  139. }
  140. // TestAsyncBufferRead tests reading from AsyncBuffer using ReadAt method
  141. func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
  142. source, bytesReader := generateSourceData(t, 20)
  143. asyncBuffer := FromReader(bytesReader)
  144. defer asyncBuffer.Close()
  145. // First, let's read all the data
  146. target := make([]byte, len(source))
  147. n, err := asyncBuffer.readAt(target, 0)
  148. require.NoError(t, err)
  149. assert.Equal(t, len(source), n)
  150. assert.Equal(t, target, source)
  151. // Let's read some data
  152. target = make([]byte, 2)
  153. n, err = asyncBuffer.readAt(target, 1)
  154. require.NoError(t, err)
  155. assert.Equal(t, len(target), n)
  156. assert.Equal(t, target, source[1:3])
  157. // Let's read some data beyond the end of the stream
  158. target = make([]byte, 2)
  159. n, err = asyncBuffer.readAt(target, 50)
  160. require.Error(t, err)
  161. assert.Equal(t, err, io.EOF)
  162. assert.Equal(t, 0, n)
  163. }
  164. func TestAsyncBufferReader(t *testing.T) {
  165. source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
  166. // Create an AsyncBuffer with the byte slice
  167. asyncBuffer := FromReader(bytesReader)
  168. defer asyncBuffer.Close()
  169. // Let's wait for all chunks to be read
  170. size, err := asyncBuffer.Wait()
  171. require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
  172. assert.Equal(t, int64(chunkSize*4+halfChunkSize), size)
  173. reader := asyncBuffer.Reader()
  174. // Ensure the total length of the data is ChunkSize*4
  175. require.NoError(t, err)
  176. // Read the first two chunks
  177. twoChunks := make([]byte, chunkSize*2)
  178. n, err := reader.Read(twoChunks)
  179. require.NoError(t, err)
  180. assert.Equal(t, chunkSize*2, n)
  181. assert.Equal(t, source[:chunkSize*2], twoChunks)
  182. // Seek to the last chunk + 10 bytes
  183. pos, err := reader.Seek(chunkSize*3+5, io.SeekStart)
  184. require.NoError(t, err)
  185. assert.Equal(t, int64(chunkSize*3+5), pos)
  186. // Read the next 10 bytes
  187. smallSlice := make([]byte, 10)
  188. n, err = reader.Read(smallSlice)
  189. require.NoError(t, err)
  190. assert.Equal(t, 10, n)
  191. assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
  192. // Seek -10 bytes from the current position
  193. pos, err = reader.Seek(-10, io.SeekCurrent)
  194. require.NoError(t, err)
  195. assert.Equal(t, int64(chunkSize*3+5), pos)
  196. // Read data again
  197. n, err = reader.Read(smallSlice)
  198. require.NoError(t, err)
  199. assert.Equal(t, 10, n)
  200. assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
  201. // Seek -10 bytes from end of the stream
  202. pos, err = reader.Seek(-10, io.SeekEnd)
  203. require.NoError(t, err)
  204. assert.Equal(t, size-10, pos)
  205. // Read last 10 bytes
  206. n, err = reader.Read(smallSlice)
  207. require.NoError(t, err)
  208. assert.Equal(t, 10, n)
  209. assert.Equal(t, source[size-10:], smallSlice)
  210. // Seek beyond the end of the stream and try to read
  211. pos, err = reader.Seek(1024, io.SeekCurrent)
  212. require.NoError(t, err)
  213. assert.Equal(t, size+1024, pos)
  214. _, err = reader.Read(smallSlice)
  215. require.ErrorIs(t, err, io.EOF)
  216. }
  217. // TestAsyncBufferClose tests closing the AsyncBuffer
  218. func TestAsyncBufferClose(t *testing.T) {
  219. _, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
  220. // Create an AsyncBuffer with the byte slice
  221. asyncBuffer := FromReader(bytesReader)
  222. reader1 := asyncBuffer.Reader()
  223. reader2 := asyncBuffer.Reader()
  224. asyncBuffer.Close()
  225. b := make([]byte, 10)
  226. _, err := reader1.Read(b)
  227. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  228. _, err = reader2.Read(b)
  229. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  230. // After closing the closed reader, it should not panic
  231. asyncBuffer.Close()
  232. _, err = reader2.Read(b)
  233. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  234. }
  235. // TestAsyncBufferReadAtErrAtSomePoint tests reading from AsyncBuffer using readAt method
  236. // which would fail somewhere
  237. func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
  238. // Let's use source buffer which is 4.5 chunks long
  239. source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
  240. slowReader := &erraticReader{reader: bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
  241. asyncBuffer := FromReader(slowReader)
  242. defer asyncBuffer.Close()
  243. // Let's wait for all chunks to be read
  244. _, err := asyncBuffer.Wait()
  245. require.Error(t, err, "simulated read failure")
  246. // Let's read something, but before error occurs
  247. target := make([]byte, halfChunkSize)
  248. n, err := asyncBuffer.readAt(target, 0)
  249. require.NoError(t, err)
  250. assert.Equal(t, len(target), n)
  251. assert.Equal(t, target, source[:halfChunkSize])
  252. // And again
  253. target = make([]byte, halfChunkSize)
  254. n, err = asyncBuffer.readAt(target, halfChunkSize)
  255. require.NoError(t, err)
  256. assert.Equal(t, len(target), n)
  257. assert.Equal(t, target, source[halfChunkSize:halfChunkSize*2])
  258. // Let's read something, but when error occurs
  259. target = make([]byte, halfChunkSize)
  260. _, err = asyncBuffer.readAt(target, chunkSize*3)
  261. require.Error(t, err, "simulated read failure")
  262. }
  263. // TestAsyncBufferReadAsync tests reading from AsyncBuffer using readAt method
  264. // with full = false
  265. func TestAsyncBufferReadAsync(t *testing.T) {
  266. // Let's use source buffer which is 4.5 chunks long
  267. source, bytesReader := generateSourceData(t, int64(chunkSize)*3)
  268. blockingReader := newBlockingReader(bytesReader)
  269. asyncBuffer := FromReader(blockingReader)
  270. defer asyncBuffer.Close()
  271. // flush the first chunk to allow reading
  272. blockingReader.flushNextChunk()
  273. // Let's try to read first two chunks, however,
  274. // we know that only the first chunk is available
  275. target := make([]byte, chunkSize*2)
  276. n, err := asyncBuffer.readAt(target, 0)
  277. require.NoError(t, err)
  278. assert.Equal(t, chunkSize, n)
  279. assert.Equal(t, target[:chunkSize], source[:chunkSize])
  280. blockingReader.flushNextChunk() // unlock reader to allow read second chunk
  281. asyncBuffer.WaitFor(chunkSize + 1) // wait for the second chunk to be available
  282. target = make([]byte, chunkSize*2)
  283. n, err = asyncBuffer.readAt(target, 0)
  284. require.NoError(t, err)
  285. assert.Equal(t, chunkSize*2, n)
  286. assert.Equal(t, target, source[:chunkSize*2])
  287. blockingReader.flush() // Flush the rest of the data
  288. asyncBuffer.Wait()
  289. // Try to read near end of the stream, EOF
  290. target = make([]byte, chunkSize)
  291. n, err = asyncBuffer.readAt(target, chunkSize*3-1)
  292. require.ErrorIs(t, err, io.EOF)
  293. assert.Equal(t, 1, n)
  294. assert.Equal(t, target[0], source[chunkSize*3-1])
  295. // Try to read beyond the end of the stream == eof
  296. target = make([]byte, chunkSize)
  297. n, err = asyncBuffer.readAt(target, chunkSize*3)
  298. require.ErrorIs(t, io.EOF, err)
  299. assert.Equal(t, 0, n)
  300. }
  301. // TestAsyncBufferReadAllCompability tests that ReadAll methods works as expected
  302. func TestAsyncBufferReadAllCompability(t *testing.T) {
  303. source, err := os.ReadFile("../testdata/test1.jpg")
  304. require.NoError(t, err)
  305. asyncBuffer := FromReader(nopSeekCloser{bytes.NewReader(source)})
  306. defer asyncBuffer.Close()
  307. b, err := io.ReadAll(asyncBuffer.Reader())
  308. require.NoError(t, err)
  309. require.Len(t, b, len(source))
  310. }
  311. func TestAsyncBufferThreshold(t *testing.T) {
  312. _, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
  313. asyncBuffer := FromReader(bytesReader)
  314. defer asyncBuffer.Close()
  315. target := make([]byte, chunkSize)
  316. n, err := asyncBuffer.readAt(target, 0)
  317. require.NoError(t, err)
  318. assert.Equal(t, chunkSize, n)
  319. // Ensure that buffer hits the pause threshold
  320. require.Eventually(t, func() bool {
  321. return asyncBuffer.len.Load() >= pauseThreshold
  322. }, 300*time.Millisecond, 10*time.Millisecond)
  323. // Ensure that buffer never reaches the end of the stream
  324. require.Never(t, func() bool {
  325. return asyncBuffer.len.Load() >= pauseThreshold*2-1
  326. }, 300*time.Millisecond, 10*time.Millisecond)
  327. // Let's hit the pause threshold
  328. target = make([]byte, pauseThreshold)
  329. n, err = asyncBuffer.readAt(target, 0)
  330. require.NoError(t, err)
  331. require.Equal(t, pauseThreshold, n)
  332. // Ensure that buffer never reaches the end of the stream
  333. require.Never(t, func() bool {
  334. return asyncBuffer.len.Load() >= pauseThreshold*2-1
  335. }, 300*time.Millisecond, 10*time.Millisecond)
  336. // Let's hit the pause threshold
  337. target = make([]byte, pauseThreshold+1)
  338. n, err = asyncBuffer.readAt(target, 0)
  339. require.NoError(t, err)
  340. require.Equal(t, pauseThreshold, n)
  341. // It usually returns only pauseThreshold bytes because this exact operation unpauses the reader,
  342. // but the initial offset is before the threshold, data beyond the threshold may not be available.
  343. assert.GreaterOrEqual(t, pauseThreshold, n)
  344. // Ensure that buffer hits the end of the stream
  345. require.Eventually(t, func() bool {
  346. return asyncBuffer.len.Load() >= pauseThreshold*2
  347. }, 300*time.Millisecond, 10*time.Millisecond)
  348. }
  349. func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
  350. _, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
  351. asyncBuffer := FromReader(bytesReader)
  352. defer asyncBuffer.Close()
  353. target := make([]byte, chunkSize)
  354. n, err := asyncBuffer.readAt(target, pauseThreshold+1)
  355. require.NoError(t, err)
  356. assert.GreaterOrEqual(t, chunkSize, n)
  357. // Ensure that buffer hits the end of the stream
  358. require.Eventually(t, func() bool {
  359. return asyncBuffer.len.Load() >= pauseThreshold*2
  360. }, 300*time.Millisecond, 10*time.Millisecond)
  361. }