谷动谷力

 找回密码
 立即注册
查看: 1675|回复: 0
打印 上一主题 下一主题
收起左侧

【openwrt】基于OpenWRT的MQTT物联网协议详解和应用

[复制链接]
跳转到指定楼层
楼主
发表于 2023-7-12 22:59:01 | 只看该作者 |只看大图 回帖奖励 |倒序浏览 |阅读模式
【openwrt】基于OpenWRT的MQTT物联网协议详解和应用



大家好,这篇文章给大家介绍MQTT协议以及如何在OpenWrt系统中使用MQTT客户端和开发,并给出相关实例代码。


MQTT简介

MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,设计用于在低带宽和不稳定的网络环境中传输消息。它最初由IBM开发,用于连接远程设备和传感器到网络,并支持发布/订阅模型的消息通信。


MQTT被广泛用于物联网(IoT)领域,其中大量的设备需要进行实时通信和数据交换。它采用了一种发布/订阅(publish/subscribe)模型,其中消息的发送者(发布者)将消息发布到特定的主题(topic),而订阅者可以选择性地订阅感兴趣的主题,以接收相应的消息。


MQTT特点

以下是MQTT的一些关键特点:

轻量级:MQTT的设计非常轻量,协议头部非常小,传输的数据量很小,适用于带宽有限的网络环境,如低速、高延迟或不稳定的网络。

简单:MQTT的协议规范相对简单,易于实现和部署。它定义了少量的消息类型和协议操作,使得开发人员可以快速上手。

异步通信:MQTT使用异步通信模式,发布者发送消息后,不需要等待接收者的响应,可以继续执行其他操作。这种异步通信模式适合在资源有限的设备和网络中工作。

可靠性:MQTT支持三种不同的消息传递质量(QoS)级别:QoS 0(至多一次),QoS 1(至少一次)和QoS 2(只有一次)。这使得可以根据应用程序的要求选择适当的消息交付保证级别。

网络状况适应性:MQTT可以适应不稳定的网络状况,如网络中断、重连等。它具有断开连接后自动重连的机制,可以确保消息的可靠传输。


订阅和发布模型

Publisher(发布者):发布者是消息的发送者,它将消息发布到特定的主题(topic)上。可以有一个或多个发布者。

Subscriber(订阅者):订阅者是对消息感兴趣的实体,它选择性地订阅一个或多个主题。一旦订阅了主题,它就会接收到相应的消息。

MQTT Broker(MQTT代理):MQTT代理是中间件,负责接收发布者发送的消息,并将其路由到对应的订阅者。它维护着主题和订阅关系的注册表,并确保消息的可靠传递。

工作流程如下:

发布者将消息发布到特定的主题上。

MQTT代理接收到消息后,根据订阅者的注册信息,将消息路由到对应的订阅者。

订阅者接收到发布者发布的消息,并进行相应的处理。

通过发布/订阅模型,MQTT允许实现解耦和灵活性,发布者和订阅者之间不需要直接的点对点连接,而是通过MQTT代理进行中转和路由。这种模型非常适合在物联网中进行大规模设备间的通信和数据交换。


MQTT QoS

MQTT(Message Queuing Telemetry Transport)协议支持三种不同的QoS(Quality of Service)级别,用于控制消息的可靠性和传输保证。以下是MQTT的三个QoS级别:

QoS 0(至多一次):

在QoS 0级别下,消息以“至多一次”传输,没有确认机制。消息被发布后,发布者不会接收到关于消息是否成功传输或交付的确认。MQTT代理会尽最大努力将消息传输给订阅者,但可能会出现消息丢失或重复的情况。此级别适用于对消息传输的可靠性要求不高的场景,如传感器数据的临时更新等。

QoS 1(至少一次):

在QoS 1级别下,消息以“至少一次”传输,确保至少传输一次。发布者发送消息后,会等待MQTT代理发送确认消息(PUBACK)来确认消息的接收。

如果发布者没有收到确认消息,它会再次发送相同的消息,直到收到确认为止。MQTT代理会确保消息至少传输一次给订阅者,但可能会出现重复传输的情况。此级别适用于对消息传输的可靠性要求较高的场景,如控制指令的传递。

QoS 2(只有一次):

在QoS 2级别下,消息以“只有一次”传输,确保仅传输一次。发布者发送消息后,会等待MQTT代理发送两个确认消息(PUBREC和PUBCOMP)来确认消息的接收和完成。MQTT代理会确保消息仅传输一次给订阅者,没有重复传输的情况。

此级别提供了最高的消息传输可靠性,但也伴随着更高的网络开销。此级别适用于对消息传输的可靠性要求非常高的场景,如金融交易或严格的数据同步。选择合适的QoS级别取决于应用程序对消息传输可靠性和网络开销的要求。更高的QoS级别提供了更可靠的传输,但同时也增加了网络开销。因此,需要根据具体场景的需求来选择适当的级别。



OpenWrt中使用mosquitto插件安装

默认是没有包含mosquitto客户端和broker的,我们可以手动安装相关插件,为了测试我们需要安装broker和client
首先更新openwrt软件源

opkg update

然后调用以下命令分别安装mosquitto broker和client,这里我们选用nossl版本,也就是不需要ssl加密,方便测试

opkg install mosquitto-nossl
opkg install mosquitto-client-nossl


mosquitto服务

安装完成后就可以使用broker和client了,首先我们需要启动mosquitto broker服务,

mosquitto broker服务配置文件在/etc/mosquitto/目录中,我们可以修改服务器相关信息,比如监听端口号、接口、ip地址等。

  1. ls /etc/mosquitto/mosquitto.conf
  2. /etc/mosquitto/mosquitto.conf
复制代码


  
mosquitto客户端

mosquitto客户端包含sub和pub两部分,分别用于订阅和发布

订阅主题:

  1. mosquitto_sub -h <MQTT Broker IP> -p <MQTT Broker Port> -t <Topic>
复制代码

其中,是MQTT Broker的IP地址,是MQTT Broker的端口号,是要订阅的主题名称。

示例:

  1. mosquitto_sub -h 192.168.1.1 -p 1883 -t test/topic
复制代码

发布主题:

  1. mosquitto_pub -h <MQTT Broker IP> -p <MQTT Broker Port> -t <Topic> -m <Message>
复制代码

其中,是MQTT Broker的IP地址,是MQTT Broker的端口号,是要发布的主题名称,是要发布的消息内容。

示例:

  1. mosquitto_pub -h 192.168.1.1 -p 1883 -t test/topic -m "Hello, MQTT!"
复制代码

运行结果:
由于订阅和发布客户端都在本地,ip使用localhost地址127.0.0.1

  1. mosquitto_sub -h 127.0.0.1 -p 1883 -t test/topic &
  2. mosquitto_pub -h 127.0.0.1 -p 1883 -t test/topic -m "hello MQTT."
  3. hello MQTT.
复制代码


在订阅主题时,我们还可以使用通配符,最常用就是通配符"#",通过"#"可以匹配多级topic
比如订阅了主题"test/#",则可以收到"test/"开头的所有topic,比如"test/topic1"、"test/hello"等
以下实例中分别订阅了"test/#"和"test/topic1",当发布"test/topic1"消息时,二者都可以收到,而发布"test/topic2"时只有一个可以收到。

除了通配符"#"之外,还有"$"、"+"等通配符,不在这里详解。


使用云端公共broker测试

emqx提供了公共免费的broker供开发者测试,注意不要在生产环境使用,仅供测试
我们可以准备两台不同的设备,都连接broker.emqx.io,这两台设备可以在不同区域,通过公网broker可以轻松实现两台设备通信。

  • 客户端1
    客户端1订阅openwrt/topic消息
  1. mosquitto_sub -h broker.emqx.io -p 1883 -i client_001  -t openwrt/topic
复制代码

  • 客户端2
    发送一条消息到主题openwrt/topic,这样客户端1就可以收到该消息
  1. mosquitto_pub -h broker.emqx.io -p 1883 -t openwrt/topic -i client_002 -m "hello client1, i am froms client2"
复制代码




OpenWrt中基于libmosquitto开发

前面给大家演示了mosquitto客户端的使用,但命令行客户端仅供测试使用,我们在开发过程中需要自定义消息并且能够实时解析消息,而通过命令行就没那么方便消息的处理了,需要调用mosquitto底层api接口实现想要的功能。


libmosquitto库

在openwrt系统中默认集成了mosquitto库,可以直接依赖调用。
对应依赖的库为:
libmosquitto-nossl 不支持ssl加密
libmosquitto 支持ssl加密


api接口详解

mosquitto_lib_init:初始化libmosquitto库。在使用其他libmosquitto函数之前,应该首先调用此函数。

mosquitto_lib_version:获取libmosquitto库的版本号信息。

mosquitto_new:创建一个新的mosquitto对象(MQTT客户端)。

mosquitto_connect:与MQTT代理服务器建立连接。

mosquitto_disconnect:断开与MQTT代理服务器的连接。

mosquitto_publish:向指定主题发布消息。

mosquitto_subscribe:订阅一个或多个主题。

mosquitto_unsubscribe:取消订阅一个或多个主题。

mosquitto_loop_start:启动一个线程来处理MQTT消息循环。

mosquitto_loop_forever:开始一个阻塞的循环,处理MQTT消息。

mosquitto_loop:在非阻塞模式下处理MQTT消息。

mosquitto_message_callback_set:设置用于接收订阅消息的回调函数。

mosquitto_username_pw_set:设置连接时使用的用户名和密码。

mosquitto_tls_set:为MQTT连接启用SSL/TLS加密。

mosquitto_tls_opts_set:设置SSL/TLS选项,如CA证书、客户端证书和私钥等。

mosquitto_tls_insecure_set:设置是否允许SSL/TLS连接中的不安全选项。

mosquitto_will_set:设置遗嘱消息,即在客户端异常断开时发布的消息。


基于libmosquitto实现一个消息订阅程序

源码

  1. #include <unistd.h>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <mosquitto.h>
  6. #include <sys/time.h>
  7. #include <sys/sysinfo.h>

  8. struct mosquitto *g_test_mosq = NULL;

  9. void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result)
  10. {
  11. printf("connect to mqtt server ok\n");
  12. if (MOSQ_ERR_SUCCESS != mosquitto_subscribe(mosq, NULL, "openwrt/#", 0))
  13. {
  14.   printf("sub topic openwrt/# failed...\n");
  15. }
  16. else{
  17.   printf("sub topic openwrt/# failed...\n");
  18. }
  19. }

  20. void mqtt_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
  21. {
  22. if (result)
  23.   printf("disconnect %s\n", mosquitto_connack_string(result));
  24. else
  25.   printf("disconnect from mqtt server.\n");
  26. }

  27. void mqtt_sub_callback(struct mosquitto *mosq,
  28.         void *userdata,
  29.         int mid,
  30.         int qos_count,
  31.         const int *granted_qos)
  32. {
  33. printf("sub callback\n");
  34. }

  35. void mqtt_msg_callback(struct mosquitto *mosq,
  36.         void *userdata,
  37.         struct mosquitto_message *message)
  38. {
  39. printf("callback recv mqtt msg, topic = %s, payload = %s\n", message->topic, message->payload);
  40. }

  41. struct mosquitto *connect_to_mqtt_server(char *server_ip)
  42. {
  43. struct mosquitto *mosq = NULL;
  44. int rc;
  45. char mqtt_user[128] = {0};
  46. char mqtt_pwd[128] = {0};
  47. char client_id[128] = {0};
  48.     struct timeval tv;
  49.     gettimeofday(&tv, NULL);
  50. mosquitto_lib_init();
  51. snprintf(client_id, sizeof(client_id), "test_%d", tv.tv_sec);
  52. printf("connect to mqtt server..client_id=%s\n", client_id);
  53. mosq = mosquitto_new(client_id, true, NULL);

  54. if (!mosq)
  55. {
  56.   return NULL;
  57. }
  58. #if 0
  59. rc = mosquitto_username_pw_set(mosq, "test", "test");
  60. if (rc)
  61. {
  62.   mosquitto_destroy(mosq);
  63.   return NULL;
  64. }
  65. #endif

  66. mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
  67. mosquitto_message_callback_set(mosq, mqtt_msg_callback);
  68. mosquitto_subscribe_callback_set(mosq, mqtt_sub_callback);
  69. mosquitto_disconnect_callback_set(mosq, mqtt_disconnect_callback);
  70. rc = mosquitto_connect(mosq, server_ip, 1883, 30);
  71. if (rc)
  72. {
  73.   printf("Unable to connect mqtt server rc=%d\n", rc);
  74.   mosquitto_destroy(mosq);
  75.   return NULL;
  76. }
  77. return mosq;
  78. }

  79. int mqtt_bcast_msg(char *api, char *data, int len)
  80. {
  81. char topic[128] = {0};
  82. int mid;
  83. if (!api || !data || len == 0)
  84.   return -1;
  85. if (!g_test_mosq)
  86.   return -1;
  87. sprintf(topic, "openwrt/%s", api);
  88. return mosquitto_publish(g_test_mosq, &mid, topic, len, data, 0, 0);
  89. }

  90. int main(int argc, char *argv[]){
  91. char *host = NULL;
  92. if (argc < 2){
  93.   host = "127.0.0.1";
  94.   printf("use default ip: 127.0.0.1\n");
  95. }
  96. else{
  97.   host = argv[1];
  98.   printf("use ip: %s\n", host);
  99. }
  100. g_test_mosq = connect_to_mqtt_server(host);
  101. if (!g_test_mosq){
  102.   printf("connect to server %s failed\n", host);
  103.   exit(0);
  104. }
  105. mosquitto_loop_forever(g_test_mosq, -1, 1);
  106. mosquitto_destroy(g_test_mosq);
  107. mosquitto_lib_cleanup();
  108. return 0;
  109. }
复制代码

实例源码编译
将源码包拷贝到openwrt源码package目录


开启mqtt_test宏并生成默认依赖配置

  1. echo "CONFIG_PACKAGE_mqtt_test=y" >>.config
  2. make defconfig
复制代码

编译

  1. make package/mqtt_test/compile V=s
复制代码

插件安装:
mqtt_test依赖了libmosquitto库,而libmosquitto依赖了libcares,所以需要安装三个插件

  • libcares
  • libmosquitto-nossl
  • mqtt_test

将插件通过winscp或其他工具上传到openwrt系统中,执行以下命令安装(以X86为例)

  1. opkg install libcares_1.18.1-1_x86_64.ipk
  2. opkg install libmosquitto-nossl_2.0.15-1_x86_64.ipk
  3. opkg install mqtt_test_1.0-1_x86_64.ipk
复制代码

运行:

mqtt_test默认连接本地broker,也可以指定ip

运行如果出现错误,表示服务器没有启动或者参数异常,请先确认mosquitto服务已经启动。

  1. use default ip: 127.0.0.1
  2. connect to mqtt server..client_id=test_1686993389
  3. Unable to connect mqtt server rc=14
  4. connect to server 127.0.0.1 failed
复制代码


运行成功

  1. mqtt_test
  2. use default ip: 127.0.0.1
  3. connect to mqtt server..client_id=test_1686993598
  4. connect to mqtt server ok
  5. sub callback
复制代码

现在就启动了一个mqtt客户端,订阅了openwrt/#
通过mosquitto_pub工具可以发送指令到该客户端,客户端当前处理方式是输出收到的消息,当然实际开发是解析指令并执行对应的命令,比如接收到reboot命令后执行重启。

pub命令如下:

  1. root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "hello openwrt"
  2. root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "你好"
  3. root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "你好"
  4. root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "reboot"
  5. root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "reboot"
复制代码

客户端输出如下:

  1. callback recv mqtt msg, topic = openwrt/send_msg, payload = hello openwrt
  2. callback recv mqtt msg, topic = openwrt/send_msg, payload = 你好
  3. callback recv mqtt msg, topic = openwrt/send_msg, payload = 你好
  4. callback recv mqtt msg, topic = openwrt/send_msg, payload = reboot
  5. callback recv mqtt msg, topic = openwrt/send_msg, payload = reboot
复制代码

如果客户端连接云端的broker,就可以实现远程操作设备,比如远程重启设备、配置下发等。


总结

在物联网开发中我们会经常用的MQTT协议,常见的就是边缘设备和云端通信,上报实时状态、远程管理等,当然也可以局域网间通信,实现节点间通信,比如可以通过MQTT协议实现mesh数据同步、AC集中管理等,有了MQTT协议我们不需要自己通过底层socket实现私有协议,可以只关注业务处理,可大大提高程序稳定性。当然MQTT也有一些缺陷,就是订阅发布模型实现实时回复消息比较麻烦,不适合做一些状态较复杂的交互。


如果你也想从事OpenWrt开发,可以加入我的知识星球(OpenWrt开发圈),里面会定期分享我进行OpenWrt开发的笔记和源码,涉及OpenWrt插件开发、Linux应用开发、Linux内核开发、计算机网络等,还可以随时向我提问,少走弯路。

+10

本帖被以下淘专辑推荐:

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|深圳市光明谷科技有限公司|光明谷商城|Sunshine Silicon Corpporation ( 粤ICP备14060730号|Sitemap

GMT+8, 2024-12-27 07:33 , Processed in 0.109866 second(s), 44 queries .

Powered by Discuz! X3.2 Licensed

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表