condvar.c 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /*
  2. * Copyright (c) 2006-2023, RT-Thread Development Team
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. *
  6. * Change Logs:
  7. * Date Author Notes
  8. * 2023-11-20 Shell Support of condition variable
  9. */
  10. #define DBG_TAG "ipc.condvar"
  11. #define DBG_LVL DBG_INFO
  12. #include <rtdbg.h>
  13. #include <rtdevice.h>
  14. #include <rtatomic.h>
  15. #include <rtthread.h>
  16. static struct rt_spinlock _local_cv_queue_lock = RT_SPINLOCK_INIT;
  17. #define CV_ASSERT_LOCKED(cv) \
  18. RT_ASSERT(!(cv)->waiting_mtx || \
  19. rt_mutex_get_owner((rt_mutex_t)(cv)->waiting_mtx) == \
  20. rt_thread_self())
  21. void rt_condvar_init(rt_condvar_t cv, char *name)
  22. {
  23. #ifdef USING_RT_OBJECT
  24. /* TODO: support rt object */
  25. rt_object_init();
  26. #endif
  27. rt_wqueue_init(&cv->event);
  28. rt_atomic_store(&cv->waiters_cnt, 0);
  29. rt_atomic_store(&cv->waiting_mtx, 0);
  30. }
  31. static int _waitq_inqueue(rt_wqueue_t *queue, struct rt_wqueue_node *node,
  32. rt_tick_t timeout, int suspend_flag)
  33. {
  34. rt_thread_t tcb = node->polling_thread;
  35. rt_timer_t timer = &(tcb->thread_timer);
  36. rt_err_t ret;
  37. if (queue->flag != RT_WQ_FLAG_WAKEUP)
  38. {
  39. ret = rt_thread_suspend_with_flag(tcb, suspend_flag);
  40. if (ret == RT_EOK)
  41. {
  42. rt_wqueue_add(queue, node);
  43. if (timeout != RT_WAITING_FOREVER)
  44. {
  45. rt_timer_control(timer, RT_TIMER_CTRL_SET_TIME, &timeout);
  46. rt_timer_start(timer);
  47. }
  48. }
  49. }
  50. else
  51. {
  52. ret = RT_EOK;
  53. }
  54. return ret;
  55. }
  56. #define INIT_WAITQ_NODE(node) \
  57. { \
  58. .polling_thread = rt_thread_self(), .key = 0, \
  59. .wakeup = __wqueue_default_wake, .wqueue = &cv->event, \
  60. .list = RT_LIST_OBJECT_INIT(node.list) \
  61. }
  62. int rt_condvar_timedwait(rt_condvar_t cv, rt_mutex_t mtx, int suspend_flag,
  63. rt_tick_t timeout)
  64. {
  65. rt_err_t acq_mtx_succ, rc;
  66. rt_atomic_t waiting_mtx;
  67. struct rt_wqueue_node node = INIT_WAITQ_NODE(node);
  68. /* not allowed in IRQ & critical section */
  69. RT_DEBUG_SCHEDULER_AVAILABLE(1);
  70. CV_ASSERT_LOCKED(cv);
  71. /**
  72. * for the worst case, this is racy with the following works to reset field
  73. * before mutex is taken. The spinlock then comes to rescue.
  74. */
  75. rt_spin_lock(&_local_cv_queue_lock);
  76. waiting_mtx = rt_atomic_load(&cv->waiting_mtx);
  77. if (!waiting_mtx)
  78. acq_mtx_succ = rt_atomic_compare_exchange_strong(
  79. &cv->waiting_mtx, &waiting_mtx, (size_t)mtx);
  80. else
  81. acq_mtx_succ = 0;
  82. rt_spin_unlock(&_local_cv_queue_lock);
  83. if (acq_mtx_succ == 1 || waiting_mtx == (size_t)mtx)
  84. {
  85. rt_atomic_add(&cv->waiters_cnt, 1);
  86. rt_enter_critical();
  87. if (suspend_flag == RT_INTERRUPTIBLE)
  88. rc = _waitq_inqueue(&cv->event, &node, timeout, RT_INTERRUPTIBLE);
  89. else /* UNINTERRUPTIBLE is forbidden, since it's not safe for user space */
  90. rc = _waitq_inqueue(&cv->event, &node, timeout, RT_KILLABLE);
  91. acq_mtx_succ = rt_mutex_release(mtx);
  92. RT_ASSERT(acq_mtx_succ == 0);
  93. rt_exit_critical();
  94. if (rc == RT_EOK)
  95. {
  96. rt_schedule();
  97. rc = rt_get_errno();
  98. rc = rc > 0 ? -rc : rc;
  99. }
  100. else
  101. {
  102. LOG_D("%s() failed to suspend", __func__);
  103. }
  104. rt_wqueue_remove(&node);
  105. rt_spin_lock(&_local_cv_queue_lock);
  106. if (rt_atomic_add(&cv->waiters_cnt, -1) == 1)
  107. {
  108. waiting_mtx = (size_t)mtx;
  109. acq_mtx_succ = rt_atomic_compare_exchange_strong(&cv->waiting_mtx,
  110. &waiting_mtx, 0);
  111. RT_ASSERT(acq_mtx_succ == 1);
  112. }
  113. rt_spin_unlock(&_local_cv_queue_lock);
  114. acq_mtx_succ = rt_mutex_take(mtx, RT_WAITING_FOREVER);
  115. RT_ASSERT(acq_mtx_succ == 0);
  116. }
  117. else
  118. {
  119. LOG_D("%s: conflict waiting mutex", __func__);
  120. rc = -EBUSY;
  121. }
  122. return rc;
  123. }
  124. /** Keep in mind that we always operating when cv.waiting_mtx is taken */
  125. int rt_condvar_signal(rt_condvar_t cv)
  126. {
  127. CV_ASSERT_LOCKED(cv);
  128. /* to avoid spurious wakeups */
  129. if (rt_atomic_load(&cv->waiters_cnt) > 0)
  130. rt_wqueue_wakeup(&cv->event, 0);
  131. cv->event.flag = 0;
  132. return 0;
  133. }
  134. int rt_condvar_broadcast(rt_condvar_t cv)
  135. {
  136. CV_ASSERT_LOCKED(cv);
  137. /* to avoid spurious wakeups */
  138. if (rt_atomic_load(&cv->waiters_cnt) > 0)
  139. rt_wqueue_wakeup_all(&cv->event, 0);
  140. cv->event.flag = 0;
  141. return 0;
  142. }