Răsfoiți Sursa

Add pipe, data queue implementation; Fix the issue which leaks one item in the available data of ring buffer.

git-svn-id: https://rt-thread.googlecode.com/svn/trunk@2313 bbd45198-f89e-11dd-88c7-29a3b14d5316
bernard.xiong@gmail.com 12 ani în urmă
părinte
comite
4f05dd5426

+ 70 - 6
components/drivers/include/rtdevice.h

@@ -14,6 +14,8 @@ struct rt_completion
     rt_list_t suspended_list;
 };
 
+#define RT_RINGBUFFER_SIZE(rb)		((rb)->write_index - (rb)->read_index)
+#define RT_RINGBUFFER_EMPTY(rb) 	((rb)->buffer_size - RT_RINGBUFFER_SIZE(rb))
 /* ring buffer */
 struct rt_ringbuffer
 {
@@ -22,6 +24,47 @@ struct rt_ringbuffer
     rt_uint16_t buffer_size;
 };
 
+/* pipe device */
+#define PIPE_DEVICE(device)	((struct rt_pipe_device*)(device))
+struct rt_pipe_device
+{
+	struct rt_device parent;
+
+	/* ring buffer in pipe device */
+	struct rt_ringbuffer ringbuffer;
+
+	/* suspended list */
+	rt_list_t suspended_read_list;
+	rt_list_t suspended_write_list;
+};
+
+#define RT_DATAQUEUE_EVENT_UNKNOWN   0x00
+#define RT_DATAQUEUE_EVENT_POP       0x01
+#define RT_DATAQUEUE_EVENT_PUSH      0x02
+#define RT_DATAQUEUE_EVENT_LWM       0x03
+
+struct rt_data_item;
+#define RT_DATAQUEUE_SIZE(dq)		((dq)->put_index - (dq)->get_index)
+#define RT_DATAQUEUE_EMPTY(dq)		((dq)->size - RT_DATAQUEUE_SIZE(dq))
+/* data queue implementation */
+struct rt_data_queue
+{
+    rt_uint16_t size;
+    rt_uint16_t lwm;
+	rt_bool_t   waiting_lwm;
+
+    rt_uint16_t get_index;
+    rt_uint16_t put_index;
+
+    struct rt_data_item *queue;
+
+    rt_list_t suspended_push_list;
+    rt_list_t suspended_pop_list;
+
+    /* event notify */
+	void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event);
+};
+
 /**
  * Completion
  */
@@ -30,12 +73,11 @@ rt_err_t rt_completion_wait(struct rt_completion* completion,
                             rt_int32_t timeout);
 void rt_completion_done(struct rt_completion* completion);
 
-/**
- * DataLink for DeviceDriver
- */
-
 /**
  * RingBuffer for DeviceDriver
+ *
+ * Please note that the ring buffer implementation of RT-Thread 
+ * has no thread wait or resume feature.
  */
 void rt_ringbuffer_init(struct rt_ringbuffer* rb,
                         rt_uint8_t *pool,
@@ -49,8 +91,29 @@ rt_size_t rt_ringbuffer_get(struct rt_ringbuffer* rb,
                             rt_uint8_t *ptr,
                             rt_uint16_t length);
 rt_size_t rt_ringbuffer_getchar(struct rt_ringbuffer* rb, rt_uint8_t *ch);
-rt_size_t rt_ringbuffer_available_size(struct rt_ringbuffer* rb);
-rt_size_t rt_ringbuffer_emptry_size(struct rt_ringbuffer* rb);
+rt_inline rt_uint16_t rt_ringbuffer_get_size(struct rt_ringbuffer* rb)
+{
+	RT_ASSERT(rb != RT_NULL);
+	return rb->buffer_size;
+}
+
+/**
+ * Pipe Device
+ */
+rt_err_t rt_pipe_create(const char* name, rt_size_t size);
+void rt_pipe_destroy(struct rt_pipe_device* pipe);
+
+/**
+ * DataQueue for DeviceDriver
+ */
+rt_err_t rt_data_queue_init(struct rt_data_queue* queue, rt_uint16_t size, rt_uint16_t lwm,
+	void (*evt_notify)(struct rt_data_queue* queue, rt_uint32_t event));
+rt_err_t rt_data_queue_push(struct rt_data_queue* queue, void* data_ptr, rt_size_t data_size, 
+	rt_int32_t timeout);
+rt_err_t rt_data_queue_pop(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size, 
+	rt_int32_t timeout);
+rt_err_t rt_data_queue_peak(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size);
+void rt_data_queue_reset(struct rt_data_queue* queue);
 
 #ifdef RT_USING_SPI
 #include "drivers/spi.h"
@@ -93,3 +156,4 @@ rt_size_t rt_ringbuffer_emptry_size(struct rt_ringbuffer* rb);
 #endif
 
 #endif /* __RT_DEVICE_H__ */
+

+ 120 - 109
components/drivers/src/completion.c

@@ -1,110 +1,121 @@
-/**
- * Complete implementation in Device Drivers
- */
-#include <rthw.h>
-#include <rtthread.h>
-#include <rtdevice.h>
-
-#define RT_COMPLETED	1
-#define RT_UNCOMPLETED	0
-
-void rt_completion_init(struct rt_completion* completion)
-{
-	rt_base_t level;
-	RT_ASSERT(completion != RT_NULL);
-
-	level = rt_hw_interrupt_disable();
-	completion->flag = RT_UNCOMPLETED;
-	rt_list_init(&completion->suspended_list);
-	rt_hw_interrupt_enable(level);
-}
-
-rt_err_t rt_completion_wait(struct rt_completion* completion, rt_int32_t timeout)
-{
-	rt_err_t result;
-	rt_base_t level;
-	rt_thread_t thread;
-	RT_ASSERT(completion != RT_NULL);
-
-	result = RT_EOK;
-	thread = rt_thread_self();
-
-	level = rt_hw_interrupt_disable();
-	if (completion->flag != RT_COMPLETED)
-	{
-		/* only one thread can suspend on complete */
-		RT_ASSERT(rt_list_isempty(&(completion->suspended_list)));
-
-		if (timeout == 0)
-		{
-			result = -RT_ETIMEOUT;
-			goto __exit;
-		}
-		else
-		{
+/*
+ * File      : completion.c
+ * This file is part of RT-Thread RTOS
+ * COPYRIGHT (C) 2012, RT-Thread Development Team
+ *
+ * The license and distribution terms for this file may be
+ * found in the file LICENSE in this distribution or at
+ * http://www.rt-thread.org/license/LICENSE
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2012-09-30     Bernard      first version.
+ */
+
+#include <rthw.h>
+#include <rtthread.h>
+#include <rtdevice.h>
+
+#define RT_COMPLETED	1
+#define RT_UNCOMPLETED	0
+
+void rt_completion_init(struct rt_completion* completion)
+{
+	rt_base_t level;
+	RT_ASSERT(completion != RT_NULL);
+
+	level = rt_hw_interrupt_disable();
+	completion->flag = RT_UNCOMPLETED;
+	rt_list_init(&completion->suspended_list);
+	rt_hw_interrupt_enable(level);
+}
+
+rt_err_t rt_completion_wait(struct rt_completion* completion, rt_int32_t timeout)
+{
+	rt_err_t result;
+	rt_base_t level;
+	rt_thread_t thread;
+	RT_ASSERT(completion != RT_NULL);
+
+	result = RT_EOK;
+	thread = rt_thread_self();
+
+	level = rt_hw_interrupt_disable();
+	if (completion->flag != RT_COMPLETED)
+	{
+		/* only one thread can suspend on complete */
+		RT_ASSERT(rt_list_isempty(&(completion->suspended_list)));
+
+		if (timeout == 0)
+		{
+			result = -RT_ETIMEOUT;
+			goto __exit;
+		}
+		else
+		{
 			/* reset thread error number */
-			thread->error = RT_EOK;
-			
-			/* suspend thread */
-			rt_thread_suspend(thread);
-			/* add to suspended list */
-			rt_list_insert_before(&(completion->suspended_list), &(thread->tlist));
-
-			/* current context checking */
-			RT_DEBUG_NOT_IN_INTERRUPT;
-
-			/* start timer */
-			if (timeout > 0)
-			{
-				/* reset the timeout of thread timer and start it */
-				rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout);
-				rt_timer_start(&(thread->thread_timer));
-			}
-			/* enable interrupt */
-			rt_hw_interrupt_enable(level);
-
-			/* do schedule */
-			rt_schedule();
-
-			/* thread is waked up */
-			result = thread->error;
-
-			level = rt_hw_interrupt_disable();
-			/* clean completed flag */
-			completion->flag = RT_UNCOMPLETED;
-		}
-	}
-
-__exit:
-	rt_hw_interrupt_enable(level);
-	return result;
-}
-
-void rt_completion_done(struct rt_completion* completion)
-{
-	rt_base_t level;
-	RT_ASSERT(completion != RT_NULL);
-
-	level = rt_hw_interrupt_disable();
-	completion->flag = RT_COMPLETED;
-
-	if (!rt_list_isempty(&(completion->suspended_list)))
-	{
-		/* there is one thread in suspended list */
-		struct rt_thread *thread;
-
-		/* get thread entry */
-		thread = rt_list_entry(completion->suspended_list.next, struct rt_thread, tlist);
-		
-		/* resume it */
-		rt_thread_resume(thread);
-		rt_hw_interrupt_enable(level);
-
-		/* perform a schedule */
-		rt_schedule();
-	}
-	else
-	{
-		rt_hw_interrupt_enable(level);
-	}
-}
+			thread->error = RT_EOK;
+			
+			/* suspend thread */
+			rt_thread_suspend(thread);
+			/* add to suspended list */
+			rt_list_insert_before(&(completion->suspended_list), &(thread->tlist));
+
+			/* current context checking */
+			RT_DEBUG_NOT_IN_INTERRUPT;
+
+			/* start timer */
+			if (timeout > 0)
+			{
+				/* reset the timeout of thread timer and start it */
+				rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout);
+				rt_timer_start(&(thread->thread_timer));
+			}
+			/* enable interrupt */
+			rt_hw_interrupt_enable(level);
+
+			/* do schedule */
+			rt_schedule();
+
+			/* thread is waked up */
+			result = thread->error;
+
+			level = rt_hw_interrupt_disable();
+			/* clean completed flag */
+			completion->flag = RT_UNCOMPLETED;
+		}
+	}
+
+__exit:
+	rt_hw_interrupt_enable(level);
+	return result;
+}
+
+void rt_completion_done(struct rt_completion* completion)
+{
+	rt_base_t level;
+	RT_ASSERT(completion != RT_NULL);
+
+	level = rt_hw_interrupt_disable();
+	completion->flag = RT_COMPLETED;
+
+	if (!rt_list_isempty(&(completion->suspended_list)))
+	{
+		/* there is one thread in suspended list */
+		struct rt_thread *thread;
+
+		/* get thread entry */
+		thread = rt_list_entry(completion->suspended_list.next, struct rt_thread, tlist);
+		
+		/* resume it */
+		rt_thread_resume(thread);
+		rt_hw_interrupt_enable(level);
+
+		/* perform a schedule */
+		rt_schedule();
+	}
+	else
+	{
+		rt_hw_interrupt_enable(level);
+	}
+}

+ 315 - 0
components/drivers/src/dataqueue.c

@@ -0,0 +1,315 @@
+/*
+ * File      : dataqueue.c
+ * This file is part of RT-Thread RTOS
+ * COPYRIGHT (C) 2012, RT-Thread Development Team
+ *
+ * The license and distribution terms for this file may be
+ * found in the file LICENSE in this distribution or at
+ * http://www.rt-thread.org/license/LICENSE
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2012-09-30     Bernard      first version.
+ */
+#include <rtthread.h>
+#include <rtdevice.h>
+#include <rthw.h>
+
+struct rt_data_item
+{
+	void* data_ptr;
+	rt_size_t data_size;
+};
+
+rt_err_t rt_data_queue_init(struct rt_data_queue* queue, rt_uint16_t size, rt_uint16_t lwm,
+	void (*evt_notify)(struct rt_data_queue* queue, rt_uint32_t event))
+{
+    RT_ASSERT(queue != RT_NULL);
+
+    queue->evt_notify = evt_notify;
+
+    queue->size = size;
+    queue->lwm = lwm;
+	queue->waiting_lwm = RT_FALSE;
+
+    queue->get_index = 0;
+    queue->put_index = 0;
+
+    rt_list_init(&(queue->suspended_push_list));
+    rt_list_init(&(queue->suspended_pop_list));
+
+    queue->queue = (struct rt_data_item*) rt_malloc(sizeof(struct rt_data_item) * size);
+    if (queue->queue == RT_NULL)
+    {
+        return -RT_ENOMEM;
+    }
+
+    return RT_EOK;
+}
+RTM_EXPORT(rt_data_queue_init);
+
+rt_err_t rt_data_queue_push(struct rt_data_queue* queue, void* data_ptr, rt_size_t data_size, rt_int32_t timeout)
+{
+    rt_uint16_t mask;
+    rt_ubase_t  level;
+    rt_thread_t thread;
+	rt_err_t    result;
+	
+	RT_ASSERT(queue != RT_NULL);
+
+	result = RT_EOK;
+    thread = rt_thread_self();
+	mask = queue->size - 1;
+
+    level = rt_hw_interrupt_disable();
+    while (queue->put_index - queue->get_index == queue->size)
+    {
+		queue->waiting_lwm = RT_TRUE;
+
+        /* queue is full */
+        if (timeout == 0)
+        {
+        	result = -RT_ETIMEOUT;
+
+			goto __exit;
+        }
+		
+		/* current context checking */
+		RT_DEBUG_NOT_IN_INTERRUPT;
+
+		/* reset thread error number */
+		thread->error = RT_EOK;
+		
+		/* suspend thread on the push list */
+		rt_thread_suspend(thread);
+		rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
+		/* start timer */
+		if (timeout > 0)
+		{
+			/* reset the timeout of thread timer and start it */
+			rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout);
+			rt_timer_start(&(thread->thread_timer));
+		}
+
+		/* enable interrupt */
+		rt_hw_interrupt_enable(level);
+
+		/* do schedule */
+		rt_schedule();
+		
+		/* thread is waked up */
+		result = thread->error;
+		level = rt_hw_interrupt_disable();
+		if (result != RT_EOK) goto __exit;
+    }
+
+	queue->queue[queue->put_index & mask].data_ptr  = data_ptr;
+	queue->queue[queue->put_index & mask].data_size = data_size;
+	queue->put_index += 1;
+
+	if (!rt_list_isempty(&(queue->suspended_pop_list)))
+	{
+		/* there is at least one thread in suspended list */
+
+		/* get thread entry */
+		thread = rt_list_entry(queue->suspended_pop_list.next, struct rt_thread, tlist);
+		
+		/* resume it */
+		rt_thread_resume(thread);
+		rt_hw_interrupt_enable(level);
+
+		/* perform a schedule */
+		rt_schedule();
+
+		return result;
+	}
+
+__exit:
+    rt_hw_interrupt_enable(level);
+	if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
+	{
+		queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
+	}
+
+    return result;
+}
+RTM_EXPORT(rt_data_queue_push);
+
+rt_err_t rt_data_queue_pop(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size, 
+	rt_int32_t timeout)
+{
+    rt_ubase_t  level;
+    rt_thread_t thread;
+	rt_err_t    result;
+	rt_uint16_t mask;
+	
+	RT_ASSERT(queue != RT_NULL);
+	RT_ASSERT(data_ptr != RT_NULL);
+	RT_ASSERT(size != RT_NULL);
+
+	result = RT_EOK;
+    thread = rt_thread_self();
+	mask = queue->size - 1;
+
+    level = rt_hw_interrupt_disable();
+	while (queue->get_index == queue->put_index)
+	{
+		/* queue is empty */
+		if (timeout == 0)
+		{
+			result = -RT_ETIMEOUT;
+			goto __exit;
+		}
+
+		/* current context checking */
+		RT_DEBUG_NOT_IN_INTERRUPT;
+
+		/* reset thread error number */
+		thread->error = RT_EOK;
+		
+		/* suspend thread on the pop list */
+		rt_thread_suspend(thread);
+		rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
+		/* start timer */
+		if (timeout > 0)
+		{
+			/* reset the timeout of thread timer and start it */
+			rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout);
+			rt_timer_start(&(thread->thread_timer));
+		}
+
+		/* enable interrupt */
+		rt_hw_interrupt_enable(level);
+
+		/* do schedule */
+		rt_schedule();
+
+		/* thread is waked up */
+		result = thread->error;
+		level = rt_hw_interrupt_disable();
+		if (result != RT_EOK) goto __exit;
+	}
+
+	*data_ptr = queue->queue[queue->get_index & mask].data_ptr;
+	*size     = queue->queue[queue->get_index & mask].data_size;
+
+	queue->get_index += 1;
+
+	if ((queue->waiting_lwm == RT_TRUE) && 
+		(queue->put_index - queue->get_index) <= queue->lwm)
+	{
+		queue->waiting_lwm = RT_FALSE;
+
+		/* there is at least one thread in suspended list and less than low water mark */
+		if (!rt_list_isempty(&(queue->suspended_push_list)))
+		{
+			/* get thread entry */
+			thread = rt_list_entry(queue->suspended_push_list.next, struct rt_thread, tlist);
+
+			/* resume it */
+			rt_thread_resume(thread);
+			rt_hw_interrupt_enable(level);
+
+			/* perform a schedule */
+			rt_schedule();
+		}
+
+		if (queue->evt_notify != RT_NULL)
+			queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);
+
+		return result;
+	}
+
+__exit:
+    rt_hw_interrupt_enable(level);
+	if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
+	{
+		queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
+	}
+
+	return result;
+}
+RTM_EXPORT(rt_data_queue_pop);
+
+rt_err_t rt_data_queue_peak(struct rt_data_queue* queue, void** data_ptr, rt_size_t *size)
+{
+    rt_ubase_t  level;
+	rt_uint16_t mask;
+
+	RT_ASSERT(queue != RT_NULL);
+
+	mask = queue->size - 1;
+
+    level = rt_hw_interrupt_disable();
+
+	if (queue->get_index == queue->put_index) 
+	{
+	    rt_hw_interrupt_enable(level);
+		return -RT_EEMPTY;
+	}
+	
+	*data_ptr = queue->queue[queue->get_index & mask].data_ptr;
+	*size	  = queue->queue[queue->get_index & mask].data_size;
+
+    rt_hw_interrupt_enable(level);
+	return RT_EOK;
+}
+RTM_EXPORT(rt_data_queue_peak);
+
+void rt_data_queue_reset(struct rt_data_queue* queue)
+{
+	struct rt_thread *thread;
+	register rt_ubase_t temp;
+
+	rt_enter_critical();
+	/* wakeup all suspend threads */
+
+	/* resume on pop list */
+	while (!rt_list_isempty(&(queue->suspended_pop_list)))
+	{
+		/* disable interrupt */
+		temp = rt_hw_interrupt_disable();
+
+		/* get next suspend thread */
+		thread = rt_list_entry(queue->suspended_pop_list.next, struct rt_thread, tlist);
+		/* set error code to RT_ERROR */
+		thread->error = -RT_ERROR;
+
+		/*
+		 * resume thread
+		 * In rt_thread_resume function, it will remove current thread from
+		 * suspend list
+		 */
+		rt_thread_resume(thread);
+
+		/* enable interrupt */
+		rt_hw_interrupt_enable(temp);
+	}
+
+	/* resume on push list */
+	while (!rt_list_isempty(&(queue->suspended_push_list)))
+	{
+		/* disable interrupt */
+		temp = rt_hw_interrupt_disable();
+
+		/* get next suspend thread */
+		thread = rt_list_entry(queue->suspended_push_list.next, struct rt_thread, tlist);
+		/* set error code to RT_ERROR */
+		thread->error = -RT_ERROR;
+
+		/*
+		 * resume thread
+		 * In rt_thread_resume function, it will remove current thread from
+		 * suspend list
+		 */
+		rt_thread_resume(thread);
+
+		/* enable interrupt */
+		rt_hw_interrupt_enable(temp);
+	}
+	rt_exit_critical();
+
+	rt_schedule();
+}
+RTM_EXPORT(rt_data_queue_reset);
+

+ 196 - 0
components/drivers/src/pipe.c

@@ -0,0 +1,196 @@
+/*
+ * File      : pipe.c
+ * This file is part of RT-Thread RTOS
+ * COPYRIGHT (C) 2012, RT-Thread Development Team
+ *
+ * The license and distribution terms for this file may be
+ * found in the file LICENSE in this distribution or at
+ * http://www.rt-thread.org/license/LICENSE
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2012-09-30     Bernard      first version.
+ */
+#include <rthw.h>
+#include <rtthread.h>
+#include <rtdevice.h>
+
+static rt_size_t rt_pipe_read(rt_device_t dev, rt_off_t pos, void *buffer, rt_size_t size)
+{
+	rt_uint32_t level;
+	rt_thread_t thread;
+	struct rt_pipe_device *pipe;
+	rt_size_t read_nbytes;
+
+	pipe = PIPE_DEVICE(dev);
+	RT_ASSERT(pipe != RT_NULL);
+
+	thread = rt_thread_self();
+
+	/* current context checking */
+	RT_DEBUG_NOT_IN_INTERRUPT;
+
+	do 
+	{
+		level = rt_hw_interrupt_disable();
+		read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size);
+		if (read_nbytes == 0)
+		{
+			rt_thread_suspend(thread);
+			/* waiting on suspended read list */
+			rt_list_insert_before(&(pipe->suspended_read_list), &(thread->tlist));
+			rt_hw_interrupt_enable(level);
+
+			rt_schedule();
+		}
+		else
+		{
+			if (!rt_list_isempty(&pipe->suspended_write_list))
+			{
+				/* get suspended thread */
+				thread = rt_list_entry(pipe->suspended_write_list.next, 
+					struct rt_thread, tlist);
+
+				/* resume the write thread */
+				rt_thread_resume(thread);
+				rt_hw_interrupt_enable(level);
+
+				rt_schedule();
+			}
+			else
+			{
+				rt_hw_interrupt_enable(level);
+			}
+			break;
+		}
+	} while (read_nbytes == 0);
+
+	return read_nbytes;
+}
+
+struct rt_pipe_device *_pipe = RT_NULL;
+static rt_size_t rt_pipe_write(rt_device_t dev, rt_off_t pos, const void *buffer, rt_size_t size)
+{
+	rt_uint32_t level;
+	rt_thread_t thread;
+	struct rt_pipe_device *pipe;
+	rt_size_t write_nbytes;
+
+	pipe = PIPE_DEVICE(dev);
+	RT_ASSERT(pipe != RT_NULL);
+	if (_pipe == RT_NULL)
+		_pipe = pipe;
+	
+	thread = rt_thread_self();
+
+	/* current context checking */
+	RT_DEBUG_NOT_IN_INTERRUPT;
+
+	do 
+	{
+		level = rt_hw_interrupt_disable();
+		write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size);
+		if (write_nbytes == 0)
+		{
+			/* pipe full, waiting on suspended write list */
+			rt_thread_suspend(thread);
+			/* waiting on suspended read list */
+			rt_list_insert_before(&(pipe->suspended_write_list), &(thread->tlist));
+			rt_hw_interrupt_enable(level);
+
+			rt_schedule();
+		}
+		else
+		{
+			if (!rt_list_isempty(&pipe->suspended_read_list))
+			{
+				/* get suspended thread */
+				thread = rt_list_entry(pipe->suspended_read_list.next, 
+					struct rt_thread, tlist);
+
+				/* resume the read thread */
+				rt_thread_resume(thread);
+				rt_hw_interrupt_enable(level);
+
+				rt_schedule();
+			}
+			else
+			{
+				rt_hw_interrupt_enable(level);
+			}
+			break;
+		}
+	}while (write_nbytes == 0);
+
+	return write_nbytes;
+}
+
+static rt_err_t rt_pipe_control(rt_device_t dev, rt_uint8_t cmd, void *args)
+{
+	return RT_EOK;
+}
+
+rt_err_t rt_pipe_create(const char* name, rt_size_t size)
+{
+	rt_err_t result = RT_EOK;
+	rt_uint8_t* rb_memptr = RT_NULL;
+	struct rt_pipe_device *pipe = RT_NULL;
+
+	/* get aligned size */
+	size = RT_ALIGN(size, RT_ALIGN_SIZE);
+	pipe = (struct rt_pipe_device*) rt_calloc (1, sizeof(struct rt_pipe_device));
+	if (pipe != RT_NULL)
+	{
+		/* create ring buffer of pipe */
+		rb_memptr = rt_malloc(size);
+		if (rb_memptr == RT_NULL)
+		{
+			result = -RT_ENOMEM;
+			goto __exit;
+		}
+		/* initialize suspended list */
+		rt_list_init(&pipe->suspended_read_list);
+		rt_list_init(&pipe->suspended_write_list);
+		
+		/* initialize ring buffer */
+		rt_ringbuffer_init(&pipe->ringbuffer, rb_memptr, size);
+
+		/* create device */
+		pipe->parent.type 	= RT_Device_Class_Char;
+		pipe->parent.init 	= RT_NULL;
+		pipe->parent.open 	= RT_NULL;
+		pipe->parent.close 	= RT_NULL;
+		pipe->parent.read 	= rt_pipe_read;
+		pipe->parent.write 	= rt_pipe_write;
+		pipe->parent.control = rt_pipe_control;
+
+		return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR);
+	}
+	else
+	{
+		result = -RT_ENOMEM;
+	}
+
+__exit:
+	if (pipe != RT_NULL) rt_free(pipe);
+	if (rb_memptr != RT_NULL) rt_free(rb_memptr);
+
+	return result;
+}
+RTM_EXPORT(rt_pipe_create);
+
+void rt_pipe_destroy(struct rt_pipe_device* pipe)
+{
+	if (pipe == RT_NULL) return;
+
+	/* un-register pipe device */
+	rt_device_unregister(&(pipe->parent));
+
+	/* release memory */
+	rt_free(pipe->ringbuffer.buffer_ptr);
+	rt_free(pipe);
+
+	return;
+}
+RTM_EXPORT(rt_pipe_destroy);
+

+ 53 - 66
components/drivers/src/ringbuffer.c

@@ -1,3 +1,17 @@
+/*
+ * File      : ringbuffer.c
+ * This file is part of RT-Thread RTOS
+ * COPYRIGHT (C) 2012, RT-Thread Development Team
+ *
+ * The license and distribution terms for this file may be
+ * found in the file LICENSE in this distribution or at
+ * http://www.rt-thread.org/license/LICENSE
+ *
+ * Change Logs:
+ * Date           Author       Notes
+ * 2012-09-30     Bernard      first version.
+ */
+
 #include <rtthread.h>
 #include <rtdevice.h>
 #include <string.h>
@@ -13,71 +27,64 @@ void rt_ringbuffer_init(struct rt_ringbuffer* rb, rt_uint8_t *pool, rt_uint16_t
 	rb->buffer_ptr = pool;
 	rb->buffer_size = RT_ALIGN_DOWN(size, RT_ALIGN_SIZE);
 }
+RTM_EXPORT(rt_ringbuffer_init);
 
 rt_size_t rt_ringbuffer_put(struct rt_ringbuffer* rb, const rt_uint8_t *ptr, rt_uint16_t length)
 {
 	rt_uint16_t size;
 	rt_uint16_t mask;
-
+	rt_uint16_t write_position;
+	
 	RT_ASSERT(rb != RT_NULL);
 
 	mask = rb->buffer_size - 1;
 	/* whether has enough space */
-	size = rb->buffer_size - ((rb->write_index - rb->read_index) & mask);
+	size = rb->buffer_size - (rb->write_index - rb->read_index);
 
 	/* no space */
 	if (size == 0) return 0;
 	/* drop some data */
 	if (size < length) length = size;
 
-	if (rb->read_index > rb->write_index)
+	write_position = (rb->write_index & mask);
+	if (rb->buffer_size - write_position> length)
 	{
 		/* read_index - write_index = empty space */
-		memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
-		rb->write_index += length;
+		memcpy(&rb->buffer_ptr[write_position], ptr, length);
 	}
 	else
 	{
-		if (rb->buffer_size - rb->write_index >= length)
-		{
-			/* there is enough space after write_index */
-			memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
-			rb->write_index = (rb->write_index + length) & mask;
-		}
-		else
-		{
-			memcpy(&rb->buffer_ptr[rb->write_index], ptr,
-					rb->buffer_size - rb->write_index);
-			memcpy(&rb->buffer_ptr[0], &ptr[rb->buffer_size - rb->write_index],
-					length - (rb->buffer_size - rb->write_index));
-			rb->write_index = length - (rb->buffer_size - rb->write_index);
-		}
+		memcpy(&rb->buffer_ptr[write_position], ptr, rb->buffer_size - write_position);
+		memcpy(&rb->buffer_ptr[0], &ptr[rb->buffer_size - write_position],
+				length - (rb->buffer_size - write_position));
 	}
+	rb->write_index += length;
 
 	return length;
 }
+RTM_EXPORT(rt_ringbuffer_put);
 
 /**
  * put a character into ring buffer
  */
 rt_size_t rt_ringbuffer_putchar(struct rt_ringbuffer* rb, const rt_uint8_t ch)
 {
-	rt_uint16_t next;
 	rt_uint16_t mask;
 
 	RT_ASSERT(rb != RT_NULL);
 	/* whether has enough space */
 	mask = rb->buffer_size - 1;
-	next = (rb->write_index + 1) & mask;
-
-	if (next == rb->read_index) return 0;
+	
+	/* whether has enough space */
+	if (rb->write_index - rb->read_index == rb->buffer_size) return 0;
 
 	/* put character */
-	rb->buffer_ptr[rb->write_index] = ch;
-	rb->write_index = next;
+	rb->buffer_ptr[rb->write_index & mask] = ch;
+	rb->write_index += 1;
 
 	return 1;
 }
+RTM_EXPORT(rt_ringbuffer_putchar);
 
 /**
  *  get data from ring buffer
@@ -86,47 +93,42 @@ rt_size_t rt_ringbuffer_get(struct rt_ringbuffer* rb, rt_uint8_t *ptr, rt_uint16
 {
 	rt_size_t size;
 	rt_uint16_t mask;
+	rt_uint16_t read_position;
 
 	RT_ASSERT(rb != RT_NULL);
 	/* whether has enough data  */
 	mask = rb->buffer_size - 1;
-	size = (rb->write_index - rb->read_index) & mask;
+	size = rb->write_index - rb->read_index;
 
 	/* no data */
 	if (size == 0) return 0;
 	/* less data */
 	if (size < length) length = size;
 
-	if (rb->read_index > rb->write_index)
+	read_position = rb->read_index & mask;
+	if (rb->buffer_size - read_position >= length)
 	{
-		if (rb->buffer_size - rb->read_index >= length)
-		{
-			/* copy directly */
-			memcpy(ptr, &rb->buffer_ptr[rb->read_index], length);
-			rb->read_index = (rb->read_index + length) & mask;
-		}
-		else
-		{
-			/* copy first and second */
-			memcpy(ptr, &rb->buffer_ptr[rb->read_index],
-				   rb->buffer_size - rb->read_index);
-			memcpy(&ptr[rb->buffer_size - rb->read_index], &rb->buffer_ptr[0],
-				   length - rb->buffer_size + rb->read_index);
-			rb->read_index = length - rb->buffer_size + rb->read_index;
-		}
+		/* copy all of data */
+		memcpy(ptr, &rb->buffer_ptr[read_position], length);
 	}
 	else
 	{
-		memcpy(ptr, &rb->buffer_ptr[rb->read_index], length);
-		rb->read_index += length;
+		/* copy first and second */
+		memcpy(ptr, &rb->buffer_ptr[read_position], rb->buffer_size - read_position);
+		memcpy(&ptr[rb->buffer_size - read_position], &rb->buffer_ptr[0],
+			   length - rb->buffer_size + read_position);
 	}
+	rb->read_index += length;
 
 	return length;
 }
+RTM_EXPORT(rt_ringbuffer_get);
 
+/**
+ * get a character from a ringbuffer
+ */
 rt_size_t rt_ringbuffer_getchar(struct rt_ringbuffer* rb, rt_uint8_t *ch)
 {
-	rt_uint16_t next;
 	rt_uint16_t mask;
 
 	RT_ASSERT(rb != RT_NULL);
@@ -134,40 +136,25 @@ rt_size_t rt_ringbuffer_getchar(struct rt_ringbuffer* rb, rt_uint8_t *ch)
 	/* ringbuffer is empty */
 	if (rb->read_index == rb->write_index) return 0;
 
-	/* whether has data */
 	mask = rb->buffer_size - 1;
-	next = (rb->read_index + 1) & mask;
 
 	/* put character */
-	*ch = rb->buffer_ptr[rb->read_index];
-	rb->read_index = next;
+	*ch = rb->buffer_ptr[rb->read_index & mask];
+	rb->read_index += 1;
 
 	return 1;
 }
+RTM_EXPORT(rt_ringbuffer_getchar);
 
 /**
  * get available data size
  */
-rt_size_t rt_ringbuffer_available_size(struct rt_ringbuffer* rb)
+rt_size_t rt_ringbuffer_get_datasize(struct rt_ringbuffer* rb)
 {
-	rt_size_t size;
-	rt_uint16_t mask;
-
 	RT_ASSERT(rb != RT_NULL);
 
-	mask = rb->buffer_size - 1;
-	size = (rb->write_index - rb->read_index) & mask;
-
 	/* return the available size */
-	return size;
+	return rb->write_index - rb->read_index;
 }
+RTM_EXPORT(rt_data_queue_reset);
 
-/**
- * get empty space size
- */
-rt_size_t rt_ringbuffer_emptry_size(struct rt_ringbuffer* rb)
-{
-	RT_ASSERT(rb != RT_NULL);
-
-	return rb->buffer_size - rt_ringbuffer_available_size(rb);
-}