Apache Druid 数据摄取---本地数据和kafka流式数据

2023-11-07 01:59

本文主要是介绍Apache Druid 数据摄取---本地数据和kafka流式数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Durid概述

Apache Druid是一个集时间序列数据库、数据仓库和全文检索系统特点于一体的分析性数据平台。本文将带你简单了解Druid的特性,使用场景,技术特点和架构。这将有助于你选型数据存储方案,深入了解Druid存储,深入了解时间序列存储等。

Apache Druid是一个高性能的实时分析型数据库。

上篇文章,我们了解了Druid的加载方式,

咱么主要说两种,一种是加载本地数据,一种是通过kafka加载流式数据。

数据摄取

4.1 加载本地文件

我们导入演示案例种的演示文件

4.1.1.1 数据选择

通过UI选择local disk

file

并选择Connect data

file

4.1.1.2 演示数据查看

演示数据在quickstart/tutorial目录下的wikiticker-2015-09-12-sampled.json.gz文件 file

4.1.1.3 选择数据源

因为我们是通过imply安装的,在Base directory输入绝对路径/usr/local/imply/imply-2021.05-1/dist/druid/quickstart/tutorial,File filter输入wikiticker-2015-09-12-sampled.json.gz,并选择apply应用配置,我们数据已经加载进来了

file

Base directoryFile filter 分开是因为可能需要同时从多个文件中摄取数据。

4.1.1.4 加载数据

数据定位后,您可以点击"Next: Parse data"来进入下一步。

file

数据加载器将尝试自动为数据确定正确的解析器。在这种情况下,它将成功确定json。可以随意使用不同的解析器选项来预览Druid如何解析您的数据。

4.1.2 数据源规范配置
4.1.2.1 设置时间列

json 选择器被选中后,点击 Next:Parse time 进入下一步来决定您的主时间列。

​ Druid的体系结构需要一个主时间列(内部存储为名为_time的列)。如果您的数据中没有时间戳,请选择 固定值(Constant Value) 。在我们的示例中,数据加载器将确定原始数据中的时间列是唯一可用作主时间列的候选者。

这里可以选择时间列,以及时间的显示方式

file

4.1.2.2 设置转换器

在这里可以新增虚拟列,将一个列的数据转换成另一个虚拟列,这里我们没有设置,直接跳过

file

4.1.2.3 设置过滤器

这里可以设置过滤器,对于某些数据可以不进行显示,这里我们也跳过

file

4.1.2.4 配置schema

Configure schema 步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。

file

4.1.2.5 配置Partition

一旦对schema满意后,点击 Next 后进入 Partition 步骤,该步骤中可以调整数据如何划分为段文件的方式,因为我们数据量非常小,这里我们按照DAY进行分段

file

4.1.3 提交任务
4.1.3.1 发布数据

点击完成 Tune 步骤,进入到 Publish 步,在这里我们可以给我们的数据源命名,这里我们就命名为druid-sampled

file

点击下一步就可以查看我们的数据规范

file

​ 这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。

4.1.3.2 提交任务

对摄取规范感到满意后,请单击 Submit,然后将创建一个数据摄取任务。

您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。

file

当一项任务成功完成时,意味着它建立了一个或多个段,这些段现在将由Data服务器接收。

4.1.3.3 查看数据源

从标题导航到 Datasources 视图,一旦看到绿色(完全可用)圆圈,就可以查询数据源。此时,您可以转到 Query 视图以对数据源运行SQL查询。

file

4.1.3.4 查询数据

可以转到查询页面进行数据查询,这里在sql窗口编写sql后点击运行就可以查询数据了

file

4.2 kafka加载流式数据
4.2.1 安装Kafka

这里我们使用docker-compose的方式启动kafka

4.2.1.1 编辑资源清单
vi docker-compose.yml
version: '2'
services:zookeeper:image: zookeepercontainer_name: zookeeperports: - 2181:2181kafka:image: wurstmeister/kafka       ## 镜像volumes: - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)ports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.64.190   ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       ## 卡夫卡运行是基于zookeeper的KAFKA_ADVERTISED_PORT: 9092KAFKA_LOG_RETENTION_HOURS: 120KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000KAFKA_NUM_PARTITIONS: 3KAFKA_DELETE_RETENTION_MS: 1000
4.2.2.2 启动容器
docker-compose up -ddocker-compose ps

file

4.2.3 验证kafka

启动kafka后需要验证kafka是否可用

4.2.3.1 登录容器

登录容器并进入指定目录

#进入容器
docker exec -it kafka_kafka_1 bash#进入 /opt/kafka_2.13-2.7.0/bin/ 目录下
cd /opt/kafka_2.13-2.7.0/bin/

file

4.2.3.2 发送消息

运行客户端发送消息,注意这里的连接地址需要写我们配置的宿主机地址

#运行kafka生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.64.173:9092 --topic test

发送的数据如下

{"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}

file

4.2.3.3 消费消息

运行消费者消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.64.173:9092 --topic test --from-beginning

file

有数据打印说明我们kafka安装是没有问题的

4.2.4 发送数据到kafka
4.2.4.1 编写代码

编写代码发送消息到kafka中

@Component
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到kafka** @param topic   主题* @param message 内容体*/public void sendMsg(String topic, String message) {kafkaTemplate.send(topic, message);}
}
@RestController
@RequestMapping("/taxi")
public class KafkaController {@Autowiredprivate KafkaSender kafkaSender;@RequestMapping("/batchTask/{num}")public String batchAdd(@PathVariable("num") int num) {for (int i = 0; i < num; i++) {Message message = Utils.getRandomMessage();kafkaSender.sendMsg("message", JSON.toJSONString(message));}return "OK";}
}
4.2.4.2 发送消息

使用postman 发送消息到kafka,消息地址:http://localhost:8010/taxi/batchTask/10,消息数据如下

file

显示OK说明消息已经发送到了kafka中

4.2.5 数据选择
4.2.51 kafka数据查看

在load页面选择kafka,进行数据摄取模式选择

file

4.2.5.2 选择数据源

在这里输入ZK的地址以及需要选择数据的topic

116.62.213.90:10903,116.62.213.90:10904

file

4.2.5.3 加载数据

点击apply应用配置,设置加载数据源

file

4.2.6 数据源规范配置
4.2.6.1 设置时间列

json 选择器被选中后,点击 Next:Parse time 进入下一步来决定您的主时间列。

​ 因为我们的时间列有两个创建时间以及打车时间,我们配置时间列为trvelDate

file

4.2.6.2 设置转换器

在这里可以新增虚拟列,将一个列的数据转换成另一个虚拟列,这里我们增加一个状态的虚拟列,来显示状态的中文名称我们定义 0:测试数据, 1:发起打车,2:排队中,3:司机接单,4:乘客上车,5:完成打车

我们使用case_simple来实现判断功能,更多判断功能参考

case_simple(status,0,'测试数据',1,'发起打车',2,'排队中',3,'司机接单',4,'完成打车','状态错误')

在这里我们新建了一个status_text的虚拟列来展示需要中文显示的列

file

配置年龄默认值,如果为空我们设置为25

nvl(age,25)

file

配置性别设置,我们需要设置为男女,0:男,1:女,如果为null,我们设置为男

case_simple(nvl(sex,0),0,'男',1,'女','男')

file

4.2.6.3 设置过滤器

这里可以设置过滤器,对于某些数据不展示,这里我们使用区间过滤器选择显示status>=1的数据,具体表达式可用参考

 {"type" : "bound","dimension" : "status","ordering": "numeric","lower": "1",}

因为我们把数据是0的测试数据不显示了,所以只显示了一条数据为1的数据

file

4.2.6.4 配置schema

Configure schema 步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。

file

4.2.6.5 配置Partition

一旦对schema满意后,点击 Next 后进入 Partition 步骤,该步骤中可以调整数据如何划分为段文件的方式,因为我们打车一般按照小时来算的,我们设置为分区为``hour

file

4.2.6.6 配置拉取方式

这里设置kafka的拉取方式,主要设置偏移量的一些配置

​ 在 Tune 步骤中,将 Use earliest offset 设置为 True 非常重要,因为我们需要从流的开始位置消费数据。 其他没有任何需要更改的地方,进入到 Publish

file

4.5.7 提交任务
4.2.7.1 发布数据

点击完成 Tune 步骤,进入到 Publish 步,在这里我们可以给我们的数据源命名,这里我们就命名为taxi-messagefile

点击下一步就可以查看我们的数据规范

file

​ 这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。

4.2.7.2 提交任务

对摄取规范感到满意后,请单击 Submit,然后将创建一个数据摄取任务。

您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。

file

当一项任务成功完成时,意味着它建立了一个或多个段,这些段现在将由Data服务器接收。

4.2.7.3 查看数据源

从标题导航到 Datasources 视图,一旦看到绿色(完全可用)圆圈,就可以查询数据源。此时,您可以转到 Query 视图以对数据源运行SQL查询。

file

4.2.7.4 查询数据

可以转到查询页面进行数据查询,这里在sql窗口编写sql后点击运行就可以查询数据了

file

4.2.7.5 动态添加数据

发送一条数据到kafka

file

druid 查询数据,发现新的数据已经进来了

file

4.2.8 清理数据
4.2.8.1 关闭集群
# 进入impl安装目录
cd /usr/local/imply/imply-2021.05-1
# 关闭集群
./bin/service --down

file

4.2.8.2 等待关闭服务

通过进程查看,查看服务是否已经关闭

 ps -ef|grep druid

file

4.2.8.3 清理数据

通过删除druid软件包下的var目录的内容来重置集群状态

ll
rm -rf var

file

4.2.8.4 重新启动集群
 nohup bin/supervise -c conf/supervise/quickstart.conf > logs/quickstart.log 2>&1 &
4.2.8.5 查看数据源

登录后查看数据源,我们发现已经被重置了

file

专注Java技术干货分享,欢迎志同道合的小伙伴,一起交流学习

这篇关于Apache Druid 数据摄取---本地数据和kafka流式数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360501

相关文章

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

C# LiteDB处理时间序列数据的高性能解决方案

《C#LiteDB处理时间序列数据的高性能解决方案》LiteDB作为.NET生态下的轻量级嵌入式NoSQL数据库,一直是时间序列处理的优选方案,本文将为大家大家简单介绍一下LiteDB处理时间序列数... 目录为什么选择LiteDB处理时间序列数据第一章:LiteDB时间序列数据模型设计1.1 核心设计原则

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装

使用Spring Cache本地缓存示例代码

《使用SpringCache本地缓存示例代码》缓存是提高应用程序性能的重要手段,通过将频繁访问的数据存储在内存中,可以减少数据库访问次数,从而加速数据读取,:本文主要介绍使用SpringCac... 目录一、Spring Cache简介核心特点:二、基础配置1. 添加依赖2. 启用缓存3. 缓存配置方案方案

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十