1
0

pipe.c 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. /*
  2. * File : pipe.c
  3. * This file is part of RT-Thread RTOS
  4. * COPYRIGHT (C) 2012, RT-Thread Development Team
  5. *
  6. * The license and distribution terms for this file may be
  7. * found in the file LICENSE in this distribution or at
  8. * http://www.rt-thread.org/license/LICENSE
  9. *
  10. * Change Logs:
  11. * Date Author Notes
  12. * 2012-09-30 Bernard first version.
  13. */
  14. #include <rthw.h>
  15. #include <rtthread.h>
  16. #include <rtdevice.h>
  17. static rt_size_t rt_pipe_read(rt_device_t dev,
  18. rt_off_t pos,
  19. void *buffer,
  20. rt_size_t size)
  21. {
  22. rt_uint32_t level;
  23. rt_thread_t thread;
  24. struct rt_pipe_device *pipe;
  25. rt_size_t read_nbytes;
  26. pipe = PIPE_DEVICE(dev);
  27. RT_ASSERT(pipe != RT_NULL);
  28. thread = rt_thread_self();
  29. /* current context checking */
  30. RT_DEBUG_NOT_IN_INTERRUPT;
  31. do
  32. {
  33. level = rt_hw_interrupt_disable();
  34. read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size);
  35. if (read_nbytes == 0)
  36. {
  37. rt_thread_suspend(thread);
  38. /* waiting on suspended read list */
  39. rt_list_insert_before(&(pipe->suspended_read_list), &(thread->tlist));
  40. rt_hw_interrupt_enable(level);
  41. rt_schedule();
  42. }
  43. else
  44. {
  45. if (!rt_list_isempty(&pipe->suspended_write_list))
  46. {
  47. /* get suspended thread */
  48. thread = rt_list_entry(pipe->suspended_write_list.next,
  49. struct rt_thread, tlist);
  50. /* resume the write thread */
  51. rt_thread_resume(thread);
  52. rt_hw_interrupt_enable(level);
  53. rt_schedule();
  54. }
  55. else
  56. {
  57. rt_hw_interrupt_enable(level);
  58. }
  59. break;
  60. }
  61. } while (read_nbytes == 0);
  62. return read_nbytes;
  63. }
  64. struct rt_pipe_device *_pipe = RT_NULL;
  65. static rt_size_t rt_pipe_write(rt_device_t dev,
  66. rt_off_t pos,
  67. const void *buffer,
  68. rt_size_t size)
  69. {
  70. rt_uint32_t level;
  71. rt_thread_t thread;
  72. struct rt_pipe_device *pipe;
  73. rt_size_t write_nbytes;
  74. pipe = PIPE_DEVICE(dev);
  75. RT_ASSERT(pipe != RT_NULL);
  76. if (_pipe == RT_NULL)
  77. _pipe = pipe;
  78. thread = rt_thread_self();
  79. /* current context checking */
  80. RT_DEBUG_NOT_IN_INTERRUPT;
  81. do
  82. {
  83. level = rt_hw_interrupt_disable();
  84. write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size);
  85. if (write_nbytes == 0)
  86. {
  87. /* pipe full, waiting on suspended write list */
  88. rt_thread_suspend(thread);
  89. /* waiting on suspended read list */
  90. rt_list_insert_before(&(pipe->suspended_write_list), &(thread->tlist));
  91. rt_hw_interrupt_enable(level);
  92. rt_schedule();
  93. }
  94. else
  95. {
  96. if (!rt_list_isempty(&pipe->suspended_read_list))
  97. {
  98. /* get suspended thread */
  99. thread = rt_list_entry(pipe->suspended_read_list.next,
  100. struct rt_thread, tlist);
  101. /* resume the read thread */
  102. rt_thread_resume(thread);
  103. rt_hw_interrupt_enable(level);
  104. rt_schedule();
  105. }
  106. else
  107. {
  108. rt_hw_interrupt_enable(level);
  109. }
  110. break;
  111. }
  112. }while (write_nbytes == 0);
  113. return write_nbytes;
  114. }
  115. static rt_err_t rt_pipe_control(rt_device_t dev, rt_uint8_t cmd, void *args)
  116. {
  117. return RT_EOK;
  118. }
  119. rt_err_t rt_pipe_create(const char *name, rt_size_t size)
  120. {
  121. rt_err_t result = RT_EOK;
  122. rt_uint8_t *rb_memptr = RT_NULL;
  123. struct rt_pipe_device *pipe = RT_NULL;
  124. /* get aligned size */
  125. size = RT_ALIGN(size, RT_ALIGN_SIZE);
  126. pipe = (struct rt_pipe_device *)rt_calloc(1, sizeof(struct rt_pipe_device));
  127. if (pipe != RT_NULL)
  128. {
  129. /* create ring buffer of pipe */
  130. rb_memptr = rt_malloc(size);
  131. if (rb_memptr == RT_NULL)
  132. {
  133. result = -RT_ENOMEM;
  134. goto __exit;
  135. }
  136. /* initialize suspended list */
  137. rt_list_init(&pipe->suspended_read_list);
  138. rt_list_init(&pipe->suspended_write_list);
  139. /* initialize ring buffer */
  140. rt_ringbuffer_init(&pipe->ringbuffer, rb_memptr, size);
  141. /* create device */
  142. pipe->parent.type = RT_Device_Class_Char;
  143. pipe->parent.init = RT_NULL;
  144. pipe->parent.open = RT_NULL;
  145. pipe->parent.close = RT_NULL;
  146. pipe->parent.read = rt_pipe_read;
  147. pipe->parent.write = rt_pipe_write;
  148. pipe->parent.control = rt_pipe_control;
  149. return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR);
  150. }
  151. else
  152. {
  153. result = -RT_ENOMEM;
  154. }
  155. __exit:
  156. if (pipe != RT_NULL)
  157. rt_free(pipe);
  158. if (rb_memptr != RT_NULL)
  159. rt_free(rb_memptr);
  160. return result;
  161. }
  162. RTM_EXPORT(rt_pipe_create);
  163. void rt_pipe_destroy(struct rt_pipe_device *pipe)
  164. {
  165. if (pipe == RT_NULL)
  166. return;
  167. /* un-register pipe device */
  168. rt_device_unregister(&(pipe->parent));
  169. /* release memory */
  170. rt_free(pipe->ringbuffer.buffer_ptr);
  171. rt_free(pipe);
  172. return;
  173. }
  174. RTM_EXPORT(rt_pipe_destroy);