cond_test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package asyncbuffer
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/suite"
  8. )
  9. type TestCondSuite struct {
  10. suite.Suite
  11. cond *Cond
  12. }
  13. func (s *TestCondSuite) SetupTest() {
  14. s.cond = NewCond()
  15. }
  16. func (s *TestCondSuite) TearDownTest() {
  17. if s.cond != nil {
  18. s.cond.Close()
  19. }
  20. }
  21. // TestBasicWaitAndTick tests the basic functionality of the Cond
  22. func (s *TestCondSuite) TestBasicWaitAndTick() {
  23. ch := s.cond.ch
  24. // Start a goroutine that will tick after a short delay
  25. go func() {
  26. time.Sleep(50 * time.Millisecond)
  27. s.cond.Tick()
  28. }()
  29. // Start a goroutine that will wait for the tick
  30. var done atomic.Bool
  31. go func() {
  32. s.cond.Wait()
  33. done.Store(true)
  34. }()
  35. s.Require().Eventually(done.Load, 200*time.Millisecond, 10*time.Millisecond)
  36. // Means that and old channel was closed and a new one has been created
  37. s.Require().NotEqual(ch, s.cond.ch)
  38. }
  39. // TestWaitMultipleWaiters tests that multiple waiters can be unblocked by a single tick
  40. func (s *TestCondSuite) TestWaitMultipleWaiters() {
  41. const numWaiters = 10
  42. var wg sync.WaitGroup
  43. var startWg sync.WaitGroup
  44. results := make([]bool, numWaiters)
  45. // Start multiple waiters
  46. for i := range numWaiters {
  47. wg.Add(1)
  48. startWg.Add(1)
  49. go func(index int) {
  50. defer wg.Done()
  51. startWg.Done() // Signal that this goroutine is ready
  52. s.cond.Wait()
  53. results[index] = true
  54. }(i)
  55. }
  56. // Wait for all goroutines to start waiting
  57. startWg.Wait()
  58. // Wait for all waiters to complete
  59. var done atomic.Bool
  60. go func() {
  61. s.cond.Tick() // Signal that execution can proceed
  62. wg.Wait()
  63. done.Store(true)
  64. }()
  65. s.Require().Eventually(done.Load, 200*time.Millisecond, 10*time.Millisecond)
  66. // Check that all waiters were unblocked
  67. for _, completed := range results {
  68. s.Require().True(completed)
  69. }
  70. }
  71. // TestClose tests the behavior of the Cond when closed
  72. func (s *TestCondSuite) TestClose() {
  73. s.cond.Close()
  74. s.cond.Close() // Should not panic
  75. s.cond.Wait() // Should eventually return
  76. s.cond.Tick() // Should not panic
  77. s.Require().Nil(s.cond.ch)
  78. }
  79. func (s *TestCondSuite) TestRapidTicksAndWaits() {
  80. const iterations = 1000
  81. var wg sync.WaitGroup
  82. // Start a goroutine that will rapidly tick
  83. wg.Add(1)
  84. go func() {
  85. defer wg.Done()
  86. for range iterations {
  87. s.cond.Tick()
  88. time.Sleep(time.Microsecond)
  89. }
  90. s.cond.Close() // Close after all ticks
  91. }()
  92. // Start multiple waiters
  93. for range 10 {
  94. wg.Add(1)
  95. go func() {
  96. defer wg.Done()
  97. for range iterations / 10 {
  98. s.cond.Wait()
  99. }
  100. }()
  101. }
  102. var done atomic.Bool
  103. go func() {
  104. wg.Wait()
  105. done.Store(true)
  106. }()
  107. s.Require().Eventually(done.Load, 200*time.Millisecond, 10*time.Millisecond)
  108. }
  109. func TestCond(t *testing.T) {
  110. suite.Run(t, new(TestCondSuite))
  111. }