前言1 V9 S7 y; V* M4 J! e! ?/ q: G) u
MQTT 协议的全称叫“消息队列遥测传输”协议。它是一个轻量级的通信协议。旨在为在低带宽、高延时、不稳定网络中的物联网设备提供消息传输服务。它运行在 TCP/IP 协议之上,采用客户端/服务器,发布/订阅消息模式工作,并提供一对多的消息分发。STM32 FOTA demo 就是通过 MQTT 协议进行 MCU 固件新版本信息的推送。Paho 是一个开源的 MQTT 客户端实现,它提供了多种开发语言下的实现。在此 demo 中,用的是 embeddedC 这个版本。
7 {+ s- [( }0 l2 p* DPaho MQTTClient EmbeddedC 的代码构成) A, r2 P9 P" ?
从 github 上下载下来的源代码,包括三个部分:
0 z- R1 E2 o$ L6 h6 \: o/ Y8 FMQTTPacket : 该文件夹下包括了底层的 C 代码,提供基本的简单的解析数据,以及将数据串行化的功能。是其他两个上层接口的基础,也可以单独使用。; G7 A# Y1 }) v" u. B+ P
MQTTClient :该文件夹下提供 C++的上层接口,现在提供 Linux, Arduino 和 mbed 的实现。
- t+ |( n9 X6 y8 F, H6 eMQTTClient-C:该文件夹下提供 C 的上层接口,针对那些不支持 C++编程的平台。
% \6 r8 W4 G% ? w6 d, LDemo 要用到的就是 MQTTPacket 和 MQTTClient-C 这两个文件夹下的源文件。
6 H ~& U+ ^4 z! g' t6 g- U1 k f5 ]1 c F0 L X, Q7 P
使用 Paho MQTTClient EmbeddedC5 q4 `# x( O) w/ Z
下面来看看如何使用 Paho MQTTClient EmbeddedC 来在 MCU 端实现 MQTT 通信。
5 I; z7 e' r8 L& F2 Z* Z4 C配置网络传输接口
* w; u8 a1 P+ bMQTT 是一个 TCP 之上的应用层协议,它发送和接收数据都要通过下层的 TCP/IP 协议栈进行。所以 MQTT 与下层的协议之9 L- \7 G( W6 m
间一定有一个接口。
. H, G& O0 @) T: _( WPaho 对这个数据收发的接口进行了形式上的定义:
9 H- g6 X# a# ?8 y- struct Network' E7 e; ~, ~" h9 ?' I& h' B5 @. g! B
- {
1 F- M/ p" X% O; k! W- ?7 N# Q - net_sockhnd_t my_socket;
d' u( g/ W6 l0 z: U/ J0 f - int (*mqttread) (Network*, unsigned char*,int,int);' A6 m) x; B& L+ _. c% j
- int (*mqttwrite) (Network*,unsigned char*,int,int);
+ l2 j; v0 @! c. Y2 W - int (*disconnect) (Network*);; Z4 x- R+ l" {2 |8 m; h
- };
复制代码
; }$ B' [0 @& E' L8 ^7 ]并通过以下形式在 Paho 中(MQTTClient.c)调用:9 {8 H3 i: X7 I" {" J$ g5 [4 R1 D2 j
- rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);' c+ O) O7 E7 s( Y. g
- …- ]: }4 j B: K: r \0 Q" ^
- int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));9 O$ C' j1 f; d9 M' g" F
- …
9 J! o$ [, I" |: U3 v& ? - rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
复制代码 ( }! f" y# o8 i- @
Paho 的实现会通过结构体 Network 中的 mqttread 和 mqttwrite 成员函数作为接口来从底层网络读取数据以及向底层网络发送数据。因此 Paho 的适配工作需要注册这些成员函数,并实现之:
0 k5 \1 y! h3 y( U0 f W) y& l在 STM32 FOTAdemo 中,Network 结构体是在 baidu_iot_network_wrapper.h 中定义的:- }6 [7 o! W) Z" }' N0 A1 I# }' Z
- struct Network
1 M9 _5 K& ]9 `9 z' i - { ` s2 q: t; j
- net_sockhnd_t my_socket;
7 M% v$ `/ ]5 b: w! t; ~ - int (*mqttread) (Network*, unsigned char*,int,int);
y: U9 B% F9 V! l& I' w a - int (*mqttwrite) (Network*,unsigned char*,int,int);
: c2 C, {9 }- j- \3 t' L - int (*disconnect) (Network*);2 ?2 | E0 y! W7 w0 Y) ~
- };
复制代码
- b' [- ?: ]7 v( i' E4 K/ b. ^0 wbaidu_iot_network_wrapper.c 文件中,有 MQTT 的网络接口函数的具体实现。6 E1 h( ~1 @$ O! Z$ U/ \' l& x7 B
- int mqtt_network_new(Network *network)3 b" S% @# ]/ V0 h% t, ?
- {
% C- v V3 I4 P: R - network->my_socket = 0;
2 Y1 D8 K7 b& V' g/ v - network->mqttread = mqtt_socket_recv;
4 w" k+ A; W9 [: {* h7 ? - network->mqttwrite = mqtt_socket_send;( l1 [7 d- D0 A$ v
- network->disconnect = mqtt_socket_disconnect;
9 [* H& v3 F2 v! Z8 N* S - return SUCCESS;
: w3 L8 r2 H- {* O. x* R - }
. @- Z: q4 n3 |' g3 G - …: `( l! Z9 _( O( }% }% n0 l7 s
- int mqtt_socket_send(Network *network, unsigned char *buf, int len, int timeout)
* [) X4 X" O' E+ T4 c; Z7 ]+ R, E0 K - {! D) _6 o8 l! T
- uint16_t rc;" I, o, h3 q3 K0 L' o" z; d
- int socket = (int) (network->my_socket);2 V- P/ T: h- o S: g
' ]9 ]- V& @& K$ s( d9 S) L& [% o0 j- u- rc = net_sock_send((void*)socket,buf,len);
' q, h! Q% Y# [: @
l! J" S2 y" a, m- return rc;
; q, m4 r9 i7 c1 B3 C. m: m2 @ - } { C. [$ R1 G8 Z4 I. X( E! o
- int mqtt_socket_recv(Network *network, unsigned char *buf, int len, int timeout)% c1 U( b: @6 Y7 ^9 e" T
- {2 J. E& ~ N! }6 u
- uint16_t rc;& C( V3 r( K$ A1 R$ G
- int socket = (int) (network->my_socket);5 S1 `) j0 ]6 X- s
- 8 D5 f+ P" S' w$ S: w. M
- rc = net_sock_recv((void*)socket,buf,len);
* F& R0 h: k2 J% }' s8 e' k. R - return rc;
! K$ n) q2 y4 Q3 `$ G& w - }
复制代码
9 q% [! }& n' J- m f9 } y新建一个 MQTT 客户端
# ?1 O# E( L9 W通过 MQTTClientInit 函数可以新建一个 MQTT 客户端。我们来看看在 Paho 中,MQTT 客户端都定义了哪些属性。MQTTClient 的定义在 MQTTClient.h 文件中,如下:9 G; @$ P2 }" d. U% v, H
- typedef struct MQTTClient
: v/ F% \% N$ e( Z" D - {
: C# v, f7 F" {6 D$ s - unsigned int next_packetid, command_timeout_ms;
- h$ A3 K K' X - size_t buf_size, readbuf_size;
+ `1 X, r/ y& v9 j9 M - unsigned char *buf, *readbuf;
; u1 E" U; h9 Q! I- ?1 } - unsigned int keepAliveInterval;
, N* Y/ x* e7 @4 ]7 |/ | | - char ping_outstanding;
0 z: B- ?5 `1 ^1 e/ w, d - int isconnected;8 T7 l% D8 W$ p9 y4 Q. _ v! Z
- int cleansession;
, N" v7 c- k! \' y* v2 g - struct MessageHandlers# n: y* B3 m, g0 E& N1 A
- {
- P! p: F6 T' j/ O) M6 a( ] - const char* topicFilter;
0 R1 k- x1 z" t- b: ~ - void (*fp) (MessageData*);
6 q# j/ G- R' Z. R$ W - } messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by
3 @1 S3 Z- p# H5 i+ t" y - subscription topic */; Y8 T. J' [( |( O, p$ @
- void (*defaultMessageHandler) (MessageData*);
. v/ _& b6 z' D- ~9 @9 P - Network* ipstack;7 y6 ~! c8 m0 O _( `- B8 ~8 M
- Timer last_sent, last_received;
5 N1 O6 l2 I* }$ Z& H - #if defined(MQTT_TASK)
! ^0 C" c. V" M3 V6 p' U; r& N - Mutex mutex;
$ s, {0 y# b4 Z! ~0 ^" f - Thread thread;8 b4 i9 N' j" x0 R' e0 s
- #endif
7 f1 N7 L/ _, y% K8 X! H - } MQTTClient;
复制代码 7 N! c3 m3 ~( C, Z
MQTTClient 结构体的定义包括:接收/发送数据的缓冲区(readbuffer 和 buf),保持心跳的时间间隔(keepAliveInterval),当前的连接状态(isconnected),消息句柄(messageData)以及网络接口(ipstack)等内容。- r3 C$ n) F6 a+ y* l9 f" H: q4 G
调用 MQTTClientInit 函数时需要输入的参数有:已经初始化好的网络接口(Network*)结构体,COMMAND_TIMEOUT_MS和接收/发送数据的 buffer。; B! T+ d2 ?& E3 X' j3 P
初始化 MQTTClient 后,就可以通过 MQTTConnect 来和服务器建立连接了。在和服务器建立连接的时候,还需要设定一些和建立连接以及后面通信相关的参数,比如:用户名、密码、心跳包的间隔、遗嘱信息(will)、设备与服务器意外断开后服务器是否要保留后续消息(cleansession)等等。都可以通过对 Connect_para 进行初始化来设置这些参数,再调用MQTTConnect 函数建立相关连接。0 G4 `/ G. g9 l9 M- F
下面就是一段新建 MQTTClient 并与服务器建立连接的代码。
( `& q% o* y, @! f# L( Y- int connect2MQTTServer(void)
" |0 z3 _" R2 i4 [( o - {
% j7 ?, R: {0 d4 A7 k! d0 M - int ret;8 j- f( P3 b; w
- uint8_t count=0;# [. r0 ^9 ]0 U) j
- MQTTPacket_connectData Connect_para = MQTTPacket_connectData_initializer;: z) c+ M$ n$ ]9 U" e+ n- z
+ h. N$ Q1 b- T, T8 a, i- Z- ret = 0;# h+ L8 y# v, w) C2 k$ f. m; z
: b& B' N( n/ @9 J- MQTTClientInit(&Client,&sNetwork,COMMAND_TIMEOUT_MS,MQTT_write_buf,sizeof(MQTT_write_buf)
: l* `/ s0 X$ _: F - ,MQTT_read_buf,sizeof(MQTT_read_buf));
, O/ n+ m: d% H; P/ H - //Connect_para.MQTTVersion = MQTT_3_1_1;
9 K( i! i: }0 x1 C$ O2 ] - Connect_para.clientID.cstring = MQTT_CLIENT_ID;
# z! |2 Q6 V# `( r! d; g' ] - Connect_para.username.cstring = MQTT_USER_NAME;" i, I- E' x; S$ _+ t3 K% X# u
- Connect_para.password.cstring = MQTT_PASSWORD;
, x- A0 r) Z, G- Z @) w( v - //MQTT connect,will option is set in Connect_para: } T( H1 c8 u" _
- do
i4 B. e( a- b) p. p g - {
9 c3 l" K7 }6 e: F+ E - count++;
! |, U: k Z F" H+ I1 X - msg_info("Attempt %d/%d ...\n",count,CONNECT_MAX_ATTEMPT_COUNT);, d! q( p- e3 Q$ `$ |: D" A
- if((ret = MQTTConnect(&Client, &Connect_para)) != 0)
* @- A0 F2 p4 x - {
' z4 h X1 y. c/ ~7 v9 H - msg_error("Client connection with Baidu MQTT Broker failed with error code: %d\n",ret);
- i. v9 ^+ a' r% d - }
" ^, w, M3 t1 E+ p/ F - }while((ret != 0)&&(count messageHandlers.fp != NULL)
( U" F% [. S% N6 M0 Y n5 S, |) ]
& r: T/ {. S f- return ret;
! s" o# d+ A0 r- [% v/ m z - }
复制代码
2 n* _9 ~8 V: s8 x8 Z& j发送数据
8 |! x7 e( ~0 |" [; K$ J/ l; s; d和服务器的 MQTT 连接建立成功后,就可以发布和订阅消息了。
3 K S6 T @8 @* M5 nPaho 中发布消息的函数是9 k' M1 X4 Y4 }. T( @8 B
- int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
复制代码 & M8 w$ N3 X: ?2 D i% d. l' [8 y
它有三个输入参数:
7 ^5 x3 ~: n& C/ r0 rMQTTClient* c:就是前面新建的 MQTTClient
) a+ }! X# A6 D0 j/ z' w: \' Econst char* topicName:mqtt 中每一条消息都是和某个主题相对应的,所以在发布消息的时候一定要指明这条消息是发往哪个主题* [, I# ^+ [4 S1 W4 \) V
MQTTMessage* message:将要被发送的消息
# v5 ?5 M3 c% r( Y* O/ Z除了消息的内容,还需要设定好消息传递的 Qos 级别以及服务器是否需要保存这条消息等。
" j* |6 Q9 p" M0 U; }下面是本例程中,向百度天工 IoT 的 MQTT 服务器发布消息的一段代码:. ~6 ]9 V6 |! B5 M
- int baiduiot_devicestatus_update(void)
/ f6 n2 Q" v/ O0 D: z' y( t) U' k - {
* ?( r' a: Q7 U# X - //pub_device_status();9 |; d/ ?% R- m# c' Z9 l
- int ret;
" C& D6 N/ R* j6 t6 o, x6 K) C# f8 B - MQTTMessage MQTT_msg;
7 S% D) F! h O) J - ret =0;& H' r8 v. F. Q" [$ Y% N
* f4 g3 l+ @* g4 G/ g+ D- MQTT_msg.qos = QOS1;//QoS1;- ]- D0 v, Y; G) v- m7 V% h
- MQTT_msg.dup = 0;//The DUP flag MUST be set to 1 by the Client or Server when it9 U- n5 U" u7 Z3 C. v
- attempts to re-deliver a PUBLISH Packet0 a/ c- n2 i7 h1 j7 \+ A5 T
- //The DUP flag MUST be set to 0 for all QoS 0 messages
( x6 y3 s% @5 S S4 O - MQTT_msg.retained = 1; //the Server MUST store 757 the Application Message and its QoS
) M/ D) @- j; f( S7 z - MQTT_msg.payload = payload_buf;
, ^, u# g# @- R
$ I- L7 l/ E+ F; `4 H- //TODO:prepare pub payload,@sz. h' p: V0 Y% O1 q
- PrepareMqttPayload(payload_buf, sizeof(payload_buf));
: t0 K, X `7 u* ~
+ Y/ v" d C' V0 X- MQTT_msg.payloadlen = strlen(payload_buf);
6 j }1 ~! H% u3 ]- R - 0 f4 G; m5 ~1 m. ~# E
- //Create message to publish
& y1 C$ e8 O' ~3 j: Z; F - //
8 r4 b0 `, X* l9 d: {7 \7 j
4 h/ O( K" e4 H8 c" M. G3 _& N* v9 j- if((ret=MQTTPublish(&Client, BAIDU_DEVICE_SHADOW_UPDATE_TOPIC,&MQTT_msg)) != 0)6 p+ V# X, h) W6 _2 v( D7 c2 i
- {5 I2 @* d6 F, F. v0 t, s/ N
- msg_error("Failed to publish data. %d\n",ret);
9 s6 I, ~, A2 Y% n9 w - }
; c, R+ g' K" I( L2 e/ t) L - else
; X- m' A' _4 f3 z9 t - {# l7 W* I& ~8 g! ? r
- msg_info("publish device status successfully.\n");
9 t- a2 ]# q1 Z2 M8 G' W1 K - }
* {5 a3 a4 H" h8 }4 \ - return ret;# U9 {, i+ f" w2 R0 v/ M0 m1 _
- }
复制代码 9 `8 u" t$ C; |( f$ O
接收数据
0 ~" y$ L& ]$ L2 ?$ L如果设备端希望能接收服务器发的某个主题的消息,需要做这么几件事:
9 n0 q& o$ z7 r' K$ P; f• 向服务器订阅这类消息所属的主题 Z7 b% S- V% r: s$ e! z
• 注册用来处理接收到的消息的回调函数& w0 u x: |0 G+ [% x
通过以下函数就可以实现向服务器订阅某个主题
6 s. m6 {, m- Q# V: t# `* R- int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
复制代码- int baiduiot_sub_shadow(void)
0 `! B% B1 ~6 |, |5 o0 v - {/ `" r! r' E3 `: B, b4 h3 D5 k
- //sub_baiduIOT_shadow_topic();' j- Y0 ~- I+ [( t/ M& B
- int ret;
0 r! \( `2 u% B v& J, q - ret = 0;1 l/ ^2 `! j V1 m% m
- % G8 r) I0 X4 g
- ! l) Y3 b& P$ C8 \3 f# X8 L9 e- B$ J
- if((ret=MQTTSubscribe(&Client,BAIDU_DEVICE_SHADOW_DELTA_TOPIC,QOS0,MQTTcallbackHandler))!
/ h4 C7 M6 ?4 [0 z - =0)$ M7 g# l9 a( s8 o
- printf("fail to subscribe to topic.\n");
1 s7 u& _+ M; S3 @% T; I - else( L9 M2 V3 ^# D% c3 M9 T8 R6 U
- printf("subscribe to topic: %s\n",BAIDU_DEVICE_SHADOW_DELTA_TOPIC);
6 w4 O( n& G( M9 C - 6 ~- p+ p9 g3 j8 e. Z
- return ret;9 J, w. z' ~) D# v
- }
复制代码 1 a1 d. R- f- u3 h' p; m; J7 E
并注册消息处理回调函数:MQTTcallbackHandler()。
; e, Y; f3 O5 Q1 V& ?注意,这里同样需要指定一个 QoS 级别,之后服务器向设备端推送消息的时候,就会按照这个 QoS 级别进行。在 MQTT 应用中,即使是同一个设备端和服务器之间的通信,发布消息和订阅消息也可以采用不同的 QoS 级别。
0 s: C& j. @7 ~+ J5 D3 X+ k6 y另外还有一个函数很重要. N1 n* Z, u( q7 Y" O
- int MQTTYield(MQTTClient* c, int timeout_ms)
复制代码 ( j$ p C: J% L# U |) ~ R6 U
该函数需要被定期调用,来接收服务器发来的数据。前面注册的消息处理回调函数 MQTTcallbackHandler()就是在
$ l! J. ~7 m1 U$ e. H4 U: xMQTTYield 调用时被执行的。
& }- G' ?( w7 ]- MQTTYield(c, MQTT_YIELD_DELAY) cycle(c, &timer) deliverMessage(c, &topicName, &msg)) `; U4 d& ]1 a/ ]
- ………: d, o% x: S: ?8 x3 M& J: t
- ……..; a: m9 X( E! h$ {5 ~% Z9 C% R
- if (c->messageHandlers<i>.</i>fp != NULL)8 K4 [/ a' b3 w
- {
3 j% ^. c* U+ h) r7 b5 F0 { - MessageData md;
, H0 V, R+ b+ [7 X: J" X - NewMessageData(&md, topicName, message);
+ ~% T0 f/ b4 ] |6 _, ~ - c->messageHandlers.fp(&md);
' l& \6 K" z, i& J5 j - rc = SUCCESS;1 T: h$ Z8 }0 T
- }
复制代码- MQTTSubscribe(…,MQTTcallbackHandler) MQTTSubscribeWithResults(…,MQTTcallbackHandler) # E6 S7 t3 \3 z5 i( |& F
- MQTTSetMessageHandler(…, messageHandler)
2 |9 ^. W; h8 j6 I; ^9 H) y - ……0 u$ Q! \' _/ _* g
- ……% T Z# X+ u$ P( ^
- c->messageHandlers<span style="font-style: italic;"><span style="font-style: normal;">.topicFilter = topicFilter;, N7 V) h3 g, ], v# }
- c->messageHandlers</span><span style="font-style: normal;">.fp = messageHandler;</span></span>
复制代码
@6 l5 d8 p; n C& {介绍到这里,相信大家已经能够使用 Paho 来实现和 MQTT 服务器的基本通信了。4 _4 Y9 S; n, y* S/ A7 W4 ]- o
最后小结一下:对于一个 MQTT 客户端,MQTT 应用先通过 MQTTClientInit()建立连接,发布消息就调用 MQTTPublish();订阅消息通过 MQTTSubscribe(),但是需要同时注册对收到的消息的处理函数,以函数参数的形式传给 MQTTSubscribe;对于所订阅的消息,会异步地从云端下发过来,MQTT 客户端需要定期调用 MQTTYield()来收取并处理。
2 n! }! ?3 M/ b/ \3 r- r* ~1 i0 Z/ {. X& {9 L Q
8 _% T5 w" N% e7 u) Q; |
; Z; F( P6 X0 O/ o' ]. N
/ b4 Q4 g' a; T |