lwp_ipc.c 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227
  1. /*
  2. * Copyright (c) 2006-2023, RT-Thread Development Team
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. *
  6. * Change Logs:
  7. * Date Author Notes
  8. * 2019-10-12 Jesven first version
  9. * 2023-09-16 zmq810150896 Increased versatility of some features on dfs v2
  10. */
  11. #include <rtthread.h>
  12. #include <rthw.h>
  13. #include <lwp.h>
  14. #include "lwp_ipc.h"
  15. #include "lwp_ipc_internal.h"
  16. #include <dfs_file.h>
  17. #include <poll.h>
  18. #ifdef RT_USING_DFS_V2
  19. #include <dfs_dentry.h>
  20. #endif
  21. /**
  22. * the IPC channel states
  23. */
  24. enum
  25. {
  26. RT_IPC_STAT_IDLE, /* no suspended threads */
  27. RT_IPC_STAT_WAIT, /* suspended receivers exist */
  28. RT_IPC_STAT_ACTIVE, /* suspended senders exist */
  29. };
  30. /**
  31. * IPC message structure.
  32. *
  33. * They are allocated and released in the similar way like 'rt_chfd'.
  34. */
  35. struct rt_ipc_msg
  36. {
  37. struct rt_channel_msg msg; /**< the payload of msg */
  38. rt_list_t mlist; /**< the msg list */
  39. rt_uint8_t need_reply; /**< whether msg wait reply*/
  40. };
  41. typedef struct rt_ipc_msg *rt_ipc_msg_t;
  42. static rt_ipc_msg_t _ipc_msg_free_list = (rt_ipc_msg_t)RT_NULL; /* released chain */
  43. static int rt_ipc_msg_used = 0; /* first unallocated entry */
  44. static struct rt_ipc_msg ipc_msg_pool[RT_CH_MSG_MAX_NR]; /* initial message array */
  45. /**
  46. * Allocate an IPC message from the statically-allocated array.
  47. */
  48. static rt_ipc_msg_t _ipc_msg_alloc(void)
  49. {
  50. rt_ipc_msg_t p = (rt_ipc_msg_t)RT_NULL;
  51. if (_ipc_msg_free_list) /* use the released chain first */
  52. {
  53. p = _ipc_msg_free_list;
  54. _ipc_msg_free_list = (rt_ipc_msg_t)p->msg.sender; /* emtry payload as a pointer */
  55. }
  56. else if (rt_ipc_msg_used < RT_CH_MSG_MAX_NR)
  57. {
  58. p = &ipc_msg_pool[rt_ipc_msg_used];
  59. rt_ipc_msg_used++;
  60. }
  61. return p;
  62. }
  63. /**
  64. * Put a released IPC message back to the released chain.
  65. */
  66. static void _ipc_msg_free(rt_ipc_msg_t p_msg)
  67. {
  68. p_msg->msg.sender = (void*)_ipc_msg_free_list;
  69. _ipc_msg_free_list = p_msg;
  70. }
  71. /**
  72. * Initialized the IPC message.
  73. */
  74. static void rt_ipc_msg_init(rt_ipc_msg_t msg, struct rt_channel_msg *data, rt_uint8_t need_reply)
  75. {
  76. RT_ASSERT(msg != RT_NULL);
  77. msg->need_reply = need_reply;
  78. msg->msg = *data;
  79. msg->msg.sender = (void*)rt_thread_self();
  80. rt_list_init(&msg->mlist);
  81. }
  82. /**
  83. * Initialized the list of the waiting receivers on the IPC channel.
  84. */
  85. rt_inline rt_err_t rt_channel_object_init(struct rt_ipc_object *ipc)
  86. {
  87. rt_list_init(&(ipc->suspend_thread)); /* receiver list */
  88. return RT_EOK;
  89. }
  90. /**
  91. * Wakeup the first suspened thread in the list.
  92. */
  93. rt_inline rt_err_t rt_channel_list_resume(rt_list_t *list)
  94. {
  95. struct rt_thread *thread;
  96. /* get the first thread entry waiting for sending */
  97. thread = rt_list_entry(list->next, struct rt_thread, tlist);
  98. rt_thread_resume(thread);
  99. return RT_EOK;
  100. }
  101. /**
  102. * Wakeup all the suspended threads in the list.
  103. */
  104. rt_inline rt_err_t rt_channel_list_resume_all(rt_list_t *list)
  105. {
  106. struct rt_thread *thread;
  107. register rt_ubase_t temp;
  108. /* wakeup all suspended threads for sending */
  109. while (!rt_list_isempty(list))
  110. {
  111. temp = rt_hw_interrupt_disable();
  112. thread = rt_list_entry(list->next, struct rt_thread, tlist);
  113. thread->error = -RT_ERROR;
  114. rt_thread_resume(thread);
  115. rt_hw_interrupt_enable(temp);
  116. }
  117. return RT_EOK;
  118. }
  119. /**
  120. * Suspend the thread and chain it into the end of the list.
  121. */
  122. rt_inline rt_err_t rt_channel_list_suspend(rt_list_t *list, struct rt_thread *thread)
  123. {
  124. /* suspend thread */
  125. rt_err_t ret = rt_thread_suspend_with_flag(thread, RT_INTERRUPTIBLE);
  126. if (ret == RT_EOK)
  127. {
  128. rt_list_insert_before(list, &(thread->tlist)); /* list end */
  129. }
  130. return ret;
  131. }
  132. static void _rt_channel_check_wq_wakup(rt_channel_t ch)
  133. {
  134. rt_base_t level;
  135. level = rt_hw_interrupt_disable();
  136. if (rt_list_isempty(&ch->wait_msg))
  137. {
  138. rt_hw_interrupt_enable(level);
  139. return;
  140. }
  141. rt_wqueue_wakeup(&ch->reader_queue, 0);
  142. rt_hw_interrupt_enable(level);
  143. }
  144. /**
  145. * Create a new or open an existing IPC channel.
  146. */
  147. rt_channel_t rt_raw_channel_open(const char *name, int flags)
  148. {
  149. register rt_ubase_t temp = 0;
  150. rt_channel_t ch = RT_NULL;
  151. struct rt_object *object;
  152. struct rt_list_node *node;
  153. struct rt_object_information *information;
  154. temp = rt_hw_interrupt_disable();
  155. information = rt_object_get_information(RT_Object_Class_Channel);
  156. RT_ASSERT(information != RT_NULL);
  157. /* retrieve the existing IPC channels */
  158. for (node = information->object_list.next;
  159. node != &(information->object_list);
  160. node = node->next)
  161. {
  162. object = rt_list_entry(node, struct rt_object, list);
  163. if (rt_strncmp(object->name, name, RT_NAME_MAX) == 0)
  164. {
  165. if ((flags & O_CREAT) && (flags & O_EXCL))
  166. {
  167. goto quit;
  168. }
  169. /* find the IPC channel with the specific name */
  170. ch = (rt_channel_t)object;
  171. ch->ref++; /* increase the reference count */
  172. break;
  173. }
  174. }
  175. if (!ch) /* create a new IPC channel */
  176. {
  177. if (flags & O_CREAT)
  178. {
  179. RT_DEBUG_NOT_IN_INTERRUPT;
  180. /* allocate a real IPC channel structure */
  181. ch = (rt_channel_t)rt_object_allocate(RT_Object_Class_Channel, name);
  182. }
  183. if (!ch)
  184. {
  185. goto quit;
  186. }
  187. rt_channel_object_init(&ch->parent); /* suspended receivers */
  188. rt_list_init(&ch->wait_msg); /* unhandled messages */
  189. rt_list_init(&ch->wait_thread); /* suspended senders */
  190. rt_wqueue_init(&ch->reader_queue); /* reader poll queue */
  191. ch->reply = RT_NULL;
  192. ch->stat = RT_IPC_STAT_IDLE; /* no suspended threads */
  193. ch->ref = 1;
  194. }
  195. quit:
  196. rt_hw_interrupt_enable(temp);
  197. return ch;
  198. }
  199. /**
  200. * Close an existiong IPC channel, release the resources.
  201. */
  202. rt_err_t rt_raw_channel_close(rt_channel_t ch)
  203. {
  204. register rt_ubase_t temp;
  205. RT_DEBUG_NOT_IN_INTERRUPT;
  206. if (ch == RT_NULL)
  207. {
  208. return -RT_EIO;
  209. }
  210. temp = rt_hw_interrupt_disable();
  211. if (rt_object_get_type(&ch->parent.parent) != RT_Object_Class_Channel)
  212. {
  213. rt_hw_interrupt_enable(temp);
  214. return -RT_EIO;
  215. }
  216. if (rt_object_is_systemobject(&ch->parent.parent) != RT_FALSE)
  217. {
  218. rt_hw_interrupt_enable(temp);
  219. return -RT_EIO;
  220. }
  221. if (ch->ref == 0)
  222. {
  223. rt_hw_interrupt_enable(temp);
  224. return -RT_EIO;
  225. }
  226. ch->ref--;
  227. if (ch->ref == 0)
  228. {
  229. /* wakeup all the suspended receivers and senders */
  230. rt_channel_list_resume_all(&ch->parent.suspend_thread);
  231. rt_channel_list_resume_all(&ch->wait_thread);
  232. /* all ipc msg will lost */
  233. rt_list_init(&ch->wait_msg);
  234. rt_object_delete(&ch->parent.parent); /* release the IPC channel structure */
  235. }
  236. rt_hw_interrupt_enable(temp);
  237. return RT_EOK;
  238. }
  239. static rt_err_t wakeup_sender_wait_recv(void *object, struct rt_thread *thread)
  240. {
  241. rt_channel_t ch;
  242. ch = (rt_channel_t)object;
  243. if (ch->stat == RT_IPC_STAT_ACTIVE && ch->reply == thread)
  244. {
  245. ch->stat = RT_IPC_STAT_IDLE;
  246. ch->reply = RT_NULL;
  247. }
  248. else
  249. {
  250. rt_ipc_msg_t msg;
  251. rt_list_t *l;
  252. l = ch->wait_msg.next;
  253. while (l != &ch->wait_msg)
  254. {
  255. msg = rt_list_entry(l, struct rt_ipc_msg, mlist);
  256. if (msg->need_reply && msg->msg.sender == thread)
  257. {
  258. rt_list_remove(&msg->mlist); /* remove the msg from the channel */
  259. _ipc_msg_free(msg);
  260. break;
  261. }
  262. l = l->next;
  263. }
  264. }
  265. thread->error = -RT_EINTR;
  266. return rt_thread_resume(thread); /* wake up the sender */
  267. }
  268. static rt_err_t wakeup_sender_wait_reply(void *object, struct rt_thread *thread)
  269. {
  270. rt_channel_t ch;
  271. ch = (rt_channel_t)object;
  272. RT_ASSERT(ch->stat == RT_IPC_STAT_ACTIVE && ch->reply == thread);
  273. ch->stat = RT_IPC_STAT_IDLE;
  274. ch->reply = RT_NULL;
  275. thread->error = -RT_EINTR;
  276. return rt_thread_resume(thread); /* wake up the sender */
  277. }
  278. static void sender_timeout(void *parameter)
  279. {
  280. struct rt_thread *thread = (struct rt_thread*)parameter;
  281. rt_channel_t ch;
  282. ch = (rt_channel_t)(thread->wakeup.user_data);
  283. if (ch->stat == RT_IPC_STAT_ACTIVE && ch->reply == thread)
  284. {
  285. ch->stat = RT_IPC_STAT_IDLE;
  286. ch->reply = RT_NULL;
  287. }
  288. else
  289. {
  290. rt_ipc_msg_t msg;
  291. rt_list_t *l;
  292. l = ch->wait_msg.next;
  293. while (l != &ch->wait_msg)
  294. {
  295. msg = rt_list_entry(l, struct rt_ipc_msg, mlist);
  296. if (msg->need_reply && msg->msg.sender == thread)
  297. {
  298. rt_list_remove(&msg->mlist); /* remove the msg from the channel */
  299. _ipc_msg_free(msg);
  300. break;
  301. }
  302. l = l->next;
  303. }
  304. }
  305. thread->error = -RT_ETIMEOUT;
  306. thread->wakeup.func = RT_NULL;
  307. rt_list_remove(&(thread->tlist));
  308. /* insert to schedule ready list */
  309. rt_schedule_insert_thread(thread);
  310. /* do schedule */
  311. rt_schedule();
  312. }
  313. /**
  314. * Get file vnode from fd.
  315. */
  316. static void *_ipc_msg_get_file(int fd)
  317. {
  318. struct dfs_file *d;
  319. d = fd_get(fd);
  320. if (d == RT_NULL)
  321. return RT_NULL;
  322. if (!d->vnode)
  323. return RT_NULL;
  324. return (void *)d;
  325. }
  326. /**
  327. * Get fd from file vnode.
  328. */
  329. static int _ipc_msg_fd_new(void *file)
  330. {
  331. int fd;
  332. struct dfs_file *d;
  333. struct dfs_file *df = RT_NULL;
  334. if (file == RT_NULL)
  335. {
  336. return -1;
  337. }
  338. df = (struct dfs_file *)file;
  339. fd = fd_new();
  340. if (fd < 0)
  341. {
  342. return -1;
  343. }
  344. d = fd_get(fd);
  345. if (!d)
  346. {
  347. fd_release(fd);
  348. return -1;
  349. }
  350. d->vnode = df->vnode;
  351. d->flags = df->flags;
  352. d->data = df->data;
  353. d->magic = df->magic;
  354. #ifdef RT_USING_DFS_V2
  355. d->fops = df->fops;
  356. d->mode = df->mode;
  357. d->dentry = df->dentry;
  358. if (d->dentry)
  359. rt_atomic_add(&(d->dentry->ref_count), 1);
  360. if (d->vnode)
  361. rt_atomic_add(&(d->vnode->ref_count), 1);
  362. #else
  363. if (d->vnode)
  364. d->vnode->ref_count ++;
  365. #endif
  366. return fd;
  367. }
  368. /**
  369. * Send data through an IPC channel, wait for the reply or not.
  370. */
  371. static rt_err_t _rt_raw_channel_send_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, int need_reply, rt_channel_msg_t data_ret, rt_int32_t time)
  372. {
  373. rt_ipc_msg_t msg;
  374. struct rt_thread *thread_recv, *thread_send = 0;
  375. register rt_base_t temp;
  376. rt_err_t ret;
  377. void (*old_timeout_func)(void *) = 0;
  378. if (need_reply)
  379. {
  380. RT_DEBUG_NOT_IN_INTERRUPT;
  381. }
  382. if (ch == RT_NULL)
  383. {
  384. return -RT_EIO;
  385. }
  386. temp = rt_hw_interrupt_disable();
  387. if (rt_object_get_type(&ch->parent.parent) != RT_Object_Class_Channel)
  388. {
  389. rt_hw_interrupt_enable(temp);
  390. return -RT_EIO;
  391. }
  392. if (need_reply && time == 0)
  393. {
  394. rt_hw_interrupt_enable(temp);
  395. return -RT_ETIMEOUT;
  396. }
  397. /* allocate an IPC message */
  398. msg = _ipc_msg_alloc();
  399. if (!msg)
  400. {
  401. rt_hw_interrupt_enable(temp);
  402. return -RT_ENOMEM;
  403. }
  404. /* IPC message : file descriptor */
  405. if (data->type == RT_CHANNEL_FD)
  406. {
  407. data->u.fd.file = _ipc_msg_get_file(data->u.fd.fd);
  408. }
  409. rt_ipc_msg_init(msg, data, need_reply);
  410. if (need_reply)
  411. {
  412. thread_send = rt_thread_self();
  413. thread_send->error = RT_EOK;
  414. }
  415. switch (ch->stat)
  416. {
  417. case RT_IPC_STAT_IDLE:
  418. case RT_IPC_STAT_ACTIVE:
  419. if (need_reply)
  420. {
  421. ret = rt_channel_list_suspend(&ch->wait_thread, thread_send);
  422. if (ret != RT_EOK)
  423. {
  424. _ipc_msg_free(msg);
  425. rt_hw_interrupt_enable(temp);
  426. return ret;
  427. }
  428. rt_thread_wakeup_set(thread_send, wakeup_sender_wait_recv, (void*)ch);
  429. if (time > 0)
  430. {
  431. rt_timer_control(&(thread_send->thread_timer),
  432. RT_TIMER_CTRL_GET_FUNC,
  433. &old_timeout_func);
  434. rt_timer_control(&(thread_send->thread_timer),
  435. RT_TIMER_CTRL_SET_FUNC,
  436. sender_timeout);
  437. /* reset the timeout of thread timer and start it */
  438. rt_timer_control(&(thread_send->thread_timer),
  439. RT_TIMER_CTRL_SET_TIME,
  440. &time);
  441. rt_timer_start(&(thread_send->thread_timer));
  442. }
  443. }
  444. /*
  445. * If there is no thread waiting for messages, chain the message
  446. * into the list.
  447. */
  448. rt_list_insert_before(&ch->wait_msg, &msg->mlist);
  449. break;
  450. case RT_IPC_STAT_WAIT:
  451. /*
  452. * If there are suspended receivers on the IPC channel, transfer the
  453. * pointer of the message to the first receiver directly and wake it
  454. * up.
  455. */
  456. RT_ASSERT(ch->parent.suspend_thread.next != &ch->parent.suspend_thread);
  457. if (need_reply)
  458. {
  459. ret = rt_channel_list_suspend(&ch->wait_thread, thread_send);
  460. if (ret != RT_EOK)
  461. {
  462. _ipc_msg_free(msg);
  463. rt_hw_interrupt_enable(temp);
  464. return ret;
  465. }
  466. ch->reply = thread_send; /* record the current waiting sender */
  467. ch->stat = RT_IPC_STAT_ACTIVE;
  468. rt_thread_wakeup_set(thread_send, wakeup_sender_wait_reply, (void*)ch);
  469. if (time > 0)
  470. {
  471. rt_timer_control(&(thread_send->thread_timer),
  472. RT_TIMER_CTRL_GET_FUNC,
  473. &old_timeout_func);
  474. rt_timer_control(&(thread_send->thread_timer),
  475. RT_TIMER_CTRL_SET_FUNC,
  476. sender_timeout);
  477. /* reset the timeout of thread timer and start it */
  478. rt_timer_control(&(thread_send->thread_timer),
  479. RT_TIMER_CTRL_SET_TIME,
  480. &time);
  481. rt_timer_start(&(thread_send->thread_timer));
  482. }
  483. }
  484. else
  485. {
  486. ch->stat = RT_IPC_STAT_IDLE;
  487. }
  488. thread_recv = rt_list_entry(ch->parent.suspend_thread.next, struct rt_thread, tlist);
  489. thread_recv->msg_ret = msg; /* to the first suspended receiver */
  490. thread_recv->error = RT_EOK;
  491. rt_channel_list_resume(&ch->parent.suspend_thread);
  492. break;
  493. default:
  494. break;
  495. }
  496. if ( ch->stat == RT_IPC_STAT_IDLE)
  497. {
  498. _rt_channel_check_wq_wakup(ch);
  499. }
  500. rt_hw_interrupt_enable(temp);
  501. /* reschedule in order to let the potential receivers run */
  502. rt_schedule();
  503. if (need_reply)
  504. {
  505. temp = rt_hw_interrupt_disable();
  506. if (old_timeout_func)
  507. {
  508. rt_timer_control(&(thread_send->thread_timer),
  509. RT_TIMER_CTRL_SET_FUNC,
  510. old_timeout_func);
  511. }
  512. ret = thread_send->error;
  513. rt_hw_interrupt_enable(temp);
  514. if (ret != RT_EOK)
  515. {
  516. return ret;
  517. }
  518. /* If the sender gets the chance to run, the requested reply must be valid. */
  519. RT_ASSERT(data_ret != RT_NULL);
  520. *data_ret = ((rt_ipc_msg_t)(thread_send->msg_ret))->msg; /* extract data */
  521. temp = rt_hw_interrupt_disable();
  522. _ipc_msg_free(thread_send->msg_ret); /* put back the message to kernel */
  523. rt_hw_interrupt_enable(temp);
  524. thread_send->msg_ret = RT_NULL;
  525. }
  526. return RT_EOK;
  527. }
  528. /**
  529. * Send data through an IPC channel with no reply.
  530. */
  531. rt_err_t rt_raw_channel_send(rt_channel_t ch, rt_channel_msg_t data)
  532. {
  533. return _rt_raw_channel_send_recv_timeout(ch, data, 0, 0, RT_WAITING_FOREVER);
  534. }
  535. /**
  536. * Send data through an IPC channel and wait for the relpy.
  537. */
  538. rt_err_t rt_raw_channel_send_recv(rt_channel_t ch, rt_channel_msg_t data, rt_channel_msg_t data_ret)
  539. {
  540. return _rt_raw_channel_send_recv_timeout(ch, data, 1, data_ret, RT_WAITING_FOREVER);
  541. }
  542. /**
  543. * Send data through an IPC channel and wait for the relpy.
  544. */
  545. rt_err_t rt_raw_channel_send_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, rt_channel_msg_t data_ret, rt_int32_t time)
  546. {
  547. return _rt_raw_channel_send_recv_timeout(ch, data, 1, data_ret, time);
  548. }
  549. /**
  550. * Reply to the waiting sender and wake it up.
  551. */
  552. rt_err_t rt_raw_channel_reply(rt_channel_t ch, rt_channel_msg_t data)
  553. {
  554. rt_ipc_msg_t msg;
  555. struct rt_thread *thread;
  556. register rt_base_t temp;
  557. if (ch == RT_NULL)
  558. {
  559. return -RT_EIO;
  560. }
  561. temp = rt_hw_interrupt_disable();
  562. if (rt_object_get_type(&ch->parent.parent) != RT_Object_Class_Channel)
  563. {
  564. rt_hw_interrupt_enable(temp);
  565. return -RT_EIO;
  566. }
  567. if (ch->stat != RT_IPC_STAT_ACTIVE)
  568. {
  569. rt_hw_interrupt_enable(temp);
  570. return -RT_ERROR;
  571. }
  572. if (ch->reply == RT_NULL)
  573. {
  574. rt_hw_interrupt_enable(temp);
  575. return -RT_ERROR;
  576. }
  577. /* allocate an IPC message */
  578. msg = _ipc_msg_alloc();
  579. if (!msg)
  580. {
  581. rt_hw_interrupt_enable(temp);
  582. return -RT_ENOMEM;
  583. }
  584. rt_ipc_msg_init(msg, data, 0);
  585. thread = ch->reply;
  586. thread->msg_ret = msg; /* transfer the reply to the sender */
  587. rt_thread_resume(thread); /* wake up the sender */
  588. ch->stat = RT_IPC_STAT_IDLE;
  589. ch->reply = RT_NULL;
  590. _rt_channel_check_wq_wakup(ch);
  591. rt_hw_interrupt_enable(temp);
  592. rt_schedule();
  593. return RT_EOK;
  594. }
  595. static rt_err_t wakeup_receiver(void *object, struct rt_thread *thread)
  596. {
  597. rt_channel_t ch;
  598. rt_err_t ret;
  599. ch = (rt_channel_t)object;
  600. ch->stat = RT_IPC_STAT_IDLE;
  601. thread->error = -RT_EINTR;
  602. ret = rt_channel_list_resume(&ch->parent.suspend_thread);
  603. _rt_channel_check_wq_wakup(ch);
  604. return ret;
  605. }
  606. static void receiver_timeout(void *parameter)
  607. {
  608. struct rt_thread *thread = (struct rt_thread*)parameter;
  609. rt_channel_t ch;
  610. ch = (rt_channel_t)(thread->wakeup.user_data);
  611. ch->stat = RT_IPC_STAT_IDLE;
  612. thread->error = -RT_ETIMEOUT;
  613. thread->wakeup.func = RT_NULL;
  614. rt_list_remove(&(thread->tlist));
  615. /* insert to schedule ready list */
  616. rt_schedule_insert_thread(thread);
  617. _rt_channel_check_wq_wakup(ch);
  618. /* do schedule */
  619. rt_schedule();
  620. }
  621. /**
  622. * Fetch a message from the specified IPC channel.
  623. */
  624. static rt_err_t _rt_raw_channel_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, rt_int32_t time)
  625. {
  626. struct rt_thread *thread;
  627. rt_ipc_msg_t msg_ret;
  628. register rt_base_t temp;
  629. rt_err_t ret;
  630. void (*old_timeout_func)(void *) = 0;
  631. RT_DEBUG_NOT_IN_INTERRUPT;
  632. if (ch == RT_NULL)
  633. {
  634. return -RT_EIO;
  635. }
  636. temp = rt_hw_interrupt_disable();
  637. if (rt_object_get_type(&ch->parent.parent) != RT_Object_Class_Channel)
  638. {
  639. rt_hw_interrupt_enable(temp);
  640. return -RT_EIO;
  641. }
  642. if (ch->stat != RT_IPC_STAT_IDLE)
  643. {
  644. rt_hw_interrupt_enable(temp);
  645. return -RT_ERROR;
  646. }
  647. if (ch->wait_msg.next != &ch->wait_msg) /* there exist unhandled messages */
  648. {
  649. msg_ret = rt_list_entry(ch->wait_msg.next, struct rt_ipc_msg, mlist);
  650. rt_list_remove(ch->wait_msg.next); /* remove the message from the channel */
  651. if (msg_ret->need_reply)
  652. {
  653. RT_ASSERT(ch->wait_thread.next != &ch->wait_thread);
  654. thread = rt_list_entry(ch->wait_thread.next, struct rt_thread, tlist);
  655. rt_list_remove(ch->wait_thread.next);
  656. ch->reply = thread; /* record the waiting sender */
  657. ch->stat = RT_IPC_STAT_ACTIVE; /* no valid suspened receivers */
  658. }
  659. *data = msg_ret->msg; /* extract the transferred data */
  660. if (data->type == RT_CHANNEL_FD)
  661. {
  662. data->u.fd.fd = _ipc_msg_fd_new(data->u.fd.file);
  663. }
  664. _ipc_msg_free(msg_ret); /* put back the message to kernel */
  665. }
  666. else
  667. {
  668. if (time == 0)
  669. {
  670. rt_hw_interrupt_enable(temp);
  671. return -RT_ETIMEOUT;
  672. }
  673. /* no valid message, we must wait */
  674. thread = rt_thread_self();
  675. ret = rt_channel_list_suspend(&ch->parent.suspend_thread, thread);
  676. if (ret != RT_EOK)
  677. {
  678. rt_hw_interrupt_enable(temp);
  679. return ret;
  680. }
  681. rt_thread_wakeup_set(thread, wakeup_receiver, (void*)ch);
  682. ch->stat = RT_IPC_STAT_WAIT;/* no valid suspended senders */
  683. thread->error = RT_EOK;
  684. if (time > 0)
  685. {
  686. rt_timer_control(&(thread->thread_timer),
  687. RT_TIMER_CTRL_GET_FUNC,
  688. &old_timeout_func);
  689. rt_timer_control(&(thread->thread_timer),
  690. RT_TIMER_CTRL_SET_FUNC,
  691. receiver_timeout);
  692. /* reset the timeout of thread timer and start it */
  693. rt_timer_control(&(thread->thread_timer),
  694. RT_TIMER_CTRL_SET_TIME,
  695. &time);
  696. rt_timer_start(&(thread->thread_timer));
  697. }
  698. rt_hw_interrupt_enable(temp);
  699. rt_schedule(); /* let the senders run */
  700. temp = rt_hw_interrupt_disable();
  701. if (old_timeout_func)
  702. {
  703. rt_timer_control(&(thread->thread_timer),
  704. RT_TIMER_CTRL_SET_FUNC,
  705. old_timeout_func);
  706. }
  707. ret = thread->error;
  708. if ( ret != RT_EOK)
  709. {
  710. rt_hw_interrupt_enable(temp);
  711. return ret;
  712. }
  713. /* If waked up, the received message has been store into the thread. */
  714. *data = ((rt_ipc_msg_t)(thread->msg_ret))->msg; /* extract data */
  715. if (data->type == RT_CHANNEL_FD)
  716. {
  717. data->u.fd.fd = _ipc_msg_fd_new(data->u.fd.file);
  718. }
  719. _ipc_msg_free(thread->msg_ret); /* put back the message to kernel */
  720. thread->msg_ret = RT_NULL;
  721. }
  722. rt_hw_interrupt_enable(temp);
  723. return RT_EOK;
  724. }
  725. rt_err_t rt_raw_channel_recv(rt_channel_t ch, rt_channel_msg_t data)
  726. {
  727. return _rt_raw_channel_recv_timeout(ch, data, RT_WAITING_FOREVER);
  728. }
  729. rt_err_t rt_raw_channel_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, rt_int32_t time)
  730. {
  731. return _rt_raw_channel_recv_timeout(ch, data, time);
  732. }
  733. /**
  734. * Peek a message from the specified IPC channel.
  735. */
  736. rt_err_t rt_raw_channel_peek(rt_channel_t ch, rt_channel_msg_t data)
  737. {
  738. return _rt_raw_channel_recv_timeout(ch, data, 0);
  739. }
  740. /* for API */
  741. static int lwp_fd_new(int fdt_type)
  742. {
  743. struct dfs_fdtable *fdt;
  744. if (fdt_type)
  745. {
  746. fdt = dfs_fdtable_get_global();
  747. }
  748. else
  749. {
  750. fdt = dfs_fdtable_get();
  751. }
  752. return fdt_fd_new(fdt);
  753. }
  754. static struct dfs_file *lwp_fd_get(int fdt_type, int fd)
  755. {
  756. struct dfs_fdtable *fdt;
  757. if (fdt_type)
  758. {
  759. fdt = dfs_fdtable_get_global();
  760. }
  761. else
  762. {
  763. fdt = dfs_fdtable_get();
  764. }
  765. return fdt_fd_get(fdt, fd);
  766. }
  767. static void lwp_fd_release(int fdt_type, int fd)
  768. {
  769. struct dfs_fdtable *fdt;
  770. if (fdt_type)
  771. {
  772. fdt = dfs_fdtable_get_global();
  773. }
  774. else
  775. {
  776. fdt = dfs_fdtable_get();
  777. }
  778. fdt_fd_release(fdt, fd);
  779. }
  780. static int _chfd_alloc(int fdt_type)
  781. {
  782. /* create a BSD socket */
  783. int fd;
  784. /* allocate a fd */
  785. fd = lwp_fd_new(fdt_type);
  786. if (fd < 0)
  787. {
  788. return -1;
  789. }
  790. return fd;
  791. }
  792. static void _chfd_free(int fd, int fdt_type)
  793. {
  794. struct dfs_file *d;
  795. d = lwp_fd_get(fdt_type, fd);
  796. if (d == RT_NULL)
  797. {
  798. return;
  799. }
  800. lwp_fd_release(fdt_type, fd);
  801. }
  802. /* for fops */
  803. static int channel_fops_poll(struct dfs_file *file, struct rt_pollreq *req)
  804. {
  805. int mask = POLLOUT;
  806. rt_channel_t ch;
  807. ch = (rt_channel_t)file->vnode->data;
  808. rt_poll_add(&(ch->reader_queue), req);
  809. if (ch->stat != RT_IPC_STAT_IDLE)
  810. {
  811. return mask;
  812. }
  813. if (!rt_list_isempty(&ch->wait_msg))
  814. {
  815. mask |= POLLIN;
  816. }
  817. return mask;
  818. }
  819. static int channel_fops_close(struct dfs_file *file)
  820. {
  821. rt_channel_t ch;
  822. rt_base_t level;
  823. level = rt_hw_interrupt_disable();
  824. ch = (rt_channel_t)file->vnode->data;
  825. if (file->vnode->ref_count == 1)
  826. {
  827. ch->ref--;
  828. if (ch->ref == 0)
  829. {
  830. /* wakeup all the suspended receivers and senders */
  831. rt_channel_list_resume_all(&ch->parent.suspend_thread);
  832. rt_channel_list_resume_all(&ch->wait_thread);
  833. /* all ipc msg will lost */
  834. rt_list_init(&ch->wait_msg);
  835. rt_object_delete(&ch->parent.parent); /* release the IPC channel structure */
  836. }
  837. }
  838. rt_hw_interrupt_enable(level);
  839. return 0;
  840. }
  841. static const struct dfs_file_ops channel_fops =
  842. {
  843. .close = channel_fops_close, /* close */
  844. .poll = channel_fops_poll, /* poll */
  845. };
  846. int lwp_channel_open(int fdt_type, const char *name, int flags)
  847. {
  848. int fd;
  849. rt_channel_t ch = RT_NULL;
  850. struct dfs_file *d;
  851. fd = _chfd_alloc(fdt_type); /* allocate an IPC channel descriptor */
  852. if (fd == -1)
  853. {
  854. goto quit;
  855. }
  856. d = lwp_fd_get(fdt_type, fd);
  857. d->vnode = (struct dfs_vnode *)rt_malloc(sizeof(struct dfs_vnode));
  858. if (!d->vnode)
  859. {
  860. _chfd_free(fd, fdt_type);
  861. fd = -1;
  862. goto quit;
  863. }
  864. ch = rt_raw_channel_open(name, flags);
  865. if (ch)
  866. {
  867. /* initialize vnode */
  868. dfs_vnode_init(d->vnode, FT_USER, &channel_fops);
  869. d->flags = O_RDWR; /* set flags as read and write */
  870. /* set socket to the data of dfs_file */
  871. d->vnode->data = (void *)ch;
  872. }
  873. else
  874. {
  875. rt_free(d->vnode);
  876. d->vnode = RT_NULL;
  877. _chfd_free(fd, fdt_type);
  878. fd = -1;
  879. }
  880. quit:
  881. return fd;
  882. }
  883. static rt_channel_t fd_2_channel(int fdt_type, int fd)
  884. {
  885. struct dfs_file *d;
  886. d = lwp_fd_get(fdt_type, fd);
  887. if (d)
  888. {
  889. rt_channel_t ch;
  890. ch = (rt_channel_t)d->vnode->data;
  891. if (ch)
  892. {
  893. return ch;
  894. }
  895. }
  896. return RT_NULL;
  897. }
  898. rt_err_t lwp_channel_close(int fdt_type, int fd)
  899. {
  900. rt_channel_t ch;
  901. struct dfs_file *d;
  902. struct dfs_vnode *vnode;
  903. d = lwp_fd_get(fdt_type, fd);
  904. if (!d)
  905. {
  906. return -RT_EIO;
  907. }
  908. vnode = d->vnode;
  909. if (!vnode)
  910. {
  911. return -RT_EIO;
  912. }
  913. ch = fd_2_channel(fdt_type, fd);
  914. if (!ch)
  915. {
  916. return -RT_EIO;
  917. }
  918. _chfd_free(fd, fdt_type);
  919. if (vnode->ref_count == 1)
  920. {
  921. rt_free(vnode);
  922. return rt_raw_channel_close(ch);
  923. }
  924. return 0;
  925. }
  926. rt_err_t lwp_channel_send(int fdt_type, int fd, rt_channel_msg_t data)
  927. {
  928. rt_channel_t ch;
  929. ch = fd_2_channel(fdt_type, fd);
  930. if (ch)
  931. {
  932. return rt_raw_channel_send(ch, data);
  933. }
  934. return -RT_EIO;
  935. }
  936. rt_err_t lwp_channel_send_recv_timeout(int fdt_type, int fd, rt_channel_msg_t data, rt_channel_msg_t data_ret, rt_int32_t time)
  937. {
  938. rt_channel_t ch;
  939. ch = fd_2_channel(fdt_type, fd);
  940. if (ch)
  941. {
  942. return rt_raw_channel_send_recv_timeout(ch, data, data_ret, time);
  943. }
  944. return -RT_EIO;
  945. }
  946. rt_err_t lwp_channel_reply(int fdt_type, int fd, rt_channel_msg_t data)
  947. {
  948. rt_channel_t ch;
  949. ch = fd_2_channel(fdt_type, fd);
  950. if (ch)
  951. {
  952. return rt_raw_channel_reply(ch, data);
  953. }
  954. return -RT_EIO;
  955. }
  956. rt_err_t lwp_channel_recv_timeout(int fdt_type, int fd, rt_channel_msg_t data, rt_int32_t time)
  957. {
  958. rt_channel_t ch;
  959. ch = fd_2_channel(fdt_type, fd);
  960. if (ch)
  961. {
  962. return rt_raw_channel_recv_timeout(ch, data, time);
  963. }
  964. return -RT_EIO;
  965. }
  966. int rt_channel_open(const char *name, int flags)
  967. {
  968. return lwp_channel_open(FDT_TYPE_KERNEL, name, flags);
  969. }
  970. rt_err_t rt_channel_close(int fd)
  971. {
  972. return lwp_channel_close(FDT_TYPE_KERNEL, fd);
  973. }
  974. rt_err_t rt_channel_send(int fd, rt_channel_msg_t data)
  975. {
  976. return lwp_channel_send(FDT_TYPE_KERNEL, fd, data);
  977. }
  978. rt_err_t rt_channel_send_recv_timeout(int fd, rt_channel_msg_t data, rt_channel_msg_t data_ret, rt_int32_t time)
  979. {
  980. return lwp_channel_send_recv_timeout(FDT_TYPE_KERNEL, fd, data, data_ret, time);
  981. }
  982. rt_err_t rt_channel_send_recv(int fd, rt_channel_msg_t data, rt_channel_msg_t data_ret)
  983. {
  984. return lwp_channel_send_recv_timeout(FDT_TYPE_KERNEL, fd, data, data_ret, RT_WAITING_FOREVER);
  985. }
  986. rt_err_t rt_channel_reply(int fd, rt_channel_msg_t data)
  987. {
  988. return lwp_channel_reply(FDT_TYPE_KERNEL, fd, data);
  989. }
  990. rt_err_t rt_channel_recv_timeout(int fd, rt_channel_msg_t data, rt_int32_t time)
  991. {
  992. return lwp_channel_recv_timeout(FDT_TYPE_KERNEL, fd, data, time);
  993. }
  994. rt_err_t rt_channel_recv(int fd, rt_channel_msg_t data)
  995. {
  996. return lwp_channel_recv_timeout(FDT_TYPE_KERNEL, fd, data, RT_WAITING_FOREVER);
  997. }
  998. rt_err_t rt_channel_peek(int fd, rt_channel_msg_t data)
  999. {
  1000. return lwp_channel_recv_timeout(FDT_TYPE_KERNEL, fd, data, 0);
  1001. }
  1002. #ifdef RT_USING_FINSH
  1003. static int list_channel(void)
  1004. {
  1005. rt_base_t level;
  1006. rt_channel_t *channels;
  1007. rt_ubase_t index, count;
  1008. struct rt_object *object;
  1009. struct rt_list_node *node;
  1010. struct rt_object_information *information;
  1011. const char* stat_strs[] = {"idle", "wait", "active"};
  1012. information = rt_object_get_information(RT_Object_Class_Channel);
  1013. RT_ASSERT(information != RT_NULL);
  1014. count = 0;
  1015. level = rt_hw_interrupt_disable();
  1016. /* get the count of IPC channels */
  1017. for (node = information->object_list.next;
  1018. node != &(information->object_list);
  1019. node = node->next)
  1020. {
  1021. count ++;
  1022. }
  1023. rt_hw_interrupt_enable(level);
  1024. if (count == 0) return 0;
  1025. channels = (rt_channel_t *) rt_calloc(count, sizeof(rt_channel_t));
  1026. if (channels == RT_NULL) return 0; /* out of memory */
  1027. index = 0;
  1028. level = rt_hw_interrupt_disable();
  1029. /* retrieve pointer of IPC channels */
  1030. for (node = information->object_list.next;
  1031. node != &(information->object_list);
  1032. node = node->next)
  1033. {
  1034. object = rt_list_entry(node, struct rt_object, list);
  1035. channels[index] = (rt_channel_t)object;
  1036. index ++;
  1037. }
  1038. rt_hw_interrupt_enable(level);
  1039. rt_kprintf(" channel state\n");
  1040. rt_kprintf("-------- -------\n");
  1041. for (index = 0; index < count; index ++)
  1042. {
  1043. if (channels[index] != RT_NULL)
  1044. {
  1045. rt_kprintf("%-*.s", RT_NAME_MAX, channels[index]->parent.parent.name);
  1046. if (channels[index]->stat < 3)
  1047. rt_kprintf(" %s\n", stat_strs[channels[index]->stat]);
  1048. }
  1049. }
  1050. rt_free(channels);
  1051. return 0;
  1052. }
  1053. MSH_CMD_EXPORT(list_channel, list IPC channel information);
  1054. #endif