使用 LF Edge eKuiper 将物联网流处理数据写入 Databend

2023-10-18 20:20

本文主要是介绍使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

LF Edge eKuiper

LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 的主要目标是在边缘端提供一个流媒体软件框架(类似于 Apache Flink (opens new window))。eKuiper 的规则引擎允许用户提供基于 SQL 或基于图形(类似于 Node-RED)的规则,在几分钟内创建物联网边缘分析应用。具体介绍可以参考 [LF Edge eKuiper - 超轻量物联网边缘流处理软件(https://ekuiper.org/docs/zh/latest/)。 

Databend Sql Sink

eKuiper 支持通过 Golang 或者 Python 在源 (Source)SQL 函数目标 (Sink) 三个方面的扩展,通过支持不同的 Sink,允许用户将分析结果发送到不同的扩展系统中。Databend 作为 Sink 也被集成到了 eKuiper plugin 当中,下面通过一个案例来展示如何使用 eKuiper 将物联网流处理数据写入 Databend。

编译 eKuiper 和 Databend Sql Plugin

eKuiper

git clone https://github.com/lf-edge/ekuiper & cd ekuiper
make

Databend Sql Plugin

go build -trimpath --buildmode=plugin -tags databend -o plugins/sinks/Sql.so extensions/sinks/sql/sql.go

编译后的 sink plugin 拷贝到 build 目录:

cp plugins/sinks/Sql.so _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64/plugins/sinks

Databend 建表

在 Databend 中先创建目标表 ekuiper_test:

create table ekuiper_test (name string,size bigint,id bigint);

启动 eKuiperd

cd _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64 
./bin/kuiperd

服务正常启动:

创建流(stream) 和 规则 (rule)

eKuiper 提供了两种管理各种流、规则,目标端的方式,一种是通过 ekuiper-manager 的 [docker image](https://hub.docker.com/r/lfedge/ekuiper) 启动可视化管理界面,一种是通过 CLI 工具来管理。这里我们使用 CLI。

创建 stream

流是 eKuiper 中数据源连接器的运行形式。它必须指定一个源类型来定义如何连接到外部资源。这里我们创建一个流,从 json 文件数据源中获取数据,并发送到 eKuiper 中。

首先配置文件数据源,连接器的配置文件位于 /etc/sources/file.yaml

default:# 文件的类型,支持 json, csv 和 linesfileType: json# 文件以 eKuiper 为根目录的目录或文件的绝对路径。# 请勿在此处包含文件名。文件名应在流数据源中定义path: data# 读取文件的时间间隔,单位为ms。如果只读取一次,则将其设置为 0interval: 0# 读取后,两条数据发送的间隔时间sendInterval: 0# 是否并行读取目录中的文件parallel: false# 文件读取后的操作# 0: 文件保持不变# 1: 删除文件# 2: 移动文件到 moveTo 定义的位置actionAfterRead: 0# 移动文件的位置, 仅用于 actionAfterRead 为 2 的情况moveTo: /tmp/kuiper/moved# 是否包含文件头,多用于 csv。若为 true,则第一行解析为文件头。hasHeader: false# 定义文件的列。如果定义了文件头,该选项将被覆盖。# columns: [id, name]# 忽略开头多少行的内容。ignoreStartLines: 0# 忽略结尾多少行的内容。最后的空行不计算在内。ignoreEndLines: 0# 使用指定的压缩方法解压缩文件。现在支持`gzip`、`zstd` 方法。decompression: ""

使用 CLI 创建 steam 名为 stream1:

./bin/kuiper create stream stream1 '(id BIGINT, name STRING,size BIGINT) WITH (DATASOURCE="test.json", FORMAT="json", TYPE="file");'

Json 文件的内容为:

[{"id": 1,"size":100, "name": "John Doe"},{"id": 2,"size":200, "name": "Jane Smith"},{"id": 3,"size":300, "name": "Kobe Brant"},{"id": 4,"size":400, "name": "Alen Iverson"}
]

创建 Databend Sink Rule

一个规则代表了一个流处理流程,定义了从将数据输入流的数据源到各种处理逻辑,再到将数据输入到外部系统的动作。eKuiper 有两种方法来定义规则的业务逻辑。要么使用 SQL / 动作组合,要么使用新增加的图 API。

这里我们通过指定 sql 和 actions 属性,以声明的方式定义规则的业务逻辑。其中,sql 定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 action 路由到多个位置。

规则由 JSON 定义,下面是准备创建的规则 myRule.json:

{"id": "myRule","sql": "SELECT id, name from stream1","actions": [{"log": {},"sql": {"url": "databend://databend:databend@localhost:8000/default?sslmode=disable","table": "ekuiper_test","fields": ["id","name"]}}]
}

执行 CLI 创建规则:

./bin/kuiper create rule myRule -f myRule.json

可以查看所创建规则的运行状态:

./bin/kuiper getstatus rule myRule

规则创建后,会立即将符合规则条件的数据发送到目标端,此时我们查看 Databend 的 ekuiper_test 表,可以看到文件数据源中的数据已经被写入到 Databend:

可以看到由于我们的规则 SQL 中只指定了 idname 字段,所以这里只有这两个字段被写入。

结论

eKuiper 是 EMQ 旗下的一款流处理软件,其体积小、功能强大,在工业物联网、车辆网、公共数据分析等很多场景中得到广泛使用。本文介绍如何使用 eKuiper 将物联网流处理数据写入 Databend。

这篇关于使用 LF Edge eKuiper 将物联网流处理数据写入 Databend的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/234957

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件