123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- #include <rthw.h>
- #include <rtthread.h>
- #include "netbuffer.h"
- #define MP3_DECODE_MP_CNT 2
- #define MP3_DECODE_MP_SZ 2560
- static rt_uint8_t mempool[(MP3_DECODE_MP_SZ * 2 + 4)* 2]; // 5k x 2
- static struct rt_mempool _mp;
- static rt_bool_t is_inited = RT_FALSE;
- rt_size_t sbuf_get_size()
- {
- return MP3_DECODE_MP_SZ * 2;
- }
- void sbuf_init()
- {
- rt_mp_init(&_mp, "mp3", &mempool[0], sizeof(mempool), MP3_DECODE_MP_SZ * 2);
- }
- void* sbuf_alloc()
- {
- if (is_inited == RT_FALSE)
- {
- sbuf_init();
- is_inited = RT_TRUE;
- }
- return (rt_uint16_t*)rt_mp_alloc(&_mp, RT_WAITING_FOREVER);
- }
- void sbuf_release(void* ptr)
- {
- rt_mp_free(ptr);
- }
- #if STM32_EXT_SRAM
- /* netbuf worker stat */
- #define NETBUF_STAT_FREE 0
- #define NETBUF_STAT_BUFFERING 1
- #define NETBUF_STAT_BUSY 2
- #define NETBUF_STAT_STOPPING 3
- #define NETBUF_STAT_STOPPED 4
- /* net buffer module */
- struct net_buffer
- {
- /* read index and save index in the buffer */
- rt_size_t read_index, save_index;
- /* buffer data and size of buffer */
- rt_uint8_t* buffer_data;
- rt_size_t data_length;
- rt_size_t size;
- /* buffer ready water mater */
- rt_uint32_t ready_wm, resume_wm;
- rt_bool_t is_wait_ready, is_wait_resume;
- rt_sem_t wait_ready, wait_resume;
- /* netbuf worker stat */
- rt_uint8_t stat;
- };
- struct net_buffer_job
- {
- rt_size_t (*fetch)(rt_uint8_t* ptr, rt_size_t len, void* parameter);
- void (*close)(void* parameter);
- void* parameter;
- };
- static struct net_buffer _netbuf;
- static rt_mq_t _netbuf_mq = RT_NULL;
- rt_size_t net_buf_read(rt_uint8_t* buffer, rt_size_t length)
- {
- rt_size_t data_length, read_index;
- rt_uint32_t level;
- data_length = _netbuf.data_length;
- if ((data_length == 0) &&
- (_netbuf.stat != NETBUF_STAT_STOPPED && _netbuf.stat != NETBUF_STAT_STOPPING))
- {
- /* set stat */
- _netbuf.stat = NETBUF_STAT_BUFFERING;
- rt_kprintf("stat -> buffering\n");
- /* buffer is not ready. */
- _netbuf.is_wait_ready = RT_TRUE;
- rt_kprintf("wait ready, data len: %d, stat %d\n", data_length, _netbuf.stat);
- rt_sem_take(_netbuf.wait_ready, RT_WAITING_FOREVER);
- }
- if ((data_length <= _netbuf.ready_wm) &&
- (_netbuf.stat == NETBUF_STAT_BUFFERING))
- {
- /* buffer is not ready. */
- _netbuf.is_wait_ready = RT_TRUE;
- rt_kprintf("wait ready, data len: %d, stat %d\n", data_length, _netbuf.stat);
- rt_sem_take(_netbuf.wait_ready, RT_WAITING_FOREVER);
- }
- /* get read and save index */
- read_index = _netbuf.read_index;
- /* re-get data legnth */
- data_length = _netbuf.data_length;
- /* set the length */
- if (length > data_length) length = data_length;
- // rt_kprintf("data len: %d, read idx %d\n", data_length, read_index);
- if (data_length > 0)
- {
- /* copy buffer */
- if (_netbuf.size - read_index > length)
- {
- rt_memcpy(buffer, &_netbuf.buffer_data[read_index],
- length);
- _netbuf.read_index += length;
- }
- else
- {
- rt_memcpy(buffer, &_netbuf.buffer_data[read_index],
- _netbuf.size - read_index);
- rt_memcpy(&buffer[_netbuf.size - read_index],
- &_netbuf.buffer_data[0],
- length - (_netbuf.size - read_index));
- _netbuf.read_index = length - (_netbuf.size - read_index);
- }
- level = rt_hw_interrupt_disable();
- _netbuf.data_length -= length;
- data_length = _netbuf.data_length;
- if ((_netbuf.is_wait_resume == RT_TRUE) && data_length < _netbuf.resume_wm)
- {
- _netbuf.is_wait_resume = RT_FALSE;
- rt_hw_interrupt_enable(level);
- rt_kprintf("resume netbuf worker\n");
- rt_sem_release(_netbuf.wait_resume);
- }
- else
- {
- rt_hw_interrupt_enable(level);
- }
- }
- return length;
- }
- void net_buf_add_job(rt_size_t (*fetch)(rt_uint8_t* ptr, rt_size_t len, void* parameter),
- void (*close)(void* parameter),
- void* parameter)
- {
- struct net_buffer_job job;
- job.fetch = fetch;
- job.close = close;
- job.parameter = parameter;
- rt_mq_send(_netbuf_mq, (void*)&job, sizeof(struct net_buffer_job));
- }
- void net_buf_stop_job()
- {
- rt_uint32_t level;
- level = rt_hw_interrupt_disable();
- _netbuf.stat = NETBUF_STAT_STOPPING;
- rt_kprintf("stat -> stopping\n");
- rt_hw_interrupt_enable(level);
- }
- static void net_buf_do_stop(struct net_buffer_job* job)
- {
- /* source closed */
- job->close(job->parameter);
- _netbuf.stat = NETBUF_STAT_STOPPED;
- rt_kprintf("stat -> stopped\n");
- if (_netbuf.is_wait_ready == RT_TRUE)
- {
- /* resume the wait for buffer task */
- _netbuf.is_wait_ready = RT_FALSE;
- rt_sem_release(_netbuf.wait_ready);
- }
- rt_kprintf("job done, stat %d\n", _netbuf.stat);
- }
- #define NETBUF_BLOCK_SIZE 1024
- static void net_buf_do_job(struct net_buffer_job* job)
- {
- rt_uint32_t level;
- rt_size_t read_length, data_length;
- rt_uint8_t *ptr;
- ptr = rt_malloc(NETBUF_BLOCK_SIZE);
- while (1)
- {
- if (_netbuf.stat == NETBUF_STAT_STOPPING)
- {
- net_buf_do_stop(job);
- break;
- }
- /* fetch data buffer */
- read_length = job->fetch(ptr, NETBUF_BLOCK_SIZE, job->parameter);
- if (read_length <= 0)
- {
- net_buf_do_stop(job);
- break;
- }
- else
- {
- /* got data length in the buffer */
- data_length = _netbuf.data_length;
- /* check avaible buffer to save */
- if ((_netbuf.size - data_length) < read_length)
- {
- rt_err_t result;
- _netbuf.is_wait_resume = RT_TRUE;
- rt_kprintf("netbuf suspend, avaible room %d\n", data_length);
- result = rt_sem_take(_netbuf.wait_resume, RT_WAITING_FOREVER);
- if (result != RT_EOK)
- {
- /* stop net buffer worker */
- net_buf_do_stop(job);
- break;
- }
- }
- /* there are free space to fetch data */
- if ((_netbuf.size - _netbuf.save_index) < read_length)
- {
- rt_memcpy(&_netbuf.buffer_data[_netbuf.save_index],
- ptr, _netbuf.size - _netbuf.save_index);
- rt_memcpy(&_netbuf.buffer_data[0],
- ptr + (_netbuf.size - _netbuf.save_index),
- read_length - (_netbuf.size - _netbuf.save_index));
- /* move save index */
- _netbuf.save_index = read_length - (_netbuf.size - _netbuf.save_index);
- }
- else
- {
- rt_memcpy(&_netbuf.buffer_data[_netbuf.save_index],
- ptr, read_length);
- /* move save index */
- _netbuf.save_index += read_length;
- if (_netbuf.save_index >= _netbuf.size) _netbuf.save_index = 0;
- }
- level = rt_hw_interrupt_disable();
- _netbuf.data_length += read_length;
- data_length = _netbuf.data_length;
- rt_hw_interrupt_enable(level);
- }
- // rt_kprintf("buffering ... %d %c\n", (data_length * 100) / _netbuf.size, '%');
- if ((_netbuf.stat == NETBUF_STAT_BUFFERING) && (data_length >= _netbuf.ready_wm))
- {
- _netbuf.stat = NETBUF_STAT_BUSY;
- rt_kprintf("stat -> busy\n");
- /* notify the thread for waitting buffer ready */
- rt_kprintf("resume wait buffer\n");
- if (_netbuf.is_wait_ready == RT_TRUE)
- {
- _netbuf.is_wait_ready = RT_FALSE;
- rt_sem_release(_netbuf.wait_ready);
- }
- }
- }
- /* release fetch buffer */
- rt_free(ptr);
- }
- static void net_buf_thread_entry(void* parameter)
- {
- rt_err_t result;
- struct net_buffer_job job;
- while (1)
- {
- /* get a job */
- result = rt_mq_recv(_netbuf_mq, (void*)&job, sizeof(struct net_buffer_job), RT_WAITING_FOREVER);
- if (result == RT_EOK)
- {
- _netbuf.stat = NETBUF_STAT_BUFFERING;
- rt_kprintf("stat -> buffering\n");
- /* perform the job */
- net_buf_do_job(&job);
- }
- }
- }
- void net_buf_init(rt_size_t size)
- {
- rt_thread_t tid;
- /* init net buffer structure */
- _netbuf.read_index = _netbuf.save_index = 0;
- _netbuf.size = size; /* net buffer size */
- /* allocate buffer */
- _netbuf.buffer_data = rt_malloc(_netbuf.size);
- _netbuf.data_length = 0;
- /* set ready and resume water mater */
- _netbuf.ready_wm = _netbuf.size * 90/100;
- _netbuf.resume_wm = _netbuf.size * 80/100;
- /* set init stat */
- _netbuf.stat = NETBUF_STAT_FREE;
- rt_kprintf("stat -> free\n");
- _netbuf.wait_ready = rt_sem_create("nready", 0, RT_IPC_FLAG_FIFO);
- _netbuf.wait_resume = rt_sem_create("nresum", 0, RT_IPC_FLAG_FIFO);
- _netbuf.is_wait_ready = RT_FALSE;
- _netbuf.is_wait_resume = RT_FALSE;
- /* crate message queue */
- _netbuf_mq = rt_mq_create("njob", sizeof(struct net_buffer_job),
- 4, RT_IPC_FLAG_FIFO);
- /* create net buffer thread */
- tid = rt_thread_create("nbuf",
- net_buf_thread_entry, RT_NULL,
- 1024, 22, 5);
- if (tid != RT_NULL)
- rt_thread_startup(tid);
- }
- #endif
|