buffer_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. package asyncbuffer
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "errors"
  6. "io"
  7. "os"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. )
  15. const (
  16. halfChunkSize = chunkSize / 2
  17. quaterChunkSize = chunkSize / 4
  18. )
  19. // erraticReader is a test reader that simulates a slow read and can fail after reading a certain number of bytes
  20. type erraticReader struct {
  21. reader bytes.Reader
  22. failAt int64 // if set, will return an error after reading this many bytes
  23. }
  24. // Read reads data from the testReader, simulating a slow read and a potential failure
  25. func (r *erraticReader) Read(p []byte) (n int, err error) {
  26. cur, _ := r.reader.Seek(0, io.SeekCurrent)
  27. if r.failAt > 0 && r.failAt < cur+int64(len(p)) {
  28. return 0, errors.New("simulated read failure")
  29. }
  30. return r.reader.Read(p)
  31. }
  32. // blockingReader is a test reader which flushes data in chunks
  33. type blockingReader struct {
  34. reader bytes.Reader
  35. mu sync.Mutex // locked reader does not return anything
  36. unlocking atomic.Bool // if true, will proceed without locking each chunk
  37. }
  38. // newBlockingReader creates a new partialReader in locked state
  39. func newBlockingReader(reader bytes.Reader) *blockingReader {
  40. r := &blockingReader{
  41. reader: reader,
  42. }
  43. r.mu.Lock()
  44. return r
  45. }
  46. // flushNextChunk unlocks the reader, allowing it to return the next chunk of data
  47. func (r *blockingReader) flushNextChunk() {
  48. r.mu.Unlock()
  49. }
  50. // flush unlocks the reader, allowing it to return all data as usual
  51. func (r *blockingReader) flush() {
  52. r.unlocking.Store(true) // allow reading data without blocking
  53. r.mu.Unlock() // and continue
  54. }
  55. // Read reads data from the testReader, simulating a slow read and a potential failure
  56. func (r *blockingReader) Read(p []byte) (n int, err error) {
  57. if !r.unlocking.Load() {
  58. r.mu.Lock()
  59. }
  60. n, err = r.reader.Read(p)
  61. return n, err
  62. }
  63. // generateSourceData generates a byte slice with 4.5 chunks of data
  64. func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
  65. // We use small chunks for tests, let's check the ChunkSize just in case
  66. assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
  67. // Create a byte slice with 4 chunks of ChunkSize
  68. source := make([]byte, size)
  69. // Fill the source with random data
  70. _, err := rand.Read(source)
  71. require.NoError(t, err)
  72. return source, bytes.NewReader(source)
  73. }
  74. // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
  75. func TestAsyncBufferReadAt(t *testing.T) {
  76. // Let's use source buffer which is 4.5 chunks long
  77. source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
  78. asyncBuffer := FromReader(bytesReader)
  79. defer asyncBuffer.Close()
  80. asyncBuffer.Wait() // Wait for all chunks to be read since we're going to read all data
  81. // Let's read all the data
  82. target := make([]byte, len(source))
  83. n, err := asyncBuffer.readAt(target, 0)
  84. require.NoError(t, err)
  85. assert.Equal(t, len(source), n)
  86. assert.Equal(t, target, source)
  87. // Let's read all the data + a bit more
  88. target = make([]byte, len(source)+1)
  89. n, err = asyncBuffer.readAt(target, 0)
  90. require.ErrorIs(t, err, io.EOF) // We read all the data, and reached end
  91. assert.Equal(t, len(source), n)
  92. assert.Equal(t, target[:n], source)
  93. // Let's read > 1 chunk, but with offset from the beginning and the end
  94. target = make([]byte, len(source)-halfChunkSize)
  95. n, err = asyncBuffer.readAt(target, quaterChunkSize)
  96. require.NoError(t, err)
  97. assert.Equal(t, len(target), n)
  98. assert.Equal(t, target, source[quaterChunkSize:len(source)-quaterChunkSize])
  99. // Let's read some data from the middle of the stream < chunk size
  100. target = make([]byte, chunkSize/4)
  101. n, err = asyncBuffer.readAt(target, chunkSize+chunkSize/4)
  102. require.NoError(t, err)
  103. assert.Equal(t, quaterChunkSize, n)
  104. assert.Equal(t, target, source[chunkSize+quaterChunkSize:chunkSize+quaterChunkSize*2])
  105. // Let's read some data from the latest half chunk
  106. target = make([]byte, quaterChunkSize)
  107. n, err = asyncBuffer.readAt(target, chunkSize*4+quaterChunkSize)
  108. require.NoError(t, err)
  109. assert.Equal(t, quaterChunkSize, n)
  110. assert.Equal(t, target, source[chunkSize*4+quaterChunkSize:chunkSize*4+halfChunkSize])
  111. // Let's try to read more data then available in the stream
  112. target = make([]byte, chunkSize*2)
  113. n, err = asyncBuffer.readAt(target, chunkSize*4)
  114. require.Error(t, err)
  115. assert.Equal(t, err, io.EOF)
  116. assert.Equal(t, chunkSize/2, n)
  117. assert.Equal(t, target[:chunkSize/2], source[chunkSize*4:]) // We read only last half chunk
  118. // Let's try to read data beyond the end of the stream
  119. target = make([]byte, chunkSize*2)
  120. n, err = asyncBuffer.readAt(target, chunkSize*5)
  121. require.Error(t, err)
  122. assert.Equal(t, err, io.EOF)
  123. assert.Equal(t, 0, n)
  124. }
  125. // TestAsyncBufferRead tests reading from AsyncBuffer using ReadAt method
  126. func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
  127. source, bytesReader := generateSourceData(t, 20)
  128. asyncBuffer := FromReader(bytesReader)
  129. defer asyncBuffer.Close()
  130. // First, let's read all the data
  131. target := make([]byte, len(source))
  132. n, err := asyncBuffer.readAt(target, 0)
  133. require.NoError(t, err)
  134. assert.Equal(t, len(source), n)
  135. assert.Equal(t, target, source)
  136. // Let's read some data
  137. target = make([]byte, 2)
  138. n, err = asyncBuffer.readAt(target, 1)
  139. require.NoError(t, err)
  140. assert.Equal(t, len(target), n)
  141. assert.Equal(t, target, source[1:3])
  142. // Let's read some data beyond the end of the stream
  143. target = make([]byte, 2)
  144. n, err = asyncBuffer.readAt(target, 50)
  145. require.Error(t, err)
  146. assert.Equal(t, err, io.EOF)
  147. assert.Equal(t, 0, n)
  148. }
  149. func TestAsyncBufferReader(t *testing.T) {
  150. source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
  151. // Create an AsyncBuffer with the byte slice
  152. asyncBuffer := FromReader(bytesReader)
  153. defer asyncBuffer.Close()
  154. // Let's wait for all chunks to be read
  155. size, err := asyncBuffer.Wait()
  156. require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
  157. assert.Equal(t, int64(chunkSize*4+halfChunkSize), size)
  158. reader := asyncBuffer.Reader()
  159. // Ensure the total length of the data is ChunkSize*4
  160. require.NoError(t, err)
  161. // Read the first two chunks
  162. twoChunks := make([]byte, chunkSize*2)
  163. n, err := reader.Read(twoChunks)
  164. require.NoError(t, err)
  165. assert.Equal(t, chunkSize*2, n)
  166. assert.Equal(t, source[:chunkSize*2], twoChunks)
  167. // Seek to the last chunk + 10 bytes
  168. pos, err := reader.Seek(chunkSize*3+5, io.SeekStart)
  169. require.NoError(t, err)
  170. assert.Equal(t, int64(chunkSize*3+5), pos)
  171. // Read the next 10 bytes
  172. smallSlice := make([]byte, 10)
  173. n, err = reader.Read(smallSlice)
  174. require.NoError(t, err)
  175. assert.Equal(t, 10, n)
  176. assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
  177. // Seek -10 bytes from the current position
  178. pos, err = reader.Seek(-10, io.SeekCurrent)
  179. require.NoError(t, err)
  180. assert.Equal(t, int64(chunkSize*3+5), pos)
  181. // Read data again
  182. n, err = reader.Read(smallSlice)
  183. require.NoError(t, err)
  184. assert.Equal(t, 10, n)
  185. assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
  186. // Seek -10 bytes from end of the stream
  187. pos, err = reader.Seek(-10, io.SeekEnd)
  188. require.NoError(t, err)
  189. assert.Equal(t, size-10, pos)
  190. // Read last 10 bytes
  191. n, err = reader.Read(smallSlice)
  192. require.NoError(t, err)
  193. assert.Equal(t, 10, n)
  194. assert.Equal(t, source[size-10:], smallSlice)
  195. // Seek beyond the end of the stream and try to read
  196. pos, err = reader.Seek(1024, io.SeekCurrent)
  197. require.NoError(t, err)
  198. assert.Equal(t, size+1024, pos)
  199. _, err = reader.Read(smallSlice)
  200. require.ErrorIs(t, err, io.EOF)
  201. }
  202. // TestAsyncBufferClose tests closing the AsyncBuffer
  203. func TestAsyncBufferClose(t *testing.T) {
  204. _, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
  205. // Create an AsyncBuffer with the byte slice
  206. asyncBuffer := FromReader(bytesReader)
  207. reader1 := asyncBuffer.Reader()
  208. reader2 := asyncBuffer.Reader()
  209. asyncBuffer.Close()
  210. b := make([]byte, 10)
  211. _, err := reader1.Read(b)
  212. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  213. _, err = reader2.Read(b)
  214. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  215. // After closing the closed reader, it should not panic
  216. asyncBuffer.Close()
  217. _, err = reader2.Read(b)
  218. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  219. }
  220. // TestAsyncBufferReadAtErrAtSomePoint tests reading from AsyncBuffer using readAt method
  221. // which would fail somewhere
  222. func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
  223. // Let's use source buffer which is 4.5 chunks long
  224. source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
  225. slowReader := &erraticReader{reader: *bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
  226. asyncBuffer := FromReader(slowReader)
  227. defer asyncBuffer.Close()
  228. // Let's wait for all chunks to be read
  229. _, err := asyncBuffer.Wait()
  230. require.Error(t, err, "simulated read failure")
  231. // Let's read something, but before error occurs
  232. target := make([]byte, halfChunkSize)
  233. n, err := asyncBuffer.readAt(target, 0)
  234. require.NoError(t, err)
  235. assert.Equal(t, len(target), n)
  236. assert.Equal(t, target, source[:halfChunkSize])
  237. // And again
  238. target = make([]byte, halfChunkSize)
  239. n, err = asyncBuffer.readAt(target, halfChunkSize)
  240. require.NoError(t, err)
  241. assert.Equal(t, len(target), n)
  242. assert.Equal(t, target, source[halfChunkSize:halfChunkSize*2])
  243. // Let's read something, but when error occurs
  244. target = make([]byte, halfChunkSize)
  245. _, err = asyncBuffer.readAt(target, chunkSize*3)
  246. require.Error(t, err, "simulated read failure")
  247. }
  248. // TestAsyncBufferReadAsync tests reading from AsyncBuffer using readAt method
  249. // with full = false
  250. func TestAsyncBufferReadAsync(t *testing.T) {
  251. // Let's use source buffer which is 4.5 chunks long
  252. source, bytesReader := generateSourceData(t, int64(chunkSize)*3)
  253. blockingReader := newBlockingReader(*bytesReader)
  254. asyncBuffer := FromReader(blockingReader)
  255. defer asyncBuffer.Close()
  256. // flush the first chunk to allow reading
  257. blockingReader.flushNextChunk()
  258. // Let's try to read first two chunks, however,
  259. // we know that only the first chunk is available
  260. target := make([]byte, chunkSize*2)
  261. n, err := asyncBuffer.readAt(target, 0)
  262. require.NoError(t, err)
  263. assert.Equal(t, chunkSize, n)
  264. assert.Equal(t, target[:chunkSize], source[:chunkSize])
  265. blockingReader.flushNextChunk() // unlock reader to allow read second chunk
  266. asyncBuffer.WaitFor(chunkSize + 1) // wait for the second chunk to be available
  267. target = make([]byte, chunkSize*2)
  268. n, err = asyncBuffer.readAt(target, 0)
  269. require.NoError(t, err)
  270. assert.Equal(t, chunkSize*2, n)
  271. assert.Equal(t, target, source[:chunkSize*2])
  272. blockingReader.flush() // Flush the rest of the data
  273. asyncBuffer.Wait()
  274. // Try to read near end of the stream, EOF
  275. target = make([]byte, chunkSize)
  276. n, err = asyncBuffer.readAt(target, chunkSize*3-1)
  277. require.ErrorIs(t, err, io.EOF)
  278. assert.Equal(t, 1, n)
  279. assert.Equal(t, target[0], source[chunkSize*3-1])
  280. // Try to read beyond the end of the stream == eof
  281. target = make([]byte, chunkSize)
  282. n, err = asyncBuffer.readAt(target, chunkSize*3)
  283. require.ErrorIs(t, io.EOF, err)
  284. assert.Equal(t, 0, n)
  285. }
  286. // TestAsyncBufferReadAllCompability tests that ReadAll methods works as expected
  287. func TestAsyncBufferReadAllCompability(t *testing.T) {
  288. source, err := os.ReadFile("../testdata/test1.jpg")
  289. require.NoError(t, err)
  290. asyncBuffer := FromReader(bytes.NewReader(source))
  291. defer asyncBuffer.Close()
  292. b, err := io.ReadAll(asyncBuffer.Reader())
  293. require.NoError(t, err)
  294. require.Len(t, b, len(source))
  295. }
  296. func TestAsyncBufferThreshold(t *testing.T) {
  297. _, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
  298. asyncBuffer := FromReader(bytesReader)
  299. defer asyncBuffer.Close()
  300. target := make([]byte, chunkSize)
  301. n, err := asyncBuffer.readAt(target, 0)
  302. require.NoError(t, err)
  303. assert.Equal(t, chunkSize, n)
  304. // Ensure that buffer hits the pause threshold
  305. require.Eventually(t, func() bool {
  306. return asyncBuffer.len.Load() >= pauseThreshold
  307. }, 300*time.Millisecond, 10*time.Millisecond)
  308. // Ensure that buffer never reaches the end of the stream
  309. require.Never(t, func() bool {
  310. return asyncBuffer.len.Load() >= pauseThreshold*2-1
  311. }, 300*time.Millisecond, 10*time.Millisecond)
  312. // Let's hit the pause threshold
  313. target = make([]byte, pauseThreshold)
  314. n, err = asyncBuffer.readAt(target, 0)
  315. require.NoError(t, err)
  316. require.Equal(t, pauseThreshold, n)
  317. // Ensure that buffer never reaches the end of the stream
  318. require.Never(t, func() bool {
  319. return asyncBuffer.len.Load() >= pauseThreshold*2-1
  320. }, 300*time.Millisecond, 10*time.Millisecond)
  321. // Let's hit the pause threshold
  322. target = make([]byte, pauseThreshold+1)
  323. n, err = asyncBuffer.readAt(target, 0)
  324. require.NoError(t, err)
  325. require.Equal(t, pauseThreshold, n)
  326. // It usually returns only pauseThreshold bytes because this exact operation unpauses the reader,
  327. // but the initial offset is before the threshold, data beyond the threshold may not be available.
  328. assert.GreaterOrEqual(t, pauseThreshold, n)
  329. // Ensure that buffer hits the end of the stream
  330. require.Eventually(t, func() bool {
  331. return asyncBuffer.len.Load() >= pauseThreshold*2
  332. }, 300*time.Millisecond, 10*time.Millisecond)
  333. }
  334. func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
  335. _, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
  336. asyncBuffer := FromReader(bytesReader)
  337. defer asyncBuffer.Close()
  338. target := make([]byte, chunkSize)
  339. n, err := asyncBuffer.readAt(target, pauseThreshold+1)
  340. require.NoError(t, err)
  341. assert.Equal(t, chunkSize, n)
  342. // Ensure that buffer hits the end of the stream
  343. require.Eventually(t, func() bool {
  344. return asyncBuffer.len.Load() >= pauseThreshold*2
  345. }, 300*time.Millisecond, 10*time.Millisecond)
  346. }