Debezium日常分享系列之:Debezium and TimescaleDB

2024-01-16 18:28

本文主要是介绍Debezium日常分享系列之:Debezium and TimescaleDB,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Debezium日常分享系列之:Debezium and TimescaleDB

  • 一、TimescaleDB
  • 二、完整案例
  • 三、Hypertables
  • 四、Continuous aggregates
  • 五、Compression
  • 六、结论

一、TimescaleDB

TimescaleDB 是一个开源数据库,旨在使 SQL 对于时间序列数据具有可扩展性。它是作为 PostgreSQL 数据库的扩展实现的。这一事实促使我们重新使用标准 Debezium PostgreSQL 连接器,并将 TimescaleDB 支持实现为单个消息转换 (SMT)。

TimescaleDB 提供了三个基本构建块/概念:

  • Hypertables
  • Continuous aggregates
  • Compression

描述实例定义的元数据(目录)和原始数据通常存储在 _timescaledb_internal_schema 中。TimescaleDb SMT 连接到数据库并读取和处理元数据。然后,从数据库读取的原始消息会使用存储在 Kafka Connect 标头中的元数据进行丰富,从而创建物理数据和 TimescaleDB 逻辑结构之间的关系。

二、完整案例

Debezium 示例存储库包含基于 Docker Compose 的部署,该部署提供了完整的环境来演示 TimescaleDB 集成。

第一步,开始部署

$ docker-compose -f docker-compose-timescaledb.yaml up --build

该命令将启动 Debezium(Zookeeper、Kafka、Kafka Connect)和源 TimescaleDB 数据库。

启动的数据库已准备好以下数据库对象:

  • 将温度和湿度测量值表示为时间序列数据的超稳定条件;使用 DDL
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')
  • 测量数据的单一记录
INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)

PostgreSQL 出版物用于将时间序列数据发布到复制槽中,因为演示使用 pgoutput 解码插件

CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')

下一步需要注册 Debezium PostgreSQL 连接器以捕获数据库中的更改

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml

注册请求文件与常规文件不同,增加了这些行

{"name": "inventory-connector","config": {
..."schema.include.list": "_timescaledb_internal","transforms": "timescaledb","transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb","transforms.timescaledb.database.hostname": "timescaledb","transforms.timescaledb.database.port": "5432","transforms.timescaledb.database.user": "postgres","transforms.timescaledb.database.password": "postgres","transforms.timescaledb.database.dbname": "postgres"}
}

三、Hypertables

连接器将捕获内部 TimescaleDB 架构以及包含原始数据的物理表,并且将应用 TimescaleDb SMT 来丰富消息并根据逻辑名称将它们路由到正确命名的主题。 SMT 配置选项包含连接到数据库所需的信息。在这种情况下,条件超表将物理存储在 _timescaledb_internal._hyper_1_1_chunk 中,并且当由 SMT 处理时,它将重新路由到根据固定配置的前缀 timescaledb 和逻辑名称 public.conditions 命名的 timescaledb.public.conditions 主题符合超表名称。

让我们在表中添加更多测量值

 docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);

并读取捕获的主题消息(在命令中启用打印密钥和标题)

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--property print.headers=true \--topic timescaledb.public.conditions

这些消息包含两个标头 debezium_timescaledb_chunk_table:_hyper_1_1_chunk、debezium_timescaledb_chunk_schema:_timescaledb_internal,它们描述了逻辑超表名称与从中捕获它们的物理源表之间的映射。

四、Continuous aggregates

连续聚合对存储在超表中的数据提供自动统计计算。聚合被定义为物化视图,由其自己的超表支持,而超表又由一组物理表支持。重新计算聚合后(手动或自动),新值将存储在超表中,可以从中捕获和流式传输这些值。连接器捕获物理表中的新值,SMT 通过将物理目标重新映射回聚合逻辑名称来再次解决路由问题。还添加了带有原始超表和物理表名称的 Kafka Connect 标头。

让我们创建一个名为conditions_summary的连续聚合,用于计算每个位置和时间间隔的平均、最低和最高温度

postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) ASSELECTlocation,time_bucket(INTERVAL '1 hour', time) AS bucket,AVG(temperature),MAX(temperature),MIN(temperature)FROM conditionsGROUP BY location, bucket;

并阅读捕获的主题消息

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--property print.headers=true \--topic timescaledb.public.conditions_summary

这些消息包含两个标头 debezium_timescaledb_hypertable_table:_materialized_hypertable_2,debezium_timescaledb_hypertable_schema:_timescaledb_internal 公开哪个支持超表用于存储聚合,以及两个附加标头 debezium_timescaledb_chunk_table:_hyper_2_2_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal 公开存储聚合的物理表。

`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.

如果添加新的测量并触发聚合重新计算,则更新的聚合将发送到主题

postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);

看起来像

{"schema":{
...},"payload":{"before":null,"after":{"location":"Ostrava","bucket":"2024-01-09T13:00:00.000000Z","avg":10.0,"max":10.0,"min":10.0},"source":{"version":"2.5.0.Final","connector":"postgresql","name":"dbserver1","ts_ms":1704806938840,"snapshot":"false","db":"postgres","sequence":"[\"29727872\",\"29728440\"]","schema":"public","table":"conditions_summary","txId":764,"lsn":29728440,"xmin":null},"op":"c","ts_ms":1704806939163,"transaction":null}
}

因此,该主题包含针对两个不同位置计算的两条或多条消息。

五、Compression

TimescaleDB SMT 不会增强压缩数据块(物理表记录),而只是将其作为存储在超表中的副产品。压缩后的数据被捕获并存储在 Kafka 主题中。通常,带有压缩块的消息会被丢弃,并且不会被管道中的后续作业处理。

让我们为超表启用压缩并压缩它

postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');show_chunks
----------------------------------------_timescaledb_internal._hyper_1_1_chunk
(1 row)postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');

消息写入 timescaledb._timescaledb_internal._compressed_hypertable_3。

停止服务

docker-compose -f docker-compose-timescaledb.yaml down

六、结论

在这篇文章中,我们演示了从 TimescaleDB 时间序列数据库捕获数据以及通过 TimescaleDb SMT 对其进行处理。我们已经展示了如何根据作为数据源的超表和连续聚合来路由和丰富消息。

深入了解Debezium请阅读博主专栏:

  • Debezium专栏

这篇关于Debezium日常分享系列之:Debezium and TimescaleDB的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/613500

相关文章

Redis多种内存淘汰策略及配置技巧分享

《Redis多种内存淘汰策略及配置技巧分享》本文介绍了Redis内存满时的淘汰机制,包括内存淘汰机制的概念,Redis提供的8种淘汰策略(如noeviction、volatile-lru等)及其适用场... 目录前言一、什么是 Redis 的内存淘汰机制?二、Redis 内存淘汰策略1. pythonnoe

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

将Python应用部署到生产环境的小技巧分享

《将Python应用部署到生产环境的小技巧分享》文章主要讲述了在将Python应用程序部署到生产环境之前,需要进行的准备工作和最佳实践,包括心态调整、代码审查、测试覆盖率提升、配置文件优化、日志记录完... 目录部署前夜:从开发到生产的心理准备与检查清单环境搭建:打造稳固的应用运行平台自动化流水线:让部署像

C#读取本地网络配置信息全攻略分享

《C#读取本地网络配置信息全攻略分享》在当今数字化时代,网络已深度融入我们生活与工作的方方面面,对于软件开发而言,掌握本地计算机的网络配置信息显得尤为关键,而在C#编程的世界里,我们又该如何巧妙地读取... 目录一、引言二、C# 读取本地网络配置信息的基础准备2.1 引入关键命名空间2.2 理解核心类与方法

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

Python中列表的高级索引技巧分享

《Python中列表的高级索引技巧分享》列表是Python中最常用的数据结构之一,它允许你存储多个元素,并且可以通过索引来访问这些元素,本文将带你深入了解Python列表的高级索引技巧,希望对... 目录1.基本索引2.切片3.负数索引切片4.步长5.多维列表6.列表解析7.切片赋值8.删除元素9.反转列表

Python中处理NaN值的技巧分享

《Python中处理NaN值的技巧分享》在数据科学和数据分析领域,NaN(NotaNumber)是一个常见的概念,它表示一个缺失或未定义的数值,在Python中,尤其是在使用pandas库处理数据时,... 目录NaN 值的来源和影响使用 pandas 的 isna()和 isnull()函数直接比较 Na

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

【专题】2024飞行汽车技术全景报告合集PDF分享(附原数据表)

原文链接: https://tecdat.cn/?p=37628 6月16日,小鹏汇天旅航者X2在北京大兴国际机场临空经济区完成首飞,这也是小鹏汇天的产品在京津冀地区进行的首次飞行。小鹏汇天方面还表示,公司准备量产,并计划今年四季度开启预售小鹏汇天分体式飞行汽车,探索分体式飞行汽车城际通勤。阅读原文,获取专题报告合集全文,解锁文末271份飞行汽车相关行业研究报告。 据悉,业内人士对飞行汽车行业