buffer_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. package asyncbuffer
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "errors"
  6. "io"
  7. "os"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/stretchr/testify/require"
  13. )
  14. const (
  15. halfChunkSize = ChunkSize / 2
  16. quaterChunkSize = ChunkSize / 4
  17. )
  18. // erraticReader is a test reader that simulates a slow read and can fail after reading a certain number of bytes
  19. type erraticReader struct {
  20. reader bytes.Reader
  21. failAt int64 // if set, will return an error after reading this many bytes
  22. }
  23. // Read reads data from the testReader, simulating a slow read and a potential failure
  24. func (r *erraticReader) Read(p []byte) (n int, err error) {
  25. cur, _ := r.reader.Seek(0, io.SeekCurrent)
  26. if r.failAt > 0 && r.failAt < cur+int64(len(p)) {
  27. return 0, errors.New("simulated read failure")
  28. }
  29. return r.reader.Read(p)
  30. }
  31. // blockingReader is a test reader which flushes data in chunks
  32. type blockingReader struct {
  33. reader bytes.Reader
  34. mu sync.Mutex // locked reader does not return anything
  35. unlocking atomic.Bool // if true, will proceed without locking each chunk
  36. }
  37. // newBlockingReader creates a new partialReader in locked state
  38. func newBlockingReader(reader bytes.Reader) *blockingReader {
  39. r := &blockingReader{
  40. reader: reader,
  41. }
  42. r.mu.Lock()
  43. return r
  44. }
  45. // flushNextChunk unlocks the reader, allowing it to return the next chunk of data
  46. func (r *blockingReader) flushNextChunk() {
  47. r.mu.Unlock()
  48. }
  49. // flush unlocks the reader, allowing it to return all data as usual
  50. func (r *blockingReader) flush() {
  51. r.unlocking.Store(true) // allow reading data without blocking
  52. r.mu.Unlock() // and continue
  53. }
  54. // Read reads data from the testReader, simulating a slow read and a potential failure
  55. func (r *blockingReader) Read(p []byte) (n int, err error) {
  56. if !r.unlocking.Load() {
  57. r.mu.Lock()
  58. }
  59. n, err = r.reader.Read(p)
  60. return n, err
  61. }
  62. // generateSourceData generates a byte slice with 4.5 chunks of data
  63. func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
  64. // We use small chunks for tests, let's check the ChunkSize just in case
  65. assert.GreaterOrEqual(t, ChunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
  66. // Create a byte slice with 4 chunks of ChunkSize
  67. source := make([]byte, size)
  68. // Fill the source with random data
  69. _, err := rand.Read(source)
  70. require.NoError(t, err)
  71. return source, bytes.NewReader(source)
  72. }
  73. // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
  74. func TestAsyncBufferReadAt(t *testing.T) {
  75. // Let's use source buffer which is 4.5 chunks long
  76. source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
  77. asyncBuffer := FromReader(bytesReader)
  78. defer asyncBuffer.Close()
  79. asyncBuffer.Wait() // Wait for all chunks to be read since we're going to read all data
  80. // Let's read all the data
  81. target := make([]byte, len(source))
  82. n, err := asyncBuffer.readAt(target, 0)
  83. require.NoError(t, err)
  84. assert.Equal(t, len(source), n)
  85. assert.Equal(t, target, source)
  86. // Let's read all the data + a bit more
  87. target = make([]byte, len(source)+1)
  88. n, err = asyncBuffer.readAt(target, 0)
  89. require.ErrorIs(t, err, io.EOF) // We read all the data, and reached end
  90. assert.Equal(t, len(source), n)
  91. assert.Equal(t, target[:n], source)
  92. // Let's read > 1 chunk, but with offset from the beginning and the end
  93. target = make([]byte, len(source)-halfChunkSize)
  94. n, err = asyncBuffer.readAt(target, quaterChunkSize)
  95. require.NoError(t, err)
  96. assert.Equal(t, len(target), n)
  97. assert.Equal(t, target, source[quaterChunkSize:len(source)-quaterChunkSize])
  98. // Let's read some data from the middle of the stream < chunk size
  99. target = make([]byte, ChunkSize/4)
  100. n, err = asyncBuffer.readAt(target, ChunkSize+ChunkSize/4)
  101. require.NoError(t, err)
  102. assert.Equal(t, quaterChunkSize, n)
  103. assert.Equal(t, target, source[ChunkSize+quaterChunkSize:ChunkSize+quaterChunkSize*2])
  104. // Let's read some data from the latest half chunk
  105. target = make([]byte, quaterChunkSize)
  106. n, err = asyncBuffer.readAt(target, ChunkSize*4+quaterChunkSize)
  107. require.NoError(t, err)
  108. assert.Equal(t, quaterChunkSize, n)
  109. assert.Equal(t, target, source[ChunkSize*4+quaterChunkSize:ChunkSize*4+halfChunkSize])
  110. // Let's try to read more data then available in the stream
  111. target = make([]byte, ChunkSize*2)
  112. n, err = asyncBuffer.readAt(target, ChunkSize*4)
  113. require.Error(t, err)
  114. assert.Equal(t, err, io.EOF)
  115. assert.Equal(t, ChunkSize/2, n)
  116. assert.Equal(t, target[:ChunkSize/2], source[ChunkSize*4:]) // We read only last half chunk
  117. // Let's try to read data beyond the end of the stream
  118. target = make([]byte, ChunkSize*2)
  119. n, err = asyncBuffer.readAt(target, ChunkSize*5)
  120. require.Error(t, err)
  121. assert.Equal(t, err, io.EOF)
  122. assert.Equal(t, 0, n)
  123. }
  124. // TestAsyncBufferRead tests reading from AsyncBuffer using ReadAt method
  125. func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
  126. source, bytesReader := generateSourceData(t, 20)
  127. asyncBuffer := FromReader(bytesReader)
  128. defer asyncBuffer.Close()
  129. // First, let's read all the data
  130. target := make([]byte, len(source))
  131. n, err := asyncBuffer.readAt(target, 0)
  132. require.NoError(t, err)
  133. assert.Equal(t, len(source), n)
  134. assert.Equal(t, target, source)
  135. // Let's read some data
  136. target = make([]byte, 2)
  137. n, err = asyncBuffer.readAt(target, 1)
  138. require.NoError(t, err)
  139. assert.Equal(t, len(target), n)
  140. assert.Equal(t, target, source[1:3])
  141. // Let's read some data beyond the end of the stream
  142. target = make([]byte, 2)
  143. n, err = asyncBuffer.readAt(target, 50)
  144. require.Error(t, err)
  145. assert.Equal(t, err, io.EOF)
  146. assert.Equal(t, 0, n)
  147. }
  148. func TestAsyncBufferReader(t *testing.T) {
  149. source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
  150. // Create an AsyncBuffer with the byte slice
  151. asyncBuffer := FromReader(bytesReader)
  152. defer asyncBuffer.Close()
  153. // Let's wait for all chunks to be read
  154. size, err := asyncBuffer.Wait()
  155. require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
  156. assert.Equal(t, int64(ChunkSize*4+halfChunkSize), size)
  157. reader := asyncBuffer.Reader()
  158. // Ensure the total length of the data is ChunkSize*4
  159. require.NoError(t, err)
  160. // Read the first two chunks
  161. twoChunks := make([]byte, ChunkSize*2)
  162. n, err := reader.Read(twoChunks)
  163. require.NoError(t, err)
  164. assert.Equal(t, ChunkSize*2, n)
  165. assert.Equal(t, source[:ChunkSize*2], twoChunks)
  166. // Seek to the last chunk + 10 bytes
  167. pos, err := reader.Seek(ChunkSize*3+5, io.SeekStart)
  168. require.NoError(t, err)
  169. assert.Equal(t, int64(ChunkSize*3+5), pos)
  170. // Read the next 10 bytes
  171. smallSlice := make([]byte, 10)
  172. n, err = reader.Read(smallSlice)
  173. require.NoError(t, err)
  174. assert.Equal(t, 10, n)
  175. assert.Equal(t, source[ChunkSize*3+5:ChunkSize*3+5+10], smallSlice)
  176. // Seek -10 bytes from the current position
  177. pos, err = reader.Seek(-10, io.SeekCurrent)
  178. require.NoError(t, err)
  179. assert.Equal(t, int64(ChunkSize*3+5), pos)
  180. // Read data again
  181. n, err = reader.Read(smallSlice)
  182. require.NoError(t, err)
  183. assert.Equal(t, 10, n)
  184. assert.Equal(t, source[ChunkSize*3+5:ChunkSize*3+5+10], smallSlice)
  185. // Seek -10 bytes from end of the stream
  186. pos, err = reader.Seek(-10, io.SeekEnd)
  187. require.NoError(t, err)
  188. assert.Equal(t, size-10, pos)
  189. // Read last 10 bytes
  190. n, err = reader.Read(smallSlice)
  191. require.NoError(t, err)
  192. assert.Equal(t, 10, n)
  193. assert.Equal(t, source[size-10:], smallSlice)
  194. // Seek beyond the end of the stream and try to read
  195. pos, err = reader.Seek(1024, io.SeekCurrent)
  196. require.NoError(t, err)
  197. assert.Equal(t, size+1024, pos)
  198. _, err = reader.Read(smallSlice)
  199. require.ErrorIs(t, err, io.EOF)
  200. }
  201. // TestAsyncBufferClose tests closing the AsyncBuffer
  202. func TestAsyncBufferClose(t *testing.T) {
  203. _, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
  204. // Create an AsyncBuffer with the byte slice
  205. asyncBuffer := FromReader(bytesReader)
  206. reader1 := asyncBuffer.Reader()
  207. reader2 := asyncBuffer.Reader()
  208. asyncBuffer.Close()
  209. b := make([]byte, 10)
  210. _, err := reader1.Read(b)
  211. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  212. _, err = reader2.Read(b)
  213. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  214. // After closing the closed reader, it should not panic
  215. asyncBuffer.Close()
  216. _, err = reader2.Read(b)
  217. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  218. }
  219. // TestAsyncBufferReadAtErrAtSomePoint tests reading from AsyncBuffer using readAt method
  220. // which would fail somewhere
  221. func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
  222. // Let's use source buffer which is 4.5 chunks long
  223. source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
  224. slowReader := &erraticReader{reader: *bytesReader, failAt: ChunkSize*3 + 5} // fails at last chunk
  225. asyncBuffer := FromReader(slowReader)
  226. defer asyncBuffer.Close()
  227. // Let's wait for all chunks to be read
  228. _, err := asyncBuffer.Wait()
  229. require.Error(t, err, "simulated read failure")
  230. // Let's read something, but before error occurs
  231. target := make([]byte, halfChunkSize)
  232. n, err := asyncBuffer.readAt(target, 0)
  233. require.NoError(t, err)
  234. assert.Equal(t, len(target), n)
  235. assert.Equal(t, target, source[:halfChunkSize])
  236. // And again
  237. target = make([]byte, halfChunkSize)
  238. n, err = asyncBuffer.readAt(target, halfChunkSize)
  239. require.NoError(t, err)
  240. assert.Equal(t, len(target), n)
  241. assert.Equal(t, target, source[halfChunkSize:halfChunkSize*2])
  242. // Let's read something, but when error occurs
  243. target = make([]byte, halfChunkSize)
  244. _, err = asyncBuffer.readAt(target, ChunkSize*3)
  245. require.Error(t, err, "simulated read failure")
  246. }
  247. // TestAsyncBufferReadAsync tests reading from AsyncBuffer using readAt method
  248. // with full = false
  249. func TestAsyncBufferReadAsync(t *testing.T) {
  250. // Let's use source buffer which is 4.5 chunks long
  251. source, bytesReader := generateSourceData(t, int64(ChunkSize)*3)
  252. blockingReader := newBlockingReader(*bytesReader)
  253. asyncBuffer := FromReader(blockingReader)
  254. defer asyncBuffer.Close()
  255. // flush the first chunk to allow reading
  256. blockingReader.flushNextChunk()
  257. // Let's try to read first two chunks, however,
  258. // we know that only the first chunk is available
  259. target := make([]byte, ChunkSize*2)
  260. n, err := asyncBuffer.readAt(target, 0)
  261. require.NoError(t, err)
  262. assert.Equal(t, ChunkSize, n)
  263. assert.Equal(t, target[:ChunkSize], source[:ChunkSize])
  264. blockingReader.flushNextChunk() // unlock reader to allow read second chunk
  265. asyncBuffer.WaitFor(ChunkSize + 1) // wait for the second chunk to be available
  266. target = make([]byte, ChunkSize*2)
  267. n, err = asyncBuffer.readAt(target, 0)
  268. require.NoError(t, err)
  269. assert.Equal(t, ChunkSize*2, n)
  270. assert.Equal(t, target, source[:ChunkSize*2])
  271. blockingReader.flush() // Flush the rest of the data
  272. asyncBuffer.Wait()
  273. // Try to read near end of the stream, EOF
  274. target = make([]byte, ChunkSize)
  275. n, err = asyncBuffer.readAt(target, ChunkSize*3-1)
  276. require.ErrorIs(t, err, io.EOF)
  277. assert.Equal(t, 1, n)
  278. assert.Equal(t, target[0], source[ChunkSize*3-1])
  279. // Try to read beyond the end of the stream == eof
  280. target = make([]byte, ChunkSize)
  281. n, err = asyncBuffer.readAt(target, ChunkSize*3)
  282. require.ErrorIs(t, io.EOF, err)
  283. assert.Equal(t, 0, n)
  284. }
  285. // TestAsyncBufferReadAllCompability tests that ReadAll methods works as expected
  286. func TestAsyncBufferReadAllCompability(t *testing.T) {
  287. source, err := os.ReadFile("../testdata/test1.jpg")
  288. require.NoError(t, err)
  289. asyncBuffer := FromReader(bytes.NewReader(source))
  290. defer asyncBuffer.Close()
  291. b, err := io.ReadAll(asyncBuffer.Reader())
  292. require.NoError(t, err)
  293. require.Len(t, b, len(source))
  294. }