EMQX webhook实现转发消息到java web服务器并保存到MySQL数据库

本文主要是介绍EMQX webhook实现转发消息到java web服务器并保存到MySQL数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 一,前言
    • 二,搭建基于tomcat的web服务器
    • 三,部署web服务器到主机上
    • 四,配置EMQX服务器
      • 4.1 设置规则引擎
      • 4.2 测试规则引擎
      • 4.3 设置响应动作

修改记录

  • 2021.5.19 修改文章格式,在第三节添加了服务器的压缩文件 myweb.war. 添加项目仓库。
  • 2022.3.1 myweb.war. 百度网盘失效,请到项目仓库 https://gitee.com/killerp/emqx-web下载

一,前言

之前写过一篇关于EMQX数据持久化到MySQL数据库,但由于这个功能需要EMQX企业版才能实现,而企业版的费用对于我这种学生党而言实在难以负担。于是,我在EMQX官方发现另一种方法也可以实现保存数据。官网对于webhook的示例

预备知识:
http协议与格式
web服务器:web项目基本结构
java语言
tomcat的安装:window系统安装tomcat,linux系统安装tomcat
开发环境:eclipse官网下载

思路:设备的数据上传到emqx服务器,我们需要一个web服务器来接收EMQX服务器post过来数据,然后再将数据保存到数据库。
项目仓库地址:https://gitee.com/killerp/emqx-web

二,搭建基于tomcat的web服务器

首先在主机安装tomcat和java jdk,这是搭建web服务器必须的环境。

具体的安装方法网上很多,找到适合自己的。这里只介绍搭建web服务器。

已经搭建好环境的同学请跳过此步骤

假设你已经完成以上的环境部署,还需要安装开发环境eclipse,再安装程序中选择安装eclipse ee,这是专门为web开发而设计的开发软件。

打开eclipse,新建工程,选择Dynamic Web Project

在这里插入图片描述

给项目取个名称,target runtime选择你主机安装的tomcat的版本,如果你是在云主机安装tomcat,建议在本地电脑也安装同一版本的tomcat,方便调试

在这里插入图片描述

接下来就一路next,最后这个页面记得勾选generate

在这里插入图片描述
web.xml是配置web服务器的文件,java resources是放java文件的
在这里插入图片描述
在下图lib文件夹下添加需要用到的库文件,然后把库文件真正导入项目
库文件请到项目仓库中下载:https://gitee.com/killerp/emqx-web

在webcontent/web-inf/lib

在这里插入图片描述

右击项目,选择properties,选择java build path ,点击add jars 找到你的项目lib里的库文件,全部添加进去,最后apply and close,导入库文件完成!

在这里插入图片描述在这里插入图片描述

接下来编写一个Java 类,myhttpservlet,它继承自HttpServlet ,用来接收EMQX post过来的数据;

HttpServlet 是一个实现http协议的最重要的java类可以参考这篇博客 httpservlet详解

package myweb;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Reader;
import java.io.StringWriter;
import java.io.Writer;import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;import org.apache.tomcat.util.http.fileupload.IOUtils;import com.mysql.jdbc.StringUtils;
import com.oracle.webservices.internal.api.message.ContentType;
import com.sun.xml.internal.bind.CycleRecoverable.Context;
import com.sun.xml.internal.ws.wsdl.writer.document.Service;import net.sf.json.JSONArray;
import net.sf.json.JSONObject;public class FirstServlet extends HttpServlet {//用来读取post过来的json的缓存区的数据长度private static final int BUFFER_SIZE = 1024 * 8;//一些emqx post过来的数据private String app_id,device_id,remark,time,state,type;/*** 不知道是什么,反正是必须的*/private static final long serialVersionUID = 1L;//用来处理get消息@Overrideprotected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {//doPost(request,response);//get请求用来获取数据库的数据}//用来处理post消息@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {//获取post过来的输入流InputStream in=request.getInputStream();//创建一个缓存读取器来暂时存贮输入流里的数据BufferedReader reader = new BufferedReader(new InputStreamReader(in));//body是json字符串,要解析字符串,拿到对应的值插入数据库//读取缓存读取器里的body数据并转为字符串格式,这里的body数据为json字符串格式String body = read(reader);//从json字符串里获取json对象JSONObject J=JSONObject.fromObject(body);//通过键值对的形式,获取json里的值并赋值给变量app_id= J.getString("app_id");device_id=J.getString("device_id");time=J.getString("mytime");state=J.getString("state");type=J.getString("device_type");remark=J.getString("remark");//把变量的值保存到数据库DBUutil.update(app_id,device_id,remark,time,state,type);}public static String read(Reader reader) throws IOException{StringWriter writer = new StringWriter();try{write(reader, writer);return writer.getBuffer().toString();}finally{ writer.close(); }}public static long write(Reader reader, Writer writer) throws IOException{return write(reader, writer, BUFFER_SIZE);}//把缓存器的json数据写入缓存区public static long write(Reader reader, Writer writer, int bufferSize) throws IOException{int read;long total = 0;char[] buf = new char[BUFFER_SIZE];while( ( read = reader.read(buf) ) != -1 ){writer.write(buf, 0, read);total += read;}return total;}}

再新建一个java 类,dbutil,这个类的作用是对数据库进行操作,这里只写了对数据库进行插入数据的操作,其他操作如更新,删除,查询都可实现

package myweb;import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;import com.mysql.jdbc.Connection;
import com.mysql.jdbc.log.Log;public class DBUutil {static List<ESP8266> ESPlist=new ArrayList<>();//����豸������private static ESP8266 Device=new ESP8266();//��ʼ������private static String driver = "com.mysql.jdbc.Driver";// MySql驱动private static String user = "app";// MySQL的用户名和密码private static String password = "123456";//连接数据库的方法private static Connection getConn(String dbName){Connection connection;connection = null;try{Class.forName(driver);//加载驱动,需要驱动才能对数据库进行操作String ip = "118.31.20.121";//数据库的ip地址ַ//连接数据库,驱动+ip地址+端口号+用户名+密码,端口号默认是3306connection = (Connection) DriverManager.getConnection("jdbc:mysql://" + ip + ":3306/" + dbName,user, password);}catch (Exception e){e.printStackTrace();}//返回一个connection对象return connection;}//这个是添加设备到数据库,不用看public static  void bind_id(String app_id,String device_id){Connection connection=getConn("MQTTDATA");String sql="INSERT INTO user_bind_devices (app_id,device_id) VALUES (?,?)";if (connection!=null){try {PreparedStatement ps=connection.prepareStatement(sql);if (ps!=null){ps.setString(1,app_id);ps.setString(2,device_id);//执行语句,注意!!!如果你的SQL 语句是诸如update,insert的更新语句,应该用statement的execute()方法// select用的是statement的executeQuery()ps.execute();connection.close();ps.close();}} catch (SQLException e) {e.printStackTrace();}}}//这个是把数据保存到MQTTDATA库的 current表格//我需要插入app_id,device_id。。。。。。public static void update(String app_id,String device_id,String remark,String time,String state,String device_type) {//先跟MySQL数据库里的MQTTDATA库建立连接Connection connection=getConn("MQTTDATA");//定义一个语句,这个语句的功能是对current表格的app_id,device_id,remark,mytime,state,device_type列分别插入我们的参数的值//这里的?可以看成一个傀儡,用ps.setString()方法可以将?替换成我们的参数的值String sql="INSERT INTO current (app_id,device_id,remark,mytime,state,device_type) VALUES (?,?,?,?,?,?)";if	(connection!=null) {try {//准备我们的mysql操作语句PreparedStatement ps=connection.prepareStatement(sql);//把第一个?替换成参数里的app_id,第二个?替换成device_id........if(ps!=null) {ps.setString(1, app_id);ps.setString(2, device_id);ps.setString(3, remark);ps.setString(4, time);ps.setString(5, state);ps.setString(6, device_type);ps.execute();connection.close();ps.close();}} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}

前面说了web.xml是web服务器的配置文件,这里我们需要在web.xml里对myhttpservlet进行配置

在web.xml添加内容如下: (这里需要注意一下,app,名称可以随便取,主要是为了程序员方便查找。)

< servlet-class >myweb.myhttpservlet</ servlet-class > 前面是你的项目名称 .后面是我们刚刚写的myhttpservlet类。

< servlet-mapping >是把名称为app的httpservlet类映射到url的一个路径,/first的意思就是当你在浏览器输入:

http://你的web服务器ip地址:端口号/first 就会执行myhttpservlet类里的内容。

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" id="WebApp_ID" version="3.1"><display-name>myweb1</display-name><welcome-file-list><welcome-file>index.html</welcome-file><welcome-file>index.htm</welcome-file><welcome-file>index.jsp</welcome-file><welcome-file>default.html</welcome-file><welcome-file>default.htm</welcome-file><welcome-file>default.jsp</welcome-file></welcome-file-list><servlet><servlet-name>app</servlet-name><servlet-class>myweb.myhttpservlet</servlet-class>
</servlet><servlet-mapping><servlet-name>app</servlet-name><url-pattern>/first</url-pattern>
</servlet-mapping>
</web-app>

三,部署web服务器到主机上

至此,我们一个简陋的web服务器差不多搭建好了,这时候我们就需要把web程序部署到主机的tomcat上。

本地部署:右击项目,选择export > war file browse保存到你想保存的位置,系统会生成一个.war文件,我们把这个文件放进 apache-tomcat-8.5.45(你的tomcat文件夹)下的webapps文件夹下。

压缩文件已上传百度云,仅供参考
链接:https://pan.baidu.com/s/1Klr_gDvmyFwwtqq3zUowww
提取码:spf3

远程云主机部署:用xftp将war文件放入同样的路径下,然后重新启动tomcat。

tomcat在运行时会自动解压war文件生成web程序并开始执行
在这里插入图片描述
在这里插入图片描述

四,配置EMQX服务器

配置的目的:让 EMQX 服务器post数据 给web服务器;

4.1 设置规则引擎

登录emqx远程控制台,在规则引擎新建规则,规则引擎会对mqtt消息进行筛选,具体的筛选规则由我们来定义,下图是一些规则示例
在这里插入图片描述
以下图的例子分析:

SELECT 下方的语句含义是 :在mqtt消息的数据段(payload)中提取出remark,app_id,device_id变量的值 复制 到本地的remark,app_id,device_id变量中。

WHERE 语句表示 从发往主题为csdn的mqtt消息中筛选。

随后我们用post把数据发送给web服务器时,数据的格式设置为json格式{“remark”:“杀手”,“app_id”:“37264726374”}
在这里插入图片描述

4.2 测试规则引擎

我们还可以进行在线测试,测试输出的json数据是不是我们需要的内容。

下图配置mqtt消息:topic设置为csdn,payload字段设置变量及其取值。
在这里插入图片描述

点击测试,输出如图的json数据:
在这里插入图片描述

4.3 设置响应动作

当规则引擎筛选出指定的mqtt消息并提取出json数据后,还需要将数据post到我们的web服务器,这需要设置响应动作来完成这最后一步。

点击添加,动作选择发送数据到web服务,点击新建资源

在这里插入图片描述

在资源管理中设置 请求url 为web服务器地址,端口号(tomcat默认端口为8888,这里我改成8090),加上前面我们web.xml里设置的/first ; http请求就会发送到我们的web程序。

在这里插入图片描述
最后,我们需要进行测试,进入websocket,连接,发布消息到指定主题
在这里插入图片描述
然后到你的数据库查看是否存贮到数据。

有疑问的同学可以留言评论。
在这里插入图片描述

这篇关于EMQX webhook实现转发消息到java web服务器并保存到MySQL数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/653807

相关文章

Python调用Orator ORM进行数据库操作

《Python调用OratorORM进行数据库操作》OratorORM是一个功能丰富且灵活的PythonORM库,旨在简化数据库操作,它支持多种数据库并提供了简洁且直观的API,下面我们就... 目录Orator ORM 主要特点安装使用示例总结Orator ORM 是一个功能丰富且灵活的 python O

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

部署Vue项目到服务器后404错误的原因及解决方案

《部署Vue项目到服务器后404错误的原因及解决方案》文章介绍了Vue项目部署步骤以及404错误的解决方案,部署步骤包括构建项目、上传文件、配置Web服务器、重启Nginx和访问域名,404错误通常是... 目录一、vue项目部署步骤二、404错误原因及解决方案错误场景原因分析解决方案一、Vue项目部署步骤