mqueue.c 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. #include "mqueue.h"
  2. #include "pthread_internal.h"
  3. #include <stdarg.h>
  4. #include <errno.h>
  5. #include <sys/fcntl.h>
  6. static posix_mq_t* posix_mq_list = RT_NULL;
  7. static struct rt_semaphore posix_mq_lock;
  8. void posix_mq_system_init()
  9. {
  10. rt_sem_init(&posix_mq_lock, "pmq", 1, RT_IPC_FLAG_FIFO);
  11. }
  12. rt_inline void posix_mq_insert(posix_mq_t *pmq)
  13. {
  14. pmq->next = posix_mq_list;
  15. posix_mq_list = pmq;
  16. }
  17. static void posix_mq_delete(posix_mq_t *pmq)
  18. {
  19. posix_mq_t *iter;
  20. if (posix_mq_list == pmq)
  21. {
  22. posix_mq_list = pmq->next;
  23. rt_mq_delete(pmq->mq);
  24. rt_free(pmq);
  25. return;
  26. }
  27. for (iter = posix_mq_list; iter->next != RT_NULL; iter = iter->next)
  28. {
  29. if (iter->next == pmq)
  30. {
  31. /* delete this mq */
  32. if (pmq->next != RT_NULL)
  33. iter->next = pmq->next;
  34. else
  35. iter->next = RT_NULL;
  36. /* delete RT-Thread mqueue */
  37. rt_mq_delete(pmq->mq);
  38. rt_free(pmq);
  39. return ;
  40. }
  41. }
  42. }
  43. static posix_mq_t *posix_mq_find(const char* name)
  44. {
  45. posix_mq_t *iter;
  46. rt_object_t object;
  47. for (iter = posix_mq_list; iter != RT_NULL; iter = iter->next)
  48. {
  49. object = (rt_object_t)&(iter->mq);
  50. if (strncmp(object->name, name, RT_NAME_MAX) == 0)
  51. {
  52. return iter;
  53. }
  54. }
  55. }
  56. int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
  57. struct mq_attr *omqstat)
  58. {
  59. rt_set_errno(-RT_ERROR);
  60. return -1;
  61. }
  62. int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
  63. {
  64. if ((mqdes == RT_NULL) || mqstat == RT_NULL)
  65. {
  66. rt_set_errno(EBADF);
  67. return -1;
  68. }
  69. mqstat->mq_maxmsg = mqdes->mq->mq->max_msgs;
  70. mqstat->mq_msgsize = mqdes->mq->mq->msg_size;
  71. mqstat->mq_curmsgs = 0;
  72. mqstat->mq_flags = 0;
  73. return 0;
  74. }
  75. mqd_t mq_open(const char *name, int oflag, ...)
  76. {
  77. mqd_t mqdes;
  78. va_list arg;
  79. mode_t mode;
  80. struct mq_attr *attr = RT_NULL;
  81. /* lock posix mqueue list */
  82. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  83. mqdes = RT_NULL;
  84. if (oflag & O_CREAT)
  85. {
  86. va_start(arg, oflag);
  87. mode = (mode_t) va_arg(arg, unsigned int);
  88. attr = (struct mq_attr *) va_arg(arg, struct mq_attr *);
  89. va_end(arg);
  90. if (oflag & O_EXCL)
  91. {
  92. if (posix_mq_find(name) != RT_NULL)
  93. {
  94. rt_set_errno(EEXIST);
  95. goto __return;
  96. }
  97. }
  98. mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes));
  99. if (mqdes == RT_NULL)
  100. {
  101. rt_set_errno(ENFILE);
  102. goto __return;
  103. }
  104. mqdes->flags = oflag;
  105. mqdes->mq = (posix_mq_t*) rt_malloc (sizeof(posix_mq_t));
  106. if (mqdes->mq == RT_NULL)
  107. {
  108. rt_set_errno(ENFILE);
  109. goto __return;
  110. }
  111. /* create RT-Thread message queue */
  112. mqdes->mq->mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO);
  113. if (mqdes->mq->mq == RT_NULL) /* create failed */
  114. {
  115. rt_set_errno(ENFILE);
  116. goto __return;
  117. }
  118. /* initialize reference count */
  119. mqdes->mq->refcount = 1;
  120. mqdes->mq->unlinked = 0;
  121. /* insert mq to posix mq list */
  122. posix_mq_insert(mqdes->mq);
  123. }
  124. else
  125. {
  126. posix_mq_t *mq;
  127. /* find mqueue */
  128. mq = posix_mq_find(name);
  129. if (mq != RT_NULL)
  130. {
  131. mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes));
  132. mqdes->mq = mq;
  133. mqdes->flags = oflag;
  134. mq->refcount ++; /* increase reference count */
  135. }
  136. else
  137. {
  138. rt_set_errno(ENOENT);
  139. goto __return;
  140. }
  141. }
  142. rt_sem_release(&posix_mq_lock);
  143. return mqdes;
  144. __return:
  145. /* release lock */
  146. rt_sem_release(&posix_mq_lock);
  147. /* release allocated memory */
  148. if (mqdes != RT_NULL)
  149. {
  150. if (mqdes->mq != RT_NULL)
  151. {
  152. /* delete RT-Thread message queue */
  153. if (mqdes->mq->mq != RT_NULL)
  154. rt_mq_delete(mqdes->mq->mq);
  155. rt_free(mqdes->mq);
  156. }
  157. rt_free(mqdes);
  158. }
  159. return RT_NULL;
  160. }
  161. ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
  162. {
  163. rt_err_t result;
  164. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  165. {
  166. rt_set_errno(EINVAL);
  167. return -1;
  168. }
  169. /* permission check */
  170. if (!(mqdes->flags & O_RDONLY))
  171. {
  172. rt_set_errno(EBADF);
  173. return -1;
  174. }
  175. result = rt_mq_recv(mqdes->mq->mq, msg_ptr, msg_len, RT_WAITING_FOREVER);
  176. if (result == RT_EOK)
  177. return msg_len;
  178. rt_set_errno(EBADF);
  179. return -1;
  180. }
  181. int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
  182. {
  183. rt_err_t result;
  184. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  185. {
  186. rt_set_errno(EINVAL);
  187. return -1;
  188. }
  189. /* permission check */
  190. if (!(mqdes->flags & O_WRONLY))
  191. {
  192. rt_set_errno(EBADF);
  193. return -1;
  194. }
  195. result = rt_mq_send(mqdes->mq->mq, (void*)msg_ptr, msg_len);
  196. if (result == RT_EOK)
  197. return 0;
  198. rt_set_errno(EBADF);
  199. return -1;
  200. }
  201. ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
  202. unsigned *msg_prio, const struct timespec *abs_timeout)
  203. {
  204. int tick;
  205. rt_err_t result;
  206. /* parameters check */
  207. if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL))
  208. {
  209. rt_set_errno(EINVAL);
  210. return -1;
  211. }
  212. /* permission check */
  213. if (!(mqdes->flags & O_RDONLY))
  214. {
  215. rt_set_errno(EBADF);
  216. return -1;
  217. }
  218. tick = libc_time_to_tick(abs_timeout);
  219. result = rt_mq_recv(mqdes->mq->mq, msg_ptr, msg_len, tick);
  220. if (result == RT_EOK) return msg_len;
  221. if (result == -RT_ETIMEOUT)
  222. rt_set_errno(ETIMEDOUT);
  223. else
  224. rt_set_errno(EBADMSG);
  225. return -1;
  226. }
  227. int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio,
  228. const struct timespec *abs_timeout)
  229. {
  230. /* RT-Thread does not support timed send */
  231. return mq_send(mqdes, msg_ptr, msg_len, msg_prio);
  232. }
  233. int mq_notify(mqd_t mqdes, const struct sigevent *notification)
  234. {
  235. rt_set_errno(-RT_ERROR);
  236. return -1;
  237. }
  238. int mq_close(mqd_t mqdes)
  239. {
  240. if (mqdes == RT_NULL)
  241. {
  242. rt_set_errno(EINVAL);
  243. return -1;
  244. }
  245. /* lock posix mqueue list */
  246. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  247. mqdes->mq->refcount --;
  248. if (mqdes->mq->refcount == 0)
  249. {
  250. /* delete from posix mqueue list */
  251. if (mqdes->mq->unlinked)
  252. posix_mq_delete(mqdes->mq);
  253. mqdes->mq = RT_NULL;
  254. }
  255. rt_sem_release(&posix_mq_lock);
  256. rt_free(mqdes);
  257. return 0;
  258. }
  259. int mq_unlink(const char *name)
  260. {
  261. posix_mq_t *pmq;
  262. /* lock posix mqueue list */
  263. rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER);
  264. pmq = posix_mq_find(name);
  265. if (pmq != RT_NULL)
  266. {
  267. pmq->unlinked = 1;
  268. if (pmq->refcount == 0)
  269. {
  270. /* remove this mqueue */
  271. posix_mq_delete(pmq);
  272. }
  273. rt_sem_release(&posix_mq_lock);
  274. return 0;
  275. }
  276. rt_sem_release(&posix_mq_lock);
  277. /* no this entry */
  278. rt_set_errno(ENOENT);
  279. return -1;
  280. }