MQTT协议及EMQ应用
MQTT协议及EMQ应用MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布/订阅信息传输协议。MQTT在时间和空间上,将消息发送者与接受者分离,可以在不可靠的网络环境中进行扩展。适用于设备硬件存储空间有限或网络带宽有限的场景。物联网平台支持设备使用MQTT协议接入。官网:mqtt.orgwiki各种语言版本开发库https://github.com/mqtt/mqtt.github.io/wiki/libraries?spm=a2c4g.11186623.2.11.2d73174cN52z601. 基础概念QoSlevel 0:最多一次的传输
level 1:至少一次的传输
level 2:只有一次的传输QoS0:对于client而言,有且仅发一次publish包,对于broker而言,有且仅发一次publish,简而言之,就是仅发一次包,是否收到完全不管,适合那些不是很重要的数据。QoS1:对于qos0而言,多了一次ack的作用,但是会有个问题,尽管我们可以通过确认来保证一定收到客户端或服务器的message,但是我们却不能保证message仅有一次,也就是当client没收到service的puback或者service没有收到client的puback,那么就会一直发送publisher流程:(publisher -> broker)publisher store msg -> publish ->broker (传递message)broker -> puback -> publisher delete msg (确认传递成功)注意:publisher必须保存msg,这样才能在重发。publisher如果在一定时间或socket断开等异常情况,会继续重发msg。
QoS2:对于qos1而言,qos2可以实现仅仅接受一次message,其主要原理,
publisher和broker进行了缓存,其中publisher缓存了message和msgID,而broker缓存了msgID,两方都做记录所以可以保证消息不重复,但是由于记录是需要删除的,这个删除流程同样多了一倍。流程:(publisher -> broker)
[*]publisher store msg -> publish ->broker -> broker store msgID(传递message)
[*]broker -> puberc (确认传递成功)
[*]publisher -> pubrel -> broker delete msgID (告诉broker删除msgID)
[*]broker -> pubcomp -> publisher delete msg (告诉publisher删除msg)
注意:第二步,即puberc不可以删除 publisher的msg,因为第三步未必成功,这个时候就需要第一步提醒第二步继续发,而提醒必须要msgID
Topic物联网平台中,服务端和设备端通过 Topic 来实现消息通信。Topic是针对设备的概念,Topic类是针对产品的概念。产品的Topic类会自动映射到产品下的所有设备中,生成用于消息通信的具体设备Topic。 为了方便海量设备基于海量Topic进行通信,简化授权操作,物联网平台增加了产品Topic类的概念。Topic类是一类Topic的集合。例如,产品的自定义Topic类/${YourProductKey}/${YourDeviceName}/user/update是具体Topic/${YourProductKey}/device1/user/update和/${YourProductKey}/device2/user/update的集合。 在您创建设备后,产品的所有Topic类会自动映射到设备上。您无需单独为每个设备授权Topic。阿里物联网平台,关于Topic类的说明:
[*]Topic类中,以正斜线(/)进行分层,区分每个类目。其中,有两个类目为既定类目:${YourProductKey}表示产品的标识符ProductKey;${YourDeviceName}表示设备名称。
[*]类目命名只能包含字母,数字和下划线(_)。每级类目不能为空。
[*]设备操作权限:发布表示设备可以往Topic发布消息;订阅表示设备可以从Topic订阅消息。
MQTT会话(Clean Session)MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。当clean session的值为true,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。当clean session的值为false,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收。(简单测试,默认配置下,mosquitto client可接收到QoS=0,1,2的消息,注意是离线再上线,而不是程序退出后再启动)MQTT保留消息(Retained Message)MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。(简单测试,默认配置下,mosquitto仅保留最后一条retained 消息(publish的retain为true))Mnesia:retained_message终端设备publish消息时,如果retain值是true,则服务器会一直记忆,哪怕是服务器重启。因为Mnesia会本地持久化。如果服务器接收到终端publish某主题的消息,payload为空且retain值是true,则会删除这条持久化的消息。如果服务器接收到终端publish某主题的消息,payload为空且retain值是false,则不会删除这条持久化的消息。Will Message客户端连接异常断开时,由Broker将其遗嘱消息发送给订阅方。客户端与Broker建立连接时将遗嘱消息包含在CONNECT消息中发送给Broker。Username/Password建立连接时,CONNECT消息中可以携带用户名和密码用于身份校验,但是默认是以明文方式传输,推荐使用TLS方式或者将密码加密后再进行传输。 2. MQTT控制包MQTT协议通过控制包交换数据。
The MQTT protocol works by exchanging a series of MQTT Control Packets in a defined way. 包flag IP数据包3. EMQ应用emq官网:http://emqtt.io,基于Erlang/OTP开发,实现了MQTT V3.1和V3.1.1,支持MQTT-SN、COAP、WebSocket、STOMP和SockJS。提供量级、可依赖、公司级别的MQTT消息。v2.0版本支持超过1.3million连接,需要配置内核、TCP stack、Erlang VM等,参考:http://emqtt.io/docs/v2/getstarted.html。服务器和client分离,常用于服务器。EMQ支持windows和linux,使用方法相似,如下linux为例:docker测试:docker run -d --rm --name emqx -p 18083:18083-p 1883:1883 emqx/emqx
dashboard:http://localhost:18083admin public因为采用Erlang语言设计,提供的client库也是erlang版本,没有提供c版本,但兼容mosquitto库,因此可用mosquitto_pub/mosquito_sub收发消息。通过dashboard中的TOOLS/Websocket订阅查查看消息(这是EMQ提供的测试工具,可用于测试mqtt收发),首先需要连接broker: 注:直接采用默认值连接即可。但要注意HOST主机IP必须是emqx主机IP,若采用docker,则为emqx docker IP,否则连接不成功。客户端模拟器https://mqttx.app/ https://github.com/emqx/MQTTXmqttx是由 EMQ (https://www.emqx.io/cn/) 开源的一款跨平台 MQTT 5.0 桌面客户端工具,它支持 macOS,Linux,Windows,是目前为止市面上最漂亮的 MQTT 客户端工具。消息转存mysql或clickhouse EMQX企业版支持将数据持久化到mysql服务器(支持批量存取,效率高)。而emqx免费版不支持,可以通过webhook转发数据到web server实现数据存储到mysql,此方法只支持单条数据的转发。参考:EMQX webhook实现转发消息到java web服务器并保存到MySQL数据库1) EMQX添加rule engine的resource。webhook,http://172.17.0.1:8090/mqtt。(注:若docker部署,此处的IP为宿主机docker IP)。2)EMQX添加rule。SQL:SELECT payload.code as code, payload.message as message, payload.data as dataFROM"deviceID"3)EMQX添加rule action。data to web server, POST。4)通过websocket测试即可。
参考:
[*]http://www.mosquitto.org/
[*]mosquitto简单应用
[*]MQTT协议规范 阿里云物理网
[*]什么是Topic
[*]我的mqtt协议和emqttd开源项目个人理解(1) - Clean Session和Retained Message
[*]Mosquitto pub/sub服务实现代码浅析-主体框架
[*]mosquito version 1.2.3服务器负载测试报告
[*]Build up a MQTT server with about 20, 000+ subscribers
[*]Mqtt精髓系列之连接建立过程
[*]mqtt.github.io wiki各种语言版本开发库Eclipse Paho C
https://github.com/mqtt/mqtt.github.io/wiki/libraries?spm=a2c4g.11186623.2.11.2d73174cN52z6011. 中移动MQTT库,采用EPOLL框架监控输入和MQTT cm-heclouds/MQTT12. 中移动OneNet平台 http://open.iot.10086.cn/doc/multiprotocol/ 13. 中移动OneNet SDK https://open.iot.10086.cn/doc/book/device-develop/multpro/sdk-doc-tool/sdk.html#%E8%AE%BE%E5%A4%87%E6%8E%A5%E5%85%A5sdk14. EMQ集群及压力测试总结 https://blog.csdn.net/xianglingchuan/article/details/8221934415. SmartGo是一款集成云计算(Cloud),大数据(Bigdata),提供主流消息队列功能及满足物联网MQTT数千万长连接设备消息推送,及人工智能(SmartAI)数据分析处理等功能,使用golang语言全新开发的分布式智能平台。16.Free and High Performance MQTT Broker 基于 golang的高性能MQTT Broker17. 物联网平台AbleCloud http://docs.ablecloud.cn/current/console/site/develop/
页:
[1]