workqueue.c 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /*
  2. * File : workqueue.c
  3. * This file is part of RT-Thread RTOS
  4. * COPYRIGHT (C) 2006 - 2017, RT-Thread Development Team
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License along
  17. * with this program; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  19. *
  20. * Change Logs:
  21. * Date Author Notes
  22. * 2017-02-27 bernard fix the re-work issue.
  23. */
  24. #include <rthw.h>
  25. #include <rtthread.h>
  26. #include <rtdevice.h>
  27. #ifdef RT_USING_HEAP
  28. rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
  29. {
  30. rt_err_t result;
  31. rt_enter_critical();
  32. while (1)
  33. {
  34. /* try to take condition semaphore */
  35. result = rt_sem_trytake(&(queue->sem));
  36. if (result == -RT_ETIMEOUT)
  37. {
  38. /* it's timeout, release this semaphore */
  39. rt_sem_release(&(queue->sem));
  40. }
  41. else if (result == RT_EOK)
  42. {
  43. /* keep the sem value = 0 */
  44. result = RT_EOK;
  45. break;
  46. }
  47. else
  48. {
  49. result = -RT_ERROR;
  50. break;
  51. }
  52. }
  53. rt_exit_critical();
  54. return result;
  55. }
  56. static void _workqueue_thread_entry(void* parameter)
  57. {
  58. rt_base_t level;
  59. struct rt_work* work;
  60. struct rt_workqueue* queue;
  61. queue = (struct rt_workqueue*) parameter;
  62. RT_ASSERT(queue != RT_NULL);
  63. while (1)
  64. {
  65. if (rt_list_isempty(&(queue->work_list)))
  66. {
  67. /* no software timer exist, suspend self. */
  68. rt_thread_suspend(rt_thread_self());
  69. rt_schedule();
  70. }
  71. /* we have work to do with. */
  72. level = rt_hw_interrupt_disable();
  73. work = rt_list_entry(queue->work_list.next, struct rt_work, list);
  74. rt_list_remove(&(work->list));
  75. queue->work_current = work;
  76. rt_hw_interrupt_enable(level);
  77. /* do work */
  78. work->work_func(work, work->work_data);
  79. level = rt_hw_interrupt_disable();
  80. /* clean current work */
  81. queue->work_current = RT_NULL;
  82. rt_hw_interrupt_enable(level);
  83. /* ack work completion */
  84. _workqueue_work_completion(queue);
  85. }
  86. }
  87. struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_size, rt_uint8_t priority)
  88. {
  89. struct rt_workqueue *queue = RT_NULL;
  90. queue = (struct rt_workqueue*)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
  91. if (queue != RT_NULL)
  92. {
  93. /* initialize work list */
  94. rt_list_init(&(queue->work_list));
  95. queue->work_current = RT_NULL;
  96. rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
  97. /* create the work thread */
  98. queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
  99. if (queue->work_thread == RT_NULL)
  100. {
  101. RT_KERNEL_FREE(queue);
  102. return RT_NULL;
  103. }
  104. rt_thread_startup(queue->work_thread);
  105. }
  106. return queue;
  107. }
  108. rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue)
  109. {
  110. RT_ASSERT(queue != RT_NULL);
  111. rt_thread_delete(queue->work_thread);
  112. RT_KERNEL_FREE(queue);
  113. return RT_EOK;
  114. }
  115. rt_err_t rt_workqueue_dowork(struct rt_workqueue* queue, struct rt_work* work)
  116. {
  117. rt_base_t level;
  118. RT_ASSERT(queue != RT_NULL);
  119. RT_ASSERT(work != RT_NULL);
  120. level = rt_hw_interrupt_disable();
  121. if (queue->work_current == work)
  122. {
  123. rt_hw_interrupt_enable(level);
  124. return -RT_EBUSY;
  125. }
  126. /* NOTE: the work MUST be initialized firstly */
  127. rt_list_remove(&(work->list));
  128. rt_list_insert_after(queue->work_list.prev, &(work->list));
  129. /* whether the workqueue is doing work */
  130. if (queue->work_current == RT_NULL)
  131. {
  132. rt_hw_interrupt_enable(level);
  133. /* resume work thread */
  134. rt_thread_resume(queue->work_thread);
  135. rt_schedule();
  136. }
  137. else rt_hw_interrupt_enable(level);
  138. return RT_EOK;
  139. }
  140. rt_err_t rt_workqueue_critical_work(struct rt_workqueue* queue, struct rt_work* work)
  141. {
  142. rt_base_t level;
  143. RT_ASSERT(queue != RT_NULL);
  144. RT_ASSERT(work != RT_NULL);
  145. level = rt_hw_interrupt_disable();
  146. if (queue->work_current == work)
  147. {
  148. rt_hw_interrupt_enable(level);
  149. return -RT_EBUSY;
  150. }
  151. /* NOTE: the work MUST be initialized firstly */
  152. rt_list_remove(&(work->list));
  153. rt_list_insert_after(queue->work_list.prev, &(work->list));
  154. if (queue->work_current == RT_NULL)
  155. {
  156. rt_hw_interrupt_enable(level);
  157. /* resume work thread */
  158. rt_thread_resume(queue->work_thread);
  159. rt_schedule();
  160. }
  161. else rt_hw_interrupt_enable(level);
  162. return RT_EOK;
  163. }
  164. rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* work)
  165. {
  166. rt_base_t level;
  167. RT_ASSERT(queue != RT_NULL);
  168. RT_ASSERT(work != RT_NULL);
  169. level = rt_hw_interrupt_disable();
  170. if (queue->work_current == work)
  171. {
  172. rt_hw_interrupt_enable(level);
  173. return -RT_EBUSY;
  174. }
  175. rt_list_remove(&(work->list));
  176. rt_hw_interrupt_enable(level);
  177. return RT_EOK;
  178. }
  179. rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work)
  180. {
  181. rt_base_t level;
  182. RT_ASSERT(queue != RT_NULL);
  183. RT_ASSERT(work != RT_NULL);
  184. level = rt_hw_interrupt_disable();
  185. if (queue->work_current == work) /* it's current work in the queue */
  186. {
  187. /* wait for work completion */
  188. rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
  189. }
  190. else
  191. {
  192. rt_list_remove(&(work->list));
  193. }
  194. rt_hw_interrupt_enable(level);
  195. return RT_EOK;
  196. }
  197. rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue* queue)
  198. {
  199. struct rt_list_node *node, *next;
  200. RT_ASSERT(queue != RT_NULL);
  201. rt_enter_critical();
  202. for (node = queue->work_list.next; node != &(queue->work_list); node = next)
  203. {
  204. next = node->next;
  205. rt_list_remove(node);
  206. }
  207. rt_exit_critical();
  208. return RT_EOK;
  209. }
  210. #endif