谷动谷力

 找回密码
 立即注册
查看: 1270|回复: 0
打印 上一主题 下一主题
收起左侧

MQTT协议及EMQ应用

[复制链接]
跳转到指定楼层
楼主
发表于 2024-3-26 11:10:14 | 只看该作者 |只看大图 回帖奖励 |倒序浏览 |阅读模式
MQTT协议及EMQ应用


MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布/订阅信息传输协议。MQTT在时间和空间上,将消息发送者与接受者分离,可以在不可靠的网络环境中进行扩展。适用于设备硬件存储空间有限或网络带宽有限的场景。物联网平台支持设备使用MQTT协议接入。
官网:mqtt.org
wiki各种语言版本开发库
1. 基础概念
QoS
level 0:最多一次的传输
level 1:至少一次的传输
level 2:只有一次的传输
QoS0:对于client而言,有且仅发一次publish包,对于broker而言,有且仅发一次publish,简而言之,就是仅发一次包,是否收到完全不管,适合那些不是很重要的数据。
QoS1:对于qos0而言,多了一次ack的作用,但是会有个问题,尽管我们可以通过确认来保证一定收到客户端或服务器的message,但是我们却不能保证message仅有一次,
也就是当client没收到service的puback或者service没有收到client的puback,那么就会一直发送publisher
流程:(publisher -> broker)
publisher store msg -> publish ->broker (传递message)
broker -> puback -> publisher delete msg (确认传递成功)
注意:
publisher必须保存msg,这样才能在重发。
publisher如果在一定时间或socket断开等异常情况,会继续重发msg。

QoS2:对于qos1而言,qos2可以实现仅仅接受一次message,其主要原理,
publisher和broker进行了缓存,其中publisher缓存了message和msgID,而broker缓存了msgID,两方都做记录所以可以保证消息不重复,
但是由于记录是需要删除的,这个删除流程同样多了一倍。
流程:(publisher -> broker)
  • publisher store msg -> publish ->broker -> broker store msgID(传递message)
  • broker -> puberc (确认传递成功)
  • publisher -> pubrel -> broker delete msgID (告诉broker删除msgID)
  • broker -> pubcomp -> publisher delete msg (告诉publisher删除msg)
注意:第二步,即puberc不可以删除 publisher的msg,因为第三步未必成功,这个时候就需要第一步提醒第二步继续发,而提醒必须要msgID


Topic
物联网平台中,服务端和设备端通过 Topic 来实现消息通信。Topic是针对设备的概念,Topic类是针对产品的概念。
产品的Topic类会自动映射到产品下的所有设备中,生成用于消息通信的具体设备Topic。 为了方便海量设备基于海量Topic进行通信,简化授权操作,物联网平台增加了产品Topic类的概念。
Topic类是一类Topic的集合。例如,产品的自定义Topic类/${YourProductKey}/${YourDeviceName}/user/update是具体Topic/${YourProductKey}/device1/user/update和
/${YourProductKey}/device2/user/update的集合。 在您创建设备后,产品的所有Topic类会自动映射到设备上。您无需单独为每个设备授权Topic。
阿里物联网平台,关于Topic类的说明:
  • Topic类中,以正斜线(/)进行分层,区分每个类目。其中,有两个类目为既定类目:${YourProductKey}表示产品的标识符ProductKey;${YourDeviceName}表示设备名称。
  • 类目命名只能包含字母,数字和下划线(_)。每级类目不能为空。
  • 设备操作权限:发布表示设备可以往Topic发布消息;订阅表示设备可以从Topic订阅消息。
MQTT会话(Clean Session)
MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。
‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。
当clean session的值为true,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。
当clean session的值为false,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收。(简单测试,默认配置下,mosquitto client可接收到QoS=0,1,2的消息,注意是离线再上线,而不是程序退出后再启动)
MQTT保留消息(Retained Message)
MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。(简单测试,默认配置下,mosquitto仅保留最后一条retained 消息(publish的retain为true))
Mnesia:retained_message
终端设备publish消息时,如果retain值是true,则服务器会一直记忆,哪怕是服务器重启。因为Mnesia会本地持久化。
如果服务器接收到终端publish某主题的消息,payload为空且retain值是true,则会删除这条持久化的消息。
如果服务器接收到终端publish某主题的消息,payload为空且retain值是false,则不会删除这条持久化的消息。
Will Message
客户端连接异常断开时,由Broker将其遗嘱消息发送给订阅方。客户端与Broker建立连接时将遗嘱消息包含在CONNECT消息中发送给Broker。
Username/Password
建立连接时,CONNECT消息中可以携带用户名和密码用于身份校验,但是默认是以明文方式传输,推荐使用TLS方式或者将密码加密后再进行传输。
2. MQTT控制包
MQTT协议通过控制包交换数据。
The MQTT protocol works by exchanging a series of MQTT Control Packets in a defined way.
包flag
IP数据包
3. EMQ应用
emq官网:http://emqtt.io,基于Erlang/OTP开发,实现了MQTT V3.1和V3.1.1,支持MQTT-SN、COAP、WebSocket、STOMP和SockJS。提供量级、可依赖、公司级别的MQTT消息。
v2.0版本支持超过1.3million连接,需要配置内核、TCP stack、Erlang VM等,参考:http://emqtt.io/docs/v2/getstarted.html。服务器和client分离,常用于服务器。
EMQ支持windows和linux,使用方法相似,如下linux为例:
docker测试:
docker run -d --rm --name emqx -p 18083:18083-p 1883:1883 emqx/emqx
dashboard:
admin public
因为采用Erlang语言设计,提供的client库也是erlang版本,没有提供c版本,但兼容mosquitto库,因此可用mosquitto_pub/mosquito_sub收发消息。
通过dashboard中的TOOLS/Websocket订阅查查看消息(这是EMQ提供的测试工具,可用于测试mqtt收发),首先需要连接broker:
注:直接采用默认值连接即可。但要注意HOST主机IP必须是emqx主机IP,若采用docker,则为emqx docker IP,否则连接不成功。
客户端模拟器
mqttx是由 EMQ (https://www.emqx.io/cn/) 开源的一款跨平台 MQTT 5.0 桌面客户端工具,它支持 macOS,Linux,Windows,是目前为止市面上最漂亮的 MQTT 客户端工具。
消息转存mysql或clickhouse
EMQX企业版支持将数据持久化到mysql服务器(支持批量存取,效率高)。而emqx免费版不支持,可以通过webhook转发数据到web server实现数据存储到mysql,此方法只支持单条数据的转发。
1) EMQX添加rule engine的resource。webhook,http://172.17.0.1:8090/mqtt。(注:若docker部署,此处的IP为宿主机docker IP)。
2)EMQX添加rule。SQL:SELECT payload.code as code, payload.message as message, payload.data as data  FROM  "deviceID"
3)EMQX添加rule action。data to web server, POST。
4)通过websocket测试即可。

参考:
11. 中移动MQTT库,采用EPOLL框架监控输入和MQTT cm-heclouds/MQTT
13. 中移动OneNet SDK



+10
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|深圳市光明谷科技有限公司|光明谷商城|Sunshine Silicon Corpporation ( 粤ICP备14060730号|Sitemap

GMT+8, 2025-1-28 11:24 , Processed in 0.109730 second(s), 44 queries .

Powered by Discuz! X3.2 Licensed

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表