mqttclient.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. #define OS_RTTHREAD 1
  2. #if (OS_RTTHREAD == 0)
  3. #include <sys/socket.h>
  4. #include <sys/time.h>
  5. #include <sys/select.h>
  6. #include <netinet/in.h>
  7. #include <netinet/tcp.h>
  8. #include <arpa/inet.h>
  9. #include <netdb.h>
  10. #include <stdio.h>
  11. #include <unistd.h>
  12. #include <errno.h>
  13. #include <fcntl.h>
  14. #include <pthread.h>
  15. #include <signal.h>
  16. #include <stdlib.h>
  17. #define DEBUG printf
  18. #define SLEEP(x) sleep(x)
  19. #else
  20. #include <rtthread.h>
  21. #include <lwip/netdb.h>
  22. #include <lwip/sockets.h>
  23. #include <arch/sys_arch.h>
  24. #include <lwip/sys.h>
  25. #define DEBUG rt_kprintf
  26. #define SLEEP(x) rt_thread_delay((x)*RT_TICK_PER_SECOND)
  27. #endif
  28. #include <string.h>
  29. #include "MQTTPacket.h"
  30. #define HOSTNAME "m2m.eclipse.org"
  31. #define HOSTPORT 1883
  32. #define USERNAME "testuser"
  33. #define PASSWORD "testpassword"
  34. #define TOPIC "test"
  35. #define KEEPALIVE_INTERVAL 20
  36. typedef struct
  37. {
  38. int qos;
  39. unsigned char retained;
  40. unsigned char dup;
  41. unsigned short id;
  42. int payloadlen;
  43. unsigned char *payload;
  44. }mqtt_msg_t;
  45. typedef struct
  46. {
  47. int sockfd;
  48. unsigned char *wbuf; //
  49. int wbuflen;
  50. unsigned char *rbuf;
  51. int rbuflen;
  52. int (*getfn)(unsigned char*, int);
  53. }mqtt_client_t;
  54. static mqtt_client_t _cpub;
  55. static mqtt_client_t _csub;
  56. static void *mqtt_ping_thread(void *param)
  57. {
  58. int *sockfd = (int*)param;
  59. unsigned char buf[2];
  60. int len;
  61. DEBUG("ping start\n");
  62. while (*sockfd >= 0)
  63. {
  64. SLEEP(KEEPALIVE_INTERVAL-1);
  65. len = MQTTSerialize_pingreq(buf, sizeof(buf));
  66. send(*sockfd, buf, len, 0);
  67. }
  68. DEBUG("ping exit\n");
  69. return 0;
  70. }
  71. void mqtt_ping_start(int *sockfd)
  72. {
  73. #if (OS_RTTHREAD == 0)
  74. pthread_t tid;
  75. pthread_create(&tid, NULL, mqtt_ping_thread, (void*)sockfd);
  76. #else
  77. sys_thread_new("ping",
  78. mqtt_ping_thread,
  79. (void*)sockfd,
  80. 512,
  81. 20);
  82. #endif
  83. }
  84. static int sub_read(unsigned char *buf, int len)
  85. {
  86. int bytes = 0;
  87. struct timeval interval;
  88. int rc;
  89. interval.tv_sec = 3;
  90. interval.tv_usec = 0;
  91. rc = setsockopt(_csub.sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
  92. while (bytes < len)
  93. {
  94. rc = recv(_csub.sockfd, &buf[bytes], (size_t)(len - bytes), 0);
  95. if (rc == -1)
  96. {
  97. if (errno != ENOTCONN && errno != ECONNRESET)
  98. {
  99. bytes = -1;
  100. break;
  101. }
  102. }
  103. else
  104. bytes += rc;
  105. }
  106. return bytes;
  107. }
  108. static int pub_read(unsigned char *buf, int len)
  109. {
  110. int bytes = 0;
  111. struct timeval interval;
  112. int rc;
  113. interval.tv_sec = 3;
  114. interval.tv_usec = 0;
  115. rc = setsockopt(_cpub.sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
  116. while (bytes < len)
  117. {
  118. rc = recv(_cpub.sockfd, &buf[bytes], (size_t)(len - bytes), 0);
  119. if (rc == -1)
  120. {
  121. if (errno != ENOTCONN && errno != ECONNRESET)
  122. {
  123. bytes = -1;
  124. break;
  125. }
  126. }
  127. else
  128. bytes += rc;
  129. }
  130. return bytes;
  131. }
  132. int mqtt_write(int sockfd, unsigned char *buf, int len)
  133. {
  134. int rc;
  135. struct timeval tv;
  136. tv.tv_sec = 2;
  137. tv.tv_usec = 0;
  138. setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(struct timeval));
  139. rc = send(sockfd, buf, len, 0);
  140. if (rc == len)
  141. rc = 0;
  142. return rc;
  143. }
  144. int mqtt_subscribe(mqtt_client_t *c, char* topicstr, int qos)
  145. {
  146. MQTTString topic = MQTTString_initializer;
  147. int msgid = 1;
  148. int len;
  149. int rc = -1;
  150. topic.cstring = topicstr;
  151. len = MQTTSerialize_subscribe(c->wbuf, c->wbuflen, 0, msgid, 1, &topic, &qos);
  152. if (len <= 0)
  153. goto exit;
  154. rc = mqtt_write(c->sockfd, c->wbuf, len);
  155. if (rc < 0)
  156. goto exit;
  157. if (MQTTPacket_read(c->rbuf, c->rbuflen, c->getfn) == SUBACK)
  158. {
  159. unsigned short submsgid;
  160. int subcount;
  161. int granted_qos;
  162. rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, c->rbuf, c->rbuflen);
  163. if (granted_qos != 0)
  164. {
  165. DEBUG("granted qos != 0, %d\n", granted_qos);
  166. rc = -1;
  167. }
  168. else
  169. {
  170. rc = 0;
  171. }
  172. }
  173. else
  174. {
  175. rc = -1;
  176. }
  177. exit:
  178. return rc;
  179. }
  180. int mqtt_publish(mqtt_client_t *c, char* topicstr, mqtt_msg_t *msg)
  181. {
  182. int rc = -1;
  183. MQTTString topic = MQTTString_initializer;
  184. int len;
  185. int pktype;
  186. topic.cstring = topicstr;
  187. len = MQTTSerialize_publish(c->wbuf, c->wbuflen,
  188. msg->dup, msg->qos, msg->retained, msg->id,
  189. topic, msg->payload, msg->payloadlen);
  190. if (len <= 0)
  191. goto exit;
  192. if ((rc = mqtt_write(c->sockfd, c->wbuf, len)) <= 0)
  193. goto exit;
  194. pktype = MQTTPacket_read(c->rbuf, c->rbuflen, c->getfn);
  195. if (msg->qos == 1)
  196. {
  197. if (pktype == PUBACK)
  198. {
  199. unsigned short mypacketid;
  200. unsigned char dup, type;
  201. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->rbuf, c->rbuflen) != 1)
  202. rc = -1;
  203. }
  204. else
  205. rc = -1;
  206. }
  207. else if (msg->qos == 2)
  208. {
  209. if (pktype == PUBCOMP)
  210. {
  211. unsigned short mypacketid;
  212. unsigned char dup, type;
  213. if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->rbuf, c->rbuflen) != 1)
  214. rc = -1;
  215. }
  216. else
  217. rc = -1;
  218. }
  219. exit:
  220. return rc;
  221. }
  222. int mqtt_netconnect(char *addr, int port)
  223. {
  224. struct hostent *host = 0;
  225. struct sockaddr_in sockaddr;
  226. int sock;
  227. host = gethostbyname(addr);
  228. if (host == 0)
  229. return -1;
  230. if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  231. {
  232. return -1;
  233. }
  234. sockaddr.sin_family = AF_INET;
  235. sockaddr.sin_port = htons(port);
  236. sockaddr.sin_addr = *((struct in_addr *)host->h_addr);
  237. memset(&(sockaddr.sin_zero), 0, sizeof(sockaddr.sin_zero));
  238. if (connect(sock, (struct sockaddr *)&sockaddr, sizeof(struct sockaddr)) == -1)
  239. {
  240. #if (OS_RTTHREAD == 0)
  241. close(sock);
  242. #else
  243. closesocket(sock);
  244. #endif
  245. return -2;
  246. }
  247. return sock;
  248. }
  249. void mqtt_netdisconnect(int *sockfd)
  250. {
  251. #if (OS_RTTHREAD == 0)
  252. close(*sockfd);
  253. #else
  254. closesocket(*sockfd);
  255. #endif
  256. *sockfd = -1;
  257. }
  258. int mqtt_connect(mqtt_client_t *c, MQTTPacket_connectData *data)
  259. {
  260. int rc = -1;
  261. int len;
  262. len = MQTTSerialize_connect(c->wbuf, c->wbuflen, data);
  263. if (len <= 0)
  264. goto exit;
  265. rc = mqtt_write(c->sockfd, c->wbuf, len);
  266. if (rc < 0)
  267. goto exit;
  268. rc = MQTTPacket_read(c->rbuf, c->rbuflen, c->getfn);
  269. if (rc < 0)
  270. goto exit;
  271. if (rc == CONNACK)
  272. {
  273. unsigned char sessionPresent, connack_rc;
  274. if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, c->rbuf, c->rbuflen) == 1)
  275. {
  276. rc = connack_rc;
  277. }
  278. else
  279. {
  280. rc = -1;
  281. }
  282. }
  283. else
  284. rc = -1;
  285. exit:
  286. return rc;
  287. }
  288. int mqtt_disconnect(mqtt_client_t *c)
  289. {
  290. int rc = -1;
  291. int len;
  292. len = MQTTSerialize_disconnect(c->wbuf, c->wbuflen);
  293. if (len > 0)
  294. {
  295. rc = mqtt_write(c->sockfd, c->wbuf, len);
  296. }
  297. return rc;
  298. }
  299. static void *mqtt_pub_thread(void *param)
  300. {
  301. MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
  302. int rc;
  303. unsigned char rbuf[64];
  304. unsigned char wbuf[64];
  305. DEBUG("pub thread start\n");
  306. /* */
  307. _cpub.rbuf = rbuf;
  308. _cpub.rbuflen = sizeof(rbuf);
  309. _cpub.wbuf = wbuf;
  310. _cpub.wbuflen = sizeof(wbuf);
  311. _cpub.getfn = pub_read;
  312. if ((_cpub.sockfd = mqtt_netconnect(HOSTNAME, HOSTPORT)) < 0)
  313. {
  314. DEBUG("pub netconnet fail\n");
  315. return 0;
  316. }
  317. DEBUG("pub connect to: %s %d\n", HOSTNAME, HOSTPORT);
  318. condata.clientID.cstring = "mqttpub";
  319. condata.keepAliveInterval = KEEPALIVE_INTERVAL;
  320. condata.cleansession = 1;
  321. condata.username.cstring = USERNAME;
  322. condata.password.cstring = PASSWORD;
  323. rc = mqtt_connect(&_cpub, &condata);
  324. if (rc < 0)
  325. goto exit;
  326. DEBUG("pub connect ok\n");
  327. mqtt_ping_start(&_cpub.sockfd);
  328. while (rc == 0)
  329. {
  330. mqtt_msg_t msg;
  331. SLEEP(5);
  332. msg.dup = 0;
  333. msg.id = 0;
  334. msg.qos = 0;
  335. msg.retained = 0;
  336. msg.payload = (unsigned char*)"RT-Thread";
  337. msg.payloadlen = strlen((const char*)msg.payload);
  338. rc = mqtt_publish(&_cpub, TOPIC, &msg);
  339. }
  340. exit:
  341. mqtt_netdisconnect(&_cpub.sockfd);
  342. DEBUG("pub thread exit\n");
  343. return 0;
  344. }
  345. void msgprocess(MQTTString *topic, mqtt_msg_t *msg)
  346. {
  347. msg->payload[msg->payloadlen] = 0;
  348. DEBUG("recv: size = %d, msg = %s\n", msg->payloadlen, msg->payload);
  349. }
  350. static void *mqtt_sub_thread(void *param)
  351. {
  352. MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
  353. int pktype, rc, len;
  354. int failcnt = 0;
  355. unsigned char wbuf[64];
  356. unsigned char rbuf[64];
  357. DEBUG("sub thread start\n");
  358. _csub.wbuf = wbuf;
  359. _csub.wbuflen = sizeof(wbuf);
  360. _csub.rbuf = rbuf;
  361. _csub.rbuflen = sizeof(rbuf);
  362. _csub.getfn = sub_read;
  363. if ((_csub.sockfd = mqtt_netconnect(HOSTNAME, HOSTPORT)) < 0)
  364. {
  365. DEBUG("sub netconnect fail\n");
  366. return 0;
  367. }
  368. DEBUG("sub connect to: %s %d\n", HOSTNAME, HOSTPORT);
  369. condata.clientID.cstring = "mqttsub";
  370. condata.keepAliveInterval = KEEPALIVE_INTERVAL;
  371. condata.cleansession = 1;
  372. condata.username.cstring = USERNAME;
  373. condata.password.cstring = PASSWORD;
  374. rc = mqtt_connect(&_csub, &condata);
  375. if (rc < 0)
  376. goto exit;
  377. DEBUG("sub connect ok\n");
  378. rc = mqtt_subscribe(&_csub, TOPIC, 0);
  379. if (rc < 0)
  380. goto exit;
  381. DEBUG("sub topic: %s\n", TOPIC);
  382. mqtt_ping_start(&_csub.sockfd);
  383. while (1)
  384. {
  385. pktype = MQTTPacket_read(_csub.rbuf, _csub.rbuflen, sub_read);
  386. switch (pktype)
  387. {
  388. case CONNACK:
  389. case PUBACK:
  390. case SUBACK:
  391. break;
  392. case PUBLISH:
  393. {
  394. MQTTString topic;
  395. mqtt_msg_t msg;
  396. if (MQTTDeserialize_publish(&msg.dup, &msg.qos, &msg.retained, &msg.id, &topic,
  397. &msg.payload, &msg.payloadlen, _csub.rbuf, _csub.rbuflen) != 1)
  398. goto exit;
  399. msgprocess(&topic, &msg);
  400. if (msg.qos != 0)
  401. {
  402. if (msg.qos == 1)
  403. len = MQTTSerialize_ack(_csub.wbuf, _csub.wbuflen, PUBACK, 0, msg.id);
  404. else if (msg.qos == 2)
  405. len = MQTTSerialize_ack(_csub.wbuf, _csub.wbuflen, PUBREC, 0, msg.id);
  406. if (len <= 0)
  407. rc = -1;
  408. else
  409. rc = mqtt_write(_csub.sockfd, _csub.wbuf, len);
  410. if (rc == -1)
  411. goto exit;
  412. }
  413. }
  414. break;
  415. case PUBCOMP:
  416. break;
  417. case PINGRESP:
  418. failcnt = 0;
  419. break;
  420. case -1:
  421. if (++failcnt > KEEPALIVE_INTERVAL)
  422. {
  423. /* */
  424. goto exit;
  425. }
  426. break;
  427. }
  428. }
  429. /* */
  430. mqtt_disconnect(&_csub);
  431. exit:
  432. mqtt_netdisconnect(&_csub.sockfd);
  433. DEBUG("sub thread exit\n");
  434. return 0;
  435. }
  436. void mqtt_client_init(void)
  437. {
  438. #if (OS_RTTHREAD == 0)
  439. pthread_t tid;
  440. pthread_create(&tid, NULL, mqtt_sub_thread, NULL);
  441. pthread_create(&tid, NULL, mqtt_pub_thread, NULL);
  442. #else
  443. sys_thread_new("sub",
  444. mqtt_sub_thread,
  445. NULL,
  446. 1024,
  447. 20);
  448. sys_thread_new("pub",
  449. mqtt_pub_thread,
  450. NULL,
  451. 1024,
  452. 20);
  453. #endif
  454. }
  455. #ifdef RT_USING_FINSH
  456. #include <finsh.h>
  457. FINSH_FUNCTION_EXPORT(mqtt_client_init, MQTT Pub/Sub Test);
  458. #endif