Browse Source

pauseThreshold in AsyncBuffer

Viktor Sokolov 2 months ago
parent
commit
ca66283d9e
4 changed files with 313 additions and 57 deletions
  1. 46 15
      asyncbuffer/buffer.go
  2. 106 42
      asyncbuffer/buffer_test.go
  3. 33 0
      asyncbuffer/latch.go
  4. 128 0
      asyncbuffer/latch_test.go

+ 46 - 15
asyncbuffer/buffer.go

@@ -20,8 +20,15 @@ import (
 	"sync/atomic"
 )
 
-// ChunkSize is the size of each chunk in bytes
-const ChunkSize = 4096
+const (
+	// chunkSize is the size of each chunk in bytes
+	chunkSize = 4096
+
+	// pauseThreshold is the size of the file which is always read to memory. Data beyond the
+	// threshold is read only if accessed. If not a multiple of chunkSize, the last chunk it points
+	// to is read in full.
+	pauseThreshold = 32768 // 32 KiB
+)
 
 // byteChunk is a struct that holds a buffer and the data read from the upstream reader
 // data slice is required since the chunk read may be smaller than ChunkSize
@@ -34,7 +41,7 @@ type byteChunk struct {
 // all readers
 var chunkPool = sync.Pool{
 	New: func() any {
-		buf := make([]byte, ChunkSize)
+		buf := make([]byte, chunkSize)
 
 		return &byteChunk{
 			buf:  buf,
@@ -54,6 +61,7 @@ type AsyncBuffer struct {
 	finished atomic.Bool  // Indicates that the reader has finished reading
 	len      atomic.Int64 // Total length of the data read
 	closed   atomic.Bool  // Indicates that the reader was closed
+	paused   *Latch       // Paused reader does not read data beyond threshold
 
 	mu             sync.RWMutex  // Mutex on chunks slice
 	newChunkSignal chan struct{} // Tick-tock channel that indicates that a new chunk is ready
@@ -71,6 +79,7 @@ func FromReader(r io.Reader) *AsyncBuffer {
 	ab := &AsyncBuffer{
 		r:              r,
 		newChunkSignal: make(chan struct{}),
+		paused:         NewLatch(),
 	}
 
 	go ab.readChunks()
@@ -120,6 +129,17 @@ func (ab *AsyncBuffer) readChunks() {
 
 	// Stop reading if the reader is finished
 	for !ab.finished.Load() {
+		// In case we are trying to read data beyond threshold and we are paused,
+		// wait for pause to be released.
+		if ab.len.Load() >= pauseThreshold {
+			ab.paused.Wait()
+		}
+
+		// If the reader has been closed while waiting, we can stop reading
+		if ab.finished.Load() {
+			return // No more data to read
+		}
+
 		// Get a chunk from the pool
 		// If the pool is empty, it will create a new byteChunk with ChunkSize
 		chunk, ok := chunkPool.Get().(*byteChunk)
@@ -181,6 +201,11 @@ func (ab *AsyncBuffer) offsetAvailable(off int64) (bool, error) {
 		return false, ab.closedError()
 	}
 
+	// In case we are trying to read data beyond the pause threshold, we need to resume the reader
+	if off >= pauseThreshold {
+		ab.paused.Release()
+	}
+
 	// In case the offset falls within the already read chunks, we can return immediately,
 	// even if error has occurred in the future
 	if off < ab.len.Load() {
@@ -207,6 +232,12 @@ func (ab *AsyncBuffer) offsetAvailable(off int64) (bool, error) {
 // WaitFor waits for the data to be ready at the given offset. nil means ok.
 // It guarantees that the chunk at the given offset is ready to be read.
 func (ab *AsyncBuffer) WaitFor(off int64) error {
+	// In case we are trying to read data which would potentially hit the pause threshold,
+	// we need to unpause the reader ASAP.
+	if off >= pauseThreshold {
+		ab.paused.Release()
+	}
+
 	for {
 		ok, err := ab.offsetAvailable(off)
 		if ok || err != nil {
@@ -220,6 +251,9 @@ func (ab *AsyncBuffer) WaitFor(off int64) error {
 // Wait waits for the reader to finish reading all data and returns
 // the total length of the data read.
 func (ab *AsyncBuffer) Wait() (int64, error) {
+	// Wait ends till the end of the stream: unpause the reader
+	ab.paused.Release()
+
 	for {
 		// We can not read data from the closed reader even if there were no errors
 		if ab.closed.Load() {
@@ -275,10 +309,10 @@ func (ab *AsyncBuffer) readChunkAt(p []byte, off int64) int {
 		return 0
 	}
 
-	ind := off / ChunkSize // chunk index
+	ind := off / chunkSize // chunk index
 	chunk := ab.chunks[ind]
 
-	startOffset := off % ChunkSize // starting offset in the chunk
+	startOffset := off % chunkSize // starting offset in the chunk
 
 	// If the offset in current chunk is greater than the data
 	// it has, we return 0
@@ -293,22 +327,17 @@ func (ab *AsyncBuffer) readChunkAt(p []byte, off int64) int {
 
 // readAt reads data from the AsyncBuffer at the given offset.
 //
-// If full is true:
-//
-// The behaviour is similar to io.ReaderAt.ReadAt. It blocks until the maxumum amount of data possible
-// is read from the buffer. It may return io.UnexpectedEOF in case we tried to read more data than was
-// available in the buffer.
+// Please note that if pause threshold is hit in the middle of the reading,
+// the data beyond the threshold may not be available.
 //
-// If full is false:
-//
-// It behaves like a regular non-blocking Read.
+// If the reader is paused and we try to read data beyond the pause threshold,
+// it will wait till something could be returned.
 func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
 	size := int64(len(p)) // total size of the data to read
 
 	if off < 0 {
 		return 0, errors.New("asyncbuffer.AsyncBuffer.readAt: negative offset")
 	}
-
 	// Wait for the offset to be available.
 	// It may return io.EOF if the offset is beyond the end of the stream.
 	err := ab.WaitFor(off)
@@ -349,7 +378,7 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
 
 		// If we read data shorter than ChunkSize or, in case that was the last chunk, less than
 		// the size of the tail, return kind of EOF
-		if int64(nX) < min(size, int64(ChunkSize)) {
+		if int64(nX) < min(size, int64(chunkSize)) {
 			return n, io.EOF
 		}
 	}
@@ -383,6 +412,8 @@ func (ab *AsyncBuffer) Close() error {
 		chunkPool.Put(chunk)
 	}
 
+	ab.paused.Release()
+
 	return nil
 }
 

+ 106 - 42
asyncbuffer/buffer_test.go

@@ -9,14 +9,15 @@ import (
 	"sync"
 	"sync/atomic"
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
 
 const (
-	halfChunkSize   = ChunkSize / 2
-	quaterChunkSize = ChunkSize / 4
+	halfChunkSize   = chunkSize / 2
+	quaterChunkSize = chunkSize / 4
 )
 
 // erraticReader is a test reader that simulates a slow read and can fail after reading a certain number of bytes
@@ -74,7 +75,7 @@ func (r *blockingReader) Read(p []byte) (n int, err error) {
 // generateSourceData generates a byte slice with 4.5 chunks of data
 func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
 	// We use small chunks for tests, let's check the ChunkSize just in case
-	assert.GreaterOrEqual(t, ChunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
+	assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
 
 	// Create a byte slice with 4 chunks of ChunkSize
 	source := make([]byte, size)
@@ -88,7 +89,7 @@ func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
 // TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
 func TestAsyncBufferReadAt(t *testing.T) {
 	// Let's use source buffer which is 4.5 chunks long
-	source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
+	source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
 	asyncBuffer := FromReader(bytesReader)
 	defer asyncBuffer.Close()
 
@@ -118,30 +119,30 @@ func TestAsyncBufferReadAt(t *testing.T) {
 	assert.Equal(t, target, source[quaterChunkSize:len(source)-quaterChunkSize])
 
 	// Let's read some data from the middle of the stream < chunk size
-	target = make([]byte, ChunkSize/4)
-	n, err = asyncBuffer.readAt(target, ChunkSize+ChunkSize/4)
+	target = make([]byte, chunkSize/4)
+	n, err = asyncBuffer.readAt(target, chunkSize+chunkSize/4)
 	require.NoError(t, err)
 	assert.Equal(t, quaterChunkSize, n)
-	assert.Equal(t, target, source[ChunkSize+quaterChunkSize:ChunkSize+quaterChunkSize*2])
+	assert.Equal(t, target, source[chunkSize+quaterChunkSize:chunkSize+quaterChunkSize*2])
 
 	// Let's read some data from the latest half chunk
 	target = make([]byte, quaterChunkSize)
-	n, err = asyncBuffer.readAt(target, ChunkSize*4+quaterChunkSize)
+	n, err = asyncBuffer.readAt(target, chunkSize*4+quaterChunkSize)
 	require.NoError(t, err)
 	assert.Equal(t, quaterChunkSize, n)
-	assert.Equal(t, target, source[ChunkSize*4+quaterChunkSize:ChunkSize*4+halfChunkSize])
+	assert.Equal(t, target, source[chunkSize*4+quaterChunkSize:chunkSize*4+halfChunkSize])
 
 	// Let's try to read more data then available in the stream
-	target = make([]byte, ChunkSize*2)
-	n, err = asyncBuffer.readAt(target, ChunkSize*4)
+	target = make([]byte, chunkSize*2)
+	n, err = asyncBuffer.readAt(target, chunkSize*4)
 	require.Error(t, err)
 	assert.Equal(t, err, io.EOF)
-	assert.Equal(t, ChunkSize/2, n)
-	assert.Equal(t, target[:ChunkSize/2], source[ChunkSize*4:]) // We read only last half chunk
+	assert.Equal(t, chunkSize/2, n)
+	assert.Equal(t, target[:chunkSize/2], source[chunkSize*4:]) // We read only last half chunk
 
 	// Let's try to read data beyond the end of the stream
-	target = make([]byte, ChunkSize*2)
-	n, err = asyncBuffer.readAt(target, ChunkSize*5)
+	target = make([]byte, chunkSize*2)
+	n, err = asyncBuffer.readAt(target, chunkSize*5)
 	require.Error(t, err)
 	assert.Equal(t, err, io.EOF)
 	assert.Equal(t, 0, n)
@@ -177,7 +178,7 @@ func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
 }
 
 func TestAsyncBufferReader(t *testing.T) {
-	source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
+	source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
 
 	// Create an AsyncBuffer with the byte slice
 	asyncBuffer := FromReader(bytesReader)
@@ -186,7 +187,7 @@ func TestAsyncBufferReader(t *testing.T) {
 	// Let's wait for all chunks to be read
 	size, err := asyncBuffer.Wait()
 	require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
-	assert.Equal(t, int64(ChunkSize*4+halfChunkSize), size)
+	assert.Equal(t, int64(chunkSize*4+halfChunkSize), size)
 
 	reader := asyncBuffer.Reader()
 
@@ -194,34 +195,34 @@ func TestAsyncBufferReader(t *testing.T) {
 	require.NoError(t, err)
 
 	// Read the first two chunks
-	twoChunks := make([]byte, ChunkSize*2)
+	twoChunks := make([]byte, chunkSize*2)
 	n, err := reader.Read(twoChunks)
 	require.NoError(t, err)
-	assert.Equal(t, ChunkSize*2, n)
-	assert.Equal(t, source[:ChunkSize*2], twoChunks)
+	assert.Equal(t, chunkSize*2, n)
+	assert.Equal(t, source[:chunkSize*2], twoChunks)
 
 	// Seek to the last chunk + 10 bytes
-	pos, err := reader.Seek(ChunkSize*3+5, io.SeekStart)
+	pos, err := reader.Seek(chunkSize*3+5, io.SeekStart)
 	require.NoError(t, err)
-	assert.Equal(t, int64(ChunkSize*3+5), pos)
+	assert.Equal(t, int64(chunkSize*3+5), pos)
 
 	// Read the next 10 bytes
 	smallSlice := make([]byte, 10)
 	n, err = reader.Read(smallSlice)
 	require.NoError(t, err)
 	assert.Equal(t, 10, n)
-	assert.Equal(t, source[ChunkSize*3+5:ChunkSize*3+5+10], smallSlice)
+	assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
 
 	// Seek -10 bytes from the current position
 	pos, err = reader.Seek(-10, io.SeekCurrent)
 	require.NoError(t, err)
-	assert.Equal(t, int64(ChunkSize*3+5), pos)
+	assert.Equal(t, int64(chunkSize*3+5), pos)
 
 	// Read data again
 	n, err = reader.Read(smallSlice)
 	require.NoError(t, err)
 	assert.Equal(t, 10, n)
-	assert.Equal(t, source[ChunkSize*3+5:ChunkSize*3+5+10], smallSlice)
+	assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
 
 	// Seek -10 bytes from end of the stream
 	pos, err = reader.Seek(-10, io.SeekEnd)
@@ -245,7 +246,7 @@ func TestAsyncBufferReader(t *testing.T) {
 
 // TestAsyncBufferClose tests closing the AsyncBuffer
 func TestAsyncBufferClose(t *testing.T) {
-	_, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
+	_, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
 
 	// Create an AsyncBuffer with the byte slice
 	asyncBuffer := FromReader(bytesReader)
@@ -273,8 +274,8 @@ func TestAsyncBufferClose(t *testing.T) {
 // which would fail somewhere
 func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
 	// Let's use source buffer which is 4.5 chunks long
-	source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
-	slowReader := &erraticReader{reader: *bytesReader, failAt: ChunkSize*3 + 5} // fails at last chunk
+	source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
+	slowReader := &erraticReader{reader: *bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
 	asyncBuffer := FromReader(slowReader)
 	defer asyncBuffer.Close()
 
@@ -298,7 +299,7 @@ func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
 
 	// Let's read something, but when error occurs
 	target = make([]byte, halfChunkSize)
-	_, err = asyncBuffer.readAt(target, ChunkSize*3)
+	_, err = asyncBuffer.readAt(target, chunkSize*3)
 	require.Error(t, err, "simulated read failure")
 }
 
@@ -306,7 +307,7 @@ func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
 // with full = false
 func TestAsyncBufferReadAsync(t *testing.T) {
 	// Let's use source buffer which is 4.5 chunks long
-	source, bytesReader := generateSourceData(t, int64(ChunkSize)*3)
+	source, bytesReader := generateSourceData(t, int64(chunkSize)*3)
 	blockingReader := newBlockingReader(*bytesReader)
 	asyncBuffer := FromReader(blockingReader)
 	defer asyncBuffer.Close()
@@ -316,34 +317,34 @@ func TestAsyncBufferReadAsync(t *testing.T) {
 
 	// Let's try to read first two chunks, however,
 	// we know that only the first chunk is available
-	target := make([]byte, ChunkSize*2)
+	target := make([]byte, chunkSize*2)
 	n, err := asyncBuffer.readAt(target, 0)
 	require.NoError(t, err)
-	assert.Equal(t, ChunkSize, n)
-	assert.Equal(t, target[:ChunkSize], source[:ChunkSize])
+	assert.Equal(t, chunkSize, n)
+	assert.Equal(t, target[:chunkSize], source[:chunkSize])
 
 	blockingReader.flushNextChunk()    // unlock reader to allow read second chunk
-	asyncBuffer.WaitFor(ChunkSize + 1) // wait for the second chunk to be available
+	asyncBuffer.WaitFor(chunkSize + 1) // wait for the second chunk to be available
 
-	target = make([]byte, ChunkSize*2)
+	target = make([]byte, chunkSize*2)
 	n, err = asyncBuffer.readAt(target, 0)
 	require.NoError(t, err)
-	assert.Equal(t, ChunkSize*2, n)
-	assert.Equal(t, target, source[:ChunkSize*2])
+	assert.Equal(t, chunkSize*2, n)
+	assert.Equal(t, target, source[:chunkSize*2])
 
 	blockingReader.flush() // Flush the rest of the data
 	asyncBuffer.Wait()
 
 	// Try to read near end of the stream, EOF
-	target = make([]byte, ChunkSize)
-	n, err = asyncBuffer.readAt(target, ChunkSize*3-1)
+	target = make([]byte, chunkSize)
+	n, err = asyncBuffer.readAt(target, chunkSize*3-1)
 	require.ErrorIs(t, err, io.EOF)
 	assert.Equal(t, 1, n)
-	assert.Equal(t, target[0], source[ChunkSize*3-1])
+	assert.Equal(t, target[0], source[chunkSize*3-1])
 
 	// Try to read beyond the end of the stream == eof
-	target = make([]byte, ChunkSize)
-	n, err = asyncBuffer.readAt(target, ChunkSize*3)
+	target = make([]byte, chunkSize)
+	n, err = asyncBuffer.readAt(target, chunkSize*3)
 	require.ErrorIs(t, io.EOF, err)
 	assert.Equal(t, 0, n)
 }
@@ -359,3 +360,66 @@ func TestAsyncBufferReadAllCompability(t *testing.T) {
 	require.NoError(t, err)
 	require.Len(t, b, len(source))
 }
+
+func TestAsyncBufferThreshold(t *testing.T) {
+	_, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
+	asyncBuffer := FromReader(bytesReader)
+	defer asyncBuffer.Close()
+
+	target := make([]byte, chunkSize)
+	n, err := asyncBuffer.readAt(target, 0)
+	require.NoError(t, err)
+	assert.Equal(t, chunkSize, n)
+
+	// Ensure that buffer hits the pause threshold
+	require.Eventually(t, func() bool {
+		return asyncBuffer.len.Load() >= pauseThreshold
+	}, 300*time.Millisecond, 10*time.Millisecond)
+
+	// Ensure that buffer never reaches the end of the stream
+	require.Never(t, func() bool {
+		return asyncBuffer.len.Load() >= pauseThreshold*2-1
+	}, 300*time.Millisecond, 10*time.Millisecond)
+
+	// Let's hit the pause threshold
+	target = make([]byte, pauseThreshold)
+	n, err = asyncBuffer.readAt(target, 0)
+	require.NoError(t, err)
+	require.Equal(t, pauseThreshold, n)
+
+	// Ensure that buffer never reaches the end of the stream
+	require.Never(t, func() bool {
+		return asyncBuffer.len.Load() >= pauseThreshold*2-1
+	}, 300*time.Millisecond, 10*time.Millisecond)
+
+	// Let's hit the pause threshold
+	target = make([]byte, pauseThreshold+1)
+	n, err = asyncBuffer.readAt(target, 0)
+	require.NoError(t, err)
+	require.Equal(t, pauseThreshold, n)
+
+	// It usually returns only pauseThreshold bytes because this exact operation unpauses the reader,
+	// but the initial offset is before the threshold, data beyond the threshold may not be available.
+	assert.GreaterOrEqual(t, pauseThreshold, n)
+
+	// Ensure that buffer hits the end of the stream
+	require.Eventually(t, func() bool {
+		return asyncBuffer.len.Load() >= pauseThreshold*2
+	}, 300*time.Millisecond, 10*time.Millisecond)
+}
+
+func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
+	_, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
+	asyncBuffer := FromReader(bytesReader)
+	defer asyncBuffer.Close()
+
+	target := make([]byte, chunkSize)
+	n, err := asyncBuffer.readAt(target, pauseThreshold+1)
+	require.NoError(t, err)
+	assert.Equal(t, chunkSize, n)
+
+	// Ensure that buffer hits the end of the stream
+	require.Eventually(t, func() bool {
+		return asyncBuffer.len.Load() >= pauseThreshold*2
+	}, 300*time.Millisecond, 10*time.Millisecond)
+}

+ 33 - 0
asyncbuffer/latch.go

@@ -0,0 +1,33 @@
+package asyncbuffer
+
+import (
+	"sync"
+)
+
+// Latch is once-releasing semaphore.
+type Latch struct {
+	_    noCopy
+	once sync.Once
+	done chan struct{}
+}
+
+// NewLatch creates a new Latch.
+func NewLatch() *Latch {
+	return &Latch{done: make(chan struct{})}
+}
+
+// Release releases the latch, allowing all waiting goroutines to proceed.
+func (g *Latch) Release() {
+	g.once.Do(func() { close(g.done) })
+}
+
+// Wait blocks until the latch is released.
+func (g *Latch) Wait() {
+	<-g.done
+}
+
+// checked by 'go vet
+type noCopy struct{}
+
+func (*noCopy) Lock()   {}
+func (*noCopy) Unlock() {}

+ 128 - 0
asyncbuffer/latch_test.go

@@ -0,0 +1,128 @@
+package asyncbuffer
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+)
+
+func TestNewLatch(t *testing.T) {
+	latch := NewLatch()
+
+	require.NotNil(t, latch)
+	require.NotNil(t, latch.done)
+
+	// Channel should be open (not closed) initially
+	select {
+	case <-latch.done:
+		t.Fatal("Latch should not be released initially")
+	default:
+		// Expected - channel is not ready
+	}
+}
+
+func TestLatchRelease(t *testing.T) {
+	latch := NewLatch()
+
+	// Release the latch
+	latch.Release()
+
+	// Channel should now be closed/ready
+	select {
+	case <-latch.done:
+		// Expected - channel is ready after release
+	default:
+		t.Fatal("Latch should be released after Release() call")
+	}
+}
+
+func TestLatchWait(t *testing.T) {
+	latch := NewLatch()
+
+	// Start a goroutine that will wait
+	waitCompleted := make(chan bool, 1)
+	go func() {
+		latch.Wait()
+		waitCompleted <- true
+	}()
+
+	// Give the goroutine a moment to start waiting
+	time.Sleep(10 * time.Millisecond)
+
+	// Wait should not complete yet
+	select {
+	case <-waitCompleted:
+		t.Fatal("Wait should not complete before Release")
+	default:
+		// Expected
+	}
+
+	// Release the latch
+	latch.Release()
+
+	// Wait should complete now
+	select {
+	case <-waitCompleted:
+		// Expected
+	case <-time.After(100 * time.Millisecond):
+		t.Fatal("Wait should complete after Release")
+	}
+}
+
+func TestLatchMultipleWaiters(t *testing.T) {
+	latch := NewLatch()
+	const numWaiters = 10
+
+	var wg sync.WaitGroup
+	waitersCompleted := make(chan int, numWaiters)
+
+	// Start multiple waiters
+	for i := 0; i < numWaiters; i++ {
+		wg.Add(1)
+		go func(id int) {
+			defer wg.Done()
+			latch.Wait()
+			waitersCompleted <- id
+		}(i)
+	}
+
+	// Give goroutines time to start waiting
+	time.Sleep(10 * time.Millisecond)
+
+	// No waiters should complete yet
+	assert.Empty(t, waitersCompleted)
+
+	// Release the latch
+	latch.Release()
+
+	// All waiters should complete
+	wg.Wait()
+	close(waitersCompleted)
+
+	// Verify all waiters completed
+	completed := make([]int, 0, numWaiters)
+	for id := range waitersCompleted {
+		completed = append(completed, id)
+	}
+	assert.Len(t, completed, numWaiters)
+}
+
+func TestLatchMultipleReleases(t *testing.T) {
+	latch := NewLatch()
+
+	// Release multiple times should be safe
+	latch.Release()
+	latch.Release()
+	latch.Release()
+
+	// Should still be able to wait
+	select {
+	case <-latch.done:
+		// Expected - channel should be ready
+	default:
+		t.Fatal("Latch should be released")
+	}
+}