semaphore_buffer_worker.c 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /*
  2. * 程序清单:信号量实现生产者消费者间的互斥
  3. *
  4. * 在这个程序中,会创建两个线程,一个是生成者线程worker一个是消费者线程thread
  5. *
  6. * 在数据信息生产、消费的过程中,worker负责把数据将写入到环形buffer中,而thread
  7. * 则从环形buffer中读出。
  8. */
  9. #include <rtthread.h>
  10. #include "tc_comm.h"
  11. /* 一个环形buffer的实现 */
  12. struct rb
  13. {
  14. rt_uint16_t read_index, write_index;
  15. rt_uint8_t *buffer_ptr;
  16. rt_uint16_t buffer_size;
  17. };
  18. /* 指向信号量控制块的指针 */
  19. static rt_sem_t sem = RT_NULL;
  20. /* 指向线程控制块的指针 */
  21. static rt_thread_t tid = RT_NULL, worker = RT_NULL;
  22. /* 环形buffer的内存块(用数组体现出来) */
  23. #define BUFFER_SIZE 256
  24. #define BUFFER_ITEM 32
  25. static rt_uint8_t working_buffer[BUFFER_SIZE];
  26. struct rb working_rb;
  27. /* 初始化环形buffer,size指的是buffer的大小。注:这里并没对数据地址对齐做处理 */
  28. static void rb_init(struct rb* rb, rt_uint8_t *pool, rt_uint16_t size)
  29. {
  30. RT_ASSERT(rb != RT_NULL);
  31. /* 对读写指针清零*/
  32. rb->read_index = rb->write_index = 0;
  33. /* 设置环形buffer的内存数据块 */
  34. rb->buffer_ptr = pool;
  35. rb->buffer_size = size;
  36. }
  37. /* 向环形buffer中写入数据 */
  38. static rt_bool_t rb_put(struct rb* rb, const rt_uint8_t *ptr, rt_uint16_t length)
  39. {
  40. rt_size_t size;
  41. /* 判断是否有足够的剩余空间 */
  42. if (rb->read_index > rb->write_index)
  43. size = rb->read_index - rb->write_index;
  44. else
  45. size = rb->buffer_size - rb->write_index + rb->read_index;
  46. /* 没有多余的空间 */
  47. if (size < length) return RT_FALSE;
  48. if (rb->read_index > rb->write_index)
  49. {
  50. /* read_index - write_index 即为总的空余空间 */
  51. memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
  52. rb->write_index += length;
  53. }
  54. else
  55. {
  56. if (rb->buffer_size - rb->write_index > length)
  57. {
  58. /* write_index 后面剩余的空间有足够的长度 */
  59. memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
  60. rb->write_index += length;
  61. }
  62. else
  63. {
  64. /*
  65. * write_index 后面剩余的空间不存在足够的长度,需要把部分数据复制到
  66. * 前面的剩余空间中
  67. */
  68. memcpy(&rb->buffer_ptr[rb->write_index], ptr,
  69. rb->buffer_size - rb->write_index);
  70. memcpy(&rb->buffer_ptr[0], &ptr[rb->buffer_size - rb->write_index],
  71. length - (rb->buffer_size - rb->write_index));
  72. rb->write_index = length - (rb->buffer_size - rb->write_index);
  73. }
  74. }
  75. return RT_TRUE;
  76. }
  77. /* 从环形buffer中读出数据 */
  78. static rt_bool_t rb_get(struct rb* rb, rt_uint8_t *ptr, rt_uint16_t length)
  79. {
  80. rt_size_t size;
  81. /* 判断是否有足够的数据 */
  82. if (rb->read_index > rb->write_index)
  83. size = rb->buffer_size - rb->read_index + rb->write_index;
  84. else
  85. size = rb->write_index - rb->read_index;
  86. /* 没有足够的数据 */
  87. if (size < length) return RT_FALSE;
  88. if (rb->read_index > rb->write_index)
  89. {
  90. if (rb->buffer_size - rb->read_index > length)
  91. {
  92. /* read_index的数据足够多,直接复制 */
  93. memcpy(ptr, &rb->buffer_ptr[rb->read_index], length);
  94. rb->read_index += length;
  95. }
  96. else
  97. {
  98. /* read_index的数据不够,需要分段复制 */
  99. memcpy(ptr, &rb->buffer_ptr[rb->read_index],
  100. rb->buffer_size - rb->read_index);
  101. memcpy(&ptr[rb->buffer_size - rb->read_index], &rb->buffer_ptr[0],
  102. length - rb->buffer_size + rb->read_index);
  103. rb->read_index = length - rb->buffer_size + rb->read_index;
  104. }
  105. }
  106. else
  107. {
  108. /*
  109. * read_index要比write_index小,总的数据量够(前面已经有总数据量的判
  110. * 断),直接复制出数据。
  111. */
  112. memcpy(ptr, &rb->buffer_ptr[rb->read_index], length);
  113. rb->read_index += length;
  114. }
  115. return RT_TRUE;
  116. }
  117. /* 生产者线程入口 */
  118. static void thread_entry(void* parameter)
  119. {
  120. rt_bool_t result;
  121. rt_uint8_t data_buffer[BUFFER_ITEM + 1];
  122. while (1)
  123. {
  124. /* 持有信号量 */
  125. rt_sem_take(sem, RT_WAITING_FOREVER);
  126. /* 从环buffer中获得数据 */
  127. result = rb_get(&working_rb, &data_buffer[0], BUFFER_ITEM);
  128. /* 释放信号量 */
  129. rt_sem_release(sem);
  130. data_buffer[BUFFER_ITEM] = '\0';
  131. if (result == RT_TRUE)
  132. {
  133. /* 获取数据成功,打印数据 */
  134. rt_kprintf("%s\n", data_buffer);
  135. }
  136. /* 做一个5 OS Tick的休眠 */
  137. rt_thread_delay(5);
  138. }
  139. }
  140. /* worker线程入口 */
  141. static void worker_entry(void* parameter)
  142. {
  143. rt_bool_t result;
  144. rt_uint32_t index, setchar;
  145. rt_uint8_t data_buffer[BUFFER_ITEM];
  146. setchar = 0x21;
  147. while (1)
  148. {
  149. /* 构造数据 */
  150. for(index = 0; index < BUFFER_ITEM; index++)
  151. {
  152. data_buffer[index] = setchar;
  153. if (++setchar == 0x7f)
  154. setchar = 0x21;
  155. }
  156. /* 持有信号量 */
  157. rt_sem_take(sem, RT_WAITING_FOREVER);
  158. /* 把数据放到环形buffer中 */
  159. result = rb_put(&working_rb, &data_buffer[0], BUFFER_ITEM);
  160. /* 释放信号量 */
  161. rt_sem_release(sem);
  162. /* 放入成功,做一个10 OS Tick的休眠 */
  163. rt_thread_delay(10);
  164. }
  165. }
  166. int semaphore_buffer_worker_init()
  167. {
  168. /* 初始化ring buffer */
  169. rb_init(&working_rb, working_buffer, BUFFER_SIZE);
  170. /* 创建信号量 */
  171. sem = rt_sem_create("sem", 1, RT_IPC_FLAG_FIFO);
  172. if (sem == RT_NULL)
  173. {
  174. tc_stat(TC_STAT_END | TC_STAT_FAILED);
  175. return 0;
  176. }
  177. /* 创建线程1 */
  178. tid = rt_thread_create("thread",
  179. thread_entry, RT_NULL, /* 线程入口是thread_entry, 入口参数是RT_NULL */
  180. THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE);
  181. if (tid != RT_NULL)
  182. rt_thread_startup(tid);
  183. else
  184. tc_stat(TC_STAT_END | TC_STAT_FAILED);
  185. /* 创建线程2 */
  186. worker = rt_thread_create("worker",
  187. worker_entry, RT_NULL, /* 线程入口是worker_entry, 入口参数是RT_NULL */
  188. THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE);
  189. if (worker != RT_NULL)
  190. rt_thread_startup(worker);
  191. else
  192. tc_stat(TC_STAT_END | TC_STAT_FAILED);
  193. return 0;
  194. }
  195. #ifdef RT_USING_TC
  196. static void _tc_cleanup()
  197. {
  198. /* 调度器上锁,上锁后,将不再切换到其他线程,仅响应中断 */
  199. rt_enter_critical();
  200. /* 删除信号量 */
  201. if (sem != RT_NULL)
  202. rt_sem_delete(sem);
  203. /* 删除线程 */
  204. if (tid != RT_NULL && tid->stat != RT_THREAD_CLOSE)
  205. rt_thread_delete(tid);
  206. if (worker != RT_NULL && worker->stat != RT_THREAD_CLOSE)
  207. rt_thread_delete(worker);
  208. /* 调度器解锁 */
  209. rt_exit_critical();
  210. /* 设置TestCase状态 */
  211. tc_done(TC_STAT_PASSED);
  212. }
  213. int _tc_semaphore_buffer_worker()
  214. {
  215. /* 设置TestCase清理回调函数 */
  216. tc_cleanup(_tc_cleanup);
  217. semaphore_buffer_worker_init();
  218. /* 返回TestCase运行的最长时间 */
  219. return 100;
  220. }
  221. /* 输出函数命令到finsh shell中 */
  222. FINSH_FUNCTION_EXPORT(_tc_semaphore_buffer_worker, a buffer worker with semaphore example);
  223. #else
  224. /* 用户应用入口 */
  225. int rt_application_init()
  226. {
  227. semaphore_buffer_worker_init();
  228. return 0;
  229. }
  230. #endif