pipe.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. /*
  2. * File : pipe.c
  3. * This file is part of RT-Thread RTOS
  4. * COPYRIGHT (C) 2012, RT-Thread Development Team
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License along
  17. * with this program; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  19. *
  20. * Change Logs:
  21. * Date Author Notes
  22. * 2012-09-30 Bernard first version.
  23. */
  24. #include <rthw.h>
  25. #include <rtthread.h>
  26. #include <rtdevice.h>
  27. static void _rt_pipe_resume_writer(struct rt_pipe_device *pipe)
  28. {
  29. if (!rt_list_isempty(&pipe->suspended_write_list))
  30. {
  31. rt_thread_t thread;
  32. RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_WR);
  33. /* get suspended thread */
  34. thread = rt_list_entry(pipe->suspended_write_list.next,
  35. struct rt_thread,
  36. tlist);
  37. /* resume the write thread */
  38. rt_thread_resume(thread);
  39. rt_schedule();
  40. }
  41. }
  42. static rt_size_t rt_pipe_read(rt_device_t dev,
  43. rt_off_t pos,
  44. void *buffer,
  45. rt_size_t size)
  46. {
  47. rt_uint32_t level;
  48. rt_thread_t thread;
  49. struct rt_pipe_device *pipe;
  50. rt_size_t read_nbytes;
  51. pipe = PIPE_DEVICE(dev);
  52. RT_ASSERT(pipe != RT_NULL);
  53. if (!(pipe->flag & RT_PIPE_FLAG_BLOCK_RD))
  54. {
  55. level = rt_hw_interrupt_disable();
  56. read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size);
  57. /* if the ringbuffer is empty, there won't be any writer waiting */
  58. if (read_nbytes)
  59. _rt_pipe_resume_writer(pipe);
  60. rt_hw_interrupt_enable(level);
  61. return read_nbytes;
  62. }
  63. thread = rt_thread_self();
  64. /* current context checking */
  65. RT_DEBUG_NOT_IN_INTERRUPT;
  66. do {
  67. level = rt_hw_interrupt_disable();
  68. read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size);
  69. if (read_nbytes == 0)
  70. {
  71. rt_thread_suspend(thread);
  72. /* waiting on suspended read list */
  73. rt_list_insert_before(&(pipe->suspended_read_list),
  74. &(thread->tlist));
  75. rt_hw_interrupt_enable(level);
  76. rt_schedule();
  77. }
  78. else
  79. {
  80. _rt_pipe_resume_writer(pipe);
  81. rt_hw_interrupt_enable(level);
  82. break;
  83. }
  84. } while (read_nbytes == 0);
  85. return read_nbytes;
  86. }
  87. static void _rt_pipe_resume_reader(struct rt_pipe_device *pipe)
  88. {
  89. if (pipe->parent.rx_indicate)
  90. pipe->parent.rx_indicate(&pipe->parent,
  91. rt_ringbuffer_data_len(&pipe->ringbuffer));
  92. if (!rt_list_isempty(&pipe->suspended_read_list))
  93. {
  94. rt_thread_t thread;
  95. RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_RD);
  96. /* get suspended thread */
  97. thread = rt_list_entry(pipe->suspended_read_list.next,
  98. struct rt_thread,
  99. tlist);
  100. /* resume the read thread */
  101. rt_thread_resume(thread);
  102. rt_schedule();
  103. }
  104. }
  105. static rt_size_t rt_pipe_write(rt_device_t dev,
  106. rt_off_t pos,
  107. const void *buffer,
  108. rt_size_t size)
  109. {
  110. rt_uint32_t level;
  111. rt_thread_t thread;
  112. struct rt_pipe_device *pipe;
  113. rt_size_t write_nbytes;
  114. pipe = PIPE_DEVICE(dev);
  115. RT_ASSERT(pipe != RT_NULL);
  116. if ((pipe->flag & RT_PIPE_FLAG_FORCE_WR) ||
  117. !(pipe->flag & RT_PIPE_FLAG_BLOCK_WR))
  118. {
  119. level = rt_hw_interrupt_disable();
  120. if (pipe->flag & RT_PIPE_FLAG_FORCE_WR)
  121. write_nbytes = rt_ringbuffer_put_force(&(pipe->ringbuffer),
  122. buffer, size);
  123. else
  124. write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer),
  125. buffer, size);
  126. _rt_pipe_resume_reader(pipe);
  127. rt_hw_interrupt_enable(level);
  128. return write_nbytes;
  129. }
  130. thread = rt_thread_self();
  131. /* current context checking */
  132. RT_DEBUG_NOT_IN_INTERRUPT;
  133. do {
  134. level = rt_hw_interrupt_disable();
  135. write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size);
  136. if (write_nbytes == 0)
  137. {
  138. /* pipe full, waiting on suspended write list */
  139. rt_thread_suspend(thread);
  140. /* waiting on suspended read list */
  141. rt_list_insert_before(&(pipe->suspended_write_list),
  142. &(thread->tlist));
  143. rt_hw_interrupt_enable(level);
  144. rt_schedule();
  145. }
  146. else
  147. {
  148. _rt_pipe_resume_reader(pipe);
  149. rt_hw_interrupt_enable(level);
  150. break;
  151. }
  152. } while (write_nbytes == 0);
  153. return write_nbytes;
  154. }
  155. static rt_err_t rt_pipe_control(rt_device_t dev, rt_uint8_t cmd, void *args)
  156. {
  157. if (cmd == PIPE_CTRL_GET_SPACE && args)
  158. *(rt_size_t*)args = rt_ringbuffer_space_len(&PIPE_DEVICE(dev)->ringbuffer);
  159. return RT_EOK;
  160. }
  161. /**
  162. * This function will initialize a pipe device and put it under control of
  163. * resource management.
  164. *
  165. * @param pipe the pipe device
  166. * @param name the name of pipe device
  167. * @param flag the attribute of the pipe device
  168. * @param buf the buffer of pipe device
  169. * @param size the size of pipe device buffer
  170. *
  171. * @return the operation status, RT_EOK on successful
  172. */
  173. rt_err_t rt_pipe_init(struct rt_pipe_device *pipe,
  174. const char *name,
  175. enum rt_pipe_flag flag,
  176. rt_uint8_t *buf,
  177. rt_size_t size)
  178. {
  179. RT_ASSERT(pipe);
  180. RT_ASSERT(buf);
  181. /* initialize suspended list */
  182. rt_list_init(&pipe->suspended_read_list);
  183. rt_list_init(&pipe->suspended_write_list);
  184. /* initialize ring buffer */
  185. rt_ringbuffer_init(&pipe->ringbuffer, buf, size);
  186. pipe->flag = flag;
  187. /* create pipe */
  188. pipe->parent.type = RT_Device_Class_Pipe;
  189. pipe->parent.init = RT_NULL;
  190. pipe->parent.open = RT_NULL;
  191. pipe->parent.close = RT_NULL;
  192. pipe->parent.read = rt_pipe_read;
  193. pipe->parent.write = rt_pipe_write;
  194. pipe->parent.control = rt_pipe_control;
  195. return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR);
  196. }
  197. RTM_EXPORT(rt_pipe_init);
  198. /**
  199. * This function will detach a pipe device from resource management
  200. *
  201. * @param pipe the pipe device
  202. *
  203. * @return the operation status, RT_EOK on successful
  204. */
  205. rt_err_t rt_pipe_detach(struct rt_pipe_device *pipe)
  206. {
  207. return rt_device_unregister(&pipe->parent);
  208. }
  209. RTM_EXPORT(rt_pipe_detach);
  210. #ifdef RT_USING_HEAP
  211. rt_err_t rt_pipe_create(const char *name, enum rt_pipe_flag flag, rt_size_t size)
  212. {
  213. rt_uint8_t *rb_memptr = RT_NULL;
  214. struct rt_pipe_device *pipe = RT_NULL;
  215. /* get aligned size */
  216. size = RT_ALIGN(size, RT_ALIGN_SIZE);
  217. pipe = (struct rt_pipe_device *)rt_calloc(1, sizeof(struct rt_pipe_device));
  218. if (pipe == RT_NULL)
  219. return -RT_ENOMEM;
  220. /* create ring buffer of pipe */
  221. rb_memptr = rt_malloc(size);
  222. if (rb_memptr == RT_NULL)
  223. {
  224. rt_free(pipe);
  225. return -RT_ENOMEM;
  226. }
  227. return rt_pipe_init(pipe, name, flag, rb_memptr, size);
  228. }
  229. RTM_EXPORT(rt_pipe_create);
  230. void rt_pipe_destroy(struct rt_pipe_device *pipe)
  231. {
  232. if (pipe == RT_NULL)
  233. return;
  234. /* un-register pipe device */
  235. rt_pipe_detach(pipe);
  236. /* release memory */
  237. rt_free(pipe->ringbuffer.buffer_ptr);
  238. rt_free(pipe);
  239. return;
  240. }
  241. RTM_EXPORT(rt_pipe_destroy);
  242. #endif /* RT_USING_HEAP */