【openwrt】基于OpenWRT的MQTT物联网协议详解和应用
大家好,这篇文章给大家介绍MQTT协议以及如何在OpenWrt系统中使用MQTT客户端和开发,并给出相关实例代码。
MQTT简介
MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,设计用于在低带宽和不稳定的网络环境中传输消息。它最初由IBM开发,用于连接远程设备和传感器到网络,并支持发布/订阅模型的消息通信。
MQTT被广泛用于物联网(IoT)领域,其中大量的设备需要进行实时通信和数据交换。它采用了一种发布/订阅(publish/subscribe)模型,其中消息的发送者(发布者)将消息发布到特定的主题(topic),而订阅者可以选择性地订阅感兴趣的主题,以接收相应的消息。
MQTT特点
以下是MQTT的一些关键特点: 轻量级:MQTT的设计非常轻量,协议头部非常小,传输的数据量很小,适用于带宽有限的网络环境,如低速、高延迟或不稳定的网络。 简单:MQTT的协议规范相对简单,易于实现和部署。它定义了少量的消息类型和协议操作,使得开发人员可以快速上手。 异步通信:MQTT使用异步通信模式,发布者发送消息后,不需要等待接收者的响应,可以继续执行其他操作。这种异步通信模式适合在资源有限的设备和网络中工作。 可靠性:MQTT支持三种不同的消息传递质量(QoS)级别:QoS 0(至多一次),QoS 1(至少一次)和QoS 2(只有一次)。这使得可以根据应用程序的要求选择适当的消息交付保证级别。 网络状况适应性:MQTT可以适应不稳定的网络状况,如网络中断、重连等。它具有断开连接后自动重连的机制,可以确保消息的可靠传输。
订阅和发布模型
Publisher(发布者):发布者是消息的发送者,它将消息发布到特定的主题(topic)上。可以有一个或多个发布者。 Subscriber(订阅者):订阅者是对消息感兴趣的实体,它选择性地订阅一个或多个主题。一旦订阅了主题,它就会接收到相应的消息。 MQTT Broker(MQTT代理):MQTT代理是中间件,负责接收发布者发送的消息,并将其路由到对应的订阅者。它维护着主题和订阅关系的注册表,并确保消息的可靠传递。
工作流程如下: 发布者将消息发布到特定的主题上。 MQTT代理接收到消息后,根据订阅者的注册信息,将消息路由到对应的订阅者。 订阅者接收到发布者发布的消息,并进行相应的处理。 通过发布/订阅模型,MQTT允许实现解耦和灵活性,发布者和订阅者之间不需要直接的点对点连接,而是通过MQTT代理进行中转和路由。这种模型非常适合在物联网中进行大规模设备间的通信和数据交换。
MQTT QoS
MQTT(Message Queuing Telemetry Transport)协议支持三种不同的QoS(Quality of Service)级别,用于控制消息的可靠性和传输保证。以下是MQTT的三个QoS级别: QoS 0(至多一次):在QoS 0级别下,消息以“至多一次”传输,没有确认机制。消息被发布后,发布者不会接收到关于消息是否成功传输或交付的确认。MQTT代理会尽最大努力将消息传输给订阅者,但可能会出现消息丢失或重复的情况。此级别适用于对消息传输的可靠性要求不高的场景,如传感器数据的临时更新等。 QoS 1(至少一次):在QoS 1级别下,消息以“至少一次”传输,确保至少传输一次。发布者发送消息后,会等待MQTT代理发送确认消息(PUBACK)来确认消息的接收。 如果发布者没有收到确认消息,它会再次发送相同的消息,直到收到确认为止。MQTT代理会确保消息至少传输一次给订阅者,但可能会出现重复传输的情况。此级别适用于对消息传输的可靠性要求较高的场景,如控制指令的传递。 QoS 2(只有一次):在QoS 2级别下,消息以“只有一次”传输,确保仅传输一次。发布者发送消息后,会等待MQTT代理发送两个确认消息(PUBREC和PUBCOMP)来确认消息的接收和完成。MQTT代理会确保消息仅传输一次给订阅者,没有重复传输的情况。 此级别提供了最高的消息传输可靠性,但也伴随着更高的网络开销。此级别适用于对消息传输的可靠性要求非常高的场景,如金融交易或严格的数据同步。选择合适的QoS级别取决于应用程序对消息传输可靠性和网络开销的要求。更高的QoS级别提供了更可靠的传输,但同时也增加了网络开销。因此,需要根据具体场景的需求来选择适当的级别。
OpenWrt中使用mosquitto插件安装
默认是没有包含mosquitto客户端和broker的,我们可以手动安装相关插件,为了测试我们需要安装broker和client
首先更新openwrt软件源 opkg update
然后调用以下命令分别安装mosquitto broker和client,这里我们选用nossl版本,也就是不需要ssl加密,方便测试 opkg install mosquitto-nossl
opkg install mosquitto-client-nossl
mosquitto服务
安装完成后就可以使用broker和client了,首先我们需要启动mosquitto broker服务,
mosquitto broker服务配置文件在/etc/mosquitto/目录中,我们可以修改服务器相关信息,比如监听端口号、接口、ip地址等。 - ls /etc/mosquitto/mosquitto.conf
- /etc/mosquitto/mosquitto.conf
复制代码
mosquitto客户端
mosquitto客户端包含sub和pub两部分,分别用于订阅和发布 订阅主题: - mosquitto_sub -h <MQTT Broker IP> -p <MQTT Broker Port> -t <Topic>
复制代码
其中,是MQTT Broker的IP地址,是MQTT Broker的端口号,是要订阅的主题名称。 示例: - mosquitto_sub -h 192.168.1.1 -p 1883 -t test/topic
复制代码
发布主题: - mosquitto_pub -h <MQTT Broker IP> -p <MQTT Broker Port> -t <Topic> -m <Message>
复制代码
其中,是MQTT Broker的IP地址,是MQTT Broker的端口号,是要发布的主题名称,是要发布的消息内容。 示例: - mosquitto_pub -h 192.168.1.1 -p 1883 -t test/topic -m "Hello, MQTT!"
复制代码
运行结果:
由于订阅和发布客户端都在本地,ip使用localhost地址127.0.0.1 - mosquitto_sub -h 127.0.0.1 -p 1883 -t test/topic &
- mosquitto_pub -h 127.0.0.1 -p 1883 -t test/topic -m "hello MQTT."
- hello MQTT.
复制代码
在订阅主题时,我们还可以使用通配符,最常用就是通配符"#",通过"#"可以匹配多级topic
比如订阅了主题"test/#",则可以收到"test/"开头的所有topic,比如"test/topic1"、"test/hello"等
以下实例中分别订阅了"test/#"和"test/topic1",当发布"test/topic1"消息时,二者都可以收到,而发布"test/topic2"时只有一个可以收到。
除了通配符"#"之外,还有"$"、"+"等通配符,不在这里详解。
使用云端公共broker测试
emqx提供了公共免费的broker供开发者测试,注意不要在生产环境使用,仅供测试
我们可以准备两台不同的设备,都连接broker.emqx.io,这两台设备可以在不同区域,通过公网broker可以轻松实现两台设备通信。 - 客户端1
客户端1订阅openwrt/topic消息
- mosquitto_sub -h broker.emqx.io -p 1883 -i client_001 -t openwrt/topic
复制代码
- 客户端2
发送一条消息到主题openwrt/topic,这样客户端1就可以收到该消息
- mosquitto_pub -h broker.emqx.io -p 1883 -t openwrt/topic -i client_002 -m "hello client1, i am froms client2"
复制代码
OpenWrt中基于libmosquitto开发
前面给大家演示了mosquitto客户端的使用,但命令行客户端仅供测试使用,我们在开发过程中需要自定义消息并且能够实时解析消息,而通过命令行就没那么方便消息的处理了,需要调用mosquitto底层api接口实现想要的功能。
libmosquitto库
在openwrt系统中默认集成了mosquitto库,可以直接依赖调用。
对应依赖的库为:
libmosquitto-nossl 不支持ssl加密
libmosquitto 支持ssl加密
api接口详解 mosquitto_lib_init:初始化libmosquitto库。在使用其他libmosquitto函数之前,应该首先调用此函数。 mosquitto_lib_version:获取libmosquitto库的版本号信息。 mosquitto_new:创建一个新的mosquitto对象(MQTT客户端)。 mosquitto_connect:与MQTT代理服务器建立连接。 mosquitto_disconnect:断开与MQTT代理服务器的连接。 mosquitto_publish:向指定主题发布消息。 mosquitto_subscribe:订阅一个或多个主题。 mosquitto_unsubscribe:取消订阅一个或多个主题。 mosquitto_loop_start:启动一个线程来处理MQTT消息循环。 mosquitto_loop_forever:开始一个阻塞的循环,处理MQTT消息。 mosquitto_loop:在非阻塞模式下处理MQTT消息。 mosquitto_message_callback_set:设置用于接收订阅消息的回调函数。 mosquitto_username_pw_set:设置连接时使用的用户名和密码。 mosquitto_tls_set:为MQTT连接启用SSL/TLS加密。 mosquitto_tls_opts_set:设置SSL/TLS选项,如CA证书、客户端证书和私钥等。 mosquitto_tls_insecure_set:设置是否允许SSL/TLS连接中的不安全选项。 mosquitto_will_set:设置遗嘱消息,即在客户端异常断开时发布的消息。
基于libmosquitto实现一个消息订阅程序
源码 - #include <unistd.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <mosquitto.h>
- #include <sys/time.h>
- #include <sys/sysinfo.h>
- struct mosquitto *g_test_mosq = NULL;
- void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result)
- {
- printf("connect to mqtt server ok\n");
- if (MOSQ_ERR_SUCCESS != mosquitto_subscribe(mosq, NULL, "openwrt/#", 0))
- {
- printf("sub topic openwrt/# failed...\n");
- }
- else{
- printf("sub topic openwrt/# failed...\n");
- }
- }
- void mqtt_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
- {
- if (result)
- printf("disconnect %s\n", mosquitto_connack_string(result));
- else
- printf("disconnect from mqtt server.\n");
- }
- void mqtt_sub_callback(struct mosquitto *mosq,
- void *userdata,
- int mid,
- int qos_count,
- const int *granted_qos)
- {
- printf("sub callback\n");
- }
- void mqtt_msg_callback(struct mosquitto *mosq,
- void *userdata,
- struct mosquitto_message *message)
- {
- printf("callback recv mqtt msg, topic = %s, payload = %s\n", message->topic, message->payload);
- }
- struct mosquitto *connect_to_mqtt_server(char *server_ip)
- {
- struct mosquitto *mosq = NULL;
- int rc;
- char mqtt_user[128] = {0};
- char mqtt_pwd[128] = {0};
- char client_id[128] = {0};
- struct timeval tv;
- gettimeofday(&tv, NULL);
- mosquitto_lib_init();
- snprintf(client_id, sizeof(client_id), "test_%d", tv.tv_sec);
- printf("connect to mqtt server..client_id=%s\n", client_id);
- mosq = mosquitto_new(client_id, true, NULL);
- if (!mosq)
- {
- return NULL;
- }
- #if 0
- rc = mosquitto_username_pw_set(mosq, "test", "test");
- if (rc)
- {
- mosquitto_destroy(mosq);
- return NULL;
- }
- #endif
- mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
- mosquitto_message_callback_set(mosq, mqtt_msg_callback);
- mosquitto_subscribe_callback_set(mosq, mqtt_sub_callback);
- mosquitto_disconnect_callback_set(mosq, mqtt_disconnect_callback);
- rc = mosquitto_connect(mosq, server_ip, 1883, 30);
- if (rc)
- {
- printf("Unable to connect mqtt server rc=%d\n", rc);
- mosquitto_destroy(mosq);
- return NULL;
- }
- return mosq;
- }
- int mqtt_bcast_msg(char *api, char *data, int len)
- {
- char topic[128] = {0};
- int mid;
- if (!api || !data || len == 0)
- return -1;
- if (!g_test_mosq)
- return -1;
- sprintf(topic, "openwrt/%s", api);
- return mosquitto_publish(g_test_mosq, &mid, topic, len, data, 0, 0);
- }
- int main(int argc, char *argv[]){
- char *host = NULL;
- if (argc < 2){
- host = "127.0.0.1";
- printf("use default ip: 127.0.0.1\n");
- }
- else{
- host = argv[1];
- printf("use ip: %s\n", host);
- }
- g_test_mosq = connect_to_mqtt_server(host);
- if (!g_test_mosq){
- printf("connect to server %s failed\n", host);
- exit(0);
- }
- mosquitto_loop_forever(g_test_mosq, -1, 1);
- mosquitto_destroy(g_test_mosq);
- mosquitto_lib_cleanup();
- return 0;
- }
复制代码
实例源码编译
将源码包拷贝到openwrt源码package目录
开启mqtt_test宏并生成默认依赖配置 - echo "CONFIG_PACKAGE_mqtt_test=y" >>.config
- make defconfig
复制代码
编译 - make package/mqtt_test/compile V=s
复制代码
插件安装:
mqtt_test依赖了libmosquitto库,而libmosquitto依赖了libcares,所以需要安装三个插件 - libcares
- libmosquitto-nossl
- mqtt_test
将插件通过winscp或其他工具上传到openwrt系统中,执行以下命令安装(以X86为例) - opkg install libcares_1.18.1-1_x86_64.ipk
- opkg install libmosquitto-nossl_2.0.15-1_x86_64.ipk
- opkg install mqtt_test_1.0-1_x86_64.ipk
复制代码
运行: mqtt_test默认连接本地broker,也可以指定ip 运行如果出现错误,表示服务器没有启动或者参数异常,请先确认mosquitto服务已经启动。 - use default ip: 127.0.0.1
- connect to mqtt server..client_id=test_1686993389
- Unable to connect mqtt server rc=14
- connect to server 127.0.0.1 failed
复制代码
运行成功 - mqtt_test
- use default ip: 127.0.0.1
- connect to mqtt server..client_id=test_1686993598
- connect to mqtt server ok
- sub callback
复制代码
现在就启动了一个mqtt客户端,订阅了openwrt/#
通过mosquitto_pub工具可以发送指令到该客户端,客户端当前处理方式是输出收到的消息,当然实际开发是解析指令并执行对应的命令,比如接收到reboot命令后执行重启。 pub命令如下: - root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "hello openwrt"
- root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "你好"
- root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "你好"
- root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "reboot"
- root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "reboot"
复制代码
客户端输出如下: - callback recv mqtt msg, topic = openwrt/send_msg, payload = hello openwrt
- callback recv mqtt msg, topic = openwrt/send_msg, payload = 你好
- callback recv mqtt msg, topic = openwrt/send_msg, payload = 你好
- callback recv mqtt msg, topic = openwrt/send_msg, payload = reboot
- callback recv mqtt msg, topic = openwrt/send_msg, payload = reboot
复制代码
如果客户端连接云端的broker,就可以实现远程操作设备,比如远程重启设备、配置下发等。
总结
在物联网开发中我们会经常用的MQTT协议,常见的就是边缘设备和云端通信,上报实时状态、远程管理等,当然也可以局域网间通信,实现节点间通信,比如可以通过MQTT协议实现mesh数据同步、AC集中管理等,有了MQTT协议我们不需要自己通过底层socket实现私有协议,可以只关注业务处理,可大大提高程序稳定性。当然MQTT也有一些缺陷,就是订阅发布模型实现实时回复消息比较麻烦,不适合做一些状态较复杂的交互。
如果你也想从事OpenWrt开发,可以加入我的知识星球(OpenWrt开发圈),里面会定期分享我进行OpenWrt开发的笔记和源码,涉及OpenWrt插件开发、Linux应用开发、Linux内核开发、计算机网络等,还可以随时向我提问,少走弯路。
|