prio_queue.c 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. * Priority Queue
  3. *
  4. * COPYRIGHT (C) 2013-2015, Shanghai Real-Thread Technology Co., Ltd
  5. * http://www.rt-thread.com
  6. *
  7. * This file is part of RT-Thread (http://www.rt-thread.org)
  8. *
  9. * All rights reserved.
  10. *
  11. * This program is free software; you can redistribute it and/or modify
  12. * it under the terms of the GNU General Public License as published by
  13. * the Free Software Foundation; either version 2 of the License, or
  14. * (at your option) any later version.
  15. *
  16. * This program is distributed in the hope that it will be useful,
  17. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  18. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  19. * GNU General Public License for more details.
  20. *
  21. * You should have received a copy of the GNU General Public License along
  22. * with this program; if not, write to the Free Software Foundation, Inc.,
  23. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  24. *
  25. * Change Logs:
  26. * Date Author Notes
  27. * 2013-11-04 Grissiom add comment
  28. */
  29. #include <rthw.h>
  30. #include <rtthread.h>
  31. #include "prio_queue.h"
  32. struct rt_prio_queue_item {
  33. struct rt_prio_queue_item *next;
  34. /* data follows */
  35. };
  36. static void _do_push(struct rt_prio_queue *que,
  37. rt_uint8_t prio,
  38. struct rt_prio_queue_item *item)
  39. {
  40. if (que->head[prio] == RT_NULL)
  41. {
  42. que->head[prio] = item;
  43. que->bitmap |= 1 << prio;
  44. }
  45. else
  46. {
  47. RT_ASSERT(que->tail[prio]);
  48. que->tail[prio]->next = item;
  49. }
  50. que->tail[prio] = item;
  51. }
  52. static struct rt_prio_queue_item* _do_pop(struct rt_prio_queue *que)
  53. {
  54. int ffs;
  55. struct rt_prio_queue_item *item;
  56. ffs = __rt_ffs(que->bitmap);
  57. if (ffs == 0)
  58. return RT_NULL;
  59. ffs--;
  60. item = que->head[ffs];
  61. RT_ASSERT(item);
  62. que->head[ffs] = item->next;
  63. if (que->head[ffs] == RT_NULL)
  64. {
  65. que->bitmap &= ~(1 << ffs);
  66. }
  67. return item;
  68. }
  69. rt_err_t rt_prio_queue_init(struct rt_prio_queue *que,
  70. const char *name,
  71. void *buf,
  72. rt_size_t bufsz,
  73. rt_size_t itemsz)
  74. {
  75. RT_ASSERT(que);
  76. rt_memset(que, 0, sizeof(*que));
  77. rt_list_init(&(que->suspended_pop_list));
  78. rt_mp_init(&que->pool, name, buf, bufsz,
  79. sizeof(struct rt_prio_queue_item) + itemsz);
  80. que->item_sz = itemsz;
  81. return RT_EOK;
  82. }
  83. void rt_prio_queue_detach(struct rt_prio_queue *que)
  84. {
  85. /* wake up all suspended pop threads, push thread is suspended on mempool.
  86. */
  87. while (!rt_list_isempty(&(que->suspended_pop_list)))
  88. {
  89. rt_thread_t thread;
  90. /* disable interrupt */
  91. rt_ubase_t temp = rt_hw_interrupt_disable();
  92. /* get next suspend thread */
  93. thread = rt_list_entry(que->suspended_pop_list.next, struct rt_thread, tlist);
  94. /* set error code to RT_ERROR */
  95. thread->error = -RT_ERROR;
  96. rt_thread_resume(thread);
  97. /* enable interrupt */
  98. rt_hw_interrupt_enable(temp);
  99. }
  100. rt_mp_detach(&que->pool);
  101. }
  102. #ifdef RT_USING_HEAP
  103. struct rt_prio_queue* rt_prio_queue_create(const char *name,
  104. rt_size_t item_nr,
  105. rt_size_t item_sz)
  106. {
  107. struct rt_prio_queue *que;
  108. rt_size_t bufsz;
  109. bufsz = item_nr * (sizeof(struct rt_prio_queue_item)
  110. + item_sz
  111. + sizeof(void*));
  112. RT_ASSERT(item_nr);
  113. que = rt_malloc(sizeof(*que) + bufsz);
  114. if (!que)
  115. return RT_NULL;
  116. rt_prio_queue_init(que, name, que+1, bufsz, item_sz);
  117. return que;
  118. }
  119. void rt_prio_queue_delete(struct rt_prio_queue *que)
  120. {
  121. rt_prio_queue_detach(que);
  122. rt_free(que);
  123. }
  124. #endif
  125. rt_err_t rt_prio_queue_push(struct rt_prio_queue *que,
  126. rt_uint8_t prio,
  127. void *data,
  128. rt_int32_t timeout)
  129. {
  130. rt_ubase_t level;
  131. struct rt_prio_queue_item *item;
  132. RT_ASSERT(que);
  133. if (prio >= RT_PRIO_QUEUE_PRIO_MAX)
  134. return -RT_ERROR;
  135. item = rt_mp_alloc(&que->pool, timeout);
  136. if (item == RT_NULL)
  137. return -RT_ENOMEM;
  138. rt_memcpy(item+1, data, que->item_sz);
  139. item->next = RT_NULL;
  140. level = rt_hw_interrupt_disable();
  141. _do_push(que, prio, item);
  142. if (!rt_list_isempty(&(que->suspended_pop_list)))
  143. {
  144. rt_thread_t thread;
  145. /* get thread entry */
  146. thread = rt_list_entry(que->suspended_pop_list.next,
  147. struct rt_thread,
  148. tlist);
  149. /* resume it */
  150. rt_thread_resume(thread);
  151. rt_hw_interrupt_enable(level);
  152. /* perform a schedule */
  153. rt_schedule();
  154. return RT_EOK;
  155. }
  156. rt_hw_interrupt_enable(level);
  157. return RT_EOK;
  158. }
  159. rt_err_t rt_prio_queue_pop(struct rt_prio_queue *que,
  160. void *data,
  161. rt_int32_t timeout)
  162. {
  163. rt_ubase_t level;
  164. struct rt_prio_queue_item *item;
  165. RT_ASSERT(que);
  166. RT_ASSERT(data);
  167. level = rt_hw_interrupt_disable();
  168. for (item = _do_pop(que);
  169. item == RT_NULL;
  170. item = _do_pop(que))
  171. {
  172. rt_thread_t thread;
  173. if (timeout == 0)
  174. {
  175. rt_hw_interrupt_enable(level);
  176. return -RT_ETIMEOUT;
  177. }
  178. RT_DEBUG_NOT_IN_INTERRUPT;
  179. thread = rt_thread_self();
  180. thread->error = RT_EOK;
  181. rt_thread_suspend(thread);
  182. rt_list_insert_before(&(que->suspended_pop_list), &(thread->tlist));
  183. if (timeout > 0)
  184. {
  185. rt_timer_control(&(thread->thread_timer),
  186. RT_TIMER_CTRL_SET_TIME,
  187. &timeout);
  188. rt_timer_start(&(thread->thread_timer));
  189. }
  190. rt_hw_interrupt_enable(level);
  191. rt_schedule();
  192. /* thread is waked up */
  193. if (thread->error != RT_EOK)
  194. return thread->error;
  195. level = rt_hw_interrupt_disable();
  196. }
  197. rt_hw_interrupt_enable(level);
  198. rt_memcpy(data, item+1, que->item_sz);
  199. rt_mp_free(item);
  200. return RT_EOK;
  201. }
  202. void rt_prio_queue_dump(struct rt_prio_queue *que)
  203. {
  204. int level = 0;
  205. rt_kprintf("bitmap: %08x\n", que->bitmap);
  206. for (level = 0; level < RT_PRIO_QUEUE_PRIO_MAX; level++)
  207. {
  208. struct rt_prio_queue_item *item;
  209. rt_kprintf("%2d: ", level);
  210. for (item = que->head[level];
  211. item;
  212. item = item->next)
  213. {
  214. rt_kprintf("%p, ", item);
  215. }
  216. rt_kprintf("\n");
  217. }
  218. }