1
0

mqueue.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. #include "mqueue.h"
  2. #include "pthread_internal.h"
  3. static mqd_t posix_mq_list = RT_NULL;
  4. static struct rt_semaphore posix_mq_lock;
  5. void posix_mq_system_init()
  6. {
  7. rt_sem_init(&posix_mq_lock, "pmq", 1, RT_IPC_FLAG_FIFO);
  8. }
  9. rt_inline void posix_mq_insert(mqd_t pmq)
  10. {
  11. pmq->next = posix_mq_list;
  12. posix_mq_list = pmq;
  13. }
  14. static void posix_mq_delete(mqd_t pmq)
  15. {
  16. mqd_t iter;
  17. if (posix_mq_list == pmq)
  18. {
  19. posix_mq_list = pmq->next;
  20. rt_mq_delete(pmq->mq);
  21. rt_free(pmq);
  22. return;
  23. }
  24. for (iter = posix_mq_list; iter->next != RT_NULL; iter = iter->next)
  25. {
  26. if (iter->next == pmq)
  27. {
  28. /* delete this mq */
  29. if (pmq->next != RT_NULL)
  30. iter->next = pmq->next;
  31. else
  32. iter->next = RT_NULL;
  33. /* delete RT-Thread mqueue */
  34. rt_mq_delete(pmq->mq);
  35. rt_free(pmq);
  36. return ;
  37. }
  38. }
  39. }
  40. static mqd_t posix_mq_find(const char* name)
  41. {
  42. mqd_t iter;
  43. rt_object_t object;
  44. for (iter = posix_mq_list; iter != RT_NULL; iter = iter->next)
  45. {
  46. object = (rt_object_t)(iter->mq);
  47. if (strncmp(object->name, name, RT_NAME_MAX) == 0)
  48. {
  49. return iter;
  50. }
  51. }
  52. return RT_NULL;
  53. }
  54. int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
  55. struct mq_attr *omqstat)
  56. {
  57. rt_set_errno(-RT_ERROR);
  58. return -1;
  59. }
  60. RTM_EXPORT(mq_setattr);
  61. int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
  62. {
  63. if ((mqdes == RT_NULL) || mqstat == RT_NULL)
  64. {
  65. rt_set_errno(EBADF);
  66. return -1;
  67. }
  68. mqstat->mq_maxmsg = mqdes->mq->max_msgs;
  69. mqstat->mq_msgsize = mqdes->mq->msg_size;
  70. mqstat->mq_curmsgs = 0;
  71. mqstat->mq_flags = 0;
  72. return 0;
  73. }
  74. RTM_EXPORTO(mq_getattr);
  75. mqd_t mq_open(const char *name, int oflag, ...)
  76. {
  77. mqd_t mqdes;
  78. va_list arg;
  79. mode_t mode;
  80. struct mq_attr *attr = RT_NULL;
  81. /* lock posix mqueue list */
  82. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  83. mqdes = RT_NULL;
  84. if (oflag & O_CREAT)
  85. {
  86. va_start(arg, oflag);
  87. mode = (mode_t) va_arg(arg, unsigned int); mode = mode;
  88. attr = (struct mq_attr *) va_arg(arg, struct mq_attr *);
  89. va_end(arg);
  90. if (oflag & O_EXCL)
  91. {
  92. if (posix_mq_find(name) != RT_NULL)
  93. {
  94. rt_set_errno(EEXIST);
  95. goto __return;
  96. }
  97. }
  98. mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes));
  99. if (mqdes == RT_NULL)
  100. {
  101. rt_set_errno(ENFILE);
  102. goto __return;
  103. }
  104. /* create RT-Thread message queue */
  105. mqdes->mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO);
  106. if (mqdes->mq == RT_NULL) /* create failed */
  107. {
  108. rt_set_errno(ENFILE);
  109. goto __return;
  110. }
  111. /* initialize reference count */
  112. mqdes->refcount = 1;
  113. mqdes->unlinked = 0;
  114. /* insert mq to posix mq list */
  115. posix_mq_insert(mqdes);
  116. }
  117. else
  118. {
  119. /* find mqueue */
  120. mqdes = posix_mq_find(name);
  121. if (mqdes != RT_NULL)
  122. {
  123. mqdes->refcount ++; /* increase reference count */
  124. }
  125. else
  126. {
  127. rt_set_errno(ENOENT);
  128. goto __return;
  129. }
  130. }
  131. rt_sem_release(&posix_mq_lock);
  132. return mqdes;
  133. __return:
  134. /* release lock */
  135. rt_sem_release(&posix_mq_lock);
  136. /* release allocated memory */
  137. if (mqdes != RT_NULL)
  138. {
  139. if (mqdes->mq != RT_NULL)
  140. {
  141. /* delete RT-Thread message queue */
  142. rt_mq_delete(mqdes->mq);
  143. }
  144. rt_free(mqdes);
  145. }
  146. return RT_NULL;
  147. }
  148. RTM_EXPORT(mq_open);
  149. ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
  150. {
  151. rt_err_t result;
  152. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  153. {
  154. rt_set_errno(EINVAL);
  155. return -1;
  156. }
  157. result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER);
  158. if (result == RT_EOK)
  159. return msg_len;
  160. rt_set_errno(EBADF);
  161. return -1;
  162. }
  163. RTM_EXPORT(mq_receive);
  164. int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
  165. {
  166. rt_err_t result;
  167. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  168. {
  169. rt_set_errno(EINVAL);
  170. return -1;
  171. }
  172. result = rt_mq_send(mqdes->mq, (void*)msg_ptr, msg_len);
  173. if (result == RT_EOK)
  174. return 0;
  175. rt_set_errno(EBADF);
  176. return -1;
  177. }
  178. RTM_EXPORT(mq_send);
  179. ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
  180. unsigned *msg_prio, const struct timespec *abs_timeout)
  181. {
  182. int tick;
  183. rt_err_t result;
  184. /* parameters check */
  185. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  186. {
  187. rt_set_errno(EINVAL);
  188. return -1;
  189. }
  190. tick = clock_time_to_tick(abs_timeout);
  191. result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick);
  192. if (result == RT_EOK) return msg_len;
  193. if (result == -RT_ETIMEOUT)
  194. rt_set_errno(ETIMEDOUT);
  195. else
  196. rt_set_errno(EBADMSG);
  197. return -1;
  198. }
  199. RTM_EXPORT(mq_timedreceive);
  200. int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio,
  201. const struct timespec *abs_timeout)
  202. {
  203. /* RT-Thread does not support timed send */
  204. return mq_send(mqdes, msg_ptr, msg_len, msg_prio);
  205. }
  206. RTM_EXPORT(mq_timedsend);
  207. int mq_notify(mqd_t mqdes, const struct sigevent *notification)
  208. {
  209. rt_set_errno(-RT_ERROR);
  210. return -1;
  211. }
  212. RTM_EXPORT(mq_notify);
  213. int mq_close(mqd_t mqdes)
  214. {
  215. if (mqdes == RT_NULL)
  216. {
  217. rt_set_errno(EINVAL);
  218. return -1;
  219. }
  220. /* lock posix mqueue list */
  221. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  222. mqdes->refcount --;
  223. if (mqdes->refcount == 0)
  224. {
  225. /* delete from posix mqueue list */
  226. if (mqdes->unlinked)
  227. posix_mq_delete(mqdes);
  228. }
  229. rt_sem_release(&posix_mq_lock);
  230. return 0;
  231. }
  232. RTM_EXPORT(mq_close);
  233. int mq_unlink(const char *name)
  234. {
  235. mqd_t pmq;
  236. /* lock posix mqueue list */
  237. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  238. pmq = posix_mq_find(name);
  239. if (pmq != RT_NULL)
  240. {
  241. pmq->unlinked = 1;
  242. if (pmq->refcount == 0)
  243. {
  244. /* remove this mqueue */
  245. posix_mq_delete(pmq);
  246. }
  247. rt_sem_release(&posix_mq_lock);
  248. return 0;
  249. }
  250. rt_sem_release(&posix_mq_lock);
  251. /* no this entry */
  252. rt_set_errno(ENOENT);
  253. return -1;
  254. }
  255. RTM_EXPORT(mq_unlink);