谷动谷力

 找回密码
 立即注册
查看: 1487|回复: 0
打印 上一主题 下一主题
收起左侧

Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】

[复制链接]
跳转到指定楼层
楼主
发表于 2023-7-14 10:20:58 | 只看该作者 |只看大图 回帖奖励 |倒序浏览 |阅读模式
Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】




物联网+大数据+机器学习将会是以后的趋势,这里介绍一篇这方面的文章包含源码。混合机器学习基础架构构建了一个场景,利用Apache Kafka作为可扩展的中枢神经系统。 公共云用于极大规模地训练分析模型(例如,通过Google ML Engine在Google Cloud Platform(GCP)上使用TensorFlow和TPU,预测(即模型推断)在本地Kafka基础设施的执行( 例如,利用Kafka Streams或KSQL进行流分析)。

本文重点介绍内部部署。 创建了一个带有KSQL UDF的Github项目,用于传感器分析。 它利用KSQL的新API功能,使用Java轻松构建UDF / UDAF函数,对传入事件进行连续流处理。使用案例:Connected Cars - 使用深度学习的实时流分析从连接设备(本例中的汽车传感器)连续处理数百万个事件:


为此构建了不同的分析模型。 他们在公共云上接受TensorFlow,H2O和Google ML Engine的训练。 模型创建不是此示例的重点。 最终模型已经可以投入生产,可以部署用于实时预测。

模型服务可以通过模型server 完成,也可以本地嵌入到流处理应用程序中。 参阅RPC与流处理的权衡,以获得模型部署和....

演示:使用MQTT,Kafka和KSQL在Edge进行模型推理

Github项目:深度学习+KSQL UDF 用于流式异常检测MQTT物联网传感器数据该项目的重点是通过MQTT将数据提取到Kafka并通过KSQL处理数据:


Confluent MQTT Proxy的一大优势是无需MQTT Broker即可实现物联网方案的简单性。 可以通过MQTT代理将消息直接从MQTT设备转发到Kafka。 这显着降低了工作量和成本。 如果你“只是”想要在Kafka和MQTT设备之间进行通信,这是一个完美的解决方案。如果你想看到另一部分(与Elasticsearch / Grafana等接收器应用程序集成),请查看Github项目“KSQL for streaming IoT data”。 这实现了通过Kafka Connect和Elastic连接器与ElasticSearch和Grafana的集成。KSQL UDF - 源代码开发UDF非常容易。 只需在UDF类中的一个Java方法中实现该函数:
[Bash shell] 纯文本查看 复制代码
  1. @Udf(description = "apply analytic model to sensor input")
复制代码

这里是所有代码:
[Java] 纯文本查看
  1. package com.github.megachucky.kafka.streams.machinelearning;

  2. import java.util.Arrays;

  3. import hex.genmodel.GenModel;
  4. import hex.genmodel.easy.EasyPredictModelWrapper;
  5. import hex.genmodel.easy.RowData;
  6. import hex.genmodel.easy.exception.PredictException;
  7. import hex.genmodel.easy.prediction.AutoEncoderModelPrediction;
  8. import io.confluent.ksql.function.udf.Udf;
  9. import io.confluent.ksql.function.udf.UdfDescription;


  10. @UdfDescription(name = "anomaly", description = "anomaly detection using deep learning")
  11. public class Anomaly {
  12.      
  13.      
  14.     // Model built with H2O R API:
  15.       // anomaly_model <- h2o.deeplearning(x = names(train_ecg),training_frame =
  16.       // train_ecg,activation = "Tanh",autoencoder = TRUE,hidden =
  17.       // c(50,20,50),sparse = TRUE,l1 = 1e-4,epochs = 100)

  18.       // Name of the generated H2O model
  19.       private static String modelClassName = "io.confluent.ksql.function.udf.ml"
  20.                                              + ".DeepLearning_model_R_1509973865970_1";
  21.      
  22.   @Udf(description = "apply analytic model to sensor input")
  23.   public String anomaly(String sensorinput) {
  24.       
  25.       System.out.println("Kai: DL-UDF starting");
  26.          
  27.       GenModel rawModel;
  28.         try {
  29.             rawModel = (hex.genmodel.GenModel) Class.forName(modelClassName).newInstance();
  30.          
  31.         EasyPredictModelWrapper model = new EasyPredictModelWrapper(rawModel);
  32.          
  33.         // Prepare input sensor data to be in correct data format for the autoencoder model (double[]):
  34.         String[] inputStringArray = sensorinput.split("#");
  35.         double[] doubleValues = Arrays.stream(inputStringArray)
  36.                 .mapToDouble(Double::parseDouble)
  37.                 .toArray();
  38.          
  39.         RowData row = new RowData();
  40.         int j = 0;
  41.         for (String colName : rawModel.getNames()) {
  42.           row.put(colName, doubleValues[j]);
  43.           j++;
  44.         }

  45.         AutoEncoderModelPrediction p = model.predictAutoEncoder(row);
  46.         // System.out.println("original: " + java.util.Arrays.toString(p.original));
  47.         // System.out.println("reconstructedrowData: " + p.reconstructedRowData);
  48.         // System.out.println("reconstructed: " + java.util.Arrays.toString(p.reconstructed));

  49.         double sum = 0;
  50.         for (int i = 0; i < p.original.length; i++) {
  51.           sum += (p.original[i] - p.reconstructed[i]) * (p.original[i] - p.reconstructed[i]);
  52.         }
  53.         // Calculate Mean Square Error => High reconstruction error means anomaly
  54.         double mse = sum / p.original.length;
  55.         System.out.println("MSE: " + mse);

  56.         String mseString = "" + mse;

  57.         return (mseString);
  58.          
  59.         } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
  60.             System.out.println(e.toString());
  61.             
  62.         } catch (PredictException e) {
  63.             System.out.println(e.toString());
  64.         }
  65.          
  66.         return null;      
  67.   }
  68. }
复制代码

如何使用Apache Kafka和MQTT Proxy运行演示?执行演示的所有步骤都在Github项目中描述。你只需安装Confluent Platform,然后按照以下步骤部署UDF,创建MQTT事件并通过KSQL levera处理它们....这里使用Mosquitto生成MQTT消息。 当然,也可以使用任何其他MQTT客户端。 这是开放和标准化协议的巨大好处。

到此结束,文章虽然简短,但是内容确实很丰富,特别项目的源码的阅读,在github上有详细的介绍。
github地址:(回复可查看)
游客,如果您要查看本帖隐藏内容请回复

源码下载:(回复可查看)
游客,如果您要查看本帖隐藏内容请回复



1200.jpg (847 Bytes, 下载次数: 199)

1200.jpg
+10
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|深圳市光明谷科技有限公司|光明谷商城|Sunshine Silicon Corpporation ( 粤ICP备14060730号|Sitemap

GMT+8, 2025-1-28 11:33 , Processed in 0.125972 second(s), 44 queries .

Powered by Discuz! X3.2 Licensed

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表