本文主要是介绍Apache Druid 数据摄取---本地数据和kafka流式数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Durid概述
Apache Druid是一个集时间序列数据库、数据仓库和全文检索系统特点于一体的分析性数据平台。本文将带你简单了解Druid的特性,使用场景,技术特点和架构。这将有助于你选型数据存储方案,深入了解Druid存储,深入了解时间序列存储等。
Apache Druid是一个高性能的实时分析型数据库。
上篇文章,我们了解了Druid的加载方式,
咱么主要说两种,一种是加载本地数据,一种是通过kafka加载流式数据。
数据摄取
4.1 加载本地文件
我们导入演示案例种的演示文件
4.1.1.1 数据选择
通过UI选择
local disk
并选择
Connect data
4.1.1.2 演示数据查看
演示数据在
quickstart/tutorial
目录下的wikiticker-2015-09-12-sampled.json.gz
文件
4.1.1.3 选择数据源
因为我们是通过
imply
安装的,在Base directory
输入绝对路径/usr/local/imply/imply-2021.05-1/dist/druid/quickstart/tutorial
,File filter
输入wikiticker-2015-09-12-sampled.json.gz
,并选择apply
应用配置,我们数据已经加载进来了
Base directory
和File filter
分开是因为可能需要同时从多个文件中摄取数据。
4.1.1.4 加载数据
数据定位后,您可以点击"Next: Parse data"来进入下一步。
数据加载器将尝试自动为数据确定正确的解析器。在这种情况下,它将成功确定
json
。可以随意使用不同的解析器选项来预览Druid如何解析您的数据。
4.1.2 数据源规范配置
4.1.2.1 设置时间列
json
选择器被选中后,点击Next:Parse time
进入下一步来决定您的主时间列。
Druid的体系结构需要一个主时间列(内部存储为名为_time
的列)。如果您的数据中没有时间戳,请选择 固定值(Constant Value)
。在我们的示例中,数据加载器将确定原始数据中的时间列是唯一可用作主时间列的候选者。
这里可以选择时间列,以及时间的显示方式
4.1.2.2 设置转换器
在这里可以新增虚拟列,将一个列的数据转换成另一个虚拟列,这里我们没有设置,直接跳过
4.1.2.3 设置过滤器
这里可以设置过滤器,对于某些数据可以不进行显示,这里我们也跳过
4.1.2.4 配置schema
在
Configure schema
步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。
4.1.2.5 配置Partition
一旦对schema满意后,点击
Next
后进入Partition
步骤,该步骤中可以调整数据如何划分为段文件的方式,因为我们数据量非常小,这里我们按照DAY
进行分段
4.1.3 提交任务
4.1.3.1 发布数据
点击完成
Tune
步骤,进入到Publish
步,在这里我们可以给我们的数据源命名,这里我们就命名为druid-sampled
,
点击下一步就可以查看我们的数据规范
这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。
4.1.3.2 提交任务
对摄取规范感到满意后,请单击
Submit
,然后将创建一个数据摄取任务。
您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。
当一项任务成功完成时,意味着它建立了一个或多个段,这些段现在将由Data服务器接收。
4.1.3.3 查看数据源
从标题导航到
Datasources
视图,一旦看到绿色(完全可用)圆圈,就可以查询数据源。此时,您可以转到Query
视图以对数据源运行SQL查询。
4.1.3.4 查询数据
可以转到查询页面进行数据查询,这里在sql窗口编写sql后点击运行就可以查询数据了
4.2 kafka加载流式数据
4.2.1 安装Kafka
这里我们使用
docker-compose
的方式启动kafka
4.2.1.1 编辑资源清单
vi docker-compose.yml
version: '2'
services:zookeeper:image: zookeepercontainer_name: zookeeperports: - 2181:2181kafka:image: wurstmeister/kafka ## 镜像volumes: - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)ports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.64.190 ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ## 卡夫卡运行是基于zookeeper的KAFKA_ADVERTISED_PORT: 9092KAFKA_LOG_RETENTION_HOURS: 120KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000KAFKA_NUM_PARTITIONS: 3KAFKA_DELETE_RETENTION_MS: 1000
4.2.2.2 启动容器
docker-compose up -ddocker-compose ps
4.2.3 验证kafka
启动kafka后需要验证kafka是否可用
4.2.3.1 登录容器
登录容器并进入指定目录
#进入容器
docker exec -it kafka_kafka_1 bash#进入 /opt/kafka_2.13-2.7.0/bin/ 目录下
cd /opt/kafka_2.13-2.7.0/bin/
4.2.3.2 发送消息
运行客户端发送消息,注意这里的连接地址需要写我们配置的宿主机地址
#运行kafka生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.64.173:9092 --topic test
发送的数据如下
{"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}
4.2.3.3 消费消息
运行消费者消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.64.173:9092 --topic test --from-beginning
有数据打印说明我们kafka安装是没有问题的
4.2.4 发送数据到kafka
4.2.4.1 编写代码
编写代码发送消息到kafka中
@Component
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送消息到kafka** @param topic 主题* @param message 内容体*/public void sendMsg(String topic, String message) {kafkaTemplate.send(topic, message);}
}
@RestController
@RequestMapping("/taxi")
public class KafkaController {@Autowiredprivate KafkaSender kafkaSender;@RequestMapping("/batchTask/{num}")public String batchAdd(@PathVariable("num") int num) {for (int i = 0; i < num; i++) {Message message = Utils.getRandomMessage();kafkaSender.sendMsg("message", JSON.toJSONString(message));}return "OK";}
}
4.2.4.2 发送消息
使用postman 发送消息到kafka,消息地址:http://localhost:8010/taxi/batchTask/10,消息数据如下
显示OK说明消息已经发送到了kafka中
4.2.5 数据选择
4.2.51 kafka数据查看
在load页面选择kafka,进行数据摄取模式选择
4.2.5.2 选择数据源
在这里输入ZK的地址以及需要选择数据的
topic
116.62.213.90:10903,116.62.213.90:10904
4.2.5.3 加载数据
点击
apply
应用配置,设置加载数据源
4.2.6 数据源规范配置
4.2.6.1 设置时间列
json
选择器被选中后,点击Next:Parse time
进入下一步来决定您的主时间列。
因为我们的时间列有两个创建时间以及打车时间,我们配置时间列为trvelDate
4.2.6.2 设置转换器
在这里可以新增虚拟列,将一个列的数据转换成另一个虚拟列,这里我们增加一个状态的虚拟列,来显示状态的中文名称我们定义 0:测试数据, 1:发起打车,2:排队中,3:司机接单,4:乘客上车,5:完成打车
我们使用case_simple
来实现判断功能,更多判断功能参考
case_simple(status,0,'测试数据',1,'发起打车',2,'排队中',3,'司机接单',4,'完成打车','状态错误')
在这里我们新建了一个
status_text
的虚拟列来展示需要中文显示的列
配置年龄默认值,如果为空我们设置为25
nvl(age,25)
配置性别设置,我们需要设置为男女,0:男,1:女,如果为null,我们设置为男
case_simple(nvl(sex,0),0,'男',1,'女','男')
4.2.6.3 设置过滤器
这里可以设置过滤器,对于某些数据不展示,这里我们使用
区间过滤器
选择显示status>=1
的数据,具体表达式可用参考
{"type" : "bound","dimension" : "status","ordering": "numeric","lower": "1",}
因为我们把数据是0的测试数据不显示了,所以只显示了一条数据为1的数据
4.2.6.4 配置schema
在
Configure schema
步骤中,您可以配置将哪些维度和指标摄入到Druid中,这些正是数据在被Druid中摄取后出现的样子。 由于我们的数据集非常小,关掉rollup、确认更改。
4.2.6.5 配置Partition
一旦对schema满意后,点击
Next
后进入Partition
步骤,该步骤中可以调整数据如何划分为段文件的方式,因为我们打车一般按照小时来算的,我们设置为分区为``hour
4.2.6.6 配置拉取方式
这里设置kafka的拉取方式,主要设置偏移量的一些配置
在 Tune
步骤中,将 Use earliest offset
设置为 True
非常重要,因为我们需要从流的开始位置消费数据。 其他没有任何需要更改的地方,进入到 Publish
步
4.5.7 提交任务
4.2.7.1 发布数据
点击完成
Tune
步骤,进入到Publish
步,在这里我们可以给我们的数据源命名,这里我们就命名为taxi-message
,
点击下一步就可以查看我们的数据规范
这就是您构建的规范,为了查看更改将如何更新规范是可以随意返回之前的步骤中进行更改,同样,您也可以直接编辑规范,并在前面的步骤中看到它。
4.2.7.2 提交任务
对摄取规范感到满意后,请单击
Submit
,然后将创建一个数据摄取任务。
您可以进入任务视图,重点关注新创建的任务。任务视图设置为自动刷新,请等待任务成功。
当一项任务成功完成时,意味着它建立了一个或多个段,这些段现在将由Data服务器接收。
4.2.7.3 查看数据源
从标题导航到
Datasources
视图,一旦看到绿色(完全可用)圆圈,就可以查询数据源。此时,您可以转到Query
视图以对数据源运行SQL查询。
4.2.7.4 查询数据
可以转到查询页面进行数据查询,这里在sql窗口编写sql后点击运行就可以查询数据了
4.2.7.5 动态添加数据
发送一条数据到kafka
druid 查询数据,发现新的数据已经进来了
4.2.8 清理数据
4.2.8.1 关闭集群
# 进入impl安装目录
cd /usr/local/imply/imply-2021.05-1
# 关闭集群
./bin/service --down
4.2.8.2 等待关闭服务
通过进程查看,查看服务是否已经关闭
ps -ef|grep druid
4.2.8.3 清理数据
通过删除druid软件包下的
var
目录的内容来重置集群状态
ll
rm -rf var
4.2.8.4 重新启动集群
nohup bin/supervise -c conf/supervise/quickstart.conf > logs/quickstart.log 2>&1 &
4.2.8.5 查看数据源
登录后查看数据源,我们发现已经被重置了
专注Java技术干货分享,欢迎志同道合的小伙伴,一起交流学习
这篇关于Apache Druid 数据摄取---本地数据和kafka流式数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!