123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- /*
- * Copyright (c) 2006-2023, RT-Thread Development Team
- *
- * SPDX-License-Identifier: Apache-2.0
- *
- * Change Logs:
- * Date Author Notes
- * 2023-11-20 Shell Support of condition variable
- */
- #define DBG_TAG "ipc.condvar"
- #define DBG_LVL DBG_INFO
- #include <rtdbg.h>
- #include <rtdevice.h>
- #include <rtatomic.h>
- #include <rtthread.h>
- static struct rt_spinlock _local_cv_queue_lock = RT_SPINLOCK_INIT;
- #define CV_ASSERT_LOCKED(cv) \
- RT_ASSERT(!(cv)->waiting_mtx || \
- rt_mutex_get_owner((rt_mutex_t)(cv)->waiting_mtx) == \
- rt_thread_self())
- void rt_condvar_init(rt_condvar_t cv, char *name)
- {
- #ifdef USING_RT_OBJECT
- /* TODO: support rt object */
- rt_object_init();
- #endif
- rt_wqueue_init(&cv->event);
- rt_atomic_store(&cv->waiters_cnt, 0);
- rt_atomic_store(&cv->waiting_mtx, 0);
- }
- static int _waitq_inqueue(rt_wqueue_t *queue, struct rt_wqueue_node *node,
- rt_tick_t timeout, int suspend_flag)
- {
- rt_thread_t tcb = node->polling_thread;
- rt_timer_t timer = &(tcb->thread_timer);
- rt_err_t ret;
- if (queue->flag != RT_WQ_FLAG_WAKEUP)
- {
- ret = rt_thread_suspend_with_flag(tcb, suspend_flag);
- if (ret == RT_EOK)
- {
- rt_wqueue_add(queue, node);
- if (timeout != RT_WAITING_FOREVER)
- {
- rt_timer_control(timer, RT_TIMER_CTRL_SET_TIME, &timeout);
- rt_timer_start(timer);
- }
- }
- }
- else
- {
- ret = RT_EOK;
- }
- return ret;
- }
- #define INIT_WAITQ_NODE(node) \
- { \
- .polling_thread = rt_thread_self(), .key = 0, \
- .wakeup = __wqueue_default_wake, .wqueue = &cv->event, \
- .list = RT_LIST_OBJECT_INIT(node.list) \
- }
- int rt_condvar_timedwait(rt_condvar_t cv, rt_mutex_t mtx, int suspend_flag,
- rt_tick_t timeout)
- {
- rt_err_t acq_mtx_succ, rc;
- rt_atomic_t waiting_mtx;
- struct rt_wqueue_node node = INIT_WAITQ_NODE(node);
- /* not allowed in IRQ & critical section */
- RT_DEBUG_SCHEDULER_AVAILABLE(1);
- CV_ASSERT_LOCKED(cv);
- /**
- * for the worst case, this is racy with the following works to reset field
- * before mutex is taken. The spinlock then comes to rescue.
- */
- rt_spin_lock(&_local_cv_queue_lock);
- waiting_mtx = rt_atomic_load(&cv->waiting_mtx);
- if (!waiting_mtx)
- acq_mtx_succ = rt_atomic_compare_exchange_strong(
- &cv->waiting_mtx, &waiting_mtx, (size_t)mtx);
- else
- acq_mtx_succ = 0;
- rt_spin_unlock(&_local_cv_queue_lock);
- if (acq_mtx_succ == 1 || waiting_mtx == (size_t)mtx)
- {
- rt_atomic_add(&cv->waiters_cnt, 1);
- rt_enter_critical();
- if (suspend_flag == RT_INTERRUPTIBLE)
- rc = _waitq_inqueue(&cv->event, &node, timeout, RT_INTERRUPTIBLE);
- else /* UNINTERRUPTIBLE is forbidden, since it's not safe for user space */
- rc = _waitq_inqueue(&cv->event, &node, timeout, RT_KILLABLE);
- acq_mtx_succ = rt_mutex_release(mtx);
- RT_ASSERT(acq_mtx_succ == 0);
- rt_exit_critical();
- if (rc == RT_EOK)
- {
- rt_schedule();
- rc = rt_get_errno();
- rc = rc > 0 ? -rc : rc;
- }
- else
- {
- LOG_D("%s() failed to suspend", __func__);
- }
- rt_wqueue_remove(&node);
- rt_spin_lock(&_local_cv_queue_lock);
- if (rt_atomic_add(&cv->waiters_cnt, -1) == 1)
- {
- waiting_mtx = (size_t)mtx;
- acq_mtx_succ = rt_atomic_compare_exchange_strong(&cv->waiting_mtx,
- &waiting_mtx, 0);
- RT_ASSERT(acq_mtx_succ == 1);
- }
- rt_spin_unlock(&_local_cv_queue_lock);
- acq_mtx_succ = rt_mutex_take(mtx, RT_WAITING_FOREVER);
- RT_ASSERT(acq_mtx_succ == 0);
- }
- else
- {
- LOG_D("%s: conflict waiting mutex", __func__);
- rc = -EBUSY;
- }
- return rc;
- }
- /** Keep in mind that we always operating when cv.waiting_mtx is taken */
- int rt_condvar_signal(rt_condvar_t cv)
- {
- CV_ASSERT_LOCKED(cv);
- /* to avoid spurious wakeups */
- if (rt_atomic_load(&cv->waiters_cnt) > 0)
- rt_wqueue_wakeup(&cv->event, 0);
- cv->event.flag = 0;
- return 0;
- }
- int rt_condvar_broadcast(rt_condvar_t cv)
- {
- CV_ASSERT_LOCKED(cv);
- /* to avoid spurious wakeups */
- if (rt_atomic_load(&cv->waiters_cnt) > 0)
- rt_wqueue_wakeup_all(&cv->event, 0);
- cv->event.flag = 0;
- return 0;
- }
|