dataqueue.c 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. * File : dataqueue.c
  3. * This file is part of RT-Thread RTOS
  4. * COPYRIGHT (C) 2012, RT-Thread Development Team
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License along
  17. * with this program; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  19. *
  20. * Change Logs:
  21. * Date Author Notes
  22. * 2012-09-30 Bernard first version.
  23. * 2016-10-31 armink fix some resume push and pop thread bugs
  24. */
  25. #include <rtthread.h>
  26. #include <rtdevice.h>
  27. #include <rthw.h>
  28. struct rt_data_item
  29. {
  30. const void *data_ptr;
  31. rt_size_t data_size;
  32. };
  33. rt_err_t
  34. rt_data_queue_init(struct rt_data_queue *queue,
  35. rt_uint16_t size,
  36. rt_uint16_t lwm,
  37. void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
  38. {
  39. RT_ASSERT(queue != RT_NULL);
  40. queue->evt_notify = evt_notify;
  41. queue->size = size;
  42. queue->lwm = lwm;
  43. queue->get_index = 0;
  44. queue->put_index = 0;
  45. rt_list_init(&(queue->suspended_push_list));
  46. rt_list_init(&(queue->suspended_pop_list));
  47. queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
  48. if (queue->queue == RT_NULL)
  49. {
  50. return -RT_ENOMEM;
  51. }
  52. return RT_EOK;
  53. }
  54. RTM_EXPORT(rt_data_queue_init);
  55. rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
  56. const void *data_ptr,
  57. rt_size_t data_size,
  58. rt_int32_t timeout)
  59. {
  60. rt_ubase_t level;
  61. rt_thread_t thread;
  62. rt_err_t result;
  63. RT_ASSERT(queue != RT_NULL);
  64. result = RT_EOK;
  65. thread = rt_thread_self();
  66. level = rt_hw_interrupt_disable();
  67. while (queue->put_index - queue->get_index == queue->size)
  68. {
  69. /* queue is full */
  70. if (timeout == 0)
  71. {
  72. result = -RT_ETIMEOUT;
  73. goto __exit;
  74. }
  75. /* current context checking */
  76. RT_DEBUG_NOT_IN_INTERRUPT;
  77. /* reset thread error number */
  78. thread->error = RT_EOK;
  79. /* suspend thread on the push list */
  80. rt_thread_suspend(thread);
  81. rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
  82. /* start timer */
  83. if (timeout > 0)
  84. {
  85. /* reset the timeout of thread timer and start it */
  86. rt_timer_control(&(thread->thread_timer),
  87. RT_TIMER_CTRL_SET_TIME,
  88. &timeout);
  89. rt_timer_start(&(thread->thread_timer));
  90. }
  91. /* enable interrupt */
  92. rt_hw_interrupt_enable(level);
  93. /* do schedule */
  94. rt_schedule();
  95. /* thread is waked up */
  96. result = thread->error;
  97. level = rt_hw_interrupt_disable();
  98. if (result != RT_EOK) goto __exit;
  99. }
  100. queue->queue[queue->put_index % queue->size].data_ptr = data_ptr;
  101. queue->queue[queue->put_index % queue->size].data_size = data_size;
  102. queue->put_index += 1;
  103. /* there is at least one thread in suspended list */
  104. if (!rt_list_isempty(&(queue->suspended_pop_list)))
  105. {
  106. /* get thread entry */
  107. thread = rt_list_entry(queue->suspended_pop_list.next,
  108. struct rt_thread,
  109. tlist);
  110. /* resume it */
  111. rt_thread_resume(thread);
  112. rt_hw_interrupt_enable(level);
  113. /* perform a schedule */
  114. rt_schedule();
  115. return result;
  116. }
  117. __exit:
  118. rt_hw_interrupt_enable(level);
  119. if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
  120. {
  121. queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
  122. }
  123. return result;
  124. }
  125. RTM_EXPORT(rt_data_queue_push);
  126. rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
  127. const void** data_ptr,
  128. rt_size_t *size,
  129. rt_int32_t timeout)
  130. {
  131. rt_ubase_t level;
  132. rt_thread_t thread;
  133. rt_err_t result;
  134. RT_ASSERT(queue != RT_NULL);
  135. RT_ASSERT(data_ptr != RT_NULL);
  136. RT_ASSERT(size != RT_NULL);
  137. result = RT_EOK;
  138. thread = rt_thread_self();
  139. level = rt_hw_interrupt_disable();
  140. while (queue->get_index == queue->put_index)
  141. {
  142. /* queue is empty */
  143. if (timeout == 0)
  144. {
  145. result = -RT_ETIMEOUT;
  146. goto __exit;
  147. }
  148. /* current context checking */
  149. RT_DEBUG_NOT_IN_INTERRUPT;
  150. /* reset thread error number */
  151. thread->error = RT_EOK;
  152. /* suspend thread on the pop list */
  153. rt_thread_suspend(thread);
  154. rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
  155. /* start timer */
  156. if (timeout > 0)
  157. {
  158. /* reset the timeout of thread timer and start it */
  159. rt_timer_control(&(thread->thread_timer),
  160. RT_TIMER_CTRL_SET_TIME,
  161. &timeout);
  162. rt_timer_start(&(thread->thread_timer));
  163. }
  164. /* enable interrupt */
  165. rt_hw_interrupt_enable(level);
  166. /* do schedule */
  167. rt_schedule();
  168. /* thread is waked up */
  169. result = thread->error;
  170. level = rt_hw_interrupt_disable();
  171. if (result != RT_EOK)
  172. goto __exit;
  173. }
  174. *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
  175. *size = queue->queue[queue->get_index % queue->size].data_size;
  176. queue->get_index += 1;
  177. if ((queue->put_index - queue->get_index) <= queue->lwm)
  178. {
  179. /* there is at least one thread in suspended list */
  180. if (!rt_list_isempty(&(queue->suspended_push_list)))
  181. {
  182. /* get thread entry */
  183. thread = rt_list_entry(queue->suspended_push_list.next,
  184. struct rt_thread,
  185. tlist);
  186. /* resume it */
  187. rt_thread_resume(thread);
  188. rt_hw_interrupt_enable(level);
  189. /* perform a schedule */
  190. rt_schedule();
  191. }
  192. else
  193. {
  194. rt_hw_interrupt_enable(level);
  195. }
  196. if (queue->evt_notify != RT_NULL)
  197. queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);
  198. return result;
  199. }
  200. __exit:
  201. rt_hw_interrupt_enable(level);
  202. if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
  203. {
  204. queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
  205. }
  206. return result;
  207. }
  208. RTM_EXPORT(rt_data_queue_pop);
  209. rt_err_t rt_data_queue_peak(struct rt_data_queue *queue,
  210. const void** data_ptr,
  211. rt_size_t *size)
  212. {
  213. rt_ubase_t level;
  214. RT_ASSERT(queue != RT_NULL);
  215. level = rt_hw_interrupt_disable();
  216. if (queue->get_index == queue->put_index)
  217. {
  218. rt_hw_interrupt_enable(level);
  219. return -RT_EEMPTY;
  220. }
  221. *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
  222. *size = queue->queue[queue->get_index % queue->size].data_size;
  223. rt_hw_interrupt_enable(level);
  224. return RT_EOK;
  225. }
  226. RTM_EXPORT(rt_data_queue_peak);
  227. void rt_data_queue_reset(struct rt_data_queue *queue)
  228. {
  229. struct rt_thread *thread;
  230. register rt_ubase_t temp;
  231. rt_enter_critical();
  232. /* wakeup all suspend threads */
  233. /* resume on pop list */
  234. while (!rt_list_isempty(&(queue->suspended_pop_list)))
  235. {
  236. /* disable interrupt */
  237. temp = rt_hw_interrupt_disable();
  238. /* get next suspend thread */
  239. thread = rt_list_entry(queue->suspended_pop_list.next,
  240. struct rt_thread,
  241. tlist);
  242. /* set error code to RT_ERROR */
  243. thread->error = -RT_ERROR;
  244. /*
  245. * resume thread
  246. * In rt_thread_resume function, it will remove current thread from
  247. * suspend list
  248. */
  249. rt_thread_resume(thread);
  250. /* enable interrupt */
  251. rt_hw_interrupt_enable(temp);
  252. }
  253. /* resume on push list */
  254. while (!rt_list_isempty(&(queue->suspended_push_list)))
  255. {
  256. /* disable interrupt */
  257. temp = rt_hw_interrupt_disable();
  258. /* get next suspend thread */
  259. thread = rt_list_entry(queue->suspended_push_list.next,
  260. struct rt_thread,
  261. tlist);
  262. /* set error code to RT_ERROR */
  263. thread->error = -RT_ERROR;
  264. /*
  265. * resume thread
  266. * In rt_thread_resume function, it will remove current thread from
  267. * suspend list
  268. */
  269. rt_thread_resume(thread);
  270. /* enable interrupt */
  271. rt_hw_interrupt_enable(temp);
  272. }
  273. rt_exit_critical();
  274. rt_schedule();
  275. }
  276. RTM_EXPORT(rt_data_queue_reset);