物联网架构成长之路(8)-EMQ-Hook了解、连接Kafka发送消息

2024-05-28 01:32

本文主要是介绍物联网架构成长之路(8)-EMQ-Hook了解、连接Kafka发送消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 前言

   按照我自己设计的物联网框架,对于MQTT集群中的所有消息,是要持久化到磁盘的,这里采用一个消息队列中间件Kafka作为数据缓冲,缓冲结果存到数据仓库中,以供后续作为数据分析。由于MQTT集群中的消息都是比较分散的,所以使用Kafka来聚合、采集消息。

2. 下载&编译&安装

  Kafka依赖ZooKeeper

  在这里下载 http://mirrors.shuosc.org/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz

  http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka-1.0.0-src.tgz

  http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz

  学习的话, 可以参考这个文档 http://orchome.com/kafka/index 

  配置使用过程可以参考官网的  http://kafka.apache.org/quickstart  http://zookeeper.apache.org/doc/current/zookeeperStarted.html ,有些资料因为版本升级的原因,已经不是以前的那种启动方式了。https://cwiki.apache.org/confluence/display/KAFKA/Clients 

3. 启动Zookeeper

1 cp ./conf/zoo_sample.cfg ./conf/zoo.cfg
2 ./bin/zkServer.sh start


  上图,就表示启动单机模式。

  ./bin/zkCli.sh -server 127.0.0.1:2181 进行连接

  然后进入命令行模式,可以慢慢玩了

  单击模式就没有什么要配置的。最多修改zoo.cfg中的dataDir文件

  ZooKeeper启动replicated模式,集群模式

  zoo.cfg 增加服务器集群信息

1 server.1=172.16.20.229:2888:3888
2 server.2=172.16.23.203:2888:3888
3 server.3=172.16.23.204:2888:3888
1 ./bin/zkServer.sh start-foreground #启动

  注意在echo “1” > %dataDir%/myid 对于每个服务器都要创建一个myid文件

  启动都是会有一些奇奇怪怪的问题,上网找资料就可以了。

  一般第一台ZooKeeper启动是会有Connection refused 出错,这个是正常的,后面的两台还没有启动,不过后面也一个一个启动了。

  如果过程中,有一个断开了,然后修改数据,然后这个断开的又连上了,那么ZooKeeper集群内部会镜像diff

1 2017-12-28 16:30:38,570 [myid:3] - INFO  [QuorumPeer[myid=3]/0.0.0.0:2181:Learner@332] - Getting a diff from the leader 0x100000005

  然后就用客户端A

1 ./bin/zkCli.sh -server 172.16.20.229:2181
2 ls /
3 create /zk_test my_data
4 ls /

  然后用客户端B

1 ./bin/zkCli.sh -server 172.16.23.203:2181
2 ls /
3 get /zk_test
4 ls /

  可以看到ZooKeeper信息在内部进行了共享

  具体可以参考这篇博客 http://www.cnblogs.com/sunddenly/p/4018459.html 

4. 启动 Kafka

  由于kafka以来ZooKeeper,所以有了上面一步的ZooKeeper了解。

  实际中,可以直接下载kafka的二进制包,直接使用,http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz 

   启动Zookeeper

1 ./bin/zookeeper-server-start.sh config/zookeeper.properties

  启动Kafka

1 ./bin/kafka-server-start.sh config/server.properties

  创建主题

1 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  查看主题

1 ./bin/kafka-topics.sh --list --zookeeper localhost:2181

  发送消息

1 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  消费消息

1 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

5. Erlang 连接 kafka

  主要参考这个 https://github.com/msdevanms/emqttd_plugin_kafka_bridge 

  增加依赖 https://github.com/helpshift/ekaf.git

  首先在Makefile增加

 

1 DEPS = eredis ecpool clique ekaf
2 dep_ekaf = git https://github.com/helpshift/ekaf master

 

  然后在rebar.config 增加

1 {ekaf, “.*”, {git, “https://github.com/helpshift/ekaf”, “master”}}

  在etc/emq_plugin_wunaozai.conf 中增加

1 ## kafka config
2 wunaozai.msg.kafka.server = 127.0.0.1:9092
3 wunaozai.msg.kafka.topic = test

  在priv/emq_plugin_wunaozai.schema 中增加

复制代码
 1 %% wunaozai.msg.kafka.server = 127.0.0.1:90922 {3     mapping,4     "wunaozai.msg.kafka.server",5     "emq_plugin_wunaozai.kafka",6     [7         {default, {"127.0.0.1", 9092}},8         {datatype, [integer, ip, string]}9     ]
10 }.
11 
12 %% wunaozai.msg.kafka.topic = test
13 {
14     mapping,
15     "wunaozai.msg.kafka.topic",
16     "emq_plugin_wunaozai.kafka",
17     [
18         {default, "test"},
19         {datatype, string},
20         hidden
21     ]
22 }.
23 
24 %% translation
25 {
26     translation,
27     "emq_plugin_wunaozai.kafka",
28     fun(Conf) ->
29             {RHost, RPort} = case cuttlefish:conf_get("wunaozai.msg.kafka.server", Conf) of
30                                  {Ip, Port} -> {Ip, Port};
31                                  S          -> case string:tokens(S, ":") of
32                                                    [Domain]       -> {Domain, 9092};
33                                                    [Domain, Port] -> {Domain, list_to_integer(Port)}
34                                                end
35                              end,
36             Topic = cuttlefish:conf_get("wunaozai.msg.kafka.topic", Conf),
37             [
38              {host, RHost},
39              {port, RPort},
40              {topic, Topic}
41             ]
42     end
43 }.
复制代码

6. 数据发往Kafka

  接下来,由于功能基本上是基于EMQ框架的Hook钩子设计,在EMQ接收到客户端上下线、主题订阅或消息发布确认时,触发钩子顺序执行回调函数,所以大部分功能在 src/emq_plugin_wunaozai.erl 文件进行修改。

复制代码
  1 -module(emq_plugin_wunaozai).2 3 -include("emq_plugin_wunaozai.hrl").4 5 -include_lib("emqttd/include/emqttd.hrl").6 7 -export([load/1, unload/0]).8 9 %% Hooks functions10 11 -export([on_client_connected/3, on_client_disconnected/3]).12 13 -export([on_client_subscribe/4, on_client_unsubscribe/4]).14 15 -export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).16 17 -export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).18 19 %% Called when the plugin application start20 load(Env) ->21     ekaf_init([Env]),22     emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),23     emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),24     emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),25     emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),26     emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),27     emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),28     emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),29     emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),30     emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),31     emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),32     emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]),33     io:format("start wunaozai Test Reload.~n", []).34 35 on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->36     io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),37     ekaf_send(<<"connected">>, ClientId, {}, _Env),38     {ok, Client}.39 40 on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->41     io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),42     ekaf_send(<<"disconnected">>, ClientId, {}, _Env),43     ok.44 45 on_client_subscribe(ClientId, Username, TopicTable, _Env) ->46     io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),47     {ok, TopicTable}.48     49 on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->50     io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),51     {ok, TopicTable}.52 53 on_session_created(ClientId, Username, _Env) ->54     io:format("session(~s/~s) created.", [ClientId, Username]).55 56 on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->57     io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),58     ekaf_send(<<"subscribed">>, ClientId, {Topic, Opts}, _Env),59     {ok, {Topic, Opts}}.60 61 on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->62     io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),63     ekaf_send(<<"unsubscribed">>, ClientId, {Topic, Opts}, _Env),64     ok.65 66 on_session_terminated(ClientId, Username, Reason, _Env) ->67     io:format("session(~s/~s) terminated: ~p.~n", [ClientId, Username, Reason]),68     stop.69 70 %% transform message and return71 on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->72     {ok, Message};73 on_message_publish(Message, _Env) ->74     io:format("publish ~s~n", [emqttd_message:format(Message)]),75     ekaf_send(<<"public">>, {}, Message, _Env),76     {ok, Message}.77 78 on_message_delivered(ClientId, Username, Message, _Env) ->79     io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),80     {ok, Message}.81 82 on_message_acked(ClientId, Username, Message, _Env) ->83     io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),84     {ok, Message}.85 86 %% Called when the plugin application stop87 unload() ->88     emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),89     emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),90     emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),91     emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),92     emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),93     emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),94     emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),95     emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),96     emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),97     emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),98     emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).99 
100 %% ==================== ekaf_init STA.===============================%%
101 ekaf_init(_Env) ->
102     % clique 方式读取配置文件
103     Env = application:get_env(?APP, kafka),
104     {ok, Kafka} = Env,
105     Host = proplists:get_value(host, Kafka),
106     Port = proplists:get_value(port, Kafka),
107     Broker = {Host, Port},
108     Topic = proplists:get_value(topic, Kafka),
109     io:format("~w ~w ~w ~n", [Host, Port, Topic]),
110 
111     % init kafka
112     application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
113     application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
114     application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
115     %application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}),
116     %application:set_env(ekaf, ekaf_bootstrap_topics, <<"test">>),
117 
118     %io:format("Init ekaf with ~s:~b~n", [Host, Port]),
119     %%ekaf:produce_async_batched(<<"test">>, list_to_binary(Json)),
120     ok.
121 %% ==================== ekaf_init END.===============================%%
122 
123 
124 %% ==================== ekaf_send STA.===============================%%
125 ekaf_send(Type, ClientId, {}, _Env) ->
126     Json = mochijson2:encode([
127                               {type, Type},
128                               {client_id, ClientId},
129                               {message, {}},
130                               {cluster_node, node()},
131                               {ts, emqttd_time:now_ms()}
132                              ]),
133     ekaf_send_sync(Json);
134 ekaf_send(Type, ClientId, {Reason}, _Env) ->
135     Json = mochijson2:encode([
136                               {type, Type},
137                               {client_id, ClientId},
138                               {cluster_node, node()},
139                               {message, Reason},
140                               {ts, emqttd_time:now_ms()}
141                              ]),
142     ekaf_send_sync(Json);
143 ekaf_send(Type, ClientId, {Topic, Opts}, _Env) ->
144     Json = mochijson2:encode([
145                               {type, Type},
146                               {client_id, ClientId},
147                               {cluster_node, node()},
148                               {message, [
149                                          {topic, Topic},
150                                          {opts, Opts}
151                                         ]},
152                               {ts, emqttd_time:now_ms()}
153                              ]),
154     ekaf_send_sync(Json);
155 ekaf_send(Type, _, Message, _Env) ->
156     Id = Message#mqtt_message.id,
157     From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的
158     Topic = Message#mqtt_message.topic,
159     Payload = Message#mqtt_message.payload,
160     Qos = Message#mqtt_message.qos,
161     Dup = Message#mqtt_message.dup,
162     Retain = Message#mqtt_message.retain,
163     Timestamp = Message#mqtt_message.timestamp,
164 
165     ClientId = c(From),
166     Username = u(From),
167 
168     Json = mochijson2:encode([
169                               {type, Type},
170                               {client_id, ClientId},
171                               {message, [
172                                          {username, Username},
173                                          {topic, Topic},
174                                          {payload, Payload},
175                                          {qos, i(Qos)},
176                                          {dup, i(Dup)},
177                                          {retain, i(Retain)}
178                                         ]},
179                               {cluster_node, node()},
180                               {ts, emqttd_time:now_ms()}
181                              ]),
182     ekaf_send_sync(Json).
183 
184 ekaf_send_async(Msg) ->
185     Topic = ekaf_get_topic(),
186     ekaf_send_async(Topic, Msg).
187 ekaf_send_async(Topic, Msg) ->
188     ekaf:produce_async_batched(list_to_binary(Topic), list_to_binary(Msg)).
189 ekaf_send_sync(Msg) ->
190     Topic = ekaf_get_topic(),
191     ekaf_send_sync(Topic, Msg).
192 ekaf_send_sync(Topic, Msg) ->
193     ekaf:produce_sync_batched(list_to_binary(Topic), list_to_binary(Msg)).
194 
195 i(true) -> 1;
196 i(false) -> 0;
197 i(I) when is_integer(I) -> I.
198 c({ClientId, Username}) -> ClientId;
199 c(From) -> From.
200 u({ClientId, Username}) -> Username;
201 u(From) -> From.
202 %% ==================== ekaf_send END.===============================%%
203 
204 
205 %% ==================== ekaf_set_host STA.===============================%%
206 ekaf_set_host(Host) ->
207     ekaf_set_host(Host, 9092).
208 ekaf_set_host(Host, Port) ->
209     Broker = {Host, Port},
210     application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
211     io:format("reset ekaf Broker ~s:~b ~n", [Host, Port]),
212     ok.
213 %% ==================== ekaf_set_host END.===============================%%
214 
215 %% ==================== ekaf_set_topic STA.===============================%%
216 ekaf_set_topic(Topic) ->
217     application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
218     ok.
219 ekaf_get_topic() ->
220     Env = application:get_env(?APP, kafka),
221     {ok, Kafka} = Env,
222     Topic = proplists:get_value(topic, Kafka),
223     Topic.
224 %% ==================== ekaf_set_topic END.===============================%%
复制代码

  上面是所有源代码,下面对其进行简单说明

  ekaf_init 函数,主要对配置文件的读取和解析并存放到application的环境变量中

  ekaf_send 函数,主要是封装成对应的JSON数据,然后发到Kafka中

  ekaf_send_async 函数,主要是异步发送JSON数据,不确保发往Kafka的顺序与Kafka消费者的接收时的顺序

  ekaf_send_sync 函数,是同步发送JSON数据,确保按照顺序发往kafka与Kafka消费者有序接收数据

  ekaf_set_host 函数,设置kafka的域名与端口

  ekaf_set_topic 函数,设置发往kafka时的主题

  ekaf_get_topic 函数,获取当前主题

  load函数增加ekaf_init调用

  剩下的在每个钩子回调中调用 ekaf_send函数

7. 测试

  (1)启动Zookeeper ./bin/zookeeper-server-start.sh config/zookeeper.properties

  (2)启动Kafka ./bin/kafka-server-start.sh config/server.properties

  (3)启动消费者 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

  (4)启动一个生产者 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  (5)启动EMQ ./_rel/emqttd/bin/emqttd console

  (6)打开MQTT客户端并连接、订阅、发布等操作

  (7)可以在消费者界面上看到获取到的信息

8. 插件源码

  最后给出本次插件开发的所有源码

  https://files.cnblogs.com/files/wunaozai/emq_plugin_wunaozai.zip

这篇关于物联网架构成长之路(8)-EMQ-Hook了解、连接Kafka发送消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1009166

相关文章

【Altium】查找PCB上未连接的网络

【更多软件使用问题请点击亿道电子官方网站】 1、文档目标: PCB设计后期检查中找出没有连接的网络 应用场景:PCB设计后期,需要检查是否所有网络都已连接布线。虽然未连接的网络会有飞线显示,但是由于布线后期整板布线密度较高,虚连,断连的网络用肉眼难以轻易发现。用DRC检查也可以找出未连接的网络,如果PCB中DRC问题较多,查找起来就不是很方便。使用PCB Filter面板来达成目的相比DRC

Java面试题:通过实例说明内连接、左外连接和右外连接的区别

在 SQL 中,连接(JOIN)用于在多个表之间组合行。最常用的连接类型是内连接(INNER JOIN)、左外连接(LEFT OUTER JOIN)和右外连接(RIGHT OUTER JOIN)。它们的主要区别在于它们如何处理表之间的匹配和不匹配行。下面是每种连接的详细说明和示例。 表示例 假设有两个表:Customers 和 Orders。 Customers CustomerIDCus

通信系统网络架构_2.广域网网络架构

1.概述          通俗来讲,广域网是将分布于相比局域网络更广区域的计算机设备联接起来的网络。广域网由通信子网于资源子网组成。通信子网可以利用公用分组交换网、卫星通信网和无线分组交换网构建,将分布在不同地区的局域网或计算机系统互连起来,实现资源子网的共享。 2.网络组成          广域网属于多级网络,通常由骨干网、分布网、接入网组成。在网络规模较小时,可仅由骨干网和接入网组成

Python利用qq邮箱发送通知邮件(已封装成model)

因为经常喜欢写一些脚本、爬虫之类的东西,有需要通知的时候,总是苦于没有太好的通知方式,虽然邮件相对于微信、短信来说,接收性差了一些,但毕竟免费,而且支持html直接渲染,所以,折腾了一个可以直接使用的sendemail模块。这里主要应用的是QQ发邮件,微信关注QQ邮箱后,也可以实时的接收到消息,肾好! 好了,废话不多说,直接上代码。 # encoding: utf-8import lo

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

神经网络第一篇:激活函数是连接感知机和神经网络的桥梁

前面发布的文章介绍了感知机,了解了感知机可以通过叠加层表示复杂的函数。遗憾的是,设定合适的、能符合预期的输入与输出的权重,是由人工进行的。从本章开始,将进入神经网络的学习,首先介绍激活函数,因为它是连接感知机和神经网络的桥梁。如果读者认知阅读了本专题知识,相信你必有收获。 感知机数学表达式的简化 前面我们介绍了用感知机接收两个输入信号的数学表示如下:

@ControllerAdvice:你可以没用过,但是不能不了解

1.概述 最近在梳理Spring MVC相关扩展点时发现了@ControllerAdvice这个注解,用于定义全局的异常处理、数据绑定、数据预处理等功能。通过使用 @ControllerAdvice,可以将一些与控制器相关的通用逻辑提取到单独的类中进行集中管理,从而减少代码重复,提升代码的可维护性。 定义如下 /*** Specialization of {@link Component @

响应式架构

介绍 响应式架构(Reactive Architecture)是一种面向服务和事件的系统设计方法,旨在提高系统的可扩展性、弹性和容错能力。它适用于构建分布式系统,特别是在云环境和微服务架构中。响应式架构的核心理念是通过事件驱动和数据流来实现各个组件之间的解耦,从而提高整个系统的响应能力和可靠性。 响应式架构的主要特点包括: 响应性:系统能够快速响应外部事件和内部变化,确保在各种负载和故障情

消息认证码解析

1. 什么是消息认证码         消息认证码(Message Authentication Code)是一种确认完整性并进行认证的技术,取三个单词的首字母,简称为MAC。         消息认证码的输入包括任意长度的消息和一个发送者与接收者之间共享的密钥,它可以输出固定长度的数据,这个数据称为MAC值。         根据任意长度的消息输出固定长度的数据,这一点和单向散列函数很类似