Browse Source

[components][drivers][workqueue] add system default workqueue and delayed work.

EvalZero 6 years ago
parent
commit
c7ccb4f532
3 changed files with 269 additions and 55 deletions
  1. 14 0
      components/drivers/Kconfig
  2. 42 10
      components/drivers/include/ipc/workqueue.h
  3. 213 45
      components/drivers/src/workqueue.c

+ 14 - 0
components/drivers/Kconfig

@@ -8,6 +8,20 @@ if RT_USING_DEVICE_IPC
     config RT_PIPE_BUFSZ
         int "Set pipe buffer size"
         default 512
+    
+    config RT_USING_SYSTEM_WORKQUEUE
+    	bool "Using system default workqueue
+    	default n
+
+    if RT_USING_SYSTEM_WORKQUEUE
+    	config RT_SYSTEM_WORKQUEUE_STACKSIZE
+    		int "The stack size for system workqueue thread"
+    		default 512
+
+   	 config RT_SYSTEM_WORKQUEUE_PRIORITY
+    		int "The priority level of system workqueue thread"
+    		default "23
+    endif
 endif
 
 config RT_USING_SERIAL

+ 42 - 10
components/drivers/include/ipc/workqueue.h

@@ -11,13 +11,26 @@
 
 #include <rtthread.h>
 
+enum
+{
+    RT_WORK_STATE_PENDING,  /* Work item pending state */
+};
+
+/**
+ * work type defitions
+ */
+enum
+{
+    RT_WORK_TYPE_DELAYED,
+};
+
 /* workqueue implementation */
 struct rt_workqueue
 {
     rt_list_t      work_list;
     struct rt_work *work_current; /* current work */
 
-    struct rt_semaphore sem; 
+    struct rt_semaphore sem;
     rt_thread_t    work_thread;
 };
 
@@ -25,27 +38,46 @@ struct rt_work
 {
     rt_list_t list;
 
-    void (*work_func)(struct rt_work* work, void* work_data);
+    void (*work_func)(struct rt_work *work, void *work_data);
     void *work_data;
+    rt_uint16_t flags;
+    rt_uint16_t type;
+};
+
+struct rt_delayed_work
+{
+    struct rt_work work;
+    struct rt_timer timer;
+    struct rt_workqueue *workqueue;
 };
 
 #ifdef RT_USING_HEAP
 /**
  * WorkQueue for DeviceDriver
  */
-struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_size, rt_uint8_t priority);
-rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue);
-rt_err_t rt_workqueue_dowork(struct rt_workqueue* queue, struct rt_work* work);
-rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* work);
-rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work);
-
-rt_inline void rt_work_init(struct rt_work* work, void (*work_func)(struct rt_work* work, void* work_data),
-    void* work_data)
+struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority);
+rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue);
+rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work);
+rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time);
+rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work);
+rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work);
+
+#ifdef RT_USING_SYSTEM_WORKQUEUE
+rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t time);
+rt_err_t rt_work_cancel(struct rt_work *work);
+#endif
+
+rt_inline void rt_work_init(struct rt_work *work, void (*work_func)(struct rt_work *work, void *work_data),
+                            void *work_data)
 {
     rt_list_init(&(work->list));
     work->work_func = work_func;
     work->work_data = work_data;
 }
+
+void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work,
+                          void *work_data), void *work_data);
+
 #endif
 
 #endif

+ 213 - 45
components/drivers/src/workqueue.c

@@ -13,11 +13,12 @@
 #include <rtdevice.h>
 
 #ifdef RT_USING_HEAP
+
 rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
 {
     rt_err_t result;
-    
-    rt_enter_critical(); 
+
+    rt_enter_critical();
     while (1)
     {
         /* try to take condition semaphore */
@@ -40,17 +41,17 @@ rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
         }
     }
     rt_exit_critical();
-    
+
     return result;
 }
 
-static void _workqueue_thread_entry(void* parameter)
+static void _workqueue_thread_entry(void *parameter)
 {
     rt_base_t level;
-    struct rt_work* work;
-    struct rt_workqueue* queue;
+    struct rt_work *work;
+    struct rt_workqueue *queue;
 
-    queue = (struct rt_workqueue*) parameter;
+    queue = (struct rt_workqueue *) parameter;
     RT_ASSERT(queue != RT_NULL);
 
     while (1)
@@ -67,6 +68,7 @@ static void _workqueue_thread_entry(void* parameter)
         work = rt_list_entry(queue->work_list.next, struct rt_work, list);
         rt_list_remove(&(work->list));
         queue->work_current = work;
+        work->flags &= ~RT_WORK_STATE_PENDING;
         rt_hw_interrupt_enable(level);
 
         /* do work */
@@ -81,11 +83,156 @@ static void _workqueue_thread_entry(void* parameter)
     }
 }
 
-struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_size, rt_uint8_t priority)
+static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work)
+{
+    rt_base_t level;
+
+    level = rt_hw_interrupt_disable();
+    if (work->flags & RT_WORK_STATE_PENDING)
+    {
+        rt_hw_interrupt_enable(level);
+        return -RT_EBUSY;
+    }
+
+    if (queue->work_current == work)
+    {
+        rt_hw_interrupt_enable(level);
+        return -RT_EBUSY;
+    }
+
+    /* NOTE: the work MUST be initialized firstly */
+    rt_list_remove(&(work->list));
+
+    rt_list_insert_after(queue->work_list.prev, &(work->list));
+    work->flags |= RT_WORK_STATE_PENDING;
+
+    /* whether the workqueue is doing work */
+    if (queue->work_current == RT_NULL)
+    {
+        rt_hw_interrupt_enable(level);
+        /* resume work thread */
+        rt_thread_resume(queue->work_thread);
+        rt_schedule();
+    }
+    else
+    {
+        rt_hw_interrupt_enable(level);
+    }
+
+    return RT_EOK;
+}
+
+static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
+{
+    rt_base_t level;
+
+    level = rt_hw_interrupt_disable();
+    if (queue->work_current == work)
+    {
+        rt_hw_interrupt_enable(level);
+        return -RT_EBUSY;
+    }
+    rt_list_remove(&(work->list));
+    work->flags &= ~RT_WORK_STATE_PENDING;
+    rt_hw_interrupt_enable(level);
+
+    return RT_EOK;
+}
+
+static rt_err_t _workqueue_cancel_delayed_work(struct rt_delayed_work *work)
+{
+    rt_base_t level;
+    int ret = RT_EOK;
+
+    if (!work->workqueue)
+    {
+        ret = -EINVAL;
+        goto __exit;
+    }
+
+    if (work->work.flags & RT_WORK_STATE_PENDING)
+    {
+        /* Remove from the queue if already submitted */
+        ret = rt_workqueue_cancel_work(work->workqueue, &(work->work));
+        if (ret)
+        {
+            goto __exit;
+        }
+    }
+    else
+    {
+        rt_timer_stop(&(work->timer));
+    }
+
+    level = rt_hw_interrupt_disable();
+    /* Detach from workqueue */
+    work->workqueue = RT_NULL;
+    work->work.flags &= ~(RT_WORK_STATE_PENDING);
+    rt_hw_interrupt_enable(level);
+
+__exit:
+    return ret;
+}
+
+static rt_err_t _workqueue_submit_delayed_work(struct rt_workqueue *queue,
+        struct rt_delayed_work *work, rt_tick_t ticks)
+{
+    rt_base_t level;
+    int ret = RT_EOK;
+
+
+    /* Work cannot be active in multiple queues */
+    if (work->workqueue && work->workqueue != queue)
+    {
+        ret = -RT_EINVAL;
+        goto __exit;
+    }
+
+    /* Cancel if work has been submitted */
+    if (work->workqueue == queue)
+    {
+        ret = _workqueue_cancel_delayed_work(work);
+        if (ret < 0)
+        {
+            goto __exit;
+        }
+    }
+
+    level = rt_hw_interrupt_disable();
+    /* Attach workqueue so the timeout callback can submit it */
+    work->workqueue = queue;
+    rt_hw_interrupt_enable(level);
+
+    if (!ticks)
+    {
+        /* Submit work if no ticks is 0 */
+        _workqueue_submit_work(work->workqueue, &(work->work));
+    }
+    else
+    {
+        /* Add timeout */
+        rt_timer_control(&(work->timer), RT_TIMER_CTRL_SET_TIME, &ticks);
+        rt_timer_start(&(work->timer));
+    }
+
+__exit:
+    return ret;
+}
+
+static void _delayed_work_timeout_handler(void *parameter)
+{
+    struct rt_delayed_work *delayed_work;
+
+    delayed_work = (struct rt_delayed_work *)parameter;
+    rt_timer_stop(&(delayed_work->timer));
+    _workqueue_submit_work(delayed_work->workqueue, &(delayed_work->work));
+}
+
+struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
 {
     struct rt_workqueue *queue = RT_NULL;
 
-    queue = (struct rt_workqueue*)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
+    queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
     if (queue != RT_NULL)
     {
         /* initialize work list */
@@ -107,7 +254,7 @@ struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_siz
     return queue;
 }
 
-rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue)
+rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
 {
     RT_ASSERT(queue != RT_NULL);
 
@@ -117,37 +264,30 @@ rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue)
     return RT_EOK;
 }
 
-rt_err_t rt_workqueue_dowork(struct rt_workqueue* queue, struct rt_work* work)
+rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
 {
-    rt_base_t level;
     RT_ASSERT(queue != RT_NULL);
     RT_ASSERT(work != RT_NULL);
 
-    level = rt_hw_interrupt_disable();
-    if (queue->work_current == work)
-    {
-        rt_hw_interrupt_enable(level);
-        return -RT_EBUSY;
-    }
+    return _workqueue_submit_work(queue, work);
+}
 
-    /* NOTE: the work MUST be initialized firstly */
-    rt_list_remove(&(work->list));
+rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time)
+{
+    RT_ASSERT(queue != RT_NULL);
+    RT_ASSERT(work != RT_NULL);
 
-    rt_list_insert_after(queue->work_list.prev, &(work->list));
-    /* whether the workqueue is doing work */
-    if (queue->work_current == RT_NULL)
+    if (work->type & RT_WORK_TYPE_DELAYED)
     {
-        rt_hw_interrupt_enable(level);
-        /* resume work thread */
-        rt_thread_resume(queue->work_thread);
-        rt_schedule();
+        return _workqueue_submit_delayed_work(queue, (struct rt_delayed_work *)work, time);
+    }
+    else
+    {
+        return _workqueue_submit_work(queue, work);
     }
-    else rt_hw_interrupt_enable(level);
-
-    return RT_EOK;
 }
 
-rt_err_t rt_workqueue_critical_work(struct rt_workqueue* queue, struct rt_work* work)
+rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work)
 {
     rt_base_t level;
     RT_ASSERT(queue != RT_NULL);
@@ -176,26 +316,22 @@ rt_err_t rt_workqueue_critical_work(struct rt_workqueue* queue, struct rt_work*
     return RT_EOK;
 }
 
-rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* work)
+rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
 {
-    rt_base_t level;
-
     RT_ASSERT(queue != RT_NULL);
     RT_ASSERT(work != RT_NULL);
 
-    level = rt_hw_interrupt_disable();
-    if (queue->work_current == work)
+    if (work->type & RT_WORK_TYPE_DELAYED)
     {
-        rt_hw_interrupt_enable(level);
-        return -RT_EBUSY;
+        return _workqueue_cancel_delayed_work((struct rt_delayed_work *)work);
+    }
+    else
+    {
+        return _workqueue_cancel_work(queue, work);
     }
-    rt_list_remove(&(work->list));
-    rt_hw_interrupt_enable(level);
-
-    return RT_EOK;
 }
 
-rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work)
+rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
 {
     rt_base_t level;
 
@@ -212,12 +348,13 @@ rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_wor
     {
         rt_list_remove(&(work->list));
     }
+    work->flags &= ~RT_WORK_STATE_PENDING;
     rt_hw_interrupt_enable(level);
 
     return RT_EOK;
 }
 
-rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue* queue)
+rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
 {
     struct rt_list_node *node, *next;
     RT_ASSERT(queue != RT_NULL);
@@ -233,5 +370,36 @@ rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue* queue)
     return RT_EOK;
 }
 
-#endif
+void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work,
+                          void *work_data), void *work_data)
+{
+    rt_work_init(&(work->work), work_func, work_data);
+    work->work.type = RT_WORK_TYPE_DELAYED;
+    rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, work, 0,
+                  RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER);
+}
 
+#ifdef RT_USING_SYSTEM_WORKQUEUE
+static struct rt_workqueue *sys_workq;
+
+rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t time)
+{
+    return rt_workqueue_submit_work(sys_workq, work, time);
+}
+
+rt_err_t rt_work_cancel(struct rt_work *work)
+{
+    return rt_workqueue_cancel_work(sys_workq, work);
+}
+
+static int rt_work_sys_workqueue_init(void)
+{
+    sys_workq = rt_workqueue_create("sys_work", RT_SYSTEM_WORKQUEUE_STACKSIZE * 4,
+                                    RT_SYSTEM_WORKQUEUE_PRIORITY);
+
+    return RT_EOK;
+}
+
+INIT_DEVICE_EXPORT(rt_work_sys_workqueue_init);
+#endif
+#endif