Browse Source

[src][components][include]消息队列支持优先级 (#7382)

zhkag 1 năm trước cách đây
mục cha
commit
e65509a58e

+ 1 - 0
components/libc/posix/ipc/Kconfig

@@ -24,6 +24,7 @@ config RT_USING_POSIX_PIPE_SIZE
 config RT_USING_POSIX_MESSAGE_QUEUE
     bool "Enable posix message queue <mqueue.h>"
     select RT_USING_POSIX_CLOCK
+    select RT_USING_MESSAGEQUEUE_PRIORITY
     default n
 
 config RT_USING_POSIX_MESSAGE_SEMAPHORE

+ 4 - 3
components/libc/posix/ipc/mqueue.c

@@ -232,7 +232,7 @@ ssize_t mq_receive(mqd_t id, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
         return -1;
     }
 
-    result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER);
+    result = rt_mq_recv_prio(mqdes->mq, msg_ptr, msg_len, (rt_int32_t *)msg_prio, RT_WAITING_FOREVER, RT_UNINTERRUPTIBLE);
     if (result >= 0)
         return rt_strlen(msg_ptr);
 
@@ -255,7 +255,7 @@ int mq_send(mqd_t id, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
         return -1;
     }
 
-    result = rt_mq_send(mqdes->mq, (void*)msg_ptr, msg_len);
+    result = rt_mq_send_wait_prio(mqdes->mq, (void *)msg_ptr, msg_len, msg_prio, 0, RT_UNINTERRUPTIBLE);
     if (result == RT_EOK)
         return 0;
 
@@ -287,7 +287,8 @@ ssize_t mq_timedreceive(mqd_t                  id,
     if (abs_timeout != RT_NULL)
         tick = rt_timespec_to_tick(abs_timeout);
 
-    result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick);
+    result = rt_mq_recv_prio(mqdes->mq, msg_ptr, msg_len, (rt_int32_t *)msg_prio, tick, RT_UNINTERRUPTIBLE);
+
     if (result >= 0)
         return rt_strlen(msg_ptr);
 

+ 33 - 0
examples/utest/testcases/kernel/messagequeue_tc.c

@@ -82,6 +82,25 @@ static void mq_send_case(rt_mq_t testmq)
         rt_thread_delay(100);
     }
 
+#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
+    ret = rt_mq_send_wait_prio(testmq, &send_buf[3], sizeof(send_buf[0]), 3, 0, RT_UNINTERRUPTIBLE);
+    uassert_true(ret == RT_EOK);
+    ret = rt_mq_send_wait_prio(testmq, &send_buf[0], sizeof(send_buf[0]), 0, 0, RT_UNINTERRUPTIBLE);
+    uassert_true(ret == RT_EOK);
+
+    ret = rt_mq_send_wait_prio(testmq, &send_buf[2], sizeof(send_buf[0]), 1, 0, RT_UNINTERRUPTIBLE);
+    uassert_true(ret == RT_EOK);
+    ret = rt_mq_send_wait_prio(testmq, &send_buf[4], sizeof(send_buf[0]), 4, 0, RT_UNINTERRUPTIBLE);
+    uassert_true(ret == RT_EOK);
+    ret = rt_mq_send_wait_prio(testmq, &send_buf[1], sizeof(send_buf[0]), 1, 0, RT_UNINTERRUPTIBLE);
+    uassert_true(ret == RT_EOK);
+
+    while (testmq->entry != 0)
+    {
+        rt_thread_delay(100);
+    }
+#endif
+
     ret = rt_mq_send(testmq, &send_buf[1], sizeof(send_buf[0]));
     uassert_true(ret == RT_EOK);
     ret = rt_mq_control(testmq, RT_IPC_CMD_RESET, RT_NULL);
@@ -121,6 +140,20 @@ static void mq_recv_case(rt_mq_t testmq)
         uassert_true(ret >= 0);
         uassert_true(recv_buf[var] == (var + 1));
     }
+#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
+    rt_int32_t msg_prio;
+    while (testmq->entry == MAX_MSGS)
+    {
+        rt_thread_delay(100);
+    }
+    for (int var = 0; var < MAX_MSGS; ++var)
+    {
+        ret = rt_mq_recv_prio(testmq, &recv_buf[var], sizeof(recv_buf[0]), &msg_prio, RT_WAITING_FOREVER, RT_UNINTERRUPTIBLE);
+        rt_kprintf("msg_prio = %d\r\n", msg_prio);
+        uassert_true(ret >= 0);
+        uassert_true(recv_buf[var] == (MAX_MSGS - var));
+    }
+#endif
 }
 
 static void mq_recv_entry(void *param)

+ 18 - 0
include/rtthread.h

@@ -463,6 +463,9 @@ struct rt_mq_message
 {
     struct rt_mq_message *next;
     rt_ssize_t length;
+#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
+    rt_int32_t prio;
+#endif
 };
 
 #define RT_MQ_BUF_SIZE(msg_size, max_msgs) \
@@ -515,6 +518,21 @@ rt_ssize_t rt_mq_recv_killable(rt_mq_t    mq,
                     rt_size_t  size,
                     rt_int32_t timeout);
 rt_err_t rt_mq_control(rt_mq_t mq, int cmd, void *arg);
+
+#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
+rt_err_t rt_mq_send_wait_prio(rt_mq_t mq,
+                              const void *buffer,
+                              rt_size_t size,
+                              rt_int32_t prio,
+                              rt_int32_t timeout,
+                              int suspend_flag);
+rt_err_t rt_mq_recv_prio(rt_mq_t mq,
+                         void *buffer,
+                         rt_size_t size,
+                         rt_int32_t *prio,
+                         rt_int32_t timeout,
+                         int suspend_flag);
+#endif
 #endif
 
 /* defunct */

+ 6 - 0
src/Kconfig

@@ -317,6 +317,12 @@ menu "Inter-Thread communication"
         bool "Enable message queue"
         default y
 
+    if RT_USING_MESSAGEQUEUE
+        config RT_USING_MESSAGEQUEUE_PRIORITY
+            bool "Enable message queue priority"
+            default n
+    endif
+
     config RT_USING_SIGNALS
         bool "Enable signals"
         select RT_USING_MEMPOOL

+ 80 - 20
src/ipc.c

@@ -3144,7 +3144,6 @@ rt_err_t rt_mq_delete(rt_mq_t mq)
 RTM_EXPORT(rt_mq_delete);
 #endif /* RT_USING_HEAP */
 
-
 /**
  * @brief    This function will send a message to the messagequeue object. If
  *           there is a thread suspended on the messagequeue, the thread will be
@@ -3154,10 +3153,10 @@ RTM_EXPORT(rt_mq_delete);
  *           fully used, the current thread will wait for a timeout. If reaching
  *           the timeout and there is still no space available, the sending
  *           thread will be resumed and an error code will be returned. By
- *           contrast, the rt_mq_send() function will return an error code
+ *           contrast, the _rt_mq_send_wait() function will return an error code
  *           immediately without waiting when the messagequeue if fully used.
  *
- * @see      rt_mq_send()
+ * @see      _rt_mq_send_wait()
  *
  * @param    mq is a pointer to the messagequeue object to be sent.
  *
@@ -3165,8 +3164,12 @@ RTM_EXPORT(rt_mq_delete);
  *
  * @param    size is the length of the message(Unit: Byte).
  *
+ * @param    prio is message priority, A larger value indicates a higher priority
+ *
  * @param    timeout is a timeout period (unit: an OS tick).
  *
+ * @param    suspend_flag status flag of the thread to be suspended.
+ *
  * @return   Return the operation status. When the return value is RT_EOK, the
  *           operation is successful. If the return value is any other values,
  *           it means that the messagequeue detach failed.
@@ -3174,11 +3177,12 @@ RTM_EXPORT(rt_mq_delete);
  * @warning  This function can be called in interrupt context and thread
  * context.
  */
-static rt_err_t _rt_mq_send_wait(rt_mq_t     mq,
-                         const void *buffer,
-                         rt_size_t   size,
-                         rt_int32_t  timeout,
-                         int suspend_flag)
+static rt_err_t _rt_mq_send_wait(rt_mq_t mq,
+                                 const void *buffer,
+                                 rt_size_t size,
+                                 rt_int32_t prio,
+                                 rt_int32_t timeout,
+                                 int suspend_flag)
 {
     rt_base_t level;
     struct rt_mq_message *msg;
@@ -3304,6 +3308,33 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t     mq,
 
     /* disable interrupt */
     level = rt_hw_interrupt_disable();
+#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
+    msg->prio = prio;
+    if (mq->msg_queue_head == RT_NULL)
+        mq->msg_queue_head = msg;
+
+    struct rt_mq_message *node, *prev_node = RT_NULL;
+    for (node = mq->msg_queue_head; node != RT_NULL; node = node->next)
+    {
+        if (node->prio < msg->prio)
+        {
+            if (prev_node == RT_NULL)
+                mq->msg_queue_head = msg;
+            else
+                prev_node->next = msg;
+            msg->next = node;
+            break;
+        }
+        if (node->next == RT_NULL)
+        {
+            if (node != msg)
+                node->next = msg;
+            mq->msg_queue_tail = msg;
+            break;
+        }
+        prev_node = node;
+    }
+#else
     /* link msg to message queue */
     if (mq->msg_queue_tail != RT_NULL)
     {
@@ -3316,6 +3347,7 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t     mq,
     /* if the head is empty, set head */
     if (mq->msg_queue_head == RT_NULL)
         mq->msg_queue_head = msg;
+#endif
 
     if(mq->entry < RT_MQ_ENTRY_MAX)
     {
@@ -3352,7 +3384,7 @@ rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                          rt_size_t   size,
                          rt_int32_t  timeout)
 {
-    return _rt_mq_send_wait(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE);
+    return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_UNINTERRUPTIBLE);
 }
 RTM_EXPORT(rt_mq_send_wait);
 
@@ -3361,7 +3393,7 @@ rt_err_t rt_mq_send_wait_interruptible(rt_mq_t     mq,
                          rt_size_t   size,
                          rt_int32_t  timeout)
 {
-    return _rt_mq_send_wait(mq, buffer, size, timeout, RT_INTERRUPTIBLE);
+    return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_INTERRUPTIBLE);
 }
 RTM_EXPORT(rt_mq_send_wait_interruptible);
 
@@ -3370,7 +3402,7 @@ rt_err_t rt_mq_send_wait_killable(rt_mq_t     mq,
                          rt_size_t   size,
                          rt_int32_t  timeout)
 {
-    return _rt_mq_send_wait(mq, buffer, size, timeout, RT_KILLABLE);
+    return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_KILLABLE);
 }
 RTM_EXPORT(rt_mq_send_wait_killable);
 /**
@@ -3513,7 +3545,6 @@ rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size)
 }
 RTM_EXPORT(rt_mq_urgent);
 
-
 /**
  * @brief    This function will receive a message from message queue object,
  *           if there is no message in messagequeue object, the thread shall wait for a specified time.
@@ -3526,11 +3557,15 @@ RTM_EXPORT(rt_mq_urgent);
  *
  * @param    buffer is the content of the message.
  *
+ * @param    prio is message priority, A larger value indicates a higher priority
+ *
  * @param    size is the length of the message(Unit: Byte).
  *
  * @param    timeout is a timeout period (unit: an OS tick). If the message is unavailable, the thread will wait for
  *           the message in the queue up to the amount of time specified by this parameter.
  *
+ * @param    suspend_flag status flag of the thread to be suspended.
+ *
  *           NOTE:
  *           If use Macro RT_WAITING_FOREVER to set this parameter, which means that when the
  *           message is unavailable in the queue, the thread will be waiting forever.
@@ -3540,11 +3575,12 @@ RTM_EXPORT(rt_mq_urgent);
  * @return   Return the real length of the message. When the return value is larger than zero, the operation is successful.
  *           If the return value is any other values, it means that the mailbox release failed.
  */
-static rt_ssize_t _rt_mq_recv(rt_mq_t    mq,
-                    void      *buffer,
-                    rt_size_t  size,
-                    rt_int32_t timeout,
-                    int suspend_flag)
+static rt_ssize_t _rt_mq_recv(rt_mq_t mq,
+                              void *buffer,
+                              rt_size_t size,
+                              rt_int32_t *prio,
+                              rt_int32_t timeout,
+                              int suspend_flag)
 {
     struct rt_thread *thread;
     rt_base_t level;
@@ -3675,6 +3711,10 @@ static rt_ssize_t _rt_mq_recv(rt_mq_t    mq,
     /* copy message */
     rt_memcpy(buffer, GET_MESSAGEBYTE_ADDR(msg), len);
 
+#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
+    if (prio != RT_NULL)
+        *prio = msg->prio;
+#endif
     /* disable interrupt */
     level = rt_hw_interrupt_disable();
     /* put message to free list */
@@ -3709,7 +3749,7 @@ rt_ssize_t rt_mq_recv(rt_mq_t    mq,
                     rt_size_t  size,
                     rt_int32_t timeout)
 {
-    return _rt_mq_recv(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE);
+    return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_UNINTERRUPTIBLE);
 }
 RTM_EXPORT(rt_mq_recv);
 
@@ -3718,7 +3758,7 @@ rt_ssize_t rt_mq_recv_interruptible(rt_mq_t    mq,
                     rt_size_t  size,
                     rt_int32_t timeout)
 {
-    return _rt_mq_recv(mq, buffer, size, timeout, RT_INTERRUPTIBLE);
+    return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_INTERRUPTIBLE);
 }
 RTM_EXPORT(rt_mq_recv_interruptible);
 
@@ -3727,8 +3767,28 @@ rt_ssize_t rt_mq_recv_killable(rt_mq_t    mq,
                     rt_size_t  size,
                     rt_int32_t timeout)
 {
-    return _rt_mq_recv(mq, buffer, size, timeout, RT_KILLABLE);
+    return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_KILLABLE);
+}
+#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
+rt_err_t rt_mq_send_wait_prio(rt_mq_t mq,
+                              const void *buffer,
+                              rt_size_t size,
+                              rt_int32_t prio,
+                              rt_int32_t timeout,
+                              int suspend_flag)
+{
+    return _rt_mq_send_wait(mq, buffer, size, prio, timeout, suspend_flag);
 }
+rt_err_t rt_mq_recv_prio(rt_mq_t mq,
+                         void *buffer,
+                         rt_size_t size,
+                         rt_int32_t *prio,
+                         rt_int32_t timeout,
+                         int suspend_flag)
+{
+    return _rt_mq_recv(mq, buffer, size, prio, timeout, suspend_flag);
+}
+#endif
 RTM_EXPORT(rt_mq_recv_killable);
 /**
  * @brief    This function will set some extra attributions of a messagequeue object.