Debezium日常分享系列之:Debezium and TimescaleDB

2024-01-16 18:28

本文主要是介绍Debezium日常分享系列之:Debezium and TimescaleDB,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Debezium日常分享系列之:Debezium and TimescaleDB

  • 一、TimescaleDB
  • 二、完整案例
  • 三、Hypertables
  • 四、Continuous aggregates
  • 五、Compression
  • 六、结论

一、TimescaleDB

TimescaleDB 是一个开源数据库,旨在使 SQL 对于时间序列数据具有可扩展性。它是作为 PostgreSQL 数据库的扩展实现的。这一事实促使我们重新使用标准 Debezium PostgreSQL 连接器,并将 TimescaleDB 支持实现为单个消息转换 (SMT)。

TimescaleDB 提供了三个基本构建块/概念:

  • Hypertables
  • Continuous aggregates
  • Compression

描述实例定义的元数据(目录)和原始数据通常存储在 _timescaledb_internal_schema 中。TimescaleDb SMT 连接到数据库并读取和处理元数据。然后,从数据库读取的原始消息会使用存储在 Kafka Connect 标头中的元数据进行丰富,从而创建物理数据和 TimescaleDB 逻辑结构之间的关系。

二、完整案例

Debezium 示例存储库包含基于 Docker Compose 的部署,该部署提供了完整的环境来演示 TimescaleDB 集成。

第一步,开始部署

$ docker-compose -f docker-compose-timescaledb.yaml up --build

该命令将启动 Debezium(Zookeeper、Kafka、Kafka Connect)和源 TimescaleDB 数据库。

启动的数据库已准备好以下数据库对象:

  • 将温度和湿度测量值表示为时间序列数据的超稳定条件;使用 DDL
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')
  • 测量数据的单一记录
INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)

PostgreSQL 出版物用于将时间序列数据发布到复制槽中,因为演示使用 pgoutput 解码插件

CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')

下一步需要注册 Debezium PostgreSQL 连接器以捕获数据库中的更改

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml

注册请求文件与常规文件不同,增加了这些行

{"name": "inventory-connector","config": {
..."schema.include.list": "_timescaledb_internal","transforms": "timescaledb","transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb","transforms.timescaledb.database.hostname": "timescaledb","transforms.timescaledb.database.port": "5432","transforms.timescaledb.database.user": "postgres","transforms.timescaledb.database.password": "postgres","transforms.timescaledb.database.dbname": "postgres"}
}

三、Hypertables

连接器将捕获内部 TimescaleDB 架构以及包含原始数据的物理表,并且将应用 TimescaleDb SMT 来丰富消息并根据逻辑名称将它们路由到正确命名的主题。 SMT 配置选项包含连接到数据库所需的信息。在这种情况下,条件超表将物理存储在 _timescaledb_internal._hyper_1_1_chunk 中,并且当由 SMT 处理时,它将重新路由到根据固定配置的前缀 timescaledb 和逻辑名称 public.conditions 命名的 timescaledb.public.conditions 主题符合超表名称。

让我们在表中添加更多测量值

 docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);

并读取捕获的主题消息(在命令中启用打印密钥和标题)

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--property print.headers=true \--topic timescaledb.public.conditions

这些消息包含两个标头 debezium_timescaledb_chunk_table:_hyper_1_1_chunk、debezium_timescaledb_chunk_schema:_timescaledb_internal,它们描述了逻辑超表名称与从中捕获它们的物理源表之间的映射。

四、Continuous aggregates

连续聚合对存储在超表中的数据提供自动统计计算。聚合被定义为物化视图,由其自己的超表支持,而超表又由一组物理表支持。重新计算聚合后(手动或自动),新值将存储在超表中,可以从中捕获和流式传输这些值。连接器捕获物理表中的新值,SMT 通过将物理目标重新映射回聚合逻辑名称来再次解决路由问题。还添加了带有原始超表和物理表名称的 Kafka Connect 标头。

让我们创建一个名为conditions_summary的连续聚合,用于计算每个位置和时间间隔的平均、最低和最高温度

postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) ASSELECTlocation,time_bucket(INTERVAL '1 hour', time) AS bucket,AVG(temperature),MAX(temperature),MIN(temperature)FROM conditionsGROUP BY location, bucket;

并阅读捕获的主题消息

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--property print.headers=true \--topic timescaledb.public.conditions_summary

这些消息包含两个标头 debezium_timescaledb_hypertable_table:_materialized_hypertable_2,debezium_timescaledb_hypertable_schema:_timescaledb_internal 公开哪个支持超表用于存储聚合,以及两个附加标头 debezium_timescaledb_chunk_table:_hyper_2_2_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal 公开存储聚合的物理表。

`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.

如果添加新的测量并触发聚合重新计算,则更新的聚合将发送到主题

postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);

看起来像

{"schema":{
...},"payload":{"before":null,"after":{"location":"Ostrava","bucket":"2024-01-09T13:00:00.000000Z","avg":10.0,"max":10.0,"min":10.0},"source":{"version":"2.5.0.Final","connector":"postgresql","name":"dbserver1","ts_ms":1704806938840,"snapshot":"false","db":"postgres","sequence":"[\"29727872\",\"29728440\"]","schema":"public","table":"conditions_summary","txId":764,"lsn":29728440,"xmin":null},"op":"c","ts_ms":1704806939163,"transaction":null}
}

因此,该主题包含针对两个不同位置计算的两条或多条消息。

五、Compression

TimescaleDB SMT 不会增强压缩数据块(物理表记录),而只是将其作为存储在超表中的副产品。压缩后的数据被捕获并存储在 Kafka 主题中。通常,带有压缩块的消息会被丢弃,并且不会被管道中的后续作业处理。

让我们为超表启用压缩并压缩它

postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');show_chunks
----------------------------------------_timescaledb_internal._hyper_1_1_chunk
(1 row)postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');

消息写入 timescaledb._timescaledb_internal._compressed_hypertable_3。

停止服务

docker-compose -f docker-compose-timescaledb.yaml down

六、结论

在这篇文章中,我们演示了从 TimescaleDB 时间序列数据库捕获数据以及通过 TimescaleDb SMT 对其进行处理。我们已经展示了如何根据作为数据源的超表和连续聚合来路由和丰富消息。

深入了解Debezium请阅读博主专栏:

  • Debezium专栏

这篇关于Debezium日常分享系列之:Debezium and TimescaleDB的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/613500

相关文章

Java嵌套for循环优化方案分享

《Java嵌套for循环优化方案分享》介绍了Java中嵌套for循环的优化方法,包括减少循环次数、合并循环、使用更高效的数据结构、并行处理、预处理和缓存、算法优化、尽量减少对象创建以及本地变量优化,通... 目录Java 嵌套 for 循环优化方案1. 减少循环次数2. 合并循环3. 使用更高效的数据结构4

Python中常用的四种取整方式分享

《Python中常用的四种取整方式分享》在数据处理和数值计算中,取整操作是非常常见的需求,Python提供了多种取整方式,本文为大家整理了四种常用的方法,希望对大家有所帮助... 目录引言向零取整(Truncate)向下取整(Floor)向上取整(Ceil)四舍五入(Round)四种取整方式的对比综合示例应

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

linux进程D状态的解决思路分享

《linux进程D状态的解决思路分享》在Linux系统中,进程在内核模式下等待I/O完成时会进入不间断睡眠状态(D状态),这种状态下,进程无法通过普通方式被杀死,本文通过实验模拟了这种状态,并分析了如... 目录1. 问题描述2. 问题分析3. 实验模拟3.1 使用losetup创建一个卷作为pv的磁盘3.

MySQL8.2.0安装教程分享

《MySQL8.2.0安装教程分享》这篇文章详细介绍了如何在Windows系统上安装MySQL数据库软件,包括下载、安装、配置和设置环境变量的步骤... 目录mysql的安装图文1.python访问网址2javascript.点击3.进入Downloads向下滑动4.选择Community Server5.

CentOS系统Maven安装教程分享

《CentOS系统Maven安装教程分享》本文介绍了如何在CentOS系统中安装Maven,并提供了一个简单的实际应用案例,安装Maven需要先安装Java和设置环境变量,Maven可以自动管理项目的... 目录准备工作下载并安装Maven常见问题及解决方法实际应用案例总结Maven是一个流行的项目管理工具

10个Python自动化办公的脚本分享

《10个Python自动化办公的脚本分享》在日常办公中,我们常常会被繁琐、重复的任务占据大量时间,本文为大家分享了10个实用的Python自动化办公案例及源码,希望对大家有所帮助... 目录1. 批量处理 Excel 文件2. 自动发送邮件3. 批量重命名文件4. 数据清洗5. 生成 PPT6. 自动化测试

10个Python Excel自动化脚本分享

《10个PythonExcel自动化脚本分享》在数据处理和分析的过程中,Excel文件是我们日常工作中常见的格式,本文将分享10个实用的Excel自动化脚本,希望可以帮助大家更轻松地掌握这些技能... 目录1. Excel单元格批量填充2. 设置行高与列宽3. 根据条件删除行4. 创建新的Excel工作表5

Redis多种内存淘汰策略及配置技巧分享

《Redis多种内存淘汰策略及配置技巧分享》本文介绍了Redis内存满时的淘汰机制,包括内存淘汰机制的概念,Redis提供的8种淘汰策略(如noeviction、volatile-lru等)及其适用场... 目录前言一、什么是 Redis 的内存淘汰机制?二、Redis 内存淘汰策略1. pythonnoe

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck