1
0

buffer_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package asyncbuffer
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "errors"
  6. "io"
  7. "sync"
  8. "sync/atomic"
  9. "testing"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/require"
  12. )
  13. const (
  14. halfChunkSize = ChunkSize / 2
  15. quaterChunkSize = ChunkSize / 4
  16. )
  17. // erraticReader is a test reader that simulates a slow read and can fail after reading a certain number of bytes
  18. type erraticReader struct {
  19. reader bytes.Reader
  20. failAt int64 // if set, will return an error after reading this many bytes
  21. }
  22. // Read reads data from the testReader, simulating a slow read and a potential failure
  23. func (r *erraticReader) Read(p []byte) (n int, err error) {
  24. cur, _ := r.reader.Seek(0, io.SeekCurrent)
  25. if r.failAt > 0 && r.failAt < cur+int64(len(p)) {
  26. return 0, errors.New("simulated read failure")
  27. }
  28. return r.reader.Read(p)
  29. }
  30. // blockingReader is a test reader which flushes data in chunks
  31. type blockingReader struct {
  32. reader bytes.Reader
  33. mu sync.Mutex // locked reader does not return anything
  34. unlocking atomic.Bool // if true, will proceed without locking each chunk
  35. }
  36. // newBlockingReader creates a new partialReader in locked state
  37. func newBlockingReader(reader bytes.Reader) *blockingReader {
  38. r := &blockingReader{
  39. reader: reader,
  40. }
  41. r.mu.Lock()
  42. return r
  43. }
  44. // flushNextChunk unlocks the reader, allowing it to return the next chunk of data
  45. func (r *blockingReader) flushNextChunk() {
  46. r.mu.Unlock()
  47. }
  48. // flush unlocks the reader, allowing it to return all data as usual
  49. func (r *blockingReader) flush() {
  50. r.unlocking.Store(true) // allow reading data without blocking
  51. r.mu.Unlock() // and continue
  52. }
  53. // Read reads data from the testReader, simulating a slow read and a potential failure
  54. func (r *blockingReader) Read(p []byte) (n int, err error) {
  55. if !r.unlocking.Load() {
  56. r.mu.Lock()
  57. }
  58. n, err = r.reader.Read(p)
  59. return n, err
  60. }
  61. // generateSourceData generates a byte slice with 4.5 chunks of data
  62. func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
  63. // We use small chunks for tests, let's check the ChunkSize just in case
  64. assert.GreaterOrEqual(t, ChunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
  65. // Create a byte slice with 4 chunks of ChunkSize
  66. source := make([]byte, size)
  67. // Fill the source with random data
  68. _, err := rand.Read(source)
  69. require.NoError(t, err)
  70. return source, bytes.NewReader(source)
  71. }
  72. // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
  73. func TestAsyncBufferReadAt(t *testing.T) {
  74. // Let's use source buffer which is 4.5 chunks long
  75. source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
  76. asyncBuffer := FromReader(bytesReader)
  77. defer asyncBuffer.Close()
  78. asyncBuffer.Wait() // Wait for all chunks to be read since we're going to read all data
  79. // Let's read all the data
  80. target := make([]byte, len(source))
  81. n, err := asyncBuffer.readAt(target, 0)
  82. require.NoError(t, err)
  83. assert.Equal(t, len(source), n)
  84. assert.Equal(t, target, source)
  85. // Let's read all the data + a bit more
  86. target = make([]byte, len(source)+1)
  87. n, err = asyncBuffer.readAt(target, 0)
  88. require.ErrorIs(t, err, io.EOF) // We read all the data, and reached end
  89. assert.Equal(t, len(source), n)
  90. assert.Equal(t, target[:n], source)
  91. // Let's read > 1 chunk, but with offset from the beginning and the end
  92. target = make([]byte, len(source)-halfChunkSize)
  93. n, err = asyncBuffer.readAt(target, quaterChunkSize)
  94. require.NoError(t, err)
  95. assert.Equal(t, len(target), n)
  96. assert.Equal(t, target, source[quaterChunkSize:len(source)-quaterChunkSize])
  97. // Let's read some data from the middle of the stream < chunk size
  98. target = make([]byte, ChunkSize/4)
  99. n, err = asyncBuffer.readAt(target, ChunkSize+ChunkSize/4)
  100. require.NoError(t, err)
  101. assert.Equal(t, quaterChunkSize, n)
  102. assert.Equal(t, target, source[ChunkSize+quaterChunkSize:ChunkSize+quaterChunkSize*2])
  103. // Let's read some data from the latest half chunk
  104. target = make([]byte, quaterChunkSize)
  105. n, err = asyncBuffer.readAt(target, ChunkSize*4+quaterChunkSize)
  106. require.NoError(t, err)
  107. assert.Equal(t, quaterChunkSize, n)
  108. assert.Equal(t, target, source[ChunkSize*4+quaterChunkSize:ChunkSize*4+halfChunkSize])
  109. // Let's try to read more data then available in the stream
  110. target = make([]byte, ChunkSize*2)
  111. n, err = asyncBuffer.readAt(target, ChunkSize*4)
  112. require.Error(t, err)
  113. assert.Equal(t, err, io.EOF)
  114. assert.Equal(t, ChunkSize/2, n)
  115. assert.Equal(t, target[:ChunkSize/2], source[ChunkSize*4:]) // We read only last half chunk
  116. // Let's try to read data beyond the end of the stream
  117. target = make([]byte, ChunkSize*2)
  118. n, err = asyncBuffer.readAt(target, ChunkSize*5)
  119. require.Error(t, err)
  120. assert.Equal(t, err, io.EOF)
  121. assert.Equal(t, 0, n)
  122. }
  123. // TestAsyncBufferRead tests reading from AsyncBuffer using ReadAt method
  124. func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
  125. source, bytesReader := generateSourceData(t, 20)
  126. asyncBuffer := FromReader(bytesReader)
  127. defer asyncBuffer.Close()
  128. // First, let's read all the data
  129. target := make([]byte, len(source))
  130. n, err := asyncBuffer.readAt(target, 0)
  131. require.NoError(t, err)
  132. assert.Equal(t, len(source), n)
  133. assert.Equal(t, target, source)
  134. // Let's read some data
  135. target = make([]byte, 2)
  136. n, err = asyncBuffer.readAt(target, 1)
  137. require.NoError(t, err)
  138. assert.Equal(t, len(target), n)
  139. assert.Equal(t, target, source[1:3])
  140. // Let's read some data beyond the end of the stream
  141. target = make([]byte, 2)
  142. n, err = asyncBuffer.readAt(target, 50)
  143. require.Error(t, err)
  144. assert.Equal(t, err, io.EOF)
  145. assert.Equal(t, 0, n)
  146. }
  147. func TestAsyncBufferReader(t *testing.T) {
  148. source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
  149. // Create an AsyncBuffer with the byte slice
  150. asyncBuffer := FromReader(bytesReader)
  151. defer asyncBuffer.Close()
  152. // Let's wait for all chunks to be read
  153. size, err := asyncBuffer.Wait()
  154. require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
  155. assert.Equal(t, int64(ChunkSize*4+halfChunkSize), size)
  156. reader := asyncBuffer.Reader()
  157. // Ensure the total length of the data is ChunkSize*4
  158. require.NoError(t, err)
  159. // Read the first two chunks
  160. twoChunks := make([]byte, ChunkSize*2)
  161. n, err := reader.Read(twoChunks)
  162. require.NoError(t, err)
  163. assert.Equal(t, ChunkSize*2, n)
  164. assert.Equal(t, source[:ChunkSize*2], twoChunks)
  165. // Seek to the last chunk + 10 bytes
  166. pos, err := reader.Seek(ChunkSize*3+5, io.SeekStart)
  167. require.NoError(t, err)
  168. assert.Equal(t, int64(ChunkSize*3+5), pos)
  169. // Read the next 10 bytes
  170. smallSlice := make([]byte, 10)
  171. n, err = reader.Read(smallSlice)
  172. require.NoError(t, err)
  173. assert.Equal(t, 10, n)
  174. assert.Equal(t, source[ChunkSize*3+5:ChunkSize*3+5+10], smallSlice)
  175. // Seek -10 bytes from the current position
  176. pos, err = reader.Seek(-10, io.SeekCurrent)
  177. require.NoError(t, err)
  178. assert.Equal(t, int64(ChunkSize*3+5), pos)
  179. // Read data again
  180. n, err = reader.Read(smallSlice)
  181. require.NoError(t, err)
  182. assert.Equal(t, 10, n)
  183. assert.Equal(t, source[ChunkSize*3+5:ChunkSize*3+5+10], smallSlice)
  184. // Seek -10 bytes from end of the stream
  185. pos, err = reader.Seek(-10, io.SeekEnd)
  186. require.NoError(t, err)
  187. assert.Equal(t, size-10, pos)
  188. // Read last 10 bytes
  189. n, err = reader.Read(smallSlice)
  190. require.NoError(t, err)
  191. assert.Equal(t, 10, n)
  192. assert.Equal(t, source[size-10:], smallSlice)
  193. // Seek beyond the end of the stream and try to read
  194. pos, err = reader.Seek(1024, io.SeekCurrent)
  195. require.NoError(t, err)
  196. assert.Equal(t, size+1024, pos)
  197. _, err = reader.Read(smallSlice)
  198. require.ErrorIs(t, err, io.EOF)
  199. }
  200. // TestAsyncBufferClose tests closing the AsyncBuffer
  201. func TestAsyncBufferClose(t *testing.T) {
  202. _, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
  203. // Create an AsyncBuffer with the byte slice
  204. asyncBuffer := FromReader(bytesReader)
  205. reader1 := asyncBuffer.Reader()
  206. reader2 := asyncBuffer.Reader()
  207. asyncBuffer.Close()
  208. b := make([]byte, 10)
  209. _, err := reader1.Read(b)
  210. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  211. _, err = reader2.Read(b)
  212. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  213. // After closing the closed reader, it should not panic
  214. asyncBuffer.Close()
  215. _, err = reader2.Read(b)
  216. require.Error(t, err, "asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
  217. }
  218. // TestAsyncBufferReadAtErrAtSomePoint tests reading from AsyncBuffer using readAt method
  219. // which would fail somewhere
  220. func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
  221. // Let's use source buffer which is 4.5 chunks long
  222. source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
  223. slowReader := &erraticReader{reader: *bytesReader, failAt: ChunkSize*3 + 5} // fails at last chunk
  224. asyncBuffer := FromReader(slowReader)
  225. defer asyncBuffer.Close()
  226. // Let's wait for all chunks to be read
  227. _, err := asyncBuffer.Wait()
  228. require.Error(t, err, "simulated read failure")
  229. // Let's read something, but before error occurs
  230. target := make([]byte, halfChunkSize)
  231. n, err := asyncBuffer.readAt(target, 0)
  232. require.NoError(t, err)
  233. assert.Equal(t, len(target), n)
  234. assert.Equal(t, target, source[:halfChunkSize])
  235. // And again
  236. target = make([]byte, halfChunkSize)
  237. n, err = asyncBuffer.readAt(target, halfChunkSize)
  238. require.NoError(t, err)
  239. assert.Equal(t, len(target), n)
  240. assert.Equal(t, target, source[halfChunkSize:halfChunkSize*2])
  241. // Let's read something, but when error occurs
  242. target = make([]byte, halfChunkSize)
  243. _, err = asyncBuffer.readAt(target, ChunkSize*3)
  244. require.Error(t, err, "simulated read failure")
  245. }
  246. // TestAsyncBufferReadAsync tests reading from AsyncBuffer using readAt method
  247. // with full = false
  248. func TestAsyncBufferReadAsync(t *testing.T) {
  249. // Let's use source buffer which is 4.5 chunks long
  250. source, bytesReader := generateSourceData(t, int64(ChunkSize)*3)
  251. blockingReader := newBlockingReader(*bytesReader)
  252. asyncBuffer := FromReader(blockingReader)
  253. defer asyncBuffer.Close()
  254. // flush the first chunk to allow reading
  255. blockingReader.flushNextChunk()
  256. // Let's try to read first two chunks, however,
  257. // we know that only the first chunk is available
  258. target := make([]byte, ChunkSize*2)
  259. n, err := asyncBuffer.readAt(target, 0)
  260. require.NoError(t, err)
  261. assert.Equal(t, ChunkSize, n)
  262. assert.Equal(t, target[:ChunkSize], source[:ChunkSize])
  263. blockingReader.flushNextChunk() // unlock reader to allow read second chunk
  264. asyncBuffer.WaitFor(ChunkSize + 1) // wait for the second chunk to be available
  265. target = make([]byte, ChunkSize*2)
  266. n, err = asyncBuffer.readAt(target, 0)
  267. require.NoError(t, err)
  268. assert.Equal(t, ChunkSize*2, n)
  269. assert.Equal(t, target, source[:ChunkSize*2])
  270. blockingReader.flush() // Flush the rest of the data
  271. asyncBuffer.Wait()
  272. // Try to read near end of the stream, EOF
  273. target = make([]byte, ChunkSize)
  274. n, err = asyncBuffer.readAt(target, ChunkSize*3-1)
  275. require.ErrorIs(t, err, io.EOF)
  276. assert.Equal(t, 1, n)
  277. assert.Equal(t, target[0], source[ChunkSize*3-1])
  278. // Try to read beyond the end of the stream == eof
  279. target = make([]byte, ChunkSize)
  280. n, err = asyncBuffer.readAt(target, ChunkSize*3)
  281. require.ErrorIs(t, io.EOF, err)
  282. assert.Equal(t, 0, n)
  283. }