cond_test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package asyncbuffer
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/suite"
  7. )
  8. type TestCondSuite struct {
  9. suite.Suite
  10. cond *Cond
  11. }
  12. func (s *TestCondSuite) SetupTest() {
  13. s.cond = NewCond()
  14. }
  15. func (s *TestCondSuite) TeardownTest() {
  16. if s.cond != nil {
  17. s.cond.Close()
  18. }
  19. }
  20. // TestBasicWaitAndTick tests the basic functionality of the Cond
  21. func (s *TestCondSuite) TestBasicWaitAndTick() {
  22. done := make(chan struct{})
  23. ch := s.cond.ch
  24. // Start a goroutine that will tick after a short delay
  25. go func() {
  26. time.Sleep(50 * time.Millisecond)
  27. s.cond.Tick()
  28. }()
  29. // Start a goroutine that will wait for the tick
  30. go func() {
  31. s.cond.Wait()
  32. close(done)
  33. }()
  34. s.Require().Eventually(func() bool {
  35. select {
  36. case <-done:
  37. return true
  38. default:
  39. return false
  40. }
  41. }, 100*time.Millisecond, 10*time.Millisecond)
  42. // Means that and old channel was closed and a new one has been created
  43. s.Require().NotEqual(ch, s.cond.ch)
  44. }
  45. // TestWaitMultipleWaiters tests that multiple waiters can be unblocked by a single tick
  46. func (s *TestCondSuite) TestWaitMultipleWaiters() {
  47. const numWaiters = 10
  48. var wg sync.WaitGroup
  49. var startWg sync.WaitGroup
  50. results := make([]bool, numWaiters)
  51. // Start multiple waiters
  52. for i := range numWaiters {
  53. wg.Add(1)
  54. startWg.Add(1)
  55. go func(index int) {
  56. defer wg.Done()
  57. startWg.Done() // Signal that this goroutine is ready
  58. s.cond.Wait()
  59. results[index] = true
  60. }(i)
  61. }
  62. // Wait for all goroutines to start waiting
  63. startWg.Wait()
  64. // Wait for all waiters to complete
  65. done := make(chan struct{})
  66. go func() {
  67. s.cond.Tick() // Signal that execution can proceed
  68. wg.Wait()
  69. close(done)
  70. }()
  71. s.Require().Eventually(func() bool {
  72. select {
  73. case <-done:
  74. return true
  75. default:
  76. return false
  77. }
  78. }, 100*time.Millisecond, 10*time.Millisecond)
  79. // Check that all waiters were unblocked
  80. for _, completed := range results {
  81. s.Require().True(completed)
  82. }
  83. }
  84. // TestClose tests the behavior of the Cond when closed
  85. func (s *TestCondSuite) TestClose() {
  86. s.cond.Close()
  87. s.cond.Close() // Should not panic
  88. s.cond.Wait() // Should eventually return
  89. s.cond.Tick() // Should not panic
  90. s.Require().Nil(s.cond.ch)
  91. }
  92. func (s *TestCondSuite) TestRapidTicksAndWaits() {
  93. const iterations = 1000
  94. var wg sync.WaitGroup
  95. // Start a goroutine that will rapidly tick
  96. wg.Add(1)
  97. go func() {
  98. defer wg.Done()
  99. for range iterations {
  100. s.cond.Tick()
  101. time.Sleep(time.Microsecond)
  102. }
  103. s.cond.Close() // Close after all ticks
  104. }()
  105. // Start multiple waiters
  106. for range 10 {
  107. wg.Add(1)
  108. go func() {
  109. defer wg.Done()
  110. for range iterations / 10 {
  111. s.cond.Wait()
  112. }
  113. }()
  114. }
  115. done := make(chan struct{})
  116. go func() {
  117. wg.Wait()
  118. close(done)
  119. }()
  120. s.Require().Eventually(func() bool {
  121. select {
  122. case <-done:
  123. return true
  124. default:
  125. return false
  126. }
  127. }, 100*time.Millisecond, 10*time.Millisecond)
  128. }
  129. func TestCond(t *testing.T) {
  130. suite.Run(t, new(TestCondSuite))
  131. }