Browse Source

fix bug of dataqueue

qiyongzhong0 4 years ago
parent
commit
44baf269f7
2 changed files with 93 additions and 35 deletions
  1. 6 5
      components/drivers/include/ipc/dataqueue.h
  2. 87 30
      components/drivers/src/dataqueue.c

+ 6 - 5
components/drivers/include/ipc/dataqueue.h

@@ -17,8 +17,7 @@
 #define RT_DATAQUEUE_EVENT_LWM       0x03
 
 struct rt_data_item;
-#define RT_DATAQUEUE_SIZE(dq)        ((dq)->put_index - (dq)->get_index)
-#define RT_DATAQUEUE_EMPTY(dq)       ((dq)->size - RT_DATAQUEUE_SIZE(dq))
+
 /* data queue implementation */
 struct rt_data_queue
 {
@@ -26,10 +25,11 @@ struct rt_data_queue
 
     rt_uint16_t size;
     rt_uint16_t lwm;
-    rt_bool_t   waiting_lwm;
 
-    rt_uint16_t get_index;
-    rt_uint16_t put_index;
+    rt_uint16_t get_index : 15;
+    rt_uint16_t is_empty  : 1;
+    rt_uint16_t put_index : 15;
+    rt_uint16_t is_full   : 1;
 
     struct rt_data_item *queue;
 
@@ -60,5 +60,6 @@ rt_err_t rt_data_queue_peak(struct rt_data_queue *queue,
                             rt_size_t            *size);
 void rt_data_queue_reset(struct rt_data_queue *queue);
 rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue);
+rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue);
 
 #endif

+ 87 - 30
components/drivers/src/dataqueue.c

@@ -28,7 +28,7 @@ rt_data_queue_init(struct rt_data_queue *queue,
                    void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
 {
     RT_ASSERT(queue != RT_NULL);
-    RT_ASSERT((0x10000 % size) == 0);
+    RT_ASSERT(size > 0);
 
     queue->evt_notify = evt_notify;
 
@@ -38,10 +38,12 @@ rt_data_queue_init(struct rt_data_queue *queue,
 
     queue->get_index = 0;
     queue->put_index = 0;
+    queue->is_empty = 1;
+    queue->is_full = 0;
 
     rt_list_init(&(queue->suspended_push_list));
     rt_list_init(&(queue->suspended_pop_list));
-
+    
     queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
     if (queue->queue == RT_NULL)
     {
@@ -61,14 +63,14 @@ rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
     rt_thread_t thread;
     rt_err_t    result;
     
-    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
     RT_ASSERT(queue != RT_NULL);
+    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
 
     result = RT_EOK;
     thread = rt_thread_self();
 
     level = rt_hw_interrupt_disable();
-    while (queue->put_index - queue->get_index == queue->size)
+    while (queue->is_full)
     {
         /* queue is full */
         if (timeout == 0)
@@ -109,9 +111,18 @@ rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
         if (result != RT_EOK) goto __exit;
     }
 
-    queue->queue[queue->put_index % queue->size].data_ptr  = data_ptr;
-    queue->queue[queue->put_index % queue->size].data_size = data_size;
+    queue->queue[queue->put_index].data_ptr  = data_ptr;
+    queue->queue[queue->put_index].data_size = data_size;
     queue->put_index += 1;
+    if (queue->put_index == queue->size)
+    {
+        queue->put_index = 0;
+    }
+    queue->is_empty = 0;
+    if (queue->put_index == queue->get_index)
+    {
+        queue->is_full = 1;
+    }
 
     /* there is at least one thread in suspended list */
     if (!rt_list_isempty(&(queue->suspended_pop_list)))
@@ -151,8 +162,8 @@ rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
     rt_thread_t thread;
     rt_err_t    result;
     
-    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
     RT_ASSERT(queue != RT_NULL);
+    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
     RT_ASSERT(data_ptr != RT_NULL);
     RT_ASSERT(size != RT_NULL);
 
@@ -160,7 +171,7 @@ rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
     thread = rt_thread_self();
 
     level = rt_hw_interrupt_disable();
-    while (queue->get_index == queue->put_index)
+    while (queue->is_empty)
     {
         /* queue is empty */
         if (timeout == 0)
@@ -201,12 +212,20 @@ rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
             goto __exit;
     }
 
-    *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
-    *size     = queue->queue[queue->get_index % queue->size].data_size;
-
+    *data_ptr = queue->queue[queue->get_index].data_ptr;
+    *size     = queue->queue[queue->get_index].data_size;
     queue->get_index += 1;
+    if (queue->get_index == queue->size)
+    {
+        queue->get_index = 0;
+    }
+    queue->is_full = 0;
+    if (queue->put_index == queue->get_index)
+    {
+        queue->is_empty = 1;
+    }
 
-    if ((queue->put_index - queue->get_index) <= queue->lwm)
+    if (rt_data_queue_len(queue) <= queue->lwm)
     {
         /* there is at least one thread in suspended list */
         if (!rt_list_isempty(&(queue->suspended_push_list)))
@@ -251,20 +270,18 @@ rt_err_t rt_data_queue_peak(struct rt_data_queue *queue,
 {
     rt_ubase_t  level;
     
-    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
     RT_ASSERT(queue != RT_NULL);
+    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
 
-    level = rt_hw_interrupt_disable();
-
-    if (queue->get_index == queue->put_index) 
+    if (queue->is_empty) 
     {
-        rt_hw_interrupt_enable(level);
-        
         return -RT_EEMPTY;
     }
 
-    *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
-    *size     = queue->queue[queue->get_index % queue->size].data_size;
+    level = rt_hw_interrupt_disable();
+
+    *data_ptr = queue->queue[queue->get_index].data_ptr;
+    *size     = queue->queue[queue->get_index].data_size;
 
     rt_hw_interrupt_enable(level);
 
@@ -274,10 +291,20 @@ RTM_EXPORT(rt_data_queue_peak);
 
 void rt_data_queue_reset(struct rt_data_queue *queue)
 {
+    rt_ubase_t  level;
     struct rt_thread *thread;
-    register rt_ubase_t temp;
     
+    RT_ASSERT(queue != RT_NULL);
     RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
+
+    level = rt_hw_interrupt_disable();
+
+    queue->get_index = 0;
+    queue->put_index = 0;
+    queue->is_empty = 1;
+    queue->is_full = 0;
+    
+    rt_hw_interrupt_enable(level);
     
     rt_enter_critical();
     /* wakeup all suspend threads */
@@ -286,7 +313,7 @@ void rt_data_queue_reset(struct rt_data_queue *queue)
     while (!rt_list_isempty(&(queue->suspended_pop_list)))
     {
         /* disable interrupt */
-        temp = rt_hw_interrupt_disable();
+        level = rt_hw_interrupt_disable();
 
         /* get next suspend thread */
         thread = rt_list_entry(queue->suspended_pop_list.next,
@@ -303,14 +330,14 @@ void rt_data_queue_reset(struct rt_data_queue *queue)
         rt_thread_resume(thread);
 
         /* enable interrupt */
-        rt_hw_interrupt_enable(temp);
+        rt_hw_interrupt_enable(level);
     }
 
     /* resume on push list */
     while (!rt_list_isempty(&(queue->suspended_push_list)))
     {
         /* disable interrupt */
-        temp = rt_hw_interrupt_disable();
+        level = rt_hw_interrupt_disable();
 
         /* get next suspend thread */
         thread = rt_list_entry(queue->suspended_push_list.next,
@@ -327,7 +354,7 @@ void rt_data_queue_reset(struct rt_data_queue *queue)
         rt_thread_resume(thread);
 
         /* enable interrupt */
-        rt_hw_interrupt_enable(temp);
+        rt_hw_interrupt_enable(level);
     }
     rt_exit_critical();
 
@@ -339,19 +366,49 @@ rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue)
 {
     rt_ubase_t level;
 
-    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
     RT_ASSERT(queue != RT_NULL);
-
-    level = rt_hw_interrupt_disable();
+    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
 
     /* wakeup all suspend threads */
     rt_data_queue_reset(queue);
 
+    level = rt_hw_interrupt_disable();
     queue->magic = 0;
-    rt_free(queue->queue);
-    
     rt_hw_interrupt_enable(level);
+    
+    rt_free(queue->queue);
 
     return RT_EOK;
 }
 RTM_EXPORT(rt_data_queue_deinit);
+
+rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue)
+{
+    rt_ubase_t level;
+    rt_int16_t len;
+    
+    RT_ASSERT(queue != RT_NULL);
+    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
+
+    if (queue->is_empty)
+    {
+        return 0;
+    }
+
+    level = rt_hw_interrupt_disable();
+
+    if (queue->put_index > queue->get_index)
+    {
+        len = queue->put_index - queue->get_index;
+    }
+    else
+    {
+        len = queue->size + queue->put_index - queue->get_index;
+    }
+    
+    rt_hw_interrupt_enable(level);
+
+    return len;
+}
+RTM_EXPORT(rt_data_queue_len);
+