completion_mp.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. /*
  2. * Copyright (c) 2006-2024, RT-Thread Development Team
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. *
  6. * Change Logs:
  7. * Date Author Notes
  8. * 2024-04-26 Shell lockless rt_completion for MP system
  9. */
  10. #define DBG_TAG "drivers.ipc"
  11. #define DBG_LVL DBG_INFO
  12. #include <rtdbg.h>
  13. #include <rtthread.h>
  14. #include <rthw.h>
  15. #include <rtdevice.h>
  16. #define RT_COMPLETED 1
  17. #define RT_UNCOMPLETED 0
  18. #define RT_WAKING (-1)
  19. #define RT_OCCUPIED (-2)
  20. #define RT_COMPLETION_NEW_STAT(thread, flag) (((flag) & 1) | (((rt_base_t)thread) & ~1))
  21. /**
  22. * The C11 atomic can be ~5% and even faster in testing on the arm64 platform
  23. * compared to rt_atomic. So the C11 way is always preferred.
  24. */
  25. #ifdef RT_USING_STDC_ATOMIC
  26. #include <stdatomic.h>
  27. #define IPC_STORE(dst, val, morder) atomic_store_explicit(dst, val, morder)
  28. #define IPC_LOAD(dst, morder) atomic_load_explicit(dst, morder)
  29. #define IPC_BARRIER(morder) atomic_thread_fence(morder)
  30. #define IPC_CAS(dst, exp, desired, succ, fail) \
  31. atomic_compare_exchange_strong_explicit(dst, exp, desired, succ, fail)
  32. #else /* !RT_USING_STDC_ATOMIC */
  33. #include <rtatomic.h>
  34. #define IPC_STORE(dst, val, morder) rt_atomic_store(dst, val)
  35. #define IPC_LOAD(dst, morder) rt_atomic_load(dst)
  36. #define IPC_BARRIER(morder)
  37. #define IPC_CAS(dst, exp, desired, succ, fail) \
  38. rt_atomic_compare_exchange_strong(dst, exp, desired)
  39. #endif /* RT_USING_STDC_ATOMIC */
  40. static rt_err_t _comp_susp_thread(struct rt_completion *completion,
  41. rt_thread_t thread, rt_int32_t timeout,
  42. int suspend_flag);
  43. /**
  44. * @brief This function will initialize a completion object.
  45. *
  46. * @param completion is a pointer to a completion object.
  47. */
  48. void rt_completion_init(struct rt_completion *completion)
  49. {
  50. RT_ASSERT(completion != RT_NULL);
  51. IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
  52. memory_order_relaxed);
  53. }
  54. RTM_EXPORT(rt_completion_init);
  55. /**
  56. * @brief This function will wait for a completion, if the completion is unavailable, the thread shall wait for
  57. * the completion up to a specified time.
  58. *
  59. * @param completion is a pointer to a completion object.
  60. *
  61. * @param timeout is a timeout period (unit: OS ticks). If the completion is unavailable, the thread will wait for
  62. * the completion done up to the amount of time specified by the argument.
  63. * NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the
  64. * completion is unavailable, the thread will be waitting forever.
  65. * @param suspend_flag suspend flags. See rt_thread_suspend_with_flag()
  66. *
  67. * @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful.
  68. * If the return value is any other values, it means that the completion wait failed.
  69. *
  70. * @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context.
  71. */
  72. rt_err_t rt_completion_wait_flags(struct rt_completion *completion,
  73. rt_int32_t timeout, int suspend_flag)
  74. {
  75. rt_err_t result = -RT_ERROR;
  76. rt_thread_t thread;
  77. rt_bool_t exchange_succ;
  78. rt_base_t expected_value;
  79. RT_ASSERT(completion != RT_NULL);
  80. /* current context checking */
  81. RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
  82. thread = rt_thread_self();
  83. do
  84. {
  85. /* try to consume one completion */
  86. expected_value = RT_COMPLETED;
  87. exchange_succ =
  88. IPC_CAS(&completion->susp_thread_n_flag, &expected_value,
  89. RT_UNCOMPLETED, memory_order_acquire, memory_order_relaxed);
  90. if (exchange_succ)
  91. {
  92. /* consume succeed, now return EOK */
  93. result = RT_EOK;
  94. break;
  95. }
  96. else if (expected_value == RT_WAKING)
  97. {
  98. /* previous wake is not done yet, yield thread & try again */
  99. rt_thread_yield();
  100. }
  101. else
  102. {
  103. /**
  104. * API rules say: only one thread can suspend on complete.
  105. * So we assert if debug.
  106. */
  107. RT_ASSERT(expected_value == RT_UNCOMPLETED);
  108. if (timeout != 0)
  109. {
  110. /**
  111. * try to occupy completion, noted that we are assuming that
  112. * `expected_value == RT_UNCOMPLETED`
  113. */
  114. exchange_succ = IPC_CAS(
  115. &completion->susp_thread_n_flag, &expected_value,
  116. RT_OCCUPIED, memory_order_relaxed, memory_order_relaxed);
  117. if (exchange_succ)
  118. {
  119. /* complete waiting business and return result */
  120. result = _comp_susp_thread(completion, thread, timeout,
  121. suspend_flag);
  122. RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) !=
  123. RT_OCCUPIED);
  124. break;
  125. }
  126. else
  127. {
  128. /* try again */
  129. }
  130. }
  131. else
  132. {
  133. result = -RT_ETIMEOUT;
  134. break;
  135. }
  136. }
  137. } while (1);
  138. return result;
  139. }
  140. static rt_base_t _wait_until_update(struct rt_completion *completion, rt_base_t expected)
  141. {
  142. rt_base_t current_value;
  143. /* spinning for update */
  144. do
  145. {
  146. rt_hw_isb();
  147. current_value =
  148. IPC_LOAD(&completion->susp_thread_n_flag, memory_order_relaxed);
  149. } while (current_value == expected);
  150. return current_value;
  151. }
  152. /**
  153. * Try to suspend thread and update completion
  154. */
  155. static rt_err_t _comp_susp_thread(struct rt_completion *completion,
  156. rt_thread_t thread, rt_int32_t timeout,
  157. int suspend_flag)
  158. {
  159. rt_err_t error = -RT_ERROR;
  160. rt_base_t clevel;
  161. rt_base_t comp_waiting;
  162. /* suspend thread */
  163. clevel = rt_enter_critical();
  164. /* reset thread error number */
  165. thread->error = RT_EOK;
  166. error = rt_thread_suspend_with_flag(thread, suspend_flag);
  167. if (error)
  168. {
  169. rt_exit_critical_safe(clevel);
  170. RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
  171. RT_OCCUPIED);
  172. IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
  173. memory_order_relaxed);
  174. }
  175. else
  176. {
  177. /* set to waiting */
  178. comp_waiting = RT_COMPLETION_NEW_STAT(thread, RT_UNCOMPLETED);
  179. RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
  180. RT_OCCUPIED);
  181. IPC_STORE(&completion->susp_thread_n_flag, comp_waiting,
  182. memory_order_relaxed);
  183. /* current context checking */
  184. RT_DEBUG_NOT_IN_INTERRUPT;
  185. /* start timer */
  186. if (timeout > 0)
  187. {
  188. /* reset the timeout of thread timer and start it */
  189. rt_timer_control(&(thread->thread_timer),
  190. RT_TIMER_CTRL_SET_TIME,
  191. &timeout);
  192. rt_timer_start(&(thread->thread_timer));
  193. }
  194. /* do schedule */
  195. rt_schedule();
  196. rt_exit_critical_safe(clevel);
  197. /* thread is woken up */
  198. error = thread->error;
  199. error = error > 0 ? -error : error;
  200. /* clean completed flag & remove susp_thread on the case of waking by timeout */
  201. if (!error)
  202. {
  203. /* completion done successfully */
  204. RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) !=
  205. comp_waiting);
  206. /* the necessary barrier is done during thread sched */
  207. }
  208. else
  209. {
  210. /* try to cancel waiting if woken up expectedly or timeout */
  211. if (!IPC_CAS(&completion->susp_thread_n_flag, &comp_waiting,
  212. RT_UNCOMPLETED, memory_order_relaxed,
  213. memory_order_relaxed))
  214. {
  215. /* cancel failed, producer had woken us in the past, fix error */
  216. if (comp_waiting == RT_WAKING)
  217. {
  218. _wait_until_update(completion, RT_WAKING);
  219. }
  220. IPC_BARRIER(memory_order_acquire);
  221. error = RT_EOK;
  222. }
  223. }
  224. }
  225. return error;
  226. }
  227. /**
  228. * @brief This function indicates a completion has done and wakeup the thread
  229. * and update its errno. No update is applied if it's a negative value.
  230. *
  231. * @param completion is a pointer to a completion object.
  232. * @param thread_errno is the errno set to waking thread.
  233. * @return RT_EOK if wakeup succeed.
  234. * RT_EEMPTY if wakeup failure and the completion is set to completed.
  235. * RT_EBUSY if the completion is still in completed state
  236. */
  237. rt_err_t rt_completion_wakeup_by_errno(struct rt_completion *completion,
  238. rt_err_t thread_errno)
  239. {
  240. rt_err_t error = -RT_ERROR;
  241. rt_thread_t suspend_thread;
  242. rt_bool_t exchange_succ;
  243. rt_base_t expected_value;
  244. RT_ASSERT(completion != RT_NULL);
  245. do
  246. {
  247. /* try to transform from uncompleted to completed */
  248. expected_value = RT_UNCOMPLETED;
  249. exchange_succ =
  250. IPC_CAS(&completion->susp_thread_n_flag, &expected_value,
  251. RT_COMPLETED, memory_order_release, memory_order_relaxed);
  252. if (exchange_succ)
  253. {
  254. error = -RT_EEMPTY;
  255. break;
  256. }
  257. else
  258. {
  259. if (expected_value == RT_COMPLETED)
  260. {
  261. /* completion still in completed state */
  262. error = -RT_EBUSY;
  263. break;
  264. }
  265. else if (expected_value == RT_OCCUPIED ||
  266. expected_value == RT_WAKING)
  267. {
  268. continue;
  269. }
  270. else
  271. {
  272. /* try to resume the thread and set uncompleted */
  273. exchange_succ = IPC_CAS(
  274. &completion->susp_thread_n_flag, &expected_value,
  275. RT_WAKING, memory_order_relaxed, memory_order_relaxed);
  276. if (exchange_succ)
  277. {
  278. #define GET_THREAD(val) ((rt_thread_t)((val) & ~1))
  279. suspend_thread = GET_THREAD(expected_value);
  280. if (thread_errno >= 0)
  281. {
  282. suspend_thread->error = thread_errno;
  283. }
  284. /* safe to assume publication done even on resume failure */
  285. RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
  286. RT_WAKING);
  287. IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
  288. memory_order_release);
  289. rt_thread_resume(suspend_thread);
  290. error = RT_EOK;
  291. break;
  292. }
  293. else
  294. {
  295. /* failed in racing to resume thread, try again */
  296. }
  297. }
  298. }
  299. } while (1);
  300. return error;
  301. }