Apache Druid 数据摄取---本地数据和kafka流式数据

2023-11-07 01:59

本文主要是介绍Apache Druid 数据摄取---本地数据和kafka流式数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Durid概述

Apache Druid是一个集时间序列数据库、数据仓库和全文检索系统特点于一体的分析性数据平台。本文将带你简单了解Druid的特性,使用场景,技术特点和架构。这将有助于你选型数据存储方案,深入了解Druid存储,深入了解时间序列存储等。

Apache Druid是一个高性能的实时分析型数据库。

上篇文章,我们了解了Druid的加载方式,

咱么主要说两种,一种是加载本地数据,一种是通过kafka加载流式数据。

数据摄取

4.1 加载本地文件

我们导入演示案例种的演示文件

4.1.1.1 数据选择

通过UI选择local disk

file

并选择Connect data

file

4.1.1.2 演示数据查看

演示数据在quickstart/tutorial目录下的wikiticker-2015-09-12-sampled.json.gz文件 file

4.1.1.3 选择数据源

因为我们是通过imply安装的,在Base directory输入绝对路径/usr/local/imply/imply-2021.05-1/dist/druid/quickstart/tutorial,File filter输入wikiticker-2015-09-12-sampled.json.gz,并选择apply应用配置,我们数据已经加载进来了

file

Base directoryFile filter 分开是因为可能需要同时从多个文件中摄取数据。

4.1.1.4 加载数据

数据定位后,您可以点击"Next: Parse data"来进入下一步。

file

数据加载器将尝试自动为数据确定正确的解析器。在这种情况下,它将成功确定json。可以随意使用不同的解析器选项来预览Druid如何解析您的数据。

4.1.2 数据源规范配置
4.1.2.1 设置时间列

json 选择器被选中后,点击 Next:Parse time 进入下一步来决定您的主时间列。

​ Druid的体系结构需要一个主时间列(内部存储为名为_time的列)。如果您的数据中没有时间戳,请选择 固定值(Constant Value) 。在我们的示例中,数据加载器将确定原始数据中的时间列是唯一可用作主时间列的候选者。

这里可以选择时间列,以及时间的显示方式

file

4.1.2.2 设置转换器

在这里可以新增虚拟列,将一个列的数据转换成另一个虚拟列,这里我们没有设置,直接跳过

file

4.1.2.3 设置过滤器

这里可以设置过滤器,对于某些数据可以不进行显示,这里我们也跳过

file

4.1.2.4 配置schema

Configure schema 步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。

file

4.1.2.5 配置Partition

一旦对schema满意后,点击 Next 后进入 Partition 步骤,该步骤中可以调整数据如何划分为段文件的方式,因为我们数据量非常小,这里我们按照DAY进行分段

file

4.1.3 提交任务
4.1.3.1 发布数据

点击完成 Tune 步骤,进入到 Publish 步,在这里我们可以给我们的数据源命名,这里我们就命名为druid-sampled

file

点击下一步就可以查看我们的数据规范

file

​ 这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。

4.1.3.2 提交任务

对摄取规范感到满意后,请单击 Submit,然后将创建一个数据摄取任务。

您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。

file

当一项任务成功完成时,意味着它建立了一个或多个段,这些段现在将由Data服务器接收。

4.1.3.3 查看数据源

从标题导航到 Datasources 视图,一旦看到绿色(完全可用)圆圈,就可以查询数据源。此时,您可以转到 Query 视图以对数据源运行SQL查询。

file

4.1.3.4 查询数据

可以转到查询页面进行数据查询,这里在sql窗口编写sql后点击运行就可以查询数据了

file

4.2 kafka加载流式数据
4.2.1 安装Kafka

这里我们使用docker-compose的方式启动kafka

4.2.1.1 编辑资源清单
vi docker-compose.yml
version: '2'
services:zookeeper:image: zookeepercontainer_name: zookeeperports: - 2181:2181kafka:image: wurstmeister/kafka       ## 镜像volumes: - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)ports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.64.190   ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       ## 卡夫卡运行是基于zookeeper的KAFKA_ADVERTISED_PORT: 9092KAFKA_LOG_RETENTION_HOURS: 120KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000KAFKA_NUM_PARTITIONS: 3KAFKA_DELETE_RETENTION_MS: 1000
4.2.2.2 启动容器
docker-compose up -ddocker-compose ps

file

4.2.3 验证kafka

启动kafka后需要验证kafka是否可用

4.2.3.1 登录容器

登录容器并进入指定目录

#进入容器
docker exec -it kafka_kafka_1 bash#进入 /opt/kafka_2.13-2.7.0/bin/ 目录下
cd /opt/kafka_2.13-2.7.0/bin/

file

4.2.3.2 发送消息

运行客户端发送消息,注意这里的连接地址需要写我们配置的宿主机地址

#运行kafka生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.64.173:9092 --topic test

发送的数据如下

{"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}

file

4.2.3.3 消费消息

运行消费者消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.64.173:9092 --topic test --from-beginning

file

有数据打印说明我们kafka安装是没有问题的

4.2.4 发送数据到kafka
4.2.4.1 编写代码

编写代码发送消息到kafka中

@Component
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到kafka** @param topic   主题* @param message 内容体*/public void sendMsg(String topic, String message) {kafkaTemplate.send(topic, message);}
}
@RestController
@RequestMapping("/taxi")
public class KafkaController {@Autowiredprivate KafkaSender kafkaSender;@RequestMapping("/batchTask/{num}")public String batchAdd(@PathVariable("num") int num) {for (int i = 0; i < num; i++) {Message message = Utils.getRandomMessage();kafkaSender.sendMsg("message", JSON.toJSONString(message));}return "OK";}
}
4.2.4.2 发送消息

使用postman 发送消息到kafka,消息地址:http://localhost:8010/taxi/batchTask/10,消息数据如下

file

显示OK说明消息已经发送到了kafka中

4.2.5 数据选择
4.2.51 kafka数据查看

在load页面选择kafka,进行数据摄取模式选择

file

4.2.5.2 选择数据源

在这里输入ZK的地址以及需要选择数据的topic

116.62.213.90:10903,116.62.213.90:10904

file

4.2.5.3 加载数据

点击apply应用配置,设置加载数据源

file

4.2.6 数据源规范配置
4.2.6.1 设置时间列

json 选择器被选中后,点击 Next:Parse time 进入下一步来决定您的主时间列。

​ 因为我们的时间列有两个创建时间以及打车时间,我们配置时间列为trvelDate

file

4.2.6.2 设置转换器

在这里可以新增虚拟列,将一个列的数据转换成另一个虚拟列,这里我们增加一个状态的虚拟列,来显示状态的中文名称我们定义 0:测试数据, 1:发起打车,2:排队中,3:司机接单,4:乘客上车,5:完成打车

我们使用case_simple来实现判断功能,更多判断功能参考

case_simple(status,0,'测试数据',1,'发起打车',2,'排队中',3,'司机接单',4,'完成打车','状态错误')

在这里我们新建了一个status_text的虚拟列来展示需要中文显示的列

file

配置年龄默认值,如果为空我们设置为25

nvl(age,25)

file

配置性别设置,我们需要设置为男女,0:男,1:女,如果为null,我们设置为男

case_simple(nvl(sex,0),0,'男',1,'女','男')

file

4.2.6.3 设置过滤器

这里可以设置过滤器,对于某些数据不展示,这里我们使用区间过滤器选择显示status>=1的数据,具体表达式可用参考

 {"type" : "bound","dimension" : "status","ordering": "numeric","lower": "1",}

因为我们把数据是0的测试数据不显示了,所以只显示了一条数据为1的数据

file

4.2.6.4 配置schema

Configure schema 步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。

file

4.2.6.5 配置Partition

一旦对schema满意后,点击 Next 后进入 Partition 步骤,该步骤中可以调整数据如何划分为段文件的方式,因为我们打车一般按照小时来算的,我们设置为分区为``hour

file

4.2.6.6 配置拉取方式

这里设置kafka的拉取方式,主要设置偏移量的一些配置

​ 在 Tune 步骤中,将 Use earliest offset 设置为 True 非常重要,因为我们需要从流的开始位置消费数据。 其他没有任何需要更改的地方,进入到 Publish

file

4.5.7 提交任务
4.2.7.1 发布数据

点击完成 Tune 步骤,进入到 Publish 步,在这里我们可以给我们的数据源命名,这里我们就命名为taxi-messagefile

点击下一步就可以查看我们的数据规范

file

​ 这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。

4.2.7.2 提交任务

对摄取规范感到满意后,请单击 Submit,然后将创建一个数据摄取任务。

您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。

file

当一项任务成功完成时,意味着它建立了一个或多个段,这些段现在将由Data服务器接收。

4.2.7.3 查看数据源

从标题导航到 Datasources 视图,一旦看到绿色(完全可用)圆圈,就可以查询数据源。此时,您可以转到 Query 视图以对数据源运行SQL查询。

file

4.2.7.4 查询数据

可以转到查询页面进行数据查询,这里在sql窗口编写sql后点击运行就可以查询数据了

file

4.2.7.5 动态添加数据

发送一条数据到kafka

file

druid 查询数据,发现新的数据已经进来了

file

4.2.8 清理数据
4.2.8.1 关闭集群
# 进入impl安装目录
cd /usr/local/imply/imply-2021.05-1
# 关闭集群
./bin/service --down

file

4.2.8.2 等待关闭服务

通过进程查看,查看服务是否已经关闭

 ps -ef|grep druid

file

4.2.8.3 清理数据

通过删除druid软件包下的var目录的内容来重置集群状态

ll
rm -rf var

file

4.2.8.4 重新启动集群
 nohup bin/supervise -c conf/supervise/quickstart.conf > logs/quickstart.log 2>&1 &
4.2.8.5 查看数据源

登录后查看数据源,我们发现已经被重置了

file

专注Java技术干货分享,欢迎志同道合的小伙伴,一起交流学习

这篇关于Apache Druid 数据摄取---本地数据和kafka流式数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360501

相关文章

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da

Python实现数据清洗的18种方法

《Python实现数据清洗的18种方法》本文主要介绍了Python实现数据清洗的18种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录1. 去除字符串两边空格2. 转换数据类型3. 大小写转换4. 移除列表中的重复元素5. 快速统