Browse Source

[kernel][ipc] add send wait support for message queue

tangyuxin 5 years ago
parent
commit
e4671d2916
3 changed files with 131 additions and 5 deletions
  1. 2 0
      include/rtdef.h
  2. 4 0
      include/rtthread.h
  3. 125 5
      src/ipc.c

+ 2 - 0
include/rtdef.h

@@ -742,6 +742,8 @@ struct rt_messagequeue
     void                *msg_queue_head;                /**< list head */
     void                *msg_queue_tail;                /**< list tail */
     void                *msg_queue_free;                /**< pointer indicated the free node of queue */
+
+    rt_list_t            suspend_sender_thread;         /**< sender thread suspended on this message queue */
 };
 typedef struct rt_messagequeue *rt_mq_t;
 #endif

+ 4 - 0
include/rtthread.h

@@ -379,6 +379,10 @@ rt_mq_t rt_mq_create(const char *name,
 rt_err_t rt_mq_delete(rt_mq_t mq);
 
 rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size);
+rt_err_t rt_mq_send_wait(rt_mq_t     mq,
+                         const void *buffer,
+                         rt_size_t   size,
+                         rt_int32_t  timeout);
 rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size);
 rt_err_t rt_mq_recv(rt_mq_t    mq,
                     void      *buffer,

+ 125 - 5
src/ipc.c

@@ -33,6 +33,7 @@
  * 2011-12-18     Bernard      add more parameter checking in message queue
  * 2013-09-14     Grissiom     add an option check in rt_event_recv
  * 2018-10-02     Bernard      add 64bit support for mailbox
+ * 2019-09-16     tyx          add send wait support for message queue
  */
 
 #include <rtthread.h>
@@ -1823,6 +1824,9 @@ rt_err_t rt_mq_init(rt_mq_t     mq,
     /* the initial entry is zero */
     mq->entry = 0;
 
+    /* init an additional list of sender suspend thread */
+    rt_list_init(&(mq->suspend_sender_thread));
+
     return RT_EOK;
 }
 RTM_EXPORT(rt_mq_init);
@@ -1843,6 +1847,8 @@ rt_err_t rt_mq_detach(rt_mq_t mq)
 
     /* resume all suspended thread */
     rt_ipc_list_resume_all(&mq->parent.suspend_thread);
+    /* also resume all message queue private suspended thread */
+    rt_ipc_list_resume_all(&(mq->suspend_sender_thread));
 
     /* detach message queue object */
     rt_object_detach(&(mq->parent.parent));
@@ -1916,6 +1922,9 @@ rt_mq_t rt_mq_create(const char *name,
     /* the initial entry is zero */
     mq->entry = 0;
 
+    /* init an additional list of sender suspend thread */
+    rt_list_init(&(mq->suspend_sender_thread));
+
     return mq;
 }
 RTM_EXPORT(rt_mq_create);
@@ -1938,6 +1947,8 @@ rt_err_t rt_mq_delete(rt_mq_t mq)
 
     /* resume all suspended thread */
     rt_ipc_list_resume_all(&(mq->parent.suspend_thread));
+    /* also resume all message queue private suspended thread */
+    rt_ipc_list_resume_all(&(mq->suspend_sender_thread));
 
     /* free message queue pool */
     RT_KERNEL_FREE(mq->msg_pool);
@@ -1951,19 +1962,25 @@ RTM_EXPORT(rt_mq_delete);
 #endif
 
 /**
- * This function will send a message to message queue object, if there are
- * threads suspended on message queue object, it will be waked up.
+ * This function will send a message to message queue object. If the message queue is full,
+ * current thread will be suspended until timeout.
  *
  * @param mq the message queue object
  * @param buffer the message
  * @param size the size of buffer
+ * @param timeout the waiting time
  *
  * @return the error code
  */
-rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
+rt_err_t rt_mq_send_wait(rt_mq_t     mq,
+                         const void *buffer,
+                         rt_size_t   size,
+                         rt_int32_t  timeout)
 {
     register rt_ubase_t temp;
     struct rt_mq_message *msg;
+    rt_uint32_t tick_delta;
+    struct rt_thread *thread;
 
     /* parameter check */
     RT_ASSERT(mq != RT_NULL);
@@ -1975,6 +1992,11 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
     if (size > mq->msg_size)
         return -RT_ERROR;
 
+    /* initialize delta tick */
+    tick_delta = 0;
+    /* get current thread */
+    thread = rt_thread_self();
+
     RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));
 
     /* disable interrupt */
@@ -1982,14 +2004,78 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
 
     /* get a free list, there must be an empty item */
     msg = (struct rt_mq_message *)mq->msg_queue_free;
-    /* message queue is full */
-    if (msg == RT_NULL)
+    /* for non-blocking call */
+    if (msg == RT_NULL && timeout == 0)
     {
         /* enable interrupt */
         rt_hw_interrupt_enable(temp);
 
         return -RT_EFULL;
     }
+
+    /* message queue is full */
+    while ((msg = mq->msg_queue_free) == RT_NULL)
+    {
+        /* reset error number in thread */
+        thread->error = RT_EOK;
+
+        /* no waiting, return timeout */
+        if (timeout == 0)
+        {
+            /* enable interrupt */
+            rt_hw_interrupt_enable(temp);
+
+            return -RT_EFULL;
+        }
+
+        RT_DEBUG_IN_THREAD_CONTEXT;
+        /* suspend current thread */
+        rt_ipc_list_suspend(&(mq->suspend_sender_thread),
+                            thread,
+                            mq->parent.parent.flag);
+
+        /* has waiting time, start thread timer */
+        if (timeout > 0)
+        {
+            /* get the start tick of timer */
+            tick_delta = rt_tick_get();
+
+            RT_DEBUG_LOG(RT_DEBUG_IPC, ("mq_send_wait: start timer of thread:%s\n",
+                                        thread->name));
+
+            /* reset the timeout of thread timer and start it */
+            rt_timer_control(&(thread->thread_timer),
+                             RT_TIMER_CTRL_SET_TIME,
+                             &timeout);
+            rt_timer_start(&(thread->thread_timer));
+        }
+
+        /* enable interrupt */
+        rt_hw_interrupt_enable(temp);
+
+        /* re-schedule */
+        rt_schedule();
+
+        /* resume from suspend state */
+        if (thread->error != RT_EOK)
+        {
+            /* return error */
+            return thread->error;
+        }
+
+        /* disable interrupt */
+        temp = rt_hw_interrupt_disable();
+
+        /* if it's not waiting forever and then re-calculate timeout tick */
+        if (timeout > 0)
+        {
+            tick_delta = rt_tick_get() - tick_delta;
+            timeout -= tick_delta;
+            if (timeout < 0)
+                timeout = 0;
+        }
+    }
+
     /* move free list pointer */
     mq->msg_queue_free = msg->next;
 
@@ -2037,6 +2123,22 @@ rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
 
     return RT_EOK;
 }
+RTM_EXPORT(rt_mq_send_wait)
+
+/**
+ * This function will send a message to message queue object, if there are
+ * threads suspended on message queue object, it will be waked up.
+ *
+ * @param mq the message queue object
+ * @param buffer the message
+ * @param size the size of buffer
+ *
+ * @return the error code
+ */
+rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
+{
+    return rt_mq_send_wait(mq, buffer, size, 0);
+}
 RTM_EXPORT(rt_mq_send);
 
 /**
@@ -2257,6 +2359,22 @@ rt_err_t rt_mq_recv(rt_mq_t    mq,
     /* put message to free list */
     msg->next = (struct rt_mq_message *)mq->msg_queue_free;
     mq->msg_queue_free = msg;
+
+    /* resume suspended thread */
+    if (!rt_list_isempty(&(mq->suspend_sender_thread)))
+    {
+        rt_ipc_list_resume(&(mq->suspend_sender_thread));
+
+        /* enable interrupt */
+        rt_hw_interrupt_enable(temp);
+
+        RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));
+
+        rt_schedule();
+
+        return RT_EOK;
+    }
+
     /* enable interrupt */
     rt_hw_interrupt_enable(temp);
 
@@ -2292,6 +2410,8 @@ rt_err_t rt_mq_control(rt_mq_t mq, int cmd, void *arg)
 
         /* resume all waiting thread */
         rt_ipc_list_resume_all(&mq->parent.suspend_thread);
+        /* also resume all message queue private suspended thread */
+        rt_ipc_list_resume_all(&(mq->suspend_sender_thread));
 
         /* release all message in the queue */
         while (mq->msg_queue_head != RT_NULL)