netbuffer.c 8.7 KB


  1. #include <rthw.h>
  2. #include <rtthread.h>
  3. #include "netbuffer.h"
  4. #define MP3_DECODE_MP_CNT 2
  5. #define MP3_DECODE_MP_SZ 2560
  6. static rt_uint8_t mempool[(MP3_DECODE_MP_SZ * 2 + 4)* 2]; // 5k x 2
  7. static struct rt_mempool _mp;
  8. static rt_bool_t is_inited = RT_FALSE;
  9. rt_size_t sbuf_get_size()
  10. {
  11. return MP3_DECODE_MP_SZ * 2;
  12. }
  13. void sbuf_init()
  14. {
  15. rt_mp_init(&_mp, "mp3", &mempool[0], sizeof(mempool), MP3_DECODE_MP_SZ * 2);
  16. }
  17. void* sbuf_alloc()
  18. {
  19. if (is_inited == RT_FALSE)
  20. {
  21. sbuf_init();
  22. is_inited = RT_TRUE;
  23. }
  24. return (rt_uint16_t*)rt_mp_alloc(&_mp, RT_WAITING_FOREVER);
  25. }
  26. void sbuf_release(void* ptr)
  27. {
  28. rt_mp_free(ptr);
  29. }
  30. #if STM32_EXT_SRAM
  31. /* netbuf worker stat */
  32. #define NETBUF_STAT_FREE 0
  33. #define NETBUF_STAT_BUFFERING 1
  34. #define NETBUF_STAT_BUSY 2
  35. #define NETBUF_STAT_STOPPING 3
  36. #define NETBUF_STAT_STOPPED 4
  37. /* net buffer module */
  38. struct net_buffer
  39. {
  40. /* read index and save index in the buffer */
  41. rt_size_t read_index, save_index;
  42. /* buffer data and size of buffer */
  43. rt_uint8_t* buffer_data;
  44. rt_size_t data_length;
  45. rt_size_t size;
  46. /* buffer ready water mater */
  47. rt_uint32_t ready_wm, resume_wm;
  48. rt_bool_t is_wait_ready, is_wait_resume;
  49. rt_sem_t wait_ready, wait_resume;
  50. /* netbuf worker stat */
  51. rt_uint8_t stat;
  52. };
  53. struct net_buffer_job
  54. {
  55. rt_size_t (*fetch)(rt_uint8_t* ptr, rt_size_t len, void* parameter);
  56. void (*close)(void* parameter);
  57. void* parameter;
  58. };
  59. static struct net_buffer _netbuf;
  60. static rt_mq_t _netbuf_mq = RT_NULL;
  61. rt_size_t net_buf_read(rt_uint8_t* buffer, rt_size_t length)
  62. {
  63. rt_size_t data_length, read_index;
  64. rt_uint32_t level;
  65. data_length = _netbuf.data_length;
  66. if ((data_length == 0) &&
  67. (_netbuf.stat != NETBUF_STAT_STOPPED && _netbuf.stat != NETBUF_STAT_STOPPING))
  68. {
  69. /* set stat */
  70. _netbuf.stat = NETBUF_STAT_BUFFERING;
  71. rt_kprintf("stat -> buffering\n");
  72. /* buffer is not ready. */
  73. _netbuf.is_wait_ready = RT_TRUE;
  74. rt_kprintf("wait ready, data len: %d, stat %d\n", data_length, _netbuf.stat);
  75. rt_sem_take(_netbuf.wait_ready, RT_WAITING_FOREVER);
  76. }
  77. if ((data_length <= _netbuf.ready_wm) &&
  78. (_netbuf.stat == NETBUF_STAT_BUFFERING))
  79. {
  80. /* buffer is not ready. */
  81. _netbuf.is_wait_ready = RT_TRUE;
  82. rt_kprintf("wait ready, data len: %d, stat %d\n", data_length, _netbuf.stat);
  83. rt_sem_take(_netbuf.wait_ready, RT_WAITING_FOREVER);
  84. }
  85. /* get read and save index */
  86. read_index = _netbuf.read_index;
  87. /* re-get data legnth */
  88. data_length = _netbuf.data_length;
  89. /* set the length */
  90. if (length > data_length) length = data_length;
  91. // rt_kprintf("data len: %d, read idx %d\n", data_length, read_index);
  92. if (data_length > 0)
  93. {
  94. /* copy buffer */
  95. if (_netbuf.size - read_index > length)
  96. {
  97. rt_memcpy(buffer, &_netbuf.buffer_data[read_index],
  98. length);
  99. _netbuf.read_index += length;
  100. }
  101. else
  102. {
  103. rt_memcpy(buffer, &_netbuf.buffer_data[read_index],
  104. _netbuf.size - read_index);
  105. rt_memcpy(&buffer[_netbuf.size - read_index],
  106. &_netbuf.buffer_data[0],
  107. length - (_netbuf.size - read_index));
  108. _netbuf.read_index = length - (_netbuf.size - read_index);
  109. }
  110. level = rt_hw_interrupt_disable();
  111. _netbuf.data_length -= length;
  112. data_length = _netbuf.data_length;
  113. if ((_netbuf.is_wait_resume == RT_TRUE) && data_length < _netbuf.resume_wm)
  114. {
  115. _netbuf.is_wait_resume = RT_FALSE;
  116. rt_hw_interrupt_enable(level);
  117. rt_kprintf("resume netbuf worker\n");
  118. rt_sem_release(_netbuf.wait_resume);
  119. }
  120. else
  121. {
  122. rt_hw_interrupt_enable(level);
  123. }
  124. }
  125. return length;
  126. }
  127. void net_buf_add_job(rt_size_t (*fetch)(rt_uint8_t* ptr, rt_size_t len, void* parameter),
  128. void (*close)(void* parameter),
  129. void* parameter)
  130. {
  131. struct net_buffer_job job;
  132. job.fetch = fetch;
  133. job.close = close;
  134. job.parameter = parameter;
  135. rt_mq_send(_netbuf_mq, (void*)&job, sizeof(struct net_buffer_job));
  136. }
  137. void net_buf_stop_job()
  138. {
  139. rt_uint32_t level;
  140. level = rt_hw_interrupt_disable();
  141. _netbuf.stat = NETBUF_STAT_STOPPING;
  142. rt_kprintf("stat -> stopping\n");
  143. rt_hw_interrupt_enable(level);
  144. }
  145. static void net_buf_do_stop(struct net_buffer_job* job)
  146. {
  147. /* source closed */
  148. job->close(job->parameter);
  149. _netbuf.stat = NETBUF_STAT_STOPPED;
  150. rt_kprintf("stat -> stopped\n");
  151. if (_netbuf.is_wait_ready == RT_TRUE)
  152. {
  153. /* resume the wait for buffer task */
  154. _netbuf.is_wait_ready = RT_FALSE;
  155. rt_sem_release(_netbuf.wait_ready);
  156. }
  157. rt_kprintf("job done, stat %d\n", _netbuf.stat);
  158. }
  159. #define NETBUF_BLOCK_SIZE 1024
  160. static void net_buf_do_job(struct net_buffer_job* job)
  161. {
  162. rt_uint32_t level;
  163. rt_size_t read_length, data_length;
  164. rt_uint8_t *ptr;
  165. ptr = rt_malloc(NETBUF_BLOCK_SIZE);
  166. while (1)
  167. {
  168. if (_netbuf.stat == NETBUF_STAT_STOPPING)
  169. {
  170. net_buf_do_stop(job);
  171. break;
  172. }
  173. /* fetch data buffer */
  174. read_length = job->fetch(ptr, NETBUF_BLOCK_SIZE, job->parameter);
  175. if (read_length <= 0)
  176. {
  177. net_buf_do_stop(job);
  178. break;
  179. }
  180. else
  181. {
  182. /* got data length in the buffer */
  183. data_length = _netbuf.data_length;
  184. /* check avaible buffer to save */
  185. if ((_netbuf.size - data_length) < read_length)
  186. {
  187. rt_err_t result;
  188. _netbuf.is_wait_resume = RT_TRUE;
  189. rt_kprintf("netbuf suspend, avaible room %d\n", data_length);
  190. result = rt_sem_take(_netbuf.wait_resume, RT_WAITING_FOREVER);
  191. if (result != RT_EOK)
  192. {
  193. /* stop net buffer worker */
  194. net_buf_do_stop(job);
  195. break;
  196. }
  197. }
  198. /* there are free space to fetch data */
  199. if ((_netbuf.size - _netbuf.save_index) < read_length)
  200. {
  201. rt_memcpy(&_netbuf.buffer_data[_netbuf.save_index],
  202. ptr, _netbuf.size - _netbuf.save_index);
  203. rt_memcpy(&_netbuf.buffer_data[0],
  204. ptr + (_netbuf.size - _netbuf.save_index),
  205. read_length - (_netbuf.size - _netbuf.save_index));
  206. /* move save index */
  207. _netbuf.save_index = read_length - (_netbuf.size - _netbuf.save_index);
  208. }
  209. else
  210. {
  211. rt_memcpy(&_netbuf.buffer_data[_netbuf.save_index],
  212. ptr, read_length);
  213. /* move save index */
  214. _netbuf.save_index += read_length;
  215. if (_netbuf.save_index >= _netbuf.size) _netbuf.save_index = 0;
  216. }
  217. level = rt_hw_interrupt_disable();
  218. _netbuf.data_length += read_length;
  219. data_length = _netbuf.data_length;
  220. rt_hw_interrupt_enable(level);
  221. }
  222. // rt_kprintf("buffering ... %d %c\n", (data_length * 100) / _netbuf.size, '%');
  223. if ((_netbuf.stat == NETBUF_STAT_BUFFERING) && (data_length >= _netbuf.ready_wm))
  224. {
  225. _netbuf.stat = NETBUF_STAT_BUSY;
  226. rt_kprintf("stat -> busy\n");
  227. /* notify the thread for waitting buffer ready */
  228. rt_kprintf("resume wait buffer\n");
  229. if (_netbuf.is_wait_ready == RT_TRUE)
  230. {
  231. _netbuf.is_wait_ready = RT_FALSE;
  232. rt_sem_release(_netbuf.wait_ready);
  233. }
  234. }
  235. }
  236. /* release fetch buffer */
  237. rt_free(ptr);
  238. }
  239. static void net_buf_thread_entry(void* parameter)
  240. {
  241. rt_err_t result;
  242. struct net_buffer_job job;
  243. while (1)
  244. {
  245. /* get a job */
  246. result = rt_mq_recv(_netbuf_mq, (void*)&job, sizeof(struct net_buffer_job), RT_WAITING_FOREVER);
  247. if (result == RT_EOK)
  248. {
  249. _netbuf.stat = NETBUF_STAT_BUFFERING;
  250. rt_kprintf("stat -> buffering\n");
  251. /* perform the job */
  252. net_buf_do_job(&job);
  253. }
  254. }
  255. }
  256. void net_buf_init(rt_size_t size)
  257. {
  258. rt_thread_t tid;
  259. /* init net buffer structure */
  260. _netbuf.read_index = _netbuf.save_index = 0;
  261. _netbuf.size = size; /* net buffer size */
  262. /* allocate buffer */
  263. _netbuf.buffer_data = rt_malloc(_netbuf.size);
  264. _netbuf.data_length = 0;
  265. /* set ready and resume water mater */
  266. _netbuf.ready_wm = _netbuf.size * 90/100;
  267. _netbuf.resume_wm = _netbuf.size * 80/100;
  268. /* set init stat */
  269. _netbuf.stat = NETBUF_STAT_FREE;
  270. rt_kprintf("stat -> free\n");
  271. _netbuf.wait_ready = rt_sem_create("nready", 0, RT_IPC_FLAG_FIFO);
  272. _netbuf.wait_resume = rt_sem_create("nresum", 0, RT_IPC_FLAG_FIFO);
  273. _netbuf.is_wait_ready = RT_FALSE;
  274. _netbuf.is_wait_resume = RT_FALSE;
  275. /* crate message queue */
  276. _netbuf_mq = rt_mq_create("njob", sizeof(struct net_buffer_job),
  277. 4, RT_IPC_FLAG_FIFO);
  278. /* create net buffer thread */
  279. tid = rt_thread_create("nbuf",
  280. net_buf_thread_entry, RT_NULL,
  281. 1024, 22, 5);
  282. if (tid != RT_NULL)
  283. rt_thread_startup(tid);
  284. }
  285. #endif