netbuffer.c 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. #include <rthw.h>
  2. #include <rtthread.h>
  3. #include "netbuffer.h"
  4. #include "player_ui.h"
  5. #include "player_bg.h"
  6. #define MP3_DECODE_MP_CNT 2
  7. #define MP3_DECODE_MP_SZ 2560
  8. static rt_uint8_t mempool[(MP3_DECODE_MP_SZ * 2 + 4)* 2]; // 5k x 2
  9. static struct rt_mempool _mp;
  10. static rt_bool_t is_inited = RT_FALSE;
  11. rt_size_t sbuf_get_size()
  12. {
  13. return MP3_DECODE_MP_SZ * 2;
  14. }
  15. void sbuf_init()
  16. {
  17. rt_mp_init(&_mp, "mp3", &mempool[0], sizeof(mempool), MP3_DECODE_MP_SZ * 2);
  18. }
  19. void* sbuf_alloc()
  20. {
  21. if (is_inited == RT_FALSE)
  22. {
  23. sbuf_init();
  24. is_inited = RT_TRUE;
  25. }
  26. return (rt_uint16_t*)rt_mp_alloc(&_mp, RT_WAITING_FOREVER);
  27. }
  28. void sbuf_release(void* ptr)
  29. {
  30. rt_mp_free(ptr);
  31. }
  32. #if STM32_EXT_SRAM
  33. /* netbuf worker stat */
  34. #define NETBUF_STAT_STOPPED 0
  35. #define NETBUF_STAT_BUFFERING 1
  36. #define NETBUF_STAT_SUSPEND 2
  37. #define NETBUF_STAT_STOPPING 3
  38. /* net buffer module */
  39. struct net_buffer
  40. {
  41. /* read index and save index in the buffer */
  42. rt_size_t read_index, save_index;
  43. /* buffer data and size of buffer */
  44. rt_uint8_t* buffer_data;
  45. rt_size_t data_length;
  46. rt_size_t size;
  47. /* buffer ready water mater */
  48. rt_uint32_t ready_wm, resume_wm;
  49. rt_bool_t is_wait_ready;
  50. rt_sem_t wait_ready, wait_resume;
  51. /* netbuf worker stat */
  52. rt_uint8_t stat;
  53. };
  54. struct net_buffer_job
  55. {
  56. rt_size_t (*fetch)(rt_uint8_t* ptr, rt_size_t len, void* parameter);
  57. void (*close)(void* parameter);
  58. void* parameter;
  59. };
  60. static struct net_buffer _netbuf;
  61. static rt_mq_t _netbuf_mq = RT_NULL;
  62. /* netbuf worker public API */
  63. rt_size_t net_buf_read(rt_uint8_t* buffer, rt_size_t length)
  64. {
  65. rt_size_t data_length, read_index;
  66. rt_uint32_t level;
  67. data_length = _netbuf.data_length;
  68. if ((data_length == 0) &&
  69. (_netbuf.stat == NETBUF_STAT_BUFFERING || _netbuf.stat == NETBUF_STAT_SUSPEND))
  70. {
  71. rt_err_t result;
  72. /* buffer is not ready. */
  73. _netbuf.is_wait_ready = RT_TRUE;
  74. rt_kprintf("wait ready, data len: %d, stat %d\n", data_length, _netbuf.stat);
  75. /* set buffer status to buffering */
  76. player_set_buffer_status(RT_TRUE);
  77. result = rt_sem_take(_netbuf.wait_ready, RT_WAITING_FOREVER);
  78. /* take semaphore failed, netbuf worker is stopped */
  79. if (result != RT_EOK) return 0;
  80. }
  81. /* get read and save index */
  82. read_index = _netbuf.read_index;
  83. /* re-get data legnth */
  84. data_length = _netbuf.data_length;
  85. /* set the length */
  86. if (length > data_length) length = data_length;
  87. // rt_kprintf("data len: %d, read idx %d\n", data_length, read_index);
  88. if (data_length > 0)
  89. {
  90. /* copy buffer */
  91. if (_netbuf.size - read_index > length)
  92. {
  93. rt_memcpy(buffer, &_netbuf.buffer_data[read_index],
  94. length);
  95. _netbuf.read_index += length;
  96. }
  97. else
  98. {
  99. rt_memcpy(buffer, &_netbuf.buffer_data[read_index],
  100. _netbuf.size - read_index);
  101. rt_memcpy(&buffer[_netbuf.size - read_index],
  102. &_netbuf.buffer_data[0],
  103. length - (_netbuf.size - read_index));
  104. _netbuf.read_index = length - (_netbuf.size - read_index);
  105. }
  106. level = rt_hw_interrupt_disable();
  107. _netbuf.data_length -= length;
  108. data_length = _netbuf.data_length;
  109. if ((_netbuf.stat == NETBUF_STAT_SUSPEND) && data_length < _netbuf.resume_wm)
  110. {
  111. _netbuf.stat = NETBUF_STAT_BUFFERING;
  112. rt_hw_interrupt_enable(level);
  113. /* resume netbuf worker */
  114. rt_kprintf("stat[suspend] -> buffering\n");
  115. rt_sem_release(_netbuf.wait_resume);
  116. }
  117. else
  118. {
  119. rt_hw_interrupt_enable(level);
  120. }
  121. }
  122. return length;
  123. }
  124. int net_buf_start_job(rt_size_t (*fetch)(rt_uint8_t* ptr, rt_size_t len, void* parameter),
  125. void (*close)(void* parameter),
  126. void* parameter)
  127. {
  128. struct net_buffer_job job;
  129. rt_uint32_t level;
  130. /* job message */
  131. job.fetch = fetch;
  132. job.close = close;
  133. job.parameter = parameter;
  134. level = rt_hw_interrupt_disable();
  135. /* check netbuf worker is stopped */
  136. if (_netbuf.stat == NETBUF_STAT_STOPPED)
  137. {
  138. /* change stat to buffering if netbuf stopped */
  139. _netbuf.stat = NETBUF_STAT_BUFFERING;
  140. rt_hw_interrupt_enable(level);
  141. rt_kprintf("stat[stoppped] -> buffering\n");
  142. rt_mq_send(_netbuf_mq, (void*)&job, sizeof(struct net_buffer_job));
  143. return 0;
  144. }
  145. rt_hw_interrupt_enable(level);
  146. return -1;
  147. }
  148. void net_buf_stop_job()
  149. {
  150. rt_uint32_t level;
  151. level = rt_hw_interrupt_disable();
  152. if (_netbuf.stat == NETBUF_STAT_SUSPEND)
  153. {
  154. /* resume the net buffer worker */
  155. rt_sem_release(_netbuf.wait_resume);
  156. _netbuf.stat = NETBUF_STAT_STOPPING;
  157. rt_kprintf("stat[suspend] -> stopping\n");
  158. }
  159. else if (_netbuf.stat == NETBUF_STAT_BUFFERING)
  160. {
  161. /* netbuf worker is working, set stat to stopping */
  162. _netbuf.stat = NETBUF_STAT_STOPPING;
  163. rt_kprintf("stat[buffering] -> stopping\n");
  164. }
  165. rt_hw_interrupt_enable(level);
  166. }
  167. static void net_buf_do_stop(struct net_buffer_job* job)
  168. {
  169. /* source closed */
  170. job->close(job->parameter);
  171. /* set stat to stopped */
  172. _netbuf.stat = NETBUF_STAT_STOPPED;
  173. rt_kprintf("stat -> stopped\n");
  174. if (_netbuf.is_wait_ready == RT_TRUE)
  175. {
  176. /* resume the wait for buffer task */
  177. _netbuf.is_wait_ready = RT_FALSE;
  178. rt_sem_release(_netbuf.wait_ready);
  179. }
  180. /* reset buffer stat */
  181. _netbuf.data_length = 0;
  182. _netbuf.read_index = 0 ;
  183. _netbuf.save_index = 0;
  184. rt_kprintf("job done\n");
  185. }
  186. #define NETBUF_BLOCK_SIZE 4096
  187. static void net_buf_do_job(struct net_buffer_job* job)
  188. {
  189. rt_uint32_t level;
  190. rt_size_t read_length, data_length;
  191. rt_uint8_t *ptr;
  192. ptr = rt_malloc(NETBUF_BLOCK_SIZE);
  193. while (1)
  194. {
  195. if (_netbuf.stat == NETBUF_STAT_STOPPING)
  196. {
  197. net_buf_do_stop(job);
  198. break;
  199. }
  200. /* fetch data buffer */
  201. read_length = job->fetch(ptr, NETBUF_BLOCK_SIZE, job->parameter);
  202. if (read_length <= 0)
  203. {
  204. net_buf_do_stop(job);
  205. break;
  206. }
  207. else
  208. {
  209. /* got data length in the buffer */
  210. data_length = _netbuf.data_length;
  211. /* check avaible buffer to save */
  212. if ((_netbuf.size - data_length) < read_length)
  213. {
  214. rt_err_t result, level;
  215. /* no free space yet, suspend itself */
  216. rt_kprintf("stat[buffering] -> suspend, avaible room %d\n", data_length);
  217. level = rt_hw_interrupt_disable();
  218. _netbuf.stat = NETBUF_STAT_SUSPEND;
  219. rt_hw_interrupt_enable(level);
  220. result = rt_sem_take(_netbuf.wait_resume, RT_WAITING_FOREVER);
  221. if (result != RT_EOK)
  222. {
  223. /* stop net buffer worker */
  224. net_buf_do_stop(job);
  225. break;
  226. }
  227. }
  228. /* there are enough free space to fetch data */
  229. if ((_netbuf.size - _netbuf.save_index) < read_length)
  230. {
  231. rt_memcpy(&_netbuf.buffer_data[_netbuf.save_index],
  232. ptr, _netbuf.size - _netbuf.save_index);
  233. rt_memcpy(&_netbuf.buffer_data[0],
  234. ptr + (_netbuf.size - _netbuf.save_index),
  235. read_length - (_netbuf.size - _netbuf.save_index));
  236. /* move save index */
  237. _netbuf.save_index = read_length - (_netbuf.size - _netbuf.save_index);
  238. }
  239. else
  240. {
  241. rt_memcpy(&_netbuf.buffer_data[_netbuf.save_index],
  242. ptr, read_length);
  243. /* move save index */
  244. _netbuf.save_index += read_length;
  245. if (_netbuf.save_index >= _netbuf.size) _netbuf.save_index = 0;
  246. }
  247. level = rt_hw_interrupt_disable();
  248. _netbuf.data_length += read_length;
  249. data_length = _netbuf.data_length;
  250. rt_hw_interrupt_enable(level);
  251. }
  252. // rt_kprintf("buffering ... %d %c\n", (data_length * 100) / _netbuf.size, '%');
  253. /* set buffer position */
  254. player_set_position(data_length);
  255. if ((_netbuf.stat == NETBUF_STAT_BUFFERING)
  256. && (data_length >= _netbuf.ready_wm)
  257. && _netbuf.is_wait_ready == RT_TRUE)
  258. {
  259. /* notify the thread for waitting buffer ready */
  260. rt_kprintf("resume wait buffer\n");
  261. _netbuf.is_wait_ready = RT_FALSE;
  262. /* set buffer status to playing */
  263. player_set_buffer_status(RT_FALSE);
  264. rt_sem_release(_netbuf.wait_ready);
  265. }
  266. }
  267. /* release fetch buffer */
  268. rt_free(ptr);
  269. }
  270. static void net_buf_thread_entry(void* parameter)
  271. {
  272. rt_err_t result;
  273. struct net_buffer_job job;
  274. while (1)
  275. {
  276. /* get a job */
  277. result = rt_mq_recv(_netbuf_mq, (void*)&job, sizeof(struct net_buffer_job), RT_WAITING_FOREVER);
  278. if (result == RT_EOK)
  279. {
  280. /* set stat to buffering */
  281. if (_netbuf.stat == NETBUF_STAT_BUFFERING)
  282. {
  283. /* reset data length and read/save index */
  284. _netbuf.data_length = 0;
  285. _netbuf.read_index = _netbuf.save_index = 0;
  286. /* perform the job */
  287. net_buf_do_job(&job);
  288. }
  289. }
  290. }
  291. }
  292. void net_buf_init(rt_size_t size)
  293. {
  294. rt_thread_t tid;
  295. /* init net buffer structure */
  296. _netbuf.read_index = _netbuf.save_index = 0;
  297. _netbuf.size = size; /* net buffer size */
  298. /* allocate buffer */
  299. _netbuf.buffer_data = rt_malloc(_netbuf.size);
  300. _netbuf.data_length = 0;
  301. /* set ready and resume water mater */
  302. _netbuf.ready_wm = _netbuf.size * 90/100;
  303. _netbuf.resume_wm = _netbuf.size * 80/100;
  304. /* set init stat */
  305. _netbuf.stat = NETBUF_STAT_STOPPED;
  306. rt_kprintf("stat -> stopped\n");
  307. _netbuf.wait_ready = rt_sem_create("nready", 0, RT_IPC_FLAG_FIFO);
  308. _netbuf.wait_resume = rt_sem_create("nresum", 0, RT_IPC_FLAG_FIFO);
  309. _netbuf.is_wait_ready = RT_FALSE;
  310. /* crate message queue */
  311. _netbuf_mq = rt_mq_create("njob", sizeof(struct net_buffer_job),
  312. 4, RT_IPC_FLAG_FIFO);
  313. /* create net buffer thread */
  314. tid = rt_thread_create("nbuf",
  315. net_buf_thread_entry, RT_NULL,
  316. 1024, 22, 5);
  317. if (tid != RT_NULL)
  318. rt_thread_startup(tid);
  319. }
  320. #endif