1
0

mqueue.c 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. #include "mqueue.h"
  2. #include "pthread_internal.h"
  3. static mqd_t posix_mq_list = RT_NULL;
  4. static struct rt_semaphore posix_mq_lock;
  5. void posix_mq_system_init()
  6. {
  7. rt_sem_init(&posix_mq_lock, "pmq", 1, RT_IPC_FLAG_FIFO);
  8. }
  9. rt_inline void posix_mq_insert(mqd_t pmq)
  10. {
  11. pmq->next = posix_mq_list;
  12. posix_mq_list = pmq;
  13. }
  14. static void posix_mq_delete(mqd_t pmq)
  15. {
  16. mqd_t iter;
  17. if (posix_mq_list == pmq)
  18. {
  19. posix_mq_list = pmq->next;
  20. rt_mq_delete(pmq->mq);
  21. rt_free(pmq);
  22. return;
  23. }
  24. for (iter = posix_mq_list; iter->next != RT_NULL; iter = iter->next)
  25. {
  26. if (iter->next == pmq)
  27. {
  28. /* delete this mq */
  29. if (pmq->next != RT_NULL)
  30. iter->next = pmq->next;
  31. else
  32. iter->next = RT_NULL;
  33. /* delete RT-Thread mqueue */
  34. rt_mq_delete(pmq->mq);
  35. rt_free(pmq);
  36. return ;
  37. }
  38. }
  39. }
  40. static mqd_t posix_mq_find(const char* name)
  41. {
  42. mqd_t iter;
  43. rt_object_t object;
  44. for (iter = posix_mq_list; iter != RT_NULL; iter = iter->next)
  45. {
  46. object = (rt_object_t)(iter->mq);
  47. if (strncmp(object->name, name, RT_NAME_MAX) == 0)
  48. {
  49. return iter;
  50. }
  51. }
  52. return RT_NULL;
  53. }
  54. int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
  55. struct mq_attr *omqstat)
  56. {
  57. rt_set_errno(-RT_ERROR);
  58. return -1;
  59. }
  60. int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
  61. {
  62. if ((mqdes == RT_NULL) || mqstat == RT_NULL)
  63. {
  64. rt_set_errno(EBADF);
  65. return -1;
  66. }
  67. mqstat->mq_maxmsg = mqdes->mq->max_msgs;
  68. mqstat->mq_msgsize = mqdes->mq->msg_size;
  69. mqstat->mq_curmsgs = 0;
  70. mqstat->mq_flags = 0;
  71. return 0;
  72. }
  73. mqd_t mq_open(const char *name, int oflag, ...)
  74. {
  75. mqd_t mqdes;
  76. va_list arg;
  77. mode_t mode;
  78. struct mq_attr *attr = RT_NULL;
  79. /* lock posix mqueue list */
  80. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  81. mqdes = RT_NULL;
  82. if (oflag & O_CREAT)
  83. {
  84. va_start(arg, oflag);
  85. mode = (mode_t) va_arg(arg, unsigned int); mode = mode;
  86. attr = (struct mq_attr *) va_arg(arg, struct mq_attr *);
  87. va_end(arg);
  88. if (oflag & O_EXCL)
  89. {
  90. if (posix_mq_find(name) != RT_NULL)
  91. {
  92. rt_set_errno(EEXIST);
  93. goto __return;
  94. }
  95. }
  96. mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes));
  97. if (mqdes == RT_NULL)
  98. {
  99. rt_set_errno(ENFILE);
  100. goto __return;
  101. }
  102. /* create RT-Thread message queue */
  103. mqdes->mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO);
  104. if (mqdes->mq == RT_NULL) /* create failed */
  105. {
  106. rt_set_errno(ENFILE);
  107. goto __return;
  108. }
  109. /* initialize reference count */
  110. mqdes->refcount = 1;
  111. mqdes->unlinked = 0;
  112. /* insert mq to posix mq list */
  113. posix_mq_insert(mqdes);
  114. }
  115. else
  116. {
  117. /* find mqueue */
  118. mqdes = posix_mq_find(name);
  119. if (mqdes != RT_NULL)
  120. {
  121. mqdes->refcount ++; /* increase reference count */
  122. }
  123. else
  124. {
  125. rt_set_errno(ENOENT);
  126. goto __return;
  127. }
  128. }
  129. rt_sem_release(&posix_mq_lock);
  130. return mqdes;
  131. __return:
  132. /* release lock */
  133. rt_sem_release(&posix_mq_lock);
  134. /* release allocated memory */
  135. if (mqdes != RT_NULL)
  136. {
  137. if (mqdes->mq != RT_NULL)
  138. {
  139. /* delete RT-Thread message queue */
  140. rt_mq_delete(mqdes->mq);
  141. }
  142. rt_free(mqdes);
  143. }
  144. return RT_NULL;
  145. }
  146. ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
  147. {
  148. rt_err_t result;
  149. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  150. {
  151. rt_set_errno(EINVAL);
  152. return -1;
  153. }
  154. result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER);
  155. if (result == RT_EOK)
  156. return msg_len;
  157. rt_set_errno(EBADF);
  158. return -1;
  159. }
  160. int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
  161. {
  162. rt_err_t result;
  163. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  164. {
  165. rt_set_errno(EINVAL);
  166. return -1;
  167. }
  168. result = rt_mq_send(mqdes->mq, (void*)msg_ptr, msg_len);
  169. if (result == RT_EOK)
  170. return 0;
  171. rt_set_errno(EBADF);
  172. return -1;
  173. }
  174. ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
  175. unsigned *msg_prio, const struct timespec *abs_timeout)
  176. {
  177. int tick;
  178. rt_err_t result;
  179. /* parameters check */
  180. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  181. {
  182. rt_set_errno(EINVAL);
  183. return -1;
  184. }
  185. tick = clock_time_to_tick(abs_timeout);
  186. result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick);
  187. if (result == RT_EOK) return msg_len;
  188. if (result == -RT_ETIMEOUT)
  189. rt_set_errno(ETIMEDOUT);
  190. else
  191. rt_set_errno(EBADMSG);
  192. return -1;
  193. }
  194. int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio,
  195. const struct timespec *abs_timeout)
  196. {
  197. /* RT-Thread does not support timed send */
  198. return mq_send(mqdes, msg_ptr, msg_len, msg_prio);
  199. }
  200. int mq_notify(mqd_t mqdes, const struct sigevent *notification)
  201. {
  202. rt_set_errno(-RT_ERROR);
  203. return -1;
  204. }
  205. int mq_close(mqd_t mqdes)
  206. {
  207. if (mqdes == RT_NULL)
  208. {
  209. rt_set_errno(EINVAL);
  210. return -1;
  211. }
  212. /* lock posix mqueue list */
  213. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  214. mqdes->refcount --;
  215. if (mqdes->refcount == 0)
  216. {
  217. /* delete from posix mqueue list */
  218. if (mqdes->unlinked)
  219. posix_mq_delete(mqdes);
  220. }
  221. rt_sem_release(&posix_mq_lock);
  222. return 0;
  223. }
  224. int mq_unlink(const char *name)
  225. {
  226. mqd_t pmq;
  227. /* lock posix mqueue list */
  228. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  229. pmq = posix_mq_find(name);
  230. if (pmq != RT_NULL)
  231. {
  232. pmq->unlinked = 1;
  233. if (pmq->refcount == 0)
  234. {
  235. /* remove this mqueue */
  236. posix_mq_delete(pmq);
  237. }
  238. rt_sem_release(&posix_mq_lock);
  239. return 0;
  240. }
  241. rt_sem_release(&posix_mq_lock);
  242. /* no this entry */
  243. rt_set_errno(ENOENT);
  244. return -1;
  245. }