netbuffer.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. #include <rthw.h>
  2. #include <rtthread.h>
  3. #include "netbuffer.h"
  4. #include "player_ui.h"
  5. #define MP3_DECODE_MP_CNT 2
  6. #define MP3_DECODE_MP_SZ 2560
  7. static rt_uint8_t mempool[(MP3_DECODE_MP_SZ * 2 + 4)* 2]; // 5k x 2
  8. static struct rt_mempool _mp;
  9. static rt_bool_t is_inited = RT_FALSE;
  10. rt_size_t sbuf_get_size()
  11. {
  12. return MP3_DECODE_MP_SZ * 2;
  13. }
  14. void sbuf_init()
  15. {
  16. rt_mp_init(&_mp, "mp3", &mempool[0], sizeof(mempool), MP3_DECODE_MP_SZ * 2);
  17. }
  18. void* sbuf_alloc()
  19. {
  20. if (is_inited == RT_FALSE)
  21. {
  22. sbuf_init();
  23. is_inited = RT_TRUE;
  24. }
  25. return (rt_uint16_t*)rt_mp_alloc(&_mp, RT_WAITING_FOREVER);
  26. }
  27. void sbuf_release(void* ptr)
  28. {
  29. rt_mp_free(ptr);
  30. }
  31. #if STM32_EXT_SRAM
  32. /* netbuf worker stat */
  33. #define NETBUF_STAT_FREE 0
  34. #define NETBUF_STAT_BUFFERING 1
  35. #define NETBUF_STAT_BUSY 2
  36. #define NETBUF_STAT_STOPPING 3
  37. #define NETBUF_STAT_STOPPED 4
  38. /* net buffer module */
  39. struct net_buffer
  40. {
  41. /* read index and save index in the buffer */
  42. rt_size_t read_index, save_index;
  43. /* buffer data and size of buffer */
  44. rt_uint8_t* buffer_data;
  45. rt_size_t data_length;
  46. rt_size_t size;
  47. /* buffer ready water mater */
  48. rt_uint32_t ready_wm, resume_wm;
  49. rt_bool_t is_wait_ready, is_wait_resume;
  50. rt_sem_t wait_ready, wait_resume;
  51. /* netbuf worker stat */
  52. rt_uint8_t stat;
  53. };
  54. struct net_buffer_job
  55. {
  56. rt_size_t (*fetch)(rt_uint8_t* ptr, rt_size_t len, void* parameter);
  57. void (*close)(void* parameter);
  58. void* parameter;
  59. };
  60. static struct net_buffer _netbuf;
  61. static rt_mq_t _netbuf_mq = RT_NULL;
  62. rt_size_t net_buf_read(rt_uint8_t* buffer, rt_size_t length)
  63. {
  64. rt_size_t data_length, read_index;
  65. rt_uint32_t level;
  66. data_length = _netbuf.data_length;
  67. if ((data_length == 0) &&
  68. (_netbuf.stat != NETBUF_STAT_STOPPED && _netbuf.stat != NETBUF_STAT_STOPPING))
  69. {
  70. /* set stat */
  71. _netbuf.stat = NETBUF_STAT_BUFFERING;
  72. rt_kprintf("stat -> buffering\n");
  73. /* buffer is not ready. */
  74. _netbuf.is_wait_ready = RT_TRUE;
  75. rt_kprintf("wait ready, data len: %d, stat %d\n", data_length, _netbuf.stat);
  76. /* set buffer status to buffering */
  77. player_set_buffer_status(RT_TRUE);
  78. rt_sem_take(_netbuf.wait_ready, RT_WAITING_FOREVER);
  79. }
  80. if ((data_length <= _netbuf.ready_wm) &&
  81. (_netbuf.stat == NETBUF_STAT_BUFFERING))
  82. {
  83. /* buffer is not ready. */
  84. _netbuf.is_wait_ready = RT_TRUE;
  85. rt_kprintf("wait ready, data len: %d, stat %d\n", data_length, _netbuf.stat);
  86. rt_sem_take(_netbuf.wait_ready, RT_WAITING_FOREVER);
  87. }
  88. /* get read and save index */
  89. read_index = _netbuf.read_index;
  90. /* re-get data legnth */
  91. data_length = _netbuf.data_length;
  92. /* set the length */
  93. if (length > data_length) length = data_length;
  94. // rt_kprintf("data len: %d, read idx %d\n", data_length, read_index);
  95. if (data_length > 0)
  96. {
  97. /* copy buffer */
  98. if (_netbuf.size - read_index > length)
  99. {
  100. rt_memcpy(buffer, &_netbuf.buffer_data[read_index],
  101. length);
  102. _netbuf.read_index += length;
  103. }
  104. else
  105. {
  106. rt_memcpy(buffer, &_netbuf.buffer_data[read_index],
  107. _netbuf.size - read_index);
  108. rt_memcpy(&buffer[_netbuf.size - read_index],
  109. &_netbuf.buffer_data[0],
  110. length - (_netbuf.size - read_index));
  111. _netbuf.read_index = length - (_netbuf.size - read_index);
  112. }
  113. level = rt_hw_interrupt_disable();
  114. _netbuf.data_length -= length;
  115. data_length = _netbuf.data_length;
  116. if ((_netbuf.is_wait_resume == RT_TRUE) && data_length < _netbuf.resume_wm)
  117. {
  118. _netbuf.is_wait_resume = RT_FALSE;
  119. rt_hw_interrupt_enable(level);
  120. rt_kprintf("resume netbuf worker\n");
  121. rt_sem_release(_netbuf.wait_resume);
  122. }
  123. else
  124. {
  125. rt_hw_interrupt_enable(level);
  126. }
  127. }
  128. return length;
  129. }
  130. void net_buf_add_job(rt_size_t (*fetch)(rt_uint8_t* ptr, rt_size_t len, void* parameter),
  131. void (*close)(void* parameter),
  132. void* parameter)
  133. {
  134. struct net_buffer_job job;
  135. job.fetch = fetch;
  136. job.close = close;
  137. job.parameter = parameter;
  138. rt_mq_send(_netbuf_mq, (void*)&job, sizeof(struct net_buffer_job));
  139. }
  140. void net_buf_stop_job()
  141. {
  142. rt_uint32_t level;
  143. level = rt_hw_interrupt_disable();
  144. _netbuf.stat = NETBUF_STAT_STOPPING;
  145. rt_kprintf("stat -> stopping\n");
  146. rt_hw_interrupt_enable(level);
  147. }
  148. static void net_buf_do_stop(struct net_buffer_job* job)
  149. {
  150. /* source closed */
  151. job->close(job->parameter);
  152. _netbuf.stat = NETBUF_STAT_STOPPED;
  153. rt_kprintf("stat -> stopped\n");
  154. if (_netbuf.is_wait_ready == RT_TRUE)
  155. {
  156. /* resume the wait for buffer task */
  157. _netbuf.is_wait_ready = RT_FALSE;
  158. rt_sem_release(_netbuf.wait_ready);
  159. }
  160. rt_kprintf("job done, stat %d\n", _netbuf.stat);
  161. }
  162. #define NETBUF_BLOCK_SIZE 4096
  163. static void net_buf_do_job(struct net_buffer_job* job)
  164. {
  165. rt_uint32_t level;
  166. rt_size_t read_length, data_length;
  167. rt_uint8_t *ptr;
  168. ptr = rt_malloc(NETBUF_BLOCK_SIZE);
  169. while (1)
  170. {
  171. if (_netbuf.stat == NETBUF_STAT_STOPPING)
  172. {
  173. net_buf_do_stop(job);
  174. break;
  175. }
  176. /* fetch data buffer */
  177. read_length = job->fetch(ptr, NETBUF_BLOCK_SIZE, job->parameter);
  178. if (read_length <= 0)
  179. {
  180. net_buf_do_stop(job);
  181. break;
  182. }
  183. else
  184. {
  185. /* got data length in the buffer */
  186. data_length = _netbuf.data_length;
  187. /* check avaible buffer to save */
  188. if ((_netbuf.size - data_length) < read_length)
  189. {
  190. rt_err_t result;
  191. _netbuf.is_wait_resume = RT_TRUE;
  192. rt_kprintf("netbuf suspend, avaible room %d\n", data_length);
  193. result = rt_sem_take(_netbuf.wait_resume, RT_WAITING_FOREVER);
  194. if (result != RT_EOK)
  195. {
  196. /* stop net buffer worker */
  197. net_buf_do_stop(job);
  198. break;
  199. }
  200. }
  201. /* there are free space to fetch data */
  202. if ((_netbuf.size - _netbuf.save_index) < read_length)
  203. {
  204. rt_memcpy(&_netbuf.buffer_data[_netbuf.save_index],
  205. ptr, _netbuf.size - _netbuf.save_index);
  206. rt_memcpy(&_netbuf.buffer_data[0],
  207. ptr + (_netbuf.size - _netbuf.save_index),
  208. read_length - (_netbuf.size - _netbuf.save_index));
  209. /* move save index */
  210. _netbuf.save_index = read_length - (_netbuf.size - _netbuf.save_index);
  211. }
  212. else
  213. {
  214. rt_memcpy(&_netbuf.buffer_data[_netbuf.save_index],
  215. ptr, read_length);
  216. /* move save index */
  217. _netbuf.save_index += read_length;
  218. if (_netbuf.save_index >= _netbuf.size) _netbuf.save_index = 0;
  219. }
  220. level = rt_hw_interrupt_disable();
  221. _netbuf.data_length += read_length;
  222. data_length = _netbuf.data_length;
  223. rt_hw_interrupt_enable(level);
  224. }
  225. rt_kprintf("buffering ... %d %c\n", (data_length * 100) / _netbuf.size, '%');
  226. /* set buffer position */
  227. player_set_position(data_length);
  228. if ((_netbuf.stat == NETBUF_STAT_BUFFERING) && (data_length >= _netbuf.ready_wm))
  229. {
  230. _netbuf.stat = NETBUF_STAT_BUSY;
  231. rt_kprintf("stat -> busy\n");
  232. /* notify the thread for waitting buffer ready */
  233. rt_kprintf("resume wait buffer\n");
  234. if (_netbuf.is_wait_ready == RT_TRUE)
  235. {
  236. _netbuf.is_wait_ready = RT_FALSE;
  237. /* set buffer status to playing */
  238. player_set_buffer_status(RT_FALSE);
  239. rt_sem_release(_netbuf.wait_ready);
  240. }
  241. }
  242. }
  243. /* release fetch buffer */
  244. rt_free(ptr);
  245. }
  246. static void net_buf_thread_entry(void* parameter)
  247. {
  248. rt_err_t result;
  249. struct net_buffer_job job;
  250. while (1)
  251. {
  252. /* get a job */
  253. result = rt_mq_recv(_netbuf_mq, (void*)&job, sizeof(struct net_buffer_job), RT_WAITING_FOREVER);
  254. if (result == RT_EOK)
  255. {
  256. _netbuf.stat = NETBUF_STAT_BUFFERING;
  257. rt_kprintf("stat -> buffering\n");
  258. /* perform the job */
  259. net_buf_do_job(&job);
  260. }
  261. }
  262. }
  263. void net_buf_init(rt_size_t size)
  264. {
  265. rt_thread_t tid;
  266. /* init net buffer structure */
  267. _netbuf.read_index = _netbuf.save_index = 0;
  268. _netbuf.size = size; /* net buffer size */
  269. /* allocate buffer */
  270. _netbuf.buffer_data = rt_malloc(_netbuf.size);
  271. _netbuf.data_length = 0;
  272. /* set ready and resume water mater */
  273. _netbuf.ready_wm = _netbuf.size * 90/100;
  274. _netbuf.resume_wm = _netbuf.size * 80/100;
  275. /* set init stat */
  276. _netbuf.stat = NETBUF_STAT_FREE;
  277. rt_kprintf("stat -> free\n");
  278. _netbuf.wait_ready = rt_sem_create("nready", 0, RT_IPC_FLAG_FIFO);
  279. _netbuf.wait_resume = rt_sem_create("nresum", 0, RT_IPC_FLAG_FIFO);
  280. _netbuf.is_wait_ready = RT_FALSE;
  281. _netbuf.is_wait_resume = RT_FALSE;
  282. /* crate message queue */
  283. _netbuf_mq = rt_mq_create("njob", sizeof(struct net_buffer_job),
  284. 4, RT_IPC_FLAG_FIFO);
  285. /* create net buffer thread */
  286. tid = rt_thread_create("nbuf",
  287. net_buf_thread_entry, RT_NULL,
  288. 1024, 22, 5);
  289. if (tid != RT_NULL)
  290. rt_thread_startup(tid);
  291. }
  292. #endif