123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- package asyncbuffer
- import (
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/stretchr/testify/suite"
- )
- type TestCondSuite struct {
- suite.Suite
- cond *Cond
- }
- func (s *TestCondSuite) SetupTest() {
- s.cond = NewCond()
- }
- func (s *TestCondSuite) TearDownTest() {
- if s.cond != nil {
- s.cond.Close()
- }
- }
- // TestBasicWaitAndTick tests the basic functionality of the Cond
- func (s *TestCondSuite) TestBasicWaitAndTick() {
- ch := s.cond.ch
- // Start a goroutine that will tick after a short delay
- go func() {
- time.Sleep(50 * time.Millisecond)
- s.cond.Tick()
- }()
- // Start a goroutine that will wait for the tick
- var done atomic.Bool
- go func() {
- s.cond.Wait()
- done.Store(true)
- }()
- s.Require().Eventually(done.Load, 200*time.Millisecond, 10*time.Millisecond)
- // Means that and old channel was closed and a new one has been created
- s.Require().NotEqual(ch, s.cond.ch)
- }
- // TestWaitMultipleWaiters tests that multiple waiters can be unblocked by a single tick
- func (s *TestCondSuite) TestWaitMultipleWaiters() {
- const numWaiters = 10
- var wg sync.WaitGroup
- var startWg sync.WaitGroup
- results := make([]bool, numWaiters)
- // Start multiple waiters
- for i := range numWaiters {
- wg.Add(1)
- startWg.Add(1)
- go func(index int) {
- defer wg.Done()
- startWg.Done() // Signal that this goroutine is ready
- s.cond.Wait()
- results[index] = true
- }(i)
- }
- // Wait for all goroutines to start waiting
- startWg.Wait()
- // Wait for all waiters to complete
- var done atomic.Bool
- go func() {
- s.cond.Tick() // Signal that execution can proceed
- wg.Wait()
- done.Store(true)
- }()
- s.Require().Eventually(done.Load, 200*time.Millisecond, 10*time.Millisecond)
- // Check that all waiters were unblocked
- for _, completed := range results {
- s.Require().True(completed)
- }
- }
- // TestClose tests the behavior of the Cond when closed
- func (s *TestCondSuite) TestClose() {
- s.cond.Close()
- s.cond.Close() // Should not panic
- s.cond.Wait() // Should eventually return
- s.cond.Tick() // Should not panic
- s.Require().Nil(s.cond.ch)
- }
- func (s *TestCondSuite) TestRapidTicksAndWaits() {
- const iterations = 1000
- var wg sync.WaitGroup
- // Start a goroutine that will rapidly tick
- wg.Add(1)
- go func() {
- defer wg.Done()
- for range iterations {
- s.cond.Tick()
- time.Sleep(time.Microsecond)
- }
- s.cond.Close() // Close after all ticks
- }()
- // Start multiple waiters
- for range 10 {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for range iterations / 10 {
- s.cond.Wait()
- }
- }()
- }
- var done atomic.Bool
- go func() {
- wg.Wait()
- done.Store(true)
- }()
- s.Require().Eventually(done.Load, 200*time.Millisecond, 10*time.Millisecond)
- }
- func TestCond(t *testing.T) {
- suite.Run(t, new(TestCondSuite))
- }
|