mqueue.c 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. #include "mqueue.h"
  2. #include "pthread_internal.h"
  3. #include <stdarg.h>
  4. #include <errno.h>
  5. #ifdef __GNUC__
  6. #include <sys/fcntl.h>
  7. #endif
  8. static mqd_t posix_mq_list = RT_NULL;
  9. static struct rt_semaphore posix_mq_lock;
  10. void posix_mq_system_init()
  11. {
  12. rt_sem_init(&posix_mq_lock, "pmq", 1, RT_IPC_FLAG_FIFO);
  13. }
  14. rt_inline void posix_mq_insert(mqd_t pmq)
  15. {
  16. pmq->next = posix_mq_list;
  17. posix_mq_list = pmq;
  18. }
  19. static void posix_mq_delete(mqd_t pmq)
  20. {
  21. mqd_t iter;
  22. if (posix_mq_list == pmq)
  23. {
  24. posix_mq_list = pmq->next;
  25. rt_mq_delete(pmq->mq);
  26. rt_free(pmq);
  27. return;
  28. }
  29. for (iter = posix_mq_list; iter->next != RT_NULL; iter = iter->next)
  30. {
  31. if (iter->next == pmq)
  32. {
  33. /* delete this mq */
  34. if (pmq->next != RT_NULL)
  35. iter->next = pmq->next;
  36. else
  37. iter->next = RT_NULL;
  38. /* delete RT-Thread mqueue */
  39. rt_mq_delete(pmq->mq);
  40. rt_free(pmq);
  41. return ;
  42. }
  43. }
  44. }
  45. static mqd_t posix_mq_find(const char* name)
  46. {
  47. mqd_t iter;
  48. rt_object_t object;
  49. for (iter = posix_mq_list; iter != RT_NULL; iter = iter->next)
  50. {
  51. object = (rt_object_t)(iter->mq);
  52. if (strncmp(object->name, name, RT_NAME_MAX) == 0)
  53. {
  54. return iter;
  55. }
  56. }
  57. return RT_NULL;
  58. }
  59. int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
  60. struct mq_attr *omqstat)
  61. {
  62. rt_set_errno(-RT_ERROR);
  63. return -1;
  64. }
  65. int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
  66. {
  67. if ((mqdes == RT_NULL) || mqstat == RT_NULL)
  68. {
  69. rt_set_errno(EBADF);
  70. return -1;
  71. }
  72. mqstat->mq_maxmsg = mqdes->mq->max_msgs;
  73. mqstat->mq_msgsize = mqdes->mq->msg_size;
  74. mqstat->mq_curmsgs = 0;
  75. mqstat->mq_flags = 0;
  76. return 0;
  77. }
  78. mqd_t mq_open(const char *name, int oflag, ...)
  79. {
  80. mqd_t mqdes;
  81. va_list arg;
  82. mode_t mode;
  83. struct mq_attr *attr = RT_NULL;
  84. /* lock posix mqueue list */
  85. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  86. mqdes = RT_NULL;
  87. if (oflag & O_CREAT)
  88. {
  89. va_start(arg, oflag);
  90. mode = (mode_t) va_arg(arg, unsigned int);
  91. attr = (struct mq_attr *) va_arg(arg, struct mq_attr *);
  92. va_end(arg);
  93. if (oflag & O_EXCL)
  94. {
  95. if (posix_mq_find(name) != RT_NULL)
  96. {
  97. rt_set_errno(EEXIST);
  98. goto __return;
  99. }
  100. }
  101. mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes));
  102. if (mqdes == RT_NULL)
  103. {
  104. rt_set_errno(ENFILE);
  105. goto __return;
  106. }
  107. /* create RT-Thread message queue */
  108. mqdes->mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO);
  109. if (mqdes->mq == RT_NULL) /* create failed */
  110. {
  111. rt_set_errno(ENFILE);
  112. goto __return;
  113. }
  114. /* initialize reference count */
  115. mqdes->refcount = 1;
  116. mqdes->unlinked = 0;
  117. /* insert mq to posix mq list */
  118. posix_mq_insert(mqdes);
  119. }
  120. else
  121. {
  122. /* find mqueue */
  123. mqdes = posix_mq_find(name);
  124. if (mqdes != RT_NULL)
  125. {
  126. mqdes->refcount ++; /* increase reference count */
  127. }
  128. else
  129. {
  130. rt_set_errno(ENOENT);
  131. goto __return;
  132. }
  133. }
  134. rt_sem_release(&posix_mq_lock);
  135. return mqdes;
  136. __return:
  137. /* release lock */
  138. rt_sem_release(&posix_mq_lock);
  139. /* release allocated memory */
  140. if (mqdes != RT_NULL)
  141. {
  142. if (mqdes->mq != RT_NULL)
  143. {
  144. /* delete RT-Thread message queue */
  145. rt_mq_delete(mqdes->mq);
  146. }
  147. rt_free(mqdes);
  148. }
  149. return RT_NULL;
  150. }
  151. ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
  152. {
  153. rt_err_t result;
  154. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  155. {
  156. rt_set_errno(EINVAL);
  157. return -1;
  158. }
  159. result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER);
  160. if (result == RT_EOK)
  161. return msg_len;
  162. rt_set_errno(EBADF);
  163. return -1;
  164. }
  165. int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
  166. {
  167. rt_err_t result;
  168. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  169. {
  170. rt_set_errno(EINVAL);
  171. return -1;
  172. }
  173. result = rt_mq_send(mqdes->mq, (void*)msg_ptr, msg_len);
  174. if (result == RT_EOK)
  175. return 0;
  176. rt_set_errno(EBADF);
  177. return -1;
  178. }
  179. ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
  180. unsigned *msg_prio, const struct timespec *abs_timeout)
  181. {
  182. int tick;
  183. rt_err_t result;
  184. /* parameters check */
  185. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  186. {
  187. rt_set_errno(EINVAL);
  188. return -1;
  189. }
  190. tick = libc_time_to_tick(abs_timeout);
  191. result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick);
  192. if (result == RT_EOK) return msg_len;
  193. if (result == -RT_ETIMEOUT)
  194. rt_set_errno(ETIMEDOUT);
  195. else
  196. rt_set_errno(EBADMSG);
  197. return -1;
  198. }
  199. int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio,
  200. const struct timespec *abs_timeout)
  201. {
  202. /* RT-Thread does not support timed send */
  203. return mq_send(mqdes, msg_ptr, msg_len, msg_prio);
  204. }
  205. int mq_notify(mqd_t mqdes, const struct sigevent *notification)
  206. {
  207. rt_set_errno(-RT_ERROR);
  208. return -1;
  209. }
  210. int mq_close(mqd_t mqdes)
  211. {
  212. if (mqdes == RT_NULL)
  213. {
  214. rt_set_errno(EINVAL);
  215. return -1;
  216. }
  217. /* lock posix mqueue list */
  218. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  219. mqdes->refcount --;
  220. if (mqdes->refcount == 0)
  221. {
  222. /* delete from posix mqueue list */
  223. if (mqdes->unlinked)
  224. posix_mq_delete(mqdes);
  225. }
  226. rt_sem_release(&posix_mq_lock);
  227. return 0;
  228. }
  229. int mq_unlink(const char *name)
  230. {
  231. mqd_t pmq;
  232. /* lock posix mqueue list */
  233. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  234. pmq = posix_mq_find(name);
  235. if (pmq != RT_NULL)
  236. {
  237. pmq->unlinked = 1;
  238. if (pmq->refcount == 0)
  239. {
  240. /* remove this mqueue */
  241. posix_mq_delete(pmq);
  242. }
  243. rt_sem_release(&posix_mq_lock);
  244. return 0;
  245. }
  246. rt_sem_release(&posix_mq_lock);
  247. /* no this entry */
  248. rt_set_errno(ENOENT);
  249. return -1;
  250. }