Browse Source

add a blocking mailbox post function

git-svn-id: https://rt-thread.googlecode.com/svn/trunk@1471 bbd45198-f89e-11dd-88c7-29a3b14d5316
mbbill@gmail.com 14 years ago
parent
commit
252bc41a2c
2 changed files with 154 additions and 53 deletions
  1. 2 0
      include/rtdef.h
  2. 152 53
      src/ipc.c

+ 2 - 0
include/rtdef.h

@@ -489,6 +489,8 @@ struct rt_mailbox
 
 	rt_uint16_t entry;									/**< index of messages in msg_pool. 				*/
 	rt_uint16_t in_offset, out_offset;					/**< in/output offset of the message buffer.		*/
+
+	rt_list_t suspend_sender_thread;					/**< sender thread suspended on this mb		*/
 };
 typedef struct rt_mailbox* rt_mailbox_t;
 #endif

+ 152 - 53
src/ipc.c

@@ -71,23 +71,23 @@ rt_inline rt_err_t rt_ipc_object_init(struct rt_ipc_object *ipc)
 }
 
 /**
- * This function will suspend a thread for a specified IPC object and put the
- * thread into suspend queue of IPC object
+ * This function will suspend a thread to a specified list. IPC object or some double-queue
+ * object (mailbox etc.) contains this kind of list.
  *
  * @param ipc the IPC object
  * @param thread the thread object to be suspended
  *
  * @return the operation status, RT_EOK on successful
  */
-rt_inline rt_err_t rt_ipc_object_suspend(struct rt_ipc_object *ipc, struct rt_thread *thread)
+rt_inline rt_err_t rt_ipc_list_suspend(rt_list_t *list, struct rt_thread *thread, rt_uint8_t flag)
 {
 	/* suspend thread */
 	rt_thread_suspend(thread);
 
-	switch (ipc->parent.flag)
+	switch (flag)
 	{
 	case RT_IPC_FLAG_FIFO:
-		rt_list_insert_before(&(ipc->suspend_thread), &(thread->tlist));
+		rt_list_insert_before(list, &(thread->tlist));
 		break;
 
 	case RT_IPC_FLAG_PRIO:
@@ -96,8 +96,7 @@ rt_inline rt_err_t rt_ipc_object_suspend(struct rt_ipc_object *ipc, struct rt_th
 			struct rt_thread* sthread;
 
 			/* find a suitable position */
-			for (n = ipc->suspend_thread.next; n != &(ipc->suspend_thread);
-				n = n->next)
+			for (n = list->next; n != list; n = n->next)
 			{
 				sthread = rt_list_entry(n, struct rt_thread, tlist);
 
@@ -111,8 +110,8 @@ rt_inline rt_err_t rt_ipc_object_suspend(struct rt_ipc_object *ipc, struct rt_th
 			}
 
 			/* not found a suitable position, append to the end of suspend_thread list */
-			if (n == &(ipc->suspend_thread))
-				rt_list_insert_before(&(ipc->suspend_thread), &(thread->tlist));
+			if (n == list)
+				rt_list_insert_before(list, &(thread->tlist));
 		}
 		break;
 	}
@@ -121,20 +120,20 @@ rt_inline rt_err_t rt_ipc_object_suspend(struct rt_ipc_object *ipc, struct rt_th
 }
 
 /**
- * This function will resume a thread from an IPC object:
+ * This function will resume the first thread in the list of a IPC object:
  * - remove the thread from suspend queue of IPC object
  * - put the thread into system ready queue
  *
- * @param ipc the IPC object
+ * @param list the thread list
  *
  * @return the operation status, RT_EOK on successful
  */
-rt_inline rt_err_t rt_ipc_object_resume(struct rt_ipc_object* ipc)
+rt_inline rt_err_t rt_ipc_list_resume(rt_list_t *list)
 {
 	struct rt_thread *thread;
 
 	/* get thread entry */
-	thread = rt_list_entry(ipc->suspend_thread.next, struct rt_thread, tlist);
+	thread = rt_list_entry(list->next, struct rt_thread, tlist);
 
 #ifdef RT_IPC_DEBUG
 	rt_kprintf("resume thread:%s\n", thread->name);
@@ -147,25 +146,26 @@ rt_inline rt_err_t rt_ipc_object_resume(struct rt_ipc_object* ipc)
 }
 
 /**
- * This function will resume all suspended threads in an IPC object.
+ * This function will resume all suspended threads in a list, including
+ * suspend list of IPC object and private list of mailbox etc.
  *
- * @param ipc the IPC object
+ * @param list of the threads to resume
  *
  * @return the operation status, RT_EOK on successful
  */
-rt_inline rt_err_t rt_ipc_object_resume_all(struct rt_ipc_object* ipc)
+rt_inline rt_err_t rt_ipc_list_resume_all(rt_list_t *list)
 {
 	struct rt_thread* thread;
 	register rt_ubase_t temp;
 
 	/* wakeup all suspend threads */
-	while (!rt_list_isempty(&(ipc->suspend_thread)))
+	while (!rt_list_isempty(list))
 	{
 		/* disable interrupt */
 		temp = rt_hw_interrupt_disable();
 
 		/* get next suspend thread */
-		thread = rt_list_entry(ipc->suspend_thread.next, struct rt_thread, tlist);
+		thread = rt_list_entry(list->next, struct rt_thread, tlist);
 		/* set error code to RT_ERROR */
 		thread->error = -RT_ERROR;
 
@@ -229,7 +229,7 @@ rt_err_t rt_sem_detach (rt_sem_t sem)
 	RT_ASSERT(sem != RT_NULL);
 
 	/* wakeup all suspend threads */
-	rt_ipc_object_resume_all(&(sem->parent));
+	rt_ipc_list_resume_all(&(sem->parent.suspend_thread));
 
 	/* detach semaphore object */
 	rt_object_detach(&(sem->parent.parent));
@@ -283,7 +283,7 @@ rt_err_t rt_sem_delete (rt_sem_t sem)
 	RT_ASSERT(sem != RT_NULL);
 
 	/* wakeup all suspend threads */
-	rt_ipc_object_resume_all(&(sem->parent));
+	rt_ipc_list_resume_all(&(sem->parent.suspend_thread));
 
 	/* delete semaphore object */
 	rt_object_delete(&(sem->parent.parent));
@@ -315,7 +315,7 @@ rt_err_t rt_sem_take (rt_sem_t sem, rt_int32_t time)
 	/* disable interrupt */
 	temp = rt_hw_interrupt_disable();
 
-#ifdef RT_IPC_DEBUG
+#ifdef RT_IPC_DEBU
 	rt_kprintf("thread %s take sem:%s, which value is: %d\n", rt_thread_self()->name,
 		((struct rt_object*)sem)->name, sem->value);
 #endif
@@ -349,7 +349,8 @@ rt_err_t rt_sem_take (rt_sem_t sem, rt_int32_t time)
 #endif
 
 			/* suspend thread */
-			rt_ipc_object_suspend(&(sem->parent), thread);
+			rt_ipc_list_suspend(&(sem->parent.suspend_thread),
+				thread, sem->parent.parent.flag);
 
 			/* has waiting time, start thread timer */
 			if (time > 0)
@@ -424,7 +425,7 @@ rt_err_t rt_sem_release(rt_sem_t sem)
 	if ( !rt_list_isempty(&sem->parent.suspend_thread) )
 	{
 		/* resume the suspended thread */
-		rt_ipc_object_resume(&(sem->parent));
+		rt_ipc_list_resume(&(sem->parent.suspend_thread));
 		need_schedule = RT_TRUE;
 	}
 	else sem->value ++; /* increase value */
@@ -457,12 +458,12 @@ rt_err_t rt_sem_control(rt_sem_t sem, rt_uint8_t cmd, void* arg)
 		rt_uint32_t value;
 
 		/* get value */
-		value = (rt_uint32_t)arg;		
+		value = (rt_uint32_t)arg;
 		/* disable interrupt */
 		level = rt_hw_interrupt_disable();
 
 		/* resume all waiting thread */
-		rt_ipc_object_resume_all(&sem->parent);
+		rt_ipc_list_resume_all(&sem->parent.suspend_thread);
 
 		/* set new value */
 		sem->value = (rt_uint16_t)value;
@@ -525,7 +526,7 @@ rt_err_t rt_mutex_detach (rt_mutex_t mutex)
 	RT_ASSERT(mutex != RT_NULL);
 
 	/* wakeup all suspend threads */
-	rt_ipc_object_resume_all(&(mutex->parent));
+	rt_ipc_list_resume_all(&(mutex->parent.suspend_thread));
 
 	/* detach semaphore object */
 	rt_object_detach(&(mutex->parent.parent));
@@ -580,7 +581,7 @@ rt_err_t rt_mutex_delete (rt_mutex_t mutex)
 	RT_ASSERT(mutex != RT_NULL);
 
 	/* wakeup all suspend threads */
-	rt_ipc_object_resume_all(&(mutex->parent));
+	rt_ipc_list_resume_all(&(mutex->parent.suspend_thread));
 
 	/* delete semaphore object */
 	rt_object_delete(&(mutex->parent.parent));
@@ -671,7 +672,8 @@ rt_err_t rt_mutex_take (rt_mutex_t mutex, rt_int32_t time)
 				}
 
 				/* suspend current thread */
-				rt_ipc_object_suspend(&(mutex->parent), thread);
+				rt_ipc_list_suspend(&(mutex->parent.suspend_thread),
+				thread, mutex->parent.parent.flag);
 
 				/* has waiting time, start thread timer */
 				if (time > 0)
@@ -784,7 +786,7 @@ rt_err_t rt_mutex_release(rt_mutex_t mutex)
 			mutex->hold ++;
 
 			/* resume thread */
-			rt_ipc_object_resume(&(mutex->parent));
+			rt_ipc_list_resume(&(mutex->parent.suspend_thread));
 
 			need_schedule = RT_TRUE;
 		}
@@ -868,7 +870,7 @@ rt_err_t rt_event_detach(rt_event_t event)
 	RT_ASSERT(event != RT_NULL);
 
 	/* resume all suspended thread */
-	rt_ipc_object_resume_all(&(event->parent));
+	rt_ipc_list_resume_all(&(event->parent.suspend_thread));
 
 	/* detach event object */
 	rt_object_detach(&(event->parent.parent));
@@ -918,7 +920,7 @@ rt_err_t rt_event_delete (rt_event_t event)
 	RT_ASSERT(event != RT_NULL);
 
 	/* resume all suspended thread */
-	rt_ipc_object_resume_all(&(event->parent));
+	rt_ipc_list_resume_all(&(event->parent.suspend_thread));
 
 	/* delete event object */
 	rt_object_delete(&(event->parent.parent));
@@ -1085,7 +1087,8 @@ rt_err_t rt_event_recv(rt_event_t event, rt_uint32_t set, rt_uint8_t option, rt_
 		thread->event_info = option;
 
 		/* put thread to suspended thread list */
-		rt_ipc_object_suspend(&(event->parent), thread);
+		rt_ipc_list_suspend(&(event->parent.suspend_thread),
+				thread, event->parent.parent.flag);
 
 		/* if there is a waiting timeout, active thread timer */
 		if (timeout > 0)
@@ -1144,7 +1147,7 @@ rt_err_t rt_event_control (rt_event_t event, rt_uint8_t cmd, void* arg)
 		level = rt_hw_interrupt_disable();
 
 		/* resume all waiting thread */
-		rt_ipc_object_resume_all(&event->parent);
+		rt_ipc_list_resume_all(&event->parent.suspend_thread);
 
 		/* init event set */
 		event->set = 0;
@@ -1194,6 +1197,8 @@ rt_err_t rt_mb_init(rt_mailbox_t mb, const char* name, void* msgpool, rt_size_t
 	mb->in_offset 	= 0;
 	mb->out_offset 	= 0;
 
+	rt_list_init(&(mb->suspend_sender_thread));
+
 	return RT_EOK;
 }
 
@@ -1210,7 +1215,9 @@ rt_err_t rt_mb_detach(rt_mailbox_t mb)
 	RT_ASSERT(mb != RT_NULL);
 
 	/* resume all suspended thread */
-	rt_ipc_object_resume_all(&(mb->parent));
+	rt_ipc_list_resume_all(&(mb->parent.suspend_thread));
+	/* also resume all mailbox private suspended thread */
+	rt_ipc_list_resume_all(&(mb->suspend_sender_thread));
 
 	/* detach mailbox object */
 	rt_object_detach(&(mb->parent.parent));
@@ -1272,11 +1279,13 @@ rt_err_t rt_mb_delete (rt_mailbox_t mb)
 	RT_ASSERT(mb != RT_NULL);
 
 	/* resume all suspended thread */
-	rt_ipc_object_resume_all(&(mb->parent));
+	rt_ipc_list_resume_all(&(mb->parent.suspend_thread));
+	/* also resume all mailbox private suspended thread */
+	rt_ipc_list_resume_all(&(mb->suspend_sender_thread));
 
 #ifdef RT_USING_MODULE
 	/* the mb object belongs to an application module */
-	if(mb->parent.parent.flag & RT_OBJECT_FLAG_MODULE) 
+	if(mb->parent.parent.flag & RT_OBJECT_FLAG_MODULE)
 		rt_module_free(mb->parent.parent.module_id, mb->msg_pool);
 	else
 #endif
@@ -1292,17 +1301,20 @@ rt_err_t rt_mb_delete (rt_mailbox_t mb)
 #endif
 
 /**
- * This function will send a mail to mailbox object, if there are threads suspended
- * on mailbox object, it will be waked up.
+ * This function will send a mail to mailbox object. If the mailbox is full,
+ * current thread will be suspended until timeout.
  *
  * @param mb the mailbox object
  * @param value the mail
+ * @param timeout the waiting time
  *
  * @return the error code
  */
-rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value)
+rt_err_t rt_mb_send_wait (rt_mailbox_t mb, rt_uint32_t value, rt_int32_t timeout)
 {
+	struct rt_thread *thread;
 	register rt_ubase_t temp;
+	rt_uint32_t tick_delta;
 
 	/* parameter check */
 	RT_ASSERT(mb != RT_NULL);
@@ -1314,15 +1326,69 @@ rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value)
 	/* disable interrupt */
 	temp = rt_hw_interrupt_disable();
 
+	/* get current thread */
+	thread = rt_thread_self();
+
 	/* mailbox is full */
-	if (mb->entry == mb->size)
+	while (mb->entry == mb->size)
 	{
+		/* reset error number in thread */
+		thread->error = RT_EOK;
+
+		/* no waiting, return timeout */
+		if (timeout == 0)
+		{
+            /* enable interrupt */
+            rt_hw_interrupt_enable(temp);
+
+            thread->error = -RT_EFULL;
+            return -RT_EFULL;
+        }
+
+		/* suspend current thread */
+		rt_ipc_list_suspend(&(mb->suspend_sender_thread),
+				thread, mb->parent.parent.flag);
+
+		/* has waiting time, start thread timer */
+		if (timeout > 0)
+		{
+			/* get the start tick of timer */
+			tick_delta = rt_tick_get();
+
+#ifdef RT_IPC_DEBUG
+			rt_kprintf("mb_send_wait: start timer of thread:%s\n", thread->name);
+#endif
+			/* reset the timeout of thread timer and start it */
+			rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout);
+			rt_timer_start(&(thread->thread_timer));
+		}
+
 		/* enable interrupt */
 		rt_hw_interrupt_enable(temp);
 
-		return -RT_EFULL;
+		/* re-schedule */
+		rt_schedule();
+
+		/* resume from suspend state */
+		if (thread->error != RT_EOK)
+		{
+		    /* return error */
+			return thread->error;
+		}
+
+		/* disable interrupt */
+		temp = rt_hw_interrupt_disable();
+
+		/* re-calculate timeout tick */
+		if (timeout > 0)
+		{
+			tick_delta = rt_tick_get() - tick_delta;
+			timeout -= tick_delta;
+			if (timeout < 0) timeout = 0;
+		}
 	}
 
+
 	/* set ptr */
 	mb->msg_pool[mb->in_offset] = value;
 	/* increase input offset */
@@ -1334,7 +1400,7 @@ rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value)
 	/* resume suspended thread */
 	if( !rt_list_isempty(&mb->parent.suspend_thread) )
 	{
-		rt_ipc_object_resume(&(mb->parent));
+		rt_ipc_list_resume(&(mb->parent.suspend_thread));
 
 		/* enable interrupt */
 		rt_hw_interrupt_enable(temp);
@@ -1349,6 +1415,21 @@ rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value)
 	return RT_EOK;
 }
 
+/**
+ * This function will send a mail to mailbox object, if there are threads suspended
+ * on mailbox object, it will be waked up. This function will return immediately, if
+ * you want blocking send, use rt_mb_send_wait instead.
+ *
+ * @param mb the mailbox object
+ * @param value the mail
+ *
+ * @return the error code
+ */
+rt_err_t rt_mb_send (rt_mailbox_t mb, rt_uint32_t value)
+{
+	return rt_mb_send_wait(mb,value,0);
+}
+
 /**
  * This function will receive a mail from mailbox object, if there is no mail in
  * mailbox object, the thread shall wait for a specified time.
@@ -1396,7 +1477,8 @@ rt_err_t rt_mb_recv (rt_mailbox_t mb, rt_uint32_t* value, rt_int32_t timeout)
 		}
 
 		/* suspend current thread */
-		rt_ipc_object_suspend(&(mb->parent), thread);
+		rt_ipc_list_suspend(&(mb->parent.suspend_thread),
+				thread, mb->parent.parent.flag);
 
 		/* has waiting time, start thread timer */
 		if (timeout > 0)
@@ -1446,6 +1528,22 @@ rt_err_t rt_mb_recv (rt_mailbox_t mb, rt_uint32_t* value, rt_int32_t timeout)
 	/* decrease message entry */
 	mb->entry --;
 
+	/* resume suspended thread */
+	if( !rt_list_isempty(&(mb->suspend_sender_thread)) )
+	{
+		rt_ipc_list_resume(&(mb->suspend_sender_thread));
+
+		/* enable interrupt */
+		rt_hw_interrupt_enable(temp);
+
+#ifdef RT_USING_HOOK
+	if (rt_object_take_hook != RT_NULL) rt_object_take_hook(&(mb->parent.parent));
+#endif
+		rt_schedule();
+
+		return RT_EOK;
+	}
+
 	/* enable interrupt */
 	rt_hw_interrupt_enable(temp);
 
@@ -1476,13 +1574,13 @@ rt_err_t rt_mb_control(rt_mailbox_t mb, rt_uint8_t cmd, void* arg)
 		level = rt_hw_interrupt_disable();
 
 		/* resume all waiting thread */
-		rt_ipc_object_resume_all(&mb->parent);
+		rt_ipc_list_resume_all(&(mb->parent.suspend_thread));
 
 		/* re-init mailbox */
 		mb->entry 	 	= 0;
 		mb->in_offset 	= 0;
 		mb->out_offset 	= 0;
-	
+
 		/* enable interrupt */
 		rt_hw_interrupt_enable(level);
 
@@ -1571,7 +1669,7 @@ rt_err_t rt_mq_detach(rt_mq_t mq)
 	RT_ASSERT(mq != RT_NULL);
 
 	/* resume all suspended thread */
-	rt_ipc_object_resume_all((struct rt_ipc_object*)mq);
+	rt_ipc_list_resume_all(&mq->parent.suspend_thread);
 
 	/* detach message queue object */
 	rt_object_detach(&(mq->parent.parent));
@@ -1653,11 +1751,11 @@ rt_err_t rt_mq_delete (rt_mq_t mq)
 	RT_ASSERT(mq != RT_NULL);
 
 	/* resume all suspended thread */
-	rt_ipc_object_resume_all(&(mq->parent));
+	rt_ipc_list_resume_all(&(mq->parent.suspend_thread));
 
 #ifdef RT_USING_MODULE
 	/* the mq object belongs to an application module */
-	if(mq->parent.parent.flag & RT_OBJECT_FLAG_MODULE) 
+	if(mq->parent.parent.flag & RT_OBJECT_FLAG_MODULE)
 		rt_module_free(mq->parent.parent.module_id, mq->msg_pool);
 	else
 #endif
@@ -1727,6 +1825,7 @@ rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size)
 		((struct rt_mq_message*)mq->msg_queue_tail)->next = msg;
 	}
 
+
 	/* set new tail */
 	mq->msg_queue_tail = msg;
 	/* if the head is empty, set head */
@@ -1738,7 +1837,7 @@ rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size)
 	/* resume suspended thread */
 	if( !rt_list_isempty(&mq->parent.suspend_thread) )
 	{
-		rt_ipc_object_resume(&(mq->parent));
+		rt_ipc_list_resume(&(mq->parent.suspend_thread));
 
 		/* enable interrupt */
 		rt_hw_interrupt_enable(temp);
@@ -1814,7 +1913,7 @@ rt_err_t rt_mq_urgent(rt_mq_t mq, void* buffer, rt_size_t size)
 	/* resume suspended thread */
 	if( !rt_list_isempty(&mq->parent.suspend_thread) )
 	{
-		rt_ipc_object_resume(&(mq->parent));
+		rt_ipc_list_resume(&(mq->parent.suspend_thread));
 
 		/* enable interrupt */
 		rt_hw_interrupt_enable(temp);
@@ -1875,7 +1974,8 @@ rt_err_t rt_mq_recv (rt_mq_t mq, void* buffer, rt_size_t size, rt_int32_t timeou
 		}
 
 		/* suspend current thread */
-		rt_ipc_object_suspend(&(mq->parent), thread);
+		rt_ipc_list_suspend(&(mq->parent.suspend_thread),
+				thread, mq->parent.parent.flag);
 
 		/* has waiting time, start thread timer */
 		if (timeout > 0)
@@ -1971,7 +2071,7 @@ rt_err_t rt_mq_control(rt_mq_t mq, rt_uint8_t cmd, void* arg)
 		level = rt_hw_interrupt_disable();
 
 		/* resume all waiting thread */
-		rt_ipc_object_resume_all(&mq->parent);
+		rt_ipc_list_resume_all(&mq->parent.suspend_thread);
 
 		/* release all message in the queue */
 		while (mq->msg_queue_head != RT_NULL)
@@ -2002,5 +2102,4 @@ rt_err_t rt_mq_control(rt_mq_t mq, rt_uint8_t cmd, void* arg)
 }
 
 #endif /* end of RT_USING_MESSAGEQUEUE */
-
 /*@}*/