谷动谷力

标题: MQTT 客户端自动重连实践 构建可靠 IoT 设备连接 [打印本页]

作者: 谷谷小师妹    时间: 2023-11-21 15:58
标题: MQTT 客户端自动重连实践 构建可靠 IoT 设备连接
MQTT 客户端自动重连实践 构建可靠 IoT 设备连接


目录

背景
MQTT 是一个基于 TCP 协议的发布/订阅模型协议,它被广泛应用于物联网、传感器网络和其他低带宽、不稳定网络环境中。在这些网络环境中,网络连接往往不稳定,可能会出现网络故障、信号弱化、丢包等问题,这可能会导致 MQTT 客户端与服务器之间的连接中断。物联网应用中,常见的触发断线重连的场景包括:
为了确保 MQTT 客户端与服务器之间的稳定连接,MQTT 客户端需要实现重连逻辑,帮助 MQTT 客户端自动重新连接服务器,并恢复之前的订阅关系、保持会话等状态。

为什么 MQTT 客户端重连代码需要良好的设计
MQTT 设备重连是很多物联网应用中不可避免的情况。设计 MQTT 客户端重连逻辑时需要注意使用正确的事件回调方法,每次重连设置合理的随机退避时间,以保证客户端和服务端的长时间稳定运行,从而确保业务的正常开展。
不合理的重连逻辑设计可能会造成诸多问题:
而合理的重连逻辑既可以提高 MQTT 客户端的稳定性和可靠性,避免因网络连接中断而导致的数据丢失、延迟等问题,还可以降低由于频繁连接对服务器端的压力。

如何设计一段 MQTT 客户端重连代码
在进行 MQTT 客户端重连代码设计时需要考虑以下几个方面:

重连代码示例
我们将以 Paho MQTT C 的库为例,示范如何使用异步编程模型优雅完成自动重连功能。Paho 提供了丰富的回调函数,请注意不同回调方法触发条件和设置方式不同,分别有全局回调、API 回调和异步方法回调。API 回调有相当的灵活性,但当开启自动重连功能时,建议只使用异步回调。此处对三种回调函数都提供了例程,用户可以使用此例程验证三种回调函数的触发。
  1. // 是 Async 使用的回调方法
  2. // 连接成功的异步回调函数,在连接成功的地方进行Subscribe操作。
  3. void conn_established(void *context, char *cause)
  4. {
  5.     printf("client reconnected!\n");
  6.     MQTTAsync client = (MQTTAsync)context;
  7.     MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  8.     int rc;

  9.     printf("Successful connection\n");

  10.     printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  11.            "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  12.     opts.onSuccess = onSubscribe;
  13.     opts.onFailure = onSubscribeFailure;
  14.     opts.context = client;
  15.     if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
  16.     {
  17.         printf("Failed to start subscribe, return code %d\n", rc);
  18.         finished = 1;
  19.     }
  20. }


  21. // 以下为客户端全局连接断开回调函数
  22. void conn_lost(void *context, char *cause)
  23. {
  24.     MQTTAsync client = (MQTTAsync)context;
  25.     MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  26.     int rc;

  27.     printf("\nConnection lost\n");
  28.     if (cause) {
  29.         printf("     cause: %s\n", cause);
  30.     }
  31.     printf("Reconnecting\n");
  32.     conn_opts.keepAliveInterval = 20;
  33.     conn_opts.cleansession = 1;
  34.     conn_opts.maxRetryInterval = 16;
  35.     conn_opts.minRetryInterval = 1;
  36.     conn_opts.automaticReconnect = 1;
  37.     conn_opts.onFailure = onConnectFailure;
  38.     MQTTAsync_setConnected(client, client, conn_established);
  39.     if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  40.     {
  41.         printf("Failed to start connect, return code %d\n", rc);
  42.         finished = 1;
  43.     }
  44. }

  45. int main(int argc, char* argv[])
  46. {
  47.     // 创建异步连接客户端需要使用的属性结构体
  48.     MQTTAsync client;
  49.     MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  50.     MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  51.     int rc;
  52.     int ch;
  53.     // 创建异步连接客户端,不使用 Paho SDK 内置的持久化来处理缓存消息
  54.     if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
  55.             != MQTTASYNC_SUCCESS)
  56.     {
  57.         printf("Failed to create client, return code %d\n", rc);
  58.         rc = EXIT_FAILURE;
  59.         goto exit;
  60.     }
  61.     // 设置异步连接回调,注意此处设置的回调函数为连接层面的全局回调函数
  62.     // conn_lost 为连接断开触发,有且只有连接成功后断开才会触发,在断开连接的情况下进行重连失败不触发。
  63.     // msgarrvd 收到消息时触发的回调函数
  64.     // msgdeliverd 是消息成功发送的回调函数,一般设置为NULL
  65.     if ((rc = MQTTAsync_setCallbacks(client, client, conn_lost, msgarrvd, msgdeliverd)) != MQTTASYNC_SUCCESS)
  66.     {
  67.         printf("Failed to set callbacks, return code %d\n", rc);
  68.         rc = EXIT_FAILURE;
  69.         goto destroy_exit;
  70.     }
  71.     // 设置连接参数
  72.     conn_opts.keepAliveInterval = 20;
  73.     conn_opts.cleansession = 1;
  74.     // 此处设置 API调用失败会触发的回调,接下来进行connect操作所以设置为 onConnectFailure 方法
  75.     conn_opts.onFailure = onConnectFailure;
  76.     // 此处设置 客户端连接API调用成功会触发的回调,由于例程使用异步连接的 API,设置了会导致2个回调都被触发,所以建议不使用此回调
  77.     //conn_opts.onSuccess = onConnect;
  78.     // 注意第一次发起连接失败不会触发自动重连,只有曾经成功连接并断开后才会触发
  79.     conn_opts.automaticReconnect = 1;
  80.     //开启自动重连,并且设置 2-16s 的随机退避时间
  81.     conn_opts.maxRetryInterval = 16;
  82.     conn_opts.minRetryInterval = 2;
  83.     conn_opts.context = client;
  84.     // 设置异步回调函数,此与之前的 API 回调不同,每次连接/断开都会触发
  85.     MQTTAsync_setConnected(client, client, conn_established);
  86.     MQTTAsync_setDisconnected(client, client, disconnect_lost);
  87.     // 启动客户端连接,之前设置的 API 回调只会在这一次操作生效
  88.     if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  89.     {
  90.         printf("Failed to start connect, return code %d\n", rc);
  91.         rc = EXIT_FAILURE;
  92.         goto destroy_exit;
  93.     }

  94.     ......
  95. }
复制代码

下载 MQTTAsync_subscribe.c 文件查看详细代码。

更多选择:NanoSDK 内置重连策略
NanoSDK 是除了 Paho 以外的又一 MQTT SDK 选择。NanoSDK 基于 NNG-NanoMSG 项目开发,使用 MIT License,对开源和商业都很友好。相较于 Paho 其最大的不同在于内置的全异步 I/O 和 支持 Actor 编程模型,当使用 QoS 1/2 消息时可以获得更高的消息吞吐速率。而且 NanoSDK 支持 MQTT over QUIC 协议,与大规模物联网 MQTT 消息服务器 EMQX 5.0 结合可解决弱网下的数据传输难题。这些优势使得它已经在车联网和工业场景中得到了广泛的使用。
在 NanoSDK 中,重连策略已经完全内置,无需用户手动实现。
// nanosdk 采用自动拨号机制,默认进行重连nng_dialer_set_ptr(*dialer, NNG_OPT_MQTT_CONNMSG, connmsg);nng_dialer_start(*dialer, NNG_FLAG_NONBLOCK);

总结
本文介绍在 MQTT 客户端代码实现过程中,重连逻辑设计的重要性与最佳实践。通过本文,读者可以设计更为合理的 MQTT 设备重连代码,降低客户端与服务器端的资源开销,构建更加稳定可靠的物联网设备连接。







欢迎光临 谷动谷力 (http://bbs.sunsili.com/) Powered by Discuz! X3.2