buffer_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. package asyncbuffer
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "errors"
  6. "io"
  7. "os"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. )
  15. const (
  16. halfChunkSize = chunkSize / 2
  17. quaterChunkSize = chunkSize / 4
  18. )
  19. // nopSeekCloser is a wrapper around io.ReadSeeker that implements no-op io.Closer
  20. type nopSeekCloser struct {
  21. io.ReadSeeker
  22. }
  23. // Close implements io.Closer interface, but does nothing
  24. func (nopSeekCloser) Close() error {
  25. return nil
  26. }
  27. // erraticReader is a test reader that simulates a slow read and can fail after reading a certain number of bytes
  28. type erraticReader struct {
  29. reader io.ReadSeekCloser
  30. failAt int64 // if set, will return an error after reading this many bytes
  31. }
  32. // Read reads data from the testReader, simulating a slow read and a potential failure
  33. func (r *erraticReader) Read(p []byte) (n int, err error) {
  34. cur, _ := r.reader.Seek(0, io.SeekCurrent)
  35. if r.failAt > 0 && r.failAt < cur+int64(len(p)) {
  36. return 0, errors.New("simulated read failure")
  37. }
  38. return r.reader.Read(p)
  39. }
  40. // Close forwards closing to the underlying reader
  41. func (r *erraticReader) Close() error {
  42. return r.reader.Close()
  43. }
  44. // blockingReader is a test reader which flushes data in chunks
  45. type blockingReader struct {
  46. reader io.ReadCloser
  47. mu sync.Mutex // locked reader does not return anything
  48. unlocking atomic.Bool // if true, will proceed without locking each chunk
  49. }
  50. // newBlockingReader creates a new partialReader in locked state
  51. func newBlockingReader(reader io.ReadCloser) *blockingReader {
  52. r := &blockingReader{
  53. reader: reader,
  54. }
  55. r.mu.Lock()
  56. return r
  57. }
  58. // flushNextChunk unlocks the reader, allowing it to return the next chunk of data
  59. func (r *blockingReader) flushNextChunk() {
  60. r.mu.Unlock()
  61. }
  62. // flush unlocks the reader, allowing it to return all data as usual
  63. func (r *blockingReader) flush() {
  64. r.unlocking.Store(true) // allow reading data without blocking
  65. r.mu.Unlock() // and continue
  66. }
  67. // Read reads data from the testReader, simulating a slow read and a potential failure
  68. func (r *blockingReader) Read(p []byte) (n int, err error) {
  69. if !r.unlocking.Load() {
  70. r.mu.Lock()
  71. }
  72. n, err = r.reader.Read(p)
  73. return n, err
  74. }
  75. func (r *blockingReader) Close() error { // Close forwards closing to the underlying reader
  76. return r.reader.Close()
  77. }
  78. // generateSourceData generates a byte slice with 4.5 chunks of data
  79. func generateSourceData(t *testing.T, size int) ([]byte, io.ReadSeekCloser) {
  80. // We use small chunks for tests, let's check the ChunkSize just in case
  81. assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
  82. // Create a byte slice with 4 chunks of ChunkSize
  83. source := make([]byte, size)
  84. // Fill the source with random data
  85. _, err := rand.Read(source)
  86. require.NoError(t, err)
  87. return source, nopSeekCloser{bytes.NewReader(source)}
  88. }
  89. // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
  90. func TestAsyncBufferReadAt(t *testing.T) {
  91. // Let's use source buffer which is 4.5 chunks long
  92. source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
  93. asyncBuffer := New(bytesReader, -1)
  94. defer asyncBuffer.Close()
  95. asyncBuffer.Wait() // Wait for all chunks to be read since we're going to read all data
  96. // Let's read all the data
  97. target := make([]byte, len(source))
  98. n, err := asyncBuffer.readAt(target, 0)
  99. require.NoError(t, err)
  100. assert.Equal(t, len(source), n)
  101. assert.Equal(t, target, source)
  102. // Let's read all the data + a bit more
  103. target = make([]byte, len(source)+1)
  104. n, err = asyncBuffer.readAt(target, 0)
  105. require.NoError(t, err) // We read all the data, and reached end
  106. assert.Equal(t, len(source), n)
  107. assert.Equal(t, target[:n], source)
  108. // Let's read > 1 chunk, but with offset from the beginning and the end
  109. target = make([]byte, len(source)-halfChunkSize)
  110. n, err = asyncBuffer.readAt(target, quaterChunkSize)
  111. require.NoError(t, err)
  112. assert.Equal(t, len(target), n)
  113. assert.Equal(t, target, source[quaterChunkSize:len(source)-quaterChunkSize])
  114. // Let's read some data from the middle of the stream < chunk size
  115. target = make([]byte, chunkSize/4)
  116. n, err = asyncBuffer.readAt(target, chunkSize+chunkSize/4)
  117. require.NoError(t, err)
  118. assert.Equal(t, quaterChunkSize, n)
  119. assert.Equal(t, target, source[chunkSize+quaterChunkSize:chunkSize+quaterChunkSize*2])
  120. // Let's read some data from the latest half chunk
  121. target = make([]byte, quaterChunkSize)
  122. n, err = asyncBuffer.readAt(target, chunkSize*4+quaterChunkSize)
  123. require.NoError(t, err)
  124. assert.Equal(t, quaterChunkSize, n)
  125. assert.Equal(t, target, source[chunkSize*4+quaterChunkSize:chunkSize*4+halfChunkSize])
  126. // Let's try to read more data then available in the stream
  127. target = make([]byte, chunkSize*2)
  128. n, err = asyncBuffer.readAt(target, chunkSize*4)
  129. require.NoError(t, err)
  130. assert.Equal(t, chunkSize/2, n)
  131. assert.Equal(t, target[:chunkSize/2], source[chunkSize*4:]) // We read only last half chunk
  132. // Let's try to read data beyond the end of the stream
  133. target = make([]byte, chunkSize*2)
  134. n, err = asyncBuffer.readAt(target, chunkSize*5)
  135. require.Error(t, err)
  136. assert.Equal(t, err, io.EOF)
  137. assert.Equal(t, 0, n)
  138. }
  139. // TestAsyncBufferRead tests reading from AsyncBuffer using ReadAt method
  140. func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
  141. source, bytesReader := generateSourceData(t, 20)
  142. asyncBuffer := New(bytesReader, -1)
  143. defer asyncBuffer.Close()
  144. // First, let's read all the data
  145. target := make([]byte, len(source))
  146. n, err := asyncBuffer.readAt(target, 0)
  147. require.NoError(t, err)
  148. assert.Equal(t, len(source), n)
  149. assert.Equal(t, target, source)
  150. // Let's read some data
  151. target = make([]byte, 2)
  152. n, err = asyncBuffer.readAt(target, 1)
  153. require.NoError(t, err)
  154. assert.Equal(t, len(target), n)
  155. assert.Equal(t, target, source[1:3])
  156. // Let's read some data beyond the end of the stream
  157. target = make([]byte, 2)
  158. n, err = asyncBuffer.readAt(target, 50)
  159. require.Error(t, err)
  160. assert.Equal(t, err, io.EOF)
  161. assert.Equal(t, 0, n)
  162. }
  163. func TestAsyncBufferReader(t *testing.T) {
  164. source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
  165. // Create an AsyncBuffer with the byte slice
  166. asyncBuffer := New(bytesReader, -1)
  167. defer asyncBuffer.Close()
  168. // Let's wait for all chunks to be read
  169. size, err := asyncBuffer.Wait()
  170. require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
  171. assert.Equal(t, chunkSize*4+halfChunkSize, size)
  172. reader := asyncBuffer.Reader()
  173. // Ensure the total length of the data is ChunkSize*4
  174. require.NoError(t, err)
  175. // Read the first two chunks
  176. twoChunks := make([]byte, chunkSize*2)
  177. n, err := reader.Read(twoChunks)
  178. require.NoError(t, err)
  179. assert.Equal(t, chunkSize*2, n)
  180. assert.Equal(t, source[:chunkSize*2], twoChunks)
  181. // Seek to the last chunk + 10 bytes
  182. pos, err := reader.Seek(chunkSize*3+5, io.SeekStart)
  183. require.NoError(t, err)
  184. assert.Equal(t, int64(chunkSize*3+5), pos)
  185. // Read the next 10 bytes
  186. smallSlice := make([]byte, 10)
  187. n, err = reader.Read(smallSlice)
  188. require.NoError(t, err)
  189. assert.Equal(t, 10, n)
  190. assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
  191. // Seek -10 bytes from the current position
  192. pos, err = reader.Seek(-10, io.SeekCurrent)
  193. require.NoError(t, err)
  194. assert.Equal(t, int64(chunkSize*3+5), pos)
  195. // Read data again
  196. n, err = reader.Read(smallSlice)
  197. require.NoError(t, err)
  198. assert.Equal(t, 10, n)
  199. assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
  200. // Seek -10 bytes from end of the stream
  201. pos, err = reader.Seek(-10, io.SeekEnd)
  202. require.NoError(t, err)
  203. assert.Equal(t, size-10, int(pos))
  204. // Read last 10 bytes
  205. n, err = reader.Read(smallSlice)
  206. require.NoError(t, err)
  207. assert.Equal(t, 10, n)
  208. assert.Equal(t, source[size-10:], smallSlice)
  209. // Seek beyond the end of the stream and try to read
  210. pos, err = reader.Seek(1024, io.SeekCurrent)
  211. require.NoError(t, err)
  212. assert.Equal(t, size+1024, int(pos))
  213. _, err = reader.Read(smallSlice)
  214. require.ErrorIs(t, err, io.EOF)
  215. }
  216. // TestAsyncBufferClose tests closing the AsyncBuffer
  217. func TestAsyncBufferClose(t *testing.T) {
  218. _, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
  219. // Create an AsyncBuffer with the byte slice
  220. asyncBuffer := New(bytesReader, -1)
  221. reader1 := asyncBuffer.Reader()
  222. reader2 := asyncBuffer.Reader()
  223. asyncBuffer.Close()
  224. b := make([]byte, 10)
  225. _, err := reader1.Read(b)
  226. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  227. _, err = reader2.Read(b)
  228. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  229. // After closing the closed reader, it should not panic
  230. asyncBuffer.Close()
  231. _, err = reader2.Read(b)
  232. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  233. }
  234. // TestAsyncBufferReadAtErrAtSomePoint tests reading from AsyncBuffer using readAt method
  235. // which would fail somewhere
  236. func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
  237. // Let's use source buffer which is 4.5 chunks long
  238. source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
  239. slowReader := &erraticReader{reader: bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
  240. asyncBuffer := New(slowReader, -1)
  241. defer asyncBuffer.Close()
  242. // Let's wait for all chunks to be read
  243. _, err := asyncBuffer.Wait()
  244. require.Error(t, err, "simulated read failure")
  245. // Let's read something, but before error occurs
  246. target := make([]byte, halfChunkSize)
  247. n, err := asyncBuffer.readAt(target, 0)
  248. require.NoError(t, err)
  249. assert.Equal(t, len(target), n)
  250. assert.Equal(t, target, source[:halfChunkSize])
  251. // And again
  252. target = make([]byte, halfChunkSize)
  253. n, err = asyncBuffer.readAt(target, halfChunkSize)
  254. require.NoError(t, err)
  255. assert.Equal(t, len(target), n)
  256. assert.Equal(t, target, source[halfChunkSize:halfChunkSize*2])
  257. // Let's read something, but when error occurs
  258. target = make([]byte, halfChunkSize)
  259. _, err = asyncBuffer.readAt(target, chunkSize*3)
  260. require.Error(t, err, "simulated read failure")
  261. }
  262. // TestAsyncBufferReadAsync tests reading from AsyncBuffer using readAt method
  263. // with full = false
  264. func TestAsyncBufferReadAsync(t *testing.T) {
  265. // Let's use source buffer which is 4.5 chunks long
  266. source, bytesReader := generateSourceData(t, chunkSize*3)
  267. blockingReader := newBlockingReader(bytesReader)
  268. asyncBuffer := New(blockingReader, -1)
  269. defer asyncBuffer.Close()
  270. // flush the first chunk to allow reading
  271. blockingReader.flushNextChunk()
  272. // Let's try to read first two chunks, however,
  273. // we know that only the first chunk is available
  274. target := make([]byte, chunkSize*2)
  275. n, err := asyncBuffer.readAt(target, 0)
  276. require.NoError(t, err)
  277. assert.Equal(t, chunkSize, n)
  278. assert.Equal(t, target[:chunkSize], source[:chunkSize])
  279. blockingReader.flushNextChunk() // unlock reader to allow read second chunk
  280. asyncBuffer.WaitFor(chunkSize + 1) // wait for the second chunk to be available
  281. target = make([]byte, chunkSize*2)
  282. n, err = asyncBuffer.readAt(target, 0)
  283. require.NoError(t, err)
  284. assert.Equal(t, chunkSize*2, n)
  285. assert.Equal(t, target, source[:chunkSize*2])
  286. blockingReader.flush() // Flush the rest of the data
  287. asyncBuffer.Wait()
  288. // Try to read near end of the stream, EOF
  289. target = make([]byte, chunkSize)
  290. n, err = asyncBuffer.readAt(target, chunkSize*3-1)
  291. require.NoError(t, err)
  292. assert.Equal(t, 1, n)
  293. assert.Equal(t, target[0], source[chunkSize*3-1])
  294. // Try to read beyond the end of the stream == eof
  295. target = make([]byte, chunkSize)
  296. n, err = asyncBuffer.readAt(target, chunkSize*3)
  297. require.ErrorIs(t, io.EOF, err)
  298. assert.Equal(t, 0, n)
  299. }
  300. // TestAsyncBufferWithDataLenAndExactReaderSize tests that AsyncBuffer doesn't
  301. // return an error when the expected data length is set and matches the reader size
  302. func TestAsyncBufferWithDataLenAndExactReaderSize(t *testing.T) {
  303. source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
  304. asyncBuffer := New(bytesReader, len(source))
  305. defer asyncBuffer.Close()
  306. // Let's wait for all chunks to be read
  307. size, err := asyncBuffer.Wait()
  308. require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
  309. assert.Equal(t, len(source), size)
  310. }
  311. // TestAsyncBufferWithDataLenAndShortReaderSize tests that AsyncBuffer returns
  312. // io.ErrUnexpectedEOF when the expected data length is set and the reader size
  313. // is shorter than the expected data length
  314. func TestAsyncBufferWithDataLenAndShortReaderSize(t *testing.T) {
  315. source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
  316. asyncBuffer := New(bytesReader, len(source)+100) // 100 bytes more than the source
  317. defer asyncBuffer.Close()
  318. // Let's wait for all chunks to be read
  319. size, err := asyncBuffer.Wait()
  320. require.Equal(t, len(source), size)
  321. require.ErrorIs(t, err, io.ErrUnexpectedEOF,
  322. "AsyncBuffer should return io.ErrUnexpectedEOF when data length is longer than reader size")
  323. }
  324. // TestAsyncBufferWithDataLenAndLongerReaderSize tests that AsyncBuffer doesn't
  325. // read more data than specified by the expected data length and doesn't return an error
  326. // when the reader size is longer than the expected data length
  327. func TestAsyncBufferWithDataLenAndLongerReaderSize(t *testing.T) {
  328. source, bytesReader := generateSourceData(t, chunkSize*4+halfChunkSize)
  329. asyncBuffer := New(bytesReader, len(source)-100) // 100 bytes less than the source
  330. defer asyncBuffer.Close()
  331. // Let's wait for all chunks to be read
  332. size, err := asyncBuffer.Wait()
  333. require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
  334. assert.Equal(t, len(source)-100, size,
  335. "AsyncBuffer should read only the specified amount of data when data length is set")
  336. }
  337. // TestAsyncBufferReadAllCompability tests that ReadAll methods works as expected
  338. func TestAsyncBufferReadAllCompability(t *testing.T) {
  339. source, err := os.ReadFile("../testdata/test1.jpg")
  340. require.NoError(t, err)
  341. asyncBuffer := New(nopSeekCloser{bytes.NewReader(source)}, -1)
  342. defer asyncBuffer.Close()
  343. b, err := io.ReadAll(asyncBuffer.Reader())
  344. require.NoError(t, err)
  345. require.Len(t, b, len(source))
  346. }
  347. func TestAsyncBufferThreshold(t *testing.T) {
  348. _, bytesReader := generateSourceData(t, pauseThreshold*2)
  349. asyncBuffer := New(bytesReader, -1)
  350. defer asyncBuffer.Close()
  351. target := make([]byte, chunkSize)
  352. n, err := asyncBuffer.readAt(target, 0)
  353. require.NoError(t, err)
  354. assert.Equal(t, chunkSize, n)
  355. // Ensure that buffer hits the pause threshold
  356. require.Eventually(t, func() bool {
  357. return asyncBuffer.bytesRead.Load() >= pauseThreshold
  358. }, 300*time.Millisecond, 10*time.Millisecond)
  359. // Ensure that buffer never reaches the end of the stream
  360. require.Never(t, func() bool {
  361. return asyncBuffer.bytesRead.Load() >= pauseThreshold*2-1
  362. }, 300*time.Millisecond, 10*time.Millisecond)
  363. // Let's hit the pause threshold
  364. target = make([]byte, pauseThreshold)
  365. n, err = asyncBuffer.readAt(target, 0)
  366. require.NoError(t, err)
  367. require.Equal(t, pauseThreshold, n)
  368. // Ensure that buffer never reaches the end of the stream
  369. require.Never(t, func() bool {
  370. return asyncBuffer.bytesRead.Load() >= pauseThreshold*2-1
  371. }, 300*time.Millisecond, 10*time.Millisecond)
  372. // Let's hit the pause threshold
  373. target = make([]byte, pauseThreshold+1)
  374. n, err = asyncBuffer.readAt(target, 0)
  375. require.NoError(t, err)
  376. // It usually returns only pauseThreshold bytes because this exact operation unpauses the reader,
  377. // but the initial offset is before the threshold, data beyond the threshold may not be available.
  378. assert.GreaterOrEqual(t, pauseThreshold, n)
  379. // Ensure that buffer hits the end of the stream
  380. require.Eventually(t, func() bool {
  381. return asyncBuffer.bytesRead.Load() >= pauseThreshold*2
  382. }, 300*time.Millisecond, 10*time.Millisecond)
  383. }
  384. func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
  385. _, bytesReader := generateSourceData(t, pauseThreshold*2)
  386. asyncBuffer := New(bytesReader, -1)
  387. defer asyncBuffer.Close()
  388. target := make([]byte, chunkSize)
  389. n, err := asyncBuffer.readAt(target, pauseThreshold+1)
  390. require.NoError(t, err)
  391. assert.GreaterOrEqual(t, chunkSize, n)
  392. // Ensure that buffer hits the end of the stream
  393. require.Eventually(t, func() bool {
  394. return asyncBuffer.bytesRead.Load() >= pauseThreshold*2
  395. }, 300*time.Millisecond, 10*time.Millisecond)
  396. }