MQTT协议及EMQ应用
MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布/订阅信息传输协议。MQTT在时间和空间上,将消息发送者与接受者分离,可以在不可靠的网络环境中进行扩展。适用于设备硬件存储空间有限或网络带宽有限的场景。物联网平台支持设备使用MQTT协议接入。 wiki各种语言版本开发库 1. 基础概念QoS level 0:最多一次的传输
level 1:至少一次的传输
level 2:只有一次的传输 QoS0:对于client而言,有且仅发一次publish包,对于broker而言,有且仅发一次publish,简而言之,就是仅发一次包,是否收到完全不管,适合那些不是很重要的数据。 QoS1:对于qos0而言,多了一次ack的作用,但是会有个问题,尽管我们可以通过确认来保证一定收到客户端或服务器的message,但是我们却不能保证message仅有一次, 也就是当client没收到service的puback或者service没有收到client的puback,那么就会一直发送publisher 流程:(publisher -> broker) publisher store msg -> publish ->broker (传递message) broker -> puback -> publisher delete msg (确认传递成功) 注意: publisher必须保存msg,这样才能在重发。 publisher如果在一定时间或socket断开等异常情况,会继续重发msg。
QoS2:对于qos1而言,qos2可以实现仅仅接受一次message,其主要原理,
publisher和broker进行了缓存,其中publisher缓存了message和msgID,而broker缓存了msgID,两方都做记录所以可以保证消息不重复, 但是由于记录是需要删除的,这个删除流程同样多了一倍。 流程:(publisher -> broker) - publisher store msg -> publish ->broker -> broker store msgID(传递message)
- broker -> puberc (确认传递成功)
- publisher -> pubrel -> broker delete msgID (告诉broker删除msgID)
- broker -> pubcomp -> publisher delete msg (告诉publisher删除msg)
注意:第二步,即puberc不可以删除 publisher的msg,因为第三步未必成功,这个时候就需要第一步提醒第二步继续发,而提醒必须要msgID
Topic 物联网平台中,服务端和设备端通过 Topic 来实现消息通信。Topic是针对设备的概念,Topic类是针对产品的概念。 产品的Topic类会自动映射到产品下的所有设备中,生成用于消息通信的具体设备Topic。 为了方便海量设备基于海量Topic进行通信,简化授权操作,物联网平台增加了产品Topic类的概念。 Topic类是一类Topic的集合。例如,产品的自定义Topic类/${YourProductKey}/${YourDeviceName}/user/update是具体Topic/${YourProductKey}/device1/user/update和 /${YourProductKey}/device2/user/update的集合。 在您创建设备后,产品的所有Topic类会自动映射到设备上。您无需单独为每个设备授权Topic。 阿里物联网平台,关于Topic类的说明: - Topic类中,以正斜线(/)进行分层,区分每个类目。其中,有两个类目为既定类目:${YourProductKey}表示产品的标识符ProductKey;${YourDeviceName}表示设备名称。
- 类目命名只能包含字母,数字和下划线(_)。每级类目不能为空。
- 设备操作权限:发布表示设备可以往Topic发布消息;订阅表示设备可以从Topic订阅消息。
MQTT会话(Clean Session) MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。 ‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。 ‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。 不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。 当clean session的值为true,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。 当clean session的值为false,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收。(简单测试,默认配置下,mosquitto client可接收到QoS=0,1,2的消息,注意是离线再上线,而不是程序退出后再启动) MQTT保留消息(Retained Message) MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。(简单测试,默认配置下,mosquitto仅保留最后一条retained 消息(publish的retain为true)) Mnesia:retained_message 终端设备publish消息时,如果retain值是true,则服务器会一直记忆,哪怕是服务器重启。因为Mnesia会本地持久化。 如果服务器接收到终端publish某主题的消息,payload为空且retain值是true,则会删除这条持久化的消息。 如果服务器接收到终端publish某主题的消息,payload为空且retain值是false,则不会删除这条持久化的消息。 Will Message 客户端连接异常断开时,由Broker将其遗嘱消息发送给订阅方。客户端与Broker建立连接时将遗嘱消息包含在CONNECT消息中发送给Broker。 Username/Password 建立连接时,CONNECT消息中可以携带用户名和密码用于身份校验,但是默认是以明文方式传输,推荐使用TLS方式或者将密码加密后再进行传输。 2. MQTT控制包MQTT协议通过控制包交换数据。
The MQTT protocol works by exchanging a series of MQTT Control Packets in a defined way. 包flag IP数据包 3. EMQ应用emq官网: http://emqtt.io,基于Erlang/OTP开发,实现了MQTT V3.1和V3.1.1,支持MQTT-SN、COAP、WebSocket、STOMP和SockJS。提供量级、可依赖、公司级别的MQTT消息。 EMQ支持windows和linux,使用方法相似,如下linux为例: docker测试: docker run -d --rm --name emqx -p 18083:18083-p 1883:1883 emqx/emqx
dashboard: admin public 因为采用Erlang语言设计,提供的client库也是erlang版本,没有提供c版本,但兼容mosquitto库,因此可用mosquitto_pub/mosquito_sub收发消息。 通过dashboard中的TOOLS/Websocket订阅查查看消息(这是EMQ提供的测试工具,可用于测试mqtt收发),首先需要连接broker: 注:直接采用默认值连接即可。但要注意HOST主机IP必须是emqx主机IP,若采用docker,则为emqx docker IP,否则连接不成功。 客户端模拟器消息转存mysql或clickhouse EMQX企业版支持将数据持久化到mysql服务器(支持批量存取,效率高)。而emqx免费版不支持,可以通过webhook转发数据到web server实现数据存储到mysql,此方法只支持单条数据的转发。 2)EMQX添加rule。SQL:SELECT payload.code as code, payload.message as message, payload.data as data FROM "deviceID" 3)EMQX添加rule action。data to web server, POST。 4)通过websocket测试即可。
参考: 13. 中移动OneNet SDK
|