pipe.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. /*
  2. * Copyright (c) 2006-2018, RT-Thread Development Team
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. *
  6. * Change Logs:
  7. * Date Author Notes
  8. * 2012-09-30 Bernard first version.
  9. * 2017-11-08 JasonJiaJie fix memory leak issue when close a pipe.
  10. */
  11. #include <rthw.h>
  12. #include <rtdevice.h>
  13. #include <stdint.h>
  14. #if defined(RT_USING_POSIX)
  15. #include <dfs_file.h>
  16. #include <dfs_posix.h>
  17. #include <dfs_poll.h>
  18. static int pipe_fops_open(struct dfs_fd *fd)
  19. {
  20. int rc = 0;
  21. rt_pipe_t *pipe;
  22. pipe = (rt_pipe_t *)fd->fnode->data;
  23. if (!pipe)
  24. {
  25. return -1;
  26. }
  27. rt_mutex_take(&pipe->lock, RT_WAITING_FOREVER);
  28. if (fd->fnode->ref_count == 1)
  29. {
  30. pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
  31. if (pipe->fifo == RT_NULL)
  32. {
  33. rc = -RT_ENOMEM;
  34. goto __exit;
  35. }
  36. }
  37. __exit:
  38. rt_mutex_release(&pipe->lock);
  39. return rc;
  40. }
  41. static int pipe_fops_close(struct dfs_fd *fd)
  42. {
  43. rt_device_t device;
  44. rt_pipe_t *pipe;
  45. pipe = (rt_pipe_t *)fd->fnode->data;
  46. if (!pipe)
  47. {
  48. return -1;
  49. }
  50. device = &pipe->parent;
  51. rt_mutex_take(&pipe->lock, RT_WAITING_FOREVER);
  52. if (fd->fnode->ref_count == 1)
  53. {
  54. if (pipe->fifo != RT_NULL)
  55. {
  56. rt_ringbuffer_destroy(pipe->fifo);
  57. }
  58. pipe->fifo = RT_NULL;
  59. }
  60. rt_mutex_release(&pipe->lock);
  61. if (fd->fnode->ref_count == 1 && pipe->is_named == RT_FALSE)
  62. {
  63. /* delete the unamed pipe */
  64. rt_pipe_delete(device->parent.name);
  65. }
  66. return 0;
  67. }
  68. static int pipe_fops_ioctl(struct dfs_fd *fd, int cmd, void *args)
  69. {
  70. rt_pipe_t *pipe;
  71. int ret = 0;
  72. pipe = (rt_pipe_t *)fd->fnode->data;
  73. switch (cmd)
  74. {
  75. case FIONREAD:
  76. *((int*)args) = rt_ringbuffer_data_len(pipe->fifo);
  77. break;
  78. case FIONWRITE:
  79. *((int*)args) = rt_ringbuffer_space_len(pipe->fifo);
  80. break;
  81. default:
  82. ret = -EINVAL;
  83. break;
  84. }
  85. return ret;
  86. }
  87. static int pipe_fops_read(struct dfs_fd *fd, void *buf, size_t count)
  88. {
  89. int len = 0;
  90. rt_pipe_t *pipe;
  91. pipe = (rt_pipe_t *)fd->fnode->data;
  92. /* no process has the pipe open for writing, return end-of-file */
  93. rt_mutex_take(&pipe->lock, RT_WAITING_FOREVER);
  94. while (1)
  95. {
  96. len = rt_ringbuffer_get(pipe->fifo, buf, count);
  97. if (len > 0)
  98. {
  99. break;
  100. }
  101. else
  102. {
  103. if (fd->flags & O_NONBLOCK)
  104. {
  105. len = -EAGAIN;
  106. goto out;
  107. }
  108. rt_mutex_release(&pipe->lock);
  109. rt_wqueue_wakeup(&pipe->writer_queue, (void*)POLLOUT);
  110. rt_wqueue_wait(&pipe->reader_queue, 0, -1);
  111. rt_mutex_take(&pipe->lock, RT_WAITING_FOREVER);
  112. }
  113. }
  114. /* wakeup writer */
  115. rt_wqueue_wakeup(&pipe->writer_queue, (void*)POLLOUT);
  116. out:
  117. rt_mutex_release(&pipe->lock);
  118. return len;
  119. }
  120. static int pipe_fops_write(struct dfs_fd *fd, const void *buf, size_t count)
  121. {
  122. int len;
  123. rt_pipe_t *pipe;
  124. int wakeup = 0;
  125. int ret = 0;
  126. uint8_t *pbuf;
  127. pipe = (rt_pipe_t *)fd->fnode->data;
  128. if (count == 0)
  129. {
  130. return 0;
  131. }
  132. pbuf = (uint8_t*)buf;
  133. rt_mutex_take(&pipe->lock, -1);
  134. while (1)
  135. {
  136. len = rt_ringbuffer_put(pipe->fifo, pbuf, count - ret);
  137. ret += len;
  138. pbuf += len;
  139. wakeup = 1;
  140. if (ret == count)
  141. {
  142. break;
  143. }
  144. else
  145. {
  146. if (fd->flags & O_NONBLOCK)
  147. {
  148. if (ret == 0)
  149. {
  150. ret = -EAGAIN;
  151. }
  152. break;
  153. }
  154. }
  155. rt_mutex_release(&pipe->lock);
  156. rt_wqueue_wakeup(&pipe->reader_queue, (void*)POLLIN);
  157. /* pipe full, waiting on suspended write list */
  158. rt_wqueue_wait(&pipe->writer_queue, 0, -1);
  159. rt_mutex_take(&pipe->lock, -1);
  160. }
  161. rt_mutex_release(&pipe->lock);
  162. if (wakeup)
  163. {
  164. rt_wqueue_wakeup(&pipe->reader_queue, (void*)POLLIN);
  165. }
  166. return ret;
  167. }
  168. static int pipe_fops_poll(struct dfs_fd *fd, rt_pollreq_t *req)
  169. {
  170. int mask = 0;
  171. rt_pipe_t *pipe;
  172. int mode = 0;
  173. pipe = (rt_pipe_t *)fd->fnode->data;
  174. rt_poll_add(&pipe->reader_queue, req);
  175. rt_poll_add(&pipe->writer_queue, req);
  176. switch (fd->flags & O_ACCMODE)
  177. {
  178. case O_RDONLY:
  179. mode = 1;
  180. break;
  181. case O_WRONLY:
  182. mode = 2;
  183. break;
  184. case O_RDWR:
  185. mode = 3;
  186. break;
  187. }
  188. if (mode & 1)
  189. {
  190. if (rt_ringbuffer_data_len(pipe->fifo) != 0)
  191. {
  192. mask |= POLLIN;
  193. }
  194. }
  195. if (mode & 2)
  196. {
  197. if (rt_ringbuffer_space_len(pipe->fifo) != 0)
  198. {
  199. mask |= POLLOUT;
  200. }
  201. }
  202. return mask;
  203. }
  204. static const struct dfs_file_ops pipe_fops =
  205. {
  206. pipe_fops_open,
  207. pipe_fops_close,
  208. pipe_fops_ioctl,
  209. pipe_fops_read,
  210. pipe_fops_write,
  211. RT_NULL,
  212. RT_NULL,
  213. RT_NULL,
  214. pipe_fops_poll,
  215. };
  216. #endif /* end of RT_USING_POSIX */
  217. rt_err_t rt_pipe_open (rt_device_t device, rt_uint16_t oflag)
  218. {
  219. rt_pipe_t *pipe = (rt_pipe_t *)device;
  220. rt_err_t ret = RT_EOK;
  221. if (device == RT_NULL)
  222. {
  223. ret = -RT_EINVAL;
  224. goto __exit;
  225. }
  226. rt_mutex_take(&pipe->lock, RT_WAITING_FOREVER);
  227. if (pipe->fifo == RT_NULL)
  228. {
  229. pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
  230. if (pipe->fifo == RT_NULL)
  231. {
  232. ret = -RT_ENOMEM;
  233. }
  234. }
  235. rt_mutex_release(&pipe->lock);
  236. __exit:
  237. return ret;
  238. }
  239. rt_err_t rt_pipe_close (rt_device_t device)
  240. {
  241. rt_pipe_t *pipe = (rt_pipe_t *)device;
  242. if (device == RT_NULL)
  243. {
  244. return -RT_EINVAL;
  245. }
  246. rt_mutex_take(&pipe->lock, RT_WAITING_FOREVER);
  247. rt_ringbuffer_destroy(pipe->fifo);
  248. pipe->fifo = RT_NULL;
  249. rt_mutex_release(&pipe->lock);
  250. return RT_EOK;
  251. }
  252. rt_size_t rt_pipe_read (rt_device_t device, rt_off_t pos, void *buffer, rt_size_t count)
  253. {
  254. uint8_t *pbuf;
  255. rt_size_t read_bytes = 0;
  256. rt_pipe_t *pipe = (rt_pipe_t *)device;
  257. if (device == RT_NULL)
  258. {
  259. rt_set_errno(-EINVAL);
  260. return 0;
  261. }
  262. if (count == 0)
  263. {
  264. return 0;
  265. }
  266. pbuf = (uint8_t*)buffer;
  267. rt_mutex_take(&pipe->lock, RT_WAITING_FOREVER);
  268. while (read_bytes < count)
  269. {
  270. int len = rt_ringbuffer_get(pipe->fifo, &pbuf[read_bytes], count - read_bytes);
  271. if (len <= 0)
  272. {
  273. break;
  274. }
  275. read_bytes += len;
  276. }
  277. rt_mutex_release(&pipe->lock);
  278. return read_bytes;
  279. }
  280. rt_size_t rt_pipe_write (rt_device_t device, rt_off_t pos, const void *buffer, rt_size_t count)
  281. {
  282. uint8_t *pbuf;
  283. rt_size_t write_bytes = 0;
  284. rt_pipe_t *pipe = (rt_pipe_t *)device;
  285. if (device == RT_NULL)
  286. {
  287. rt_set_errno(-EINVAL);
  288. return 0;
  289. }
  290. if (count == 0)
  291. {
  292. return 0;
  293. }
  294. pbuf = (uint8_t*)buffer;
  295. rt_mutex_take(&pipe->lock, -1);
  296. while (write_bytes < count)
  297. {
  298. int len = rt_ringbuffer_put(pipe->fifo, &pbuf[write_bytes], count - write_bytes);
  299. if (len <= 0)
  300. {
  301. break;
  302. }
  303. write_bytes += len;
  304. }
  305. rt_mutex_release(&pipe->lock);
  306. return write_bytes;
  307. }
  308. rt_err_t rt_pipe_control(rt_device_t dev, int cmd, void *args)
  309. {
  310. return RT_EOK;
  311. }
  312. #ifdef RT_USING_DEVICE_OPS
  313. const static struct rt_device_ops pipe_ops =
  314. {
  315. RT_NULL,
  316. rt_pipe_open,
  317. rt_pipe_close,
  318. rt_pipe_read,
  319. rt_pipe_write,
  320. rt_pipe_control,
  321. };
  322. #endif
  323. rt_pipe_t *rt_pipe_create(const char *name, int bufsz)
  324. {
  325. rt_pipe_t *pipe;
  326. rt_device_t dev;
  327. pipe = (rt_pipe_t *)rt_malloc(sizeof(rt_pipe_t));
  328. if (pipe == RT_NULL) return RT_NULL;
  329. rt_memset(pipe, 0, sizeof(rt_pipe_t));
  330. pipe->is_named = RT_TRUE; /* initialize as a named pipe */
  331. rt_mutex_init(&pipe->lock, name, RT_IPC_FLAG_FIFO);
  332. rt_wqueue_init(&pipe->reader_queue);
  333. rt_wqueue_init(&pipe->writer_queue);
  334. RT_ASSERT(bufsz < 0xFFFF);
  335. pipe->bufsz = bufsz;
  336. dev = &pipe->parent;
  337. dev->type = RT_Device_Class_Pipe;
  338. #ifdef RT_USING_DEVICE_OPS
  339. dev->ops = &pipe_ops;
  340. #else
  341. dev->init = RT_NULL;
  342. dev->open = rt_pipe_open;
  343. dev->read = rt_pipe_read;
  344. dev->write = rt_pipe_write;
  345. dev->close = rt_pipe_close;
  346. dev->control = rt_pipe_control;
  347. #endif
  348. dev->rx_indicate = RT_NULL;
  349. dev->tx_complete = RT_NULL;
  350. if (rt_device_register(&pipe->parent, name, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_REMOVABLE) != 0)
  351. {
  352. rt_free(pipe);
  353. return RT_NULL;
  354. }
  355. #ifdef RT_USING_POSIX
  356. dev->fops = (void*)&pipe_fops;
  357. #endif
  358. return pipe;
  359. }
  360. int rt_pipe_delete(const char *name)
  361. {
  362. int result = 0;
  363. rt_device_t device;
  364. device = rt_device_find(name);
  365. if (device)
  366. {
  367. if (device->type == RT_Device_Class_Pipe)
  368. {
  369. rt_pipe_t *pipe;
  370. pipe = (rt_pipe_t *)device;
  371. rt_mutex_detach(&pipe->lock);
  372. rt_device_unregister(device);
  373. /* close fifo ringbuffer */
  374. if (pipe->fifo)
  375. {
  376. rt_ringbuffer_destroy(pipe->fifo);
  377. pipe->fifo = RT_NULL;
  378. }
  379. rt_free(pipe);
  380. }
  381. else
  382. {
  383. result = -ENODEV;
  384. }
  385. }
  386. else
  387. {
  388. result = -ENODEV;
  389. }
  390. return result;
  391. }
  392. #ifdef RT_USING_POSIX
  393. int pipe(int fildes[2])
  394. {
  395. rt_pipe_t *pipe;
  396. char dname[8];
  397. char dev_name[32];
  398. static int pipeno = 0;
  399. rt_snprintf(dname, sizeof(dname), "pipe%d", pipeno++);
  400. pipe = rt_pipe_create(dname, PIPE_BUFSZ);
  401. if (pipe == RT_NULL)
  402. {
  403. return -1;
  404. }
  405. pipe->is_named = RT_FALSE; /* unamed pipe */
  406. rt_snprintf(dev_name, sizeof(dev_name), "/dev/%s", dname);
  407. fildes[0] = open(dev_name, O_RDONLY, 0);
  408. if (fildes[0] < 0)
  409. {
  410. return -1;
  411. }
  412. fildes[1] = open(dev_name, O_WRONLY, 0);
  413. if (fildes[1] < 0)
  414. {
  415. close(fildes[0]);
  416. return -1;
  417. }
  418. return 0;
  419. }
  420. int mkfifo(const char *path, mode_t mode)
  421. {
  422. rt_pipe_t *pipe;
  423. pipe = rt_pipe_create(path, PIPE_BUFSZ);
  424. if (pipe == RT_NULL)
  425. {
  426. return -1;
  427. }
  428. return 0;
  429. }
  430. #endif