瀏覽代碼

[DeviceDrivers] Add sync mode in work queue.

bernard 7 年之前
父節點
當前提交
19aa36e830
共有 2 個文件被更改,包括 62 次插入0 次删除
  1. 3 0
      components/drivers/include/ipc/workqueue.h
  2. 59 0
      components/drivers/src/workqueue.c

+ 3 - 0
components/drivers/include/ipc/workqueue.h

@@ -8,6 +8,8 @@ struct rt_workqueue
 {
     rt_list_t      work_list;
     struct rt_work *work_current; /* current work */
+
+    struct rt_semaphore sem; 
     rt_thread_t    work_thread;
 };
 
@@ -27,6 +29,7 @@ struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_siz
 rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue);
 rt_err_t rt_workqueue_dowork(struct rt_workqueue* queue, struct rt_work* work);
 rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* work);
+rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work);
 
 rt_inline void rt_work_init(struct rt_work* work, void (*work_func)(struct rt_work* work, void* work_data),
     void* work_data)

+ 59 - 0
components/drivers/src/workqueue.c

@@ -27,6 +27,37 @@
 #include <rtdevice.h>
 
 #ifdef RT_USING_HEAP
+rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
+{
+    rt_err_t result;
+    
+    rt_enter_critical(); 
+    while (1)
+    {
+        /* try to take condition semaphore */
+        result = rt_sem_trytake(&(queue->sem));
+        if (result == -RT_ETIMEOUT)
+        {
+            /* it's timeout, release this semaphore */
+            rt_sem_release(&(queue->sem));
+        }
+        else if (result == RT_EOK)
+        {
+            /* keep the sem value = 0 */
+            result = RT_EOK;
+            break;
+        }
+        else
+        {
+            result = -RT_ERROR;
+            break;
+        }
+    }
+    rt_exit_critical();
+    
+    return result;
+}
+
 static void _workqueue_thread_entry(void* parameter)
 {
     rt_base_t level;
@@ -58,6 +89,9 @@ static void _workqueue_thread_entry(void* parameter)
         /* clean current work */
         queue->work_current = RT_NULL;
         rt_hw_interrupt_enable(level);
+
+        /* ack work completion */
+        _workqueue_work_completion(queue);
     }
 }
 
@@ -71,6 +105,7 @@ struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_siz
         /* initialize work list */
         rt_list_init(&(queue->work_list));
         queue->work_current = RT_NULL;
+        rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
 
         /* create the work thread */
         queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
@@ -174,6 +209,30 @@ rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* wo
     return RT_EOK;
 }
 
+rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work)
+{
+    rt_base_t level;
+
+    RT_ASSERT(queue != RT_NULL);
+    RT_ASSERT(work != RT_NULL);
+
+    level = rt_hw_interrupt_disable();
+    if (queue->work_current == work) /* it's current work in the queue */
+    {
+        rt_uint32_t recv;
+        
+        /* wait for work completion */
+        rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
+    }
+    else
+    {
+        rt_list_remove(&(work->list));
+    }
+    rt_hw_interrupt_enable(level);
+
+    return RT_EOK;
+}
+
 rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue* queue)
 {
     struct rt_list_node *node, *next;