数据变更捕获 (CDC):PostgreSQL 与 ClickHouse - 第一部分

2024-04-25 19:12

本文主要是介绍数据变更捕获 (CDC):PostgreSQL 与 ClickHouse - 第一部分,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

图片

本文字数:13442;估计阅读时间:34 分钟

审校:庄晓东(魏庄)

本文在公众号【ClickHouseInc】首发

简介

在之前的文章中,我们已经讨论了OLTP数据库(例如Postgres)和OLAP数据库(例如ClickHouse)之间的区别,以及用户可能希望将分析工作负载转移到后者的原因。

本文为您提供了在Postgres和ClickHouse中实现变更数据捕获(Change Data Capture,简称CDC)的入门指南。CDC是指在两个数据库之间保持表同步的过程。本系列博客的解决方案仅使用ClickHouse的本地功能,无需额外的组件,除了Debezium和Kafka。

这篇初始文章介绍了建立Postgres到ClickHouse CDC管道的概念和构建模块。本系列的下一篇文章将组合这些概念,并生成一个可工作的管道。虽然后一篇博客文章可能可以独立阅读,但我们建议您阅读本文以了解概念和约束,以避免可能出现的问题。

在我们的示例中,我们使用了ClickHouse Cloud集群中的开发实例以及Postgres的AWS Aurora实例。然而,这些示例应该可以在等效大小的自管理集群上重现。或者,立即启动您的ClickHouse Cloud集群,获得$300的信用额度。让我们来担心基础设施,并开始查询吧!

方案

对于静态或不可变数据,且仅进行附加操作的情况,CDC 可能相对简单。在这种情况下,通常可以使用原生的 ClickHouse 函数(如 postgresql 函数)在实例之间移动数据,使用时间戳或单调递增的 ID 定期识别需要从 OLTP 数据库读取并插入到 ClickHouse 中的行。

然而,对于那些面临更复杂的更新和删除等工作负载的表,用户需要在其源 Postgres 数据库中识别和跟踪所有可能的更改,以便几乎实时地将它们应用到 ClickHouse 中。

解决这个问题的方案通常是针对源和目的地的,利用源数据库的特性来识别变化,并确保根据目的地的最佳使用模式尽可能高效地反映这些变化。下面我们描述了几种解决方案。

拉取模式

拉取式变更数据捕获(Pull-based Change Data Capture)是指目标数据库从源数据库中主动拉取变更,通常是通过记录表列的变更来实现。这种方式减轻了源数据库的负载,并需要从目标数据库实现一种轮询方法。

对于数据不可变且仅附加操作的情况,可以考虑使用定期运行的 cron-scheduled postgresql 函数来在 Postgres 和 ClickHouse 之间移动数据,作为拉取式 CDC 的实现方式。

用户可以选择添加消息传递系统,以确保即使目标系统不可用,变更的传递也是健壮的。由于变更是周期性批量应用的,这种方法主要受到源和目标之间变更可见性之间的延迟影响。此外,这种方法需要访问源数据库的网络,并且无法控制数据何时以及如何被拉取。在生产关键系统中,管理员通常不愿提供访问权限,因此这种方式通常是不可取的。

图片

推送 vs. 实时事件流

在基于推送的变更数据捕获管道中,源系统捕获表的列的变更,并将这些变更推送到目标系统,然后在目标系统中应用于当前数据。在任何时间点上,这两个系统应该几乎相同,变更几乎实时应用。这种方法假定变更可以可靠地传递 - 要么通过精确一次交付语义,要么通过至少一次交付和目标系统适当处理重复项的能力。虽然不一定必要,但通常会引入消息传递系统(如Kafka),以确保可靠的消息传递。

图片

以下,我们提出了一种基于推送的CDC方法,可以在Postgres和ClickHouse之间几乎实时地移动变更。为了实现这一目标,我们充分利用了Postgres的本地功能,它可以在存储级别识别变更,并将其转换为可消费的变更流。

跟踪 PostgreSQL 的变更

任何CDC解决方案都需要源数据库提供一种强大而可靠的方式来跟踪特定表在行和列级别的变更。PostgreSQL通过两个基本特性来暴露这些变更:

  • 写前日志(WAL)是数据库中所有更改的顺序日志。每当事务修改数据库时,更改首先被写入WAL,然后应用到实际的数据文件中。这个过程称为写前日志。WAL提供了数据库事务的持久性和崩溃恢复,并且对于PostgreSQL中的复制技术也至关重要。但是,WAL的原生格式不适合外部进程消费。

  • 逻辑解码将WAL的内容解码为连贯且易于理解的格式,如元组流或SQL语句。这通过使用复制槽来实现,它代表一个有序的更改流,通常用于在普通的Postgres复制中重播事件。重要的是,这些复制槽确保变更按照应用于源数据库的顺序传递。这个排序是基于内部LSN(日志序列号),LSN本身是指向WAL日志中的位置的指针。这个过程对于崩溃是健壮的,使用检查点来提升位置。关键是,任何消费者必须能够潜在地处理由于在检查点之间重新启动而传递的重复消息。

逻辑解码执行的解码过程以及复制槽中消息的后续格式,都可以通过插件进行控制。自PostgreSQL 10版本起,Postgres已经包含了一个标准的逻辑解码插件pgoutput,无需安装额外的库,也用于内部复制。有了这些变更消息流,我们现在需要一个能够读取并发送到ClickHouse的系统。为此,我们将使用开源工具Debezium。

Debezium

Debezium是一组用于捕获数据库变更的服务。它将数据库表中的所有行级变更记录为有序事件流,并将这些事件发送到下游系统进行消费。Debezium支持多种连接器,旨在生成与源数据库管理系统无关的消息格式,从而使事件的消费者能够使用相似的逻辑,无论事件的来源是什么。虽然我们尚未测试其他数据库连接器(如MySQL和MSSQL),但这意味着以下的ClickHouse配置应该是可重用的。

这种架构要求每个连接器利用每个源数据库中的适当变更捕获功能。对于Postgres,连接器利用了逻辑解码功能以及通过复制槽公开的消息。此外,自定义的Java代码结合使用JDBC驱动程序和流复制协议读取变更流并生成事件。在标准架构中,这些事件被发送到Kafka,以供下游接收器消费。

简而言之,Debezium将为Postgres中发生的每次插入、更新和删除生成一个行级变更事件。然后,这些事件必须应用到我们的ClickHouse实例中的等效表中,以确保我们的数据一致。需要注意的是,这里的表映射是一对一的关系。在最简单的情况下,每个表都部署了一个Debezium连接器,尽管也可以进行多表配置。

Kafka 和队列的角色

通常在源系统和目标系统之间实现消息传递系统,以便将变更事件缓冲并保留,直到它们被提交到目标系统。这个消息队列的作用是减轻Postgres的写前日志的压力,并解耦系统。这种设计可以避免在ClickHouse不可用时(例如由于网络连接问题)导致Postgres WAL日志增长而引起的潜在问题。

由于Debezium是使用Kafka Connect框架开发的,它天然支持Apache Kafka,因此成为我们CDC解决方案的明显消息系统选择 - 尤其是当结合Kafka与ClickHouse的丰富集成选项时。至于Kafka的部署和配置超出了本文的范围,但Debezium文档中提供了有关主题配置和命名的建议。

图片

Debezium还支持服务器模式,可以将事件发送到任何消息传递系统,如Kinesis或Google Pub Sub。只要Debezium和Postgres的配置保持相同,生成的消息使用相同的格式,用户应该能够使用相同的ClickHouse配置和CDC方法。

图片

ReplacingMergeTree

在上文中,我们描述了如何通过利用WAL日志、pgoutput插件和复制槽从Postgres中生成一系列的插入、更新和删除事件。

有了这个消息流可用,我们介绍了如何将这些变更应用到ClickHouse中。由于ClickHouse尚未针对删除和更新工作负载进行优化,我们使用了ReplacingMergeTree表引擎来高效处理这个变更流。作为其中的一部分,我们还讨论了使用这个表引擎的一些重要考虑因素,以及最近的发展和这种方法将在什么条件下达到最佳效果。

优化删除和更新

尽管OLTP数据库(如Postgres)被优化用于事务性更新和删除工作负载,但OLAP数据库为此类操作提供了较少的保证,并且优化了以批量方式插入不可变数据以获得显着更快的分析查询的目的。虽然ClickHouse通过变异提供了更新操作,以及一种轻量级的删除行的方式,但其面向列的结构意味着这些操作应该谨慎安排。这些操作是异步处理的,使用单个线程处理,并且需要(在更新的情况下)将数据重写到磁盘上。因此,它们不应该用于大量的小更改。

为了处理更新和删除行的数据流,并同时避免上述的使用模式,我们可以使用ClickHouse表引擎ReplacingMergeTree。

这个表引擎允许对行应用更新操作,而无需使用低效的ALTER或DELETE语句,它通过允许用户插入多个相同行的副本,并将其中一个标记为最新版本来实现。然后,一个后台进程会异步地移除相同行的旧版本,通过使用不可变的插入有效地模拟更新操作。

这依赖于表引擎能够识别重复行的能力。这是通过使用ORDER BY子句来确定唯一性来实现的,即,如果两行在ORDER BY中指定的列上具有相同的值,则它们被视为重复行。在定义表时指定的版本列允许在识别两行为重复时保留行的最新版本,即保留具有最高版本值的行。

此外,可以指定一个已删除的列。这可以包含0或1,其中值为1表示该行(及其副本)应被删除,否则使用零。

我们在下面的示例中说明了这个过程。在这里,行通过A列(表的ORDER BY)唯一标识。我们假设这些行已经作为两批插入,导致在磁盘上形成了两个数据部分。稍后,在一个异步后台进程中,这些部分被合并在一起。在这个过程中,发生了以下情况:

1. 由A列的值1标识的行既有一个版本为2的更新行,也有一个版本为3的删除行(并且已删除列的值为1)。因此,该键的所有行都被删除。

2. 由A列的值2标识的行既有一个版本为2的更新行。因此,具有价格列值为6的后者行被保留。

3. 由A列的值3标识的行既有一个版本为2的更新行。因此,具有价格列值为3的后者行被保留。

通过这个合并过程的结果,我们得到了代表最终状态的两行。

图片

如前所述,这个重复行移除过程发生在合并时,异步地在后台进行,并且最终一致性只是保证。或者,可以在查询时使用特殊的FINAL语法来调用,以确保结果准确 - 参见ClickHouse中的查询。

除了用于重复行移除之外,这个表引擎的属性还可以用来处理更新和删除工作负载。假设我们在ClickHouse中有以下简单的表和行。请注意,ORDER BY子句定义了行的唯一性,并设置为列键。我们还在引擎创建语句中定义了一个版本和已删除列:


CREATE TABLE test
(`key` String,`price` UInt64,`version` UInt64,`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
ORDER BY key

{"version":1,"deleted":0,"key":"A","price":100}

假设我们需要更新这一行的价格列(因为在Postgres中已更改)。为了实现这一点,我们插入以下行。请注意键值是相同的,但版本值已递增。


{"version":2,"deleted":0,"key":"A","price":200}

如果我们稍后希望删除此行,则再次插入一个具有相同键值的重复行,较高的版本以及已删除列的值为1。


{"version":3,"deleted":1,"key":"A","price":200}

这种更新和删除的方法允许我们明确避免使用低效的ALTER和DELETE命令,而是通过插入不可变行来模拟这些操作,并允许ClickHouse异步地协调这些更改。

删除行的限制

清除已删除行

仅当表级设置clean_deleted_rows设置为Always时,删除的行才会在合并时被删除。默认情况下,此值设置为Never,这意味着行永远不会被删除。截至ClickHouse版本23.5,当该值设置为Always时,此功能存在已知问题,可能导致错误的行被删除。

因此,目前我们建议使用Never值,如所示 - 这将导致已删除的行累积,这在低容量下可能是可以接受的。要强制删除已删除的行,用户可以定期在表上安排OPTIMIZE FINAL CLEANUP操作。


OPTIMIZE TABLE uk_price_paid FINAL CLEANUP

这是一个I/O密集型操作,应在空闲时段谨慎安排。

因此,在解决上述问题之前,我们建议将我们的CDC管道仅用于具有低到中等数量删除的表(少于10%)。

仅按顺序交付

如果支持删除并依赖于OPTIMIZE FINAL CLEANUPclean_deleted_rows=Always(当解决上述问题时)来删除行,则必须按顺序交付每个Postgres行的更改。更具体地说,对于配置的ORDER BY列的不同值集合的行,必须按Postgres中发生的顺序插入。

如果不满足此约束,则可能会错误地保留行,如果在后台合并或计划的OPTIMIZE FINAL CLEANUP执行后发生更新,则会执行删除操作。考虑以下事件序列:

图片

如上所述,先接收到了唯一键值A的插入事件,然后是一个删除事件。接着进行了OPTIMIZE FINAL CLEANUP或后台合并,导致这两行都被删除。接着接收到了一个版本较低的更新事件,这是由于插入顺序不正确。这样就保留了错误的行。这个问题很容易复现。请注意,如果clean_deleted_rows=Never,如当前建议的那样,则不会出现此问题,因为已删除的行会被保留。

为了解决这个问题,Debezium默认使用单个分区的Kafka主题,从而确保按顺序交付(以及仅使用一个Kafka任务)。虽然这对大多数工作负载来说已经足够,但更高的吞吐量可能需要多个分区。

如果需要多个分区,用户可以考虑探索主题分区路由,将ORDER BY列进行哈希处理,以确保同一行的所有更改事件都发送到同一分区,从而保证按顺序交付。虽然这不能保证所有事件的顺序交付,但它确保了对于特定的Postgres行和ORDER BY值集,更改按顺序交付 - 足以避免上述竞争条件。注意:我们尚未测试此多主题配置。

或者,用户可以避免使用OPTIMIZE FINAL CLEANUP,简单地允许删除行累积,或者确保它被谨慎执行,例如,暂停Postgres上的更改,允许Kafka队列清除,在对Postgres进行更改之前执行删除清理。用户还可以针对不再受更改影响的选择性分区发出OPTIMIZE FINAL CLEANUP

行的需求

因此,为了使我们的Postgres到ClickHouse变更数据捕获管道能够处理更新和删除,发送到ClickHouse的任何行都需要满足以下条件:

  • ClickHouse ORDER BY 子句中列的值必须唯一标识Postgres中的一行。

  • 当发送任何更改事件行(无论是更新还是删除)时,它必须包含ORDER BY 子句中列的相同值,以唯一标识行。这些值不能更改,应被视为不可变的。ORDER BY 子句通常包含Postgres中的主键列。用户还希望包含与查询访问模式对齐的列,以优化查询性能。更新不能更改这些列 - 请参阅选择主键。

  • 当发送更新行时,它必须包含表中所有列的值作为行的最终状态,以及一个删除列的值为0和高于之前行的版本值。

  • 当发送删除行时,它必须包含所有ORDER BY 子句的列以及删除列的值为1,以及版本列的值高于之前的列。

  • 插入必须包含所有列值,删除列的值为0,以及保证低于后续更新的版本号。

Postgres行的变更事件可以独立地发送到ClickHouse。如果用户允许删除的行累积,那么特定行的变更事件也可能以不同顺序发送。如果要移除删除事件,那么变更事件必须按照上述方式按照任何特定Postgres行的顺序发生(但不跨越Postgres行)。在设计从Kafka消费消息的多线程/进程方法时,必须考虑这些约束。

所有上述内容都需要一个版本号,该版本号满足确保它单调递增且反映特定行的Postgres事件顺序的要求。

在我们的下一篇文章中,我们将讨论如何转换Debezium的变更事件消息,以满足ReplacingMergeTree的上述要求。

选择主键

在上文中,我们强调了在使用ReplacingMergeTree时必须满足的一个重要额外约束:ORDER BY 子句的列值必须唯一标识Postgres行在变更中的唯一性。因此,我们建议将Postgres的主键包含在ClickHouse的ORDER BY 子句中。

ClickHouse的用户通常会根据频繁查询的列选择表的ORDER BY 子句,以优化查询性能。一般来说,这些列应根据您频繁的查询选择,并按基数递增的顺序列出。重要的是,ReplacingMergeTree 引入了一个额外的限制 - 这些列必须是不可变的,即只能添加不会在底层Postgres数据中更改的列到此子句中。虽然其他列可以改变,但对于唯一行标识,这些列需要保持一致性。

对于分析工作负载,Postgres的主键通常不会被频繁使用,因为用户很少执行点行查找(这是OLAP数据库不擅长的,与OLTP数据库不同)。鉴于我们建议按基数递增的顺序选择列,以及列在ORDER BY中的匹配通常更快,所以Postgres的主键应追加到ORDER BY的末尾(除非它具有分析价值)。如果在Postgres中多个列形成主键,则应将它们追加到ORDER BY中,考虑基数和查询值的可能性。用户还可以通过使用 MATERIALIZED 列的值串联生成唯一主键。

考虑以下用于英国房产价格数据集的模式。这里的id列代表Postgres中的唯一主键(仅用于示例目的)。postcode1postcode2addr1addr2列代表常用于分析查询的列。因此,我们将id列追加到ORDER BY 子句的末尾,这些列在之前。


CREATE TABLE default.uk_price_paid
(`id` UInt64,`price` UInt32,`date` Date,`postcode1` LowCardinality(String),`postcode2` LowCardinality(String),`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),`is_new` UInt8,`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),`addr1` String,`addr2` String,`street` LowCardinality(String),`locality` LowCardinality(String),`town` LowCardinality(String),`district` LowCardinality(String),`county` LowCardinality(String),`version` UInt64,`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)

ORDER BY 子句配置了数据在磁盘上的顺序。如果未指定,默认情况下,PRIMARY KEY 子句也会设置为相同的值。此子句配置了相关的稀疏主索引。

为了性能考虑,主索引会在内存中保留,使用标记进行间接引用以最小化大小。在 ReplacingMergeTree 中,使用 ORDER BY 键进行去重可能会导致主索引变得很长,增加内存使用量。如果这成为问题,用户可以直接指定 PRIMARY KEY,限制加载到内存中的列,同时保留 ORDER BY 以最大化压缩并强制唯一性。

通过这种方法,Postgres 主键列可以从 PRIMARY KEY 中省略 - 节省内存而不影响查询性能。我们也将此应用于上面的示例。

在 ClickHouse 中查询

在合并过程中,ReplacingMergeTree 使用 ORDER BY 值作为唯一标识符来识别重复行,然后只保留最高版本,或者如果最新版本指示删除,则删除所有重复项(在前面提到的问题解决之前)。然而,这只提供了最终的正确性 - 它不能保证行将被去重,并且您不应该依赖它。由于更新和删除行被考虑在查询中,因此查询可能会产生不正确的答案。

为了获得正确的答案,用户需要将后台合并与查询时去重和删除移除相结合。这可以使用 FINAL 运算符来实现。考虑以下使用英国房价数据集的示例。


postgres=> select count(*) FROM uk_price_paid;count
----------27735104
(1 row)postgres=> SELECT avg(price) FROM uk_price_paid;avg
---------------------214354.531780374791
(1 row)– no FINAL, incorrect result
SELECT count()
FROM uk_price_paid┌──count()─┐
│ 27735966 │
└──────────┘– FINAL, correct result
SELECT count()
FROM uk_price_paid
FINAL┌──count()─┐
│ 27735104 │
└──────────┘– no FINAL, incorrect result
SELECT avg(price)
FROM uk_price_paid┌─────────avg(price)─┐
│ 214353.94542901445 │
└────────────────────┘– FINAL, correct result with some precision
SELECT avg(price)
FROM uk_price_paid
FINAL┌────────avg(price)─┐
│ 214354.5317803748 │
└───────────────────┘

FINAL 的性能

尽管在 22.6 版本中确保了去重步骤是多线程的,但 FINAL 运算符会对查询产生性能开销。当查询未在主键列上进行过滤时,这种开销将会更加显著,因为需要读取更多数据并增加去重的开销。如果用户使用 WHERE 条件在关键列上进行过滤,则加载和传递给去重的数据将会减少。

如果 WHERE 条件不使用关键列,ClickHouse 目前在使用 FINAL 时不会利用 PREWHERE 优化。这种优化旨在减少对非过滤列的读取行数。当运行查询时,首先使用表的主键来识别需要读取的 granule。这样可以确定一组 granule,每个 granule 包含一定数量的行(默认为 8192 行)。然而,这些 granule 中的并非所有行都与主键上的筛选条件匹配 - 因为一个 granule 可能包含一系列值,或者因为 WHERE 条件未使用主键。为了识别正确的行,在 SELECT 列可以读取之前,因此需要进行额外的过滤。这是在读取的第二阶段使用 PREWHERE 来完成的,它允许在 WHERE 子句中进一步过滤主键和非主键列。ClickHouse 通常根据内部启发式方法将列移动到 PREWHERE 阶段。然而,当使用 FINAL 时,当前不会应用此优化。最近有关 PREWHERE 的改进的更多细节在这里。

图片

为了模拟这种优化,用户可以重写查询以使用子查询。例如,考虑以下查询,该查询查找伦敦房产的平均价格:


–establish correct answer in postgres
postgres=> SELECT avg(price)
FROM uk_price_paid WHERE town = 'LONDON';avg
---------------------474799.921480528985
(1 row)SELECT avg(price)
FROM uk_price_paid
WHERE town = 'LONDON'┌─────────avg(price)─┐
│ 474797.35553246835 │
└────────────────────┘1 row in set. Elapsed: 0.033 sec. Processed 27.74 million rows, 39.93 MB (835.45 million rows/s., 1.20 GB/s.)EXPLAIN SYNTAX
SELECT avg(price)
FROM uk_price_paid
WHERE town = 'LONDON'┌─explain──────────────────┐
│ SELECT avg(price)      │
│ FROM uk_price_paid     │
│ PREWHERE town = 'LONDON' │
└──────────────────────────┘3 rows in set. Elapsed: 0.002 sec.

虽然在没有 FINAL 的情况下,这个结果是不正确的,但我们可以看到 PREWHERE 优化被应用以帮助实现了 0.075 秒的执行时间。然而,使用 FINAL 的话,虽然可以返回正确的答案,但性能下降了 20 倍以上。


SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE town = 'LONDON'┌───────avg(price)─┐
│ 474799.921480529 │
└──────────────────┘1 row in set. Elapsed: 0.725 sec. Processed 29.65 million rows, 1.41 GB (40.88 million rows/s., 1.94 GB/s.)EXPLAIN SYNTAX
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE town = 'LONDON'┌─explain───────────────┐
│ SELECT avg(price)   │
│ FROM uk_price_paid  │
│ FINAL               │
│ WHERE town = 'LONDON' │
└───────────────────────┘4 rows in set. Elapsed: 0.004 sec.

我们可以通过在子查询中返回主键,并仅在外部查询中使用 FINAL 来部分模拟 PREWHERE,如下所示。虽然这不会像之前(不准确的)查询那样达到相同的性能,但在增加复杂性的情况下提供了一些改进:


SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE ((postcode1, postcode2) IN (SELECTpostcode1,postcode2FROM uk_price_paidWHERE town = 'LONDON'GROUP BYpostcode1,postcode2
)) AND (town = 'LONDON')┌───────avg(price)─┐
│ 474799.921480529 │
└──────────────────┘1 row in set. Elapsed: 0.287 sec. Processed 31.55 million rows, 230.30 MB (109.88 million rows/s., 802.08 MB/s.)

这种解决方法在内部查询返回主键值的子集较小时效果最佳。

利用分区

在 ClickHouse 中,数据的合并发生在分区级别。当使用 ReplacingMergeTree 时,

我们建议用户根据最佳实践对表进行分区,前提是用户可以确保这个分区键不会对某一行发生变化。这将确保属于同一行的更新将被发送到同一个 ClickHouse 分区。

假设情况是这样的,用户可以使用设置 do_not_merge_across_partitions_select_final=1 来提高 FINAL 查询性能。该设置使得在使用 = FINAL 时分区独立地合并和处理。考虑以下表,我们按年份对数据进行分区,并计算多个分区的平均价格。


CREATE TABLE default.uk_price_paid_year
(`id` UInt64,`price` UInt32,`date` Date,`postcode1` LowCardinality(String),`postcode2` LowCardinality(String),`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),`is_new` UInt8,`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),`addr1` String,`addr2` String,`street` LowCardinality(String),`locality` LowCardinality(String),`town` LowCardinality(String),`district` LowCardinality(String),`county` LowCardinality(String),`version` UInt64,`deleted` UInt8
)
ENGINE = ReplacingMergeTree( version, deleted) PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id) PARTITION BY toYear(date)INSERT INTO default.uk_price_paid_year SELECT * FROM default.uk_price_paid——query on original tableSELECT avg(price)
FROM uk_price_paid
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘1 row in set. Elapsed: 0.702 sec. Processed 29.65 million rows, 1.44 GB (42.23 million rows/s., 2.05 GB/s.)-– query on partitioned tableSELECT avg(price)
FROM uk_price_paid_year
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘1 row in set. Elapsed: 0.492 sec. Processed 9.27 million rows, 443.09 MB (18.83 million rows/s., 900.24 MB/s.)-— performance with do_not_merge_across_partitions_select_final = 1SET do_not_merge_across_partitions_select_final = 1SELECT avg(price)
FROM uk_price_paid_year
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘1 row in set. Elapsed: 0.230 sec. Processed 7.62 million rows, 364.26 MB (33.12 million rows/s., 1.58 GB/s.)

如上所示,通过分区可以提高性能,因为我们的查询针对 10 个分区,并减少了所需读取的数据量。通过将查询时间的去重限制到分区内,性能进一步提高。

结论

在本博文中,我们探讨了一个基于拉取的 CDC 管道的构建基块,用于在 Postgres 和 ClickHouse 之间传输数据,使用了 Debezium。这包括了如何跟踪 Postgres 中的更改、Debezium 和 ReplacingMergeTree 的介绍。这些概念可以结合起来产生以下管道。

图片

在本系列的下一篇文章中,我们将为一个测试数据集构建一个可工作的管道,并突出重要的配置选项,以确保各个组件之间正确地交互。请继续关注!

征稿启示

面向社区长期正文,文章内容包括但不限于关于 ClickHouse 的技术研究、项目实践和创新做法等。建议行文风格干货输出&图文并茂。质量合格的文章将会发布在本公众号,优秀者也有机会推荐到 ClickHouse 官网。请将文章稿件的 WORD 版本发邮件至:Tracy.Wang@clickhouse.com

图片

​​联系我们

手机号:13910395701

邮箱:Tracy.Wang@clickhouse.com

满足您所有的在线分析列式数据库管理需求

这篇关于数据变更捕获 (CDC):PostgreSQL 与 ClickHouse - 第一部分的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/935551

相关文章

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da

Python实现数据清洗的18种方法

《Python实现数据清洗的18种方法》本文主要介绍了Python实现数据清洗的18种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录1. 去除字符串两边空格2. 转换数据类型3. 大小写转换4. 移除列表中的重复元素5. 快速统

Python数据处理之导入导出Excel数据方式

《Python数据处理之导入导出Excel数据方式》Python是Excel数据处理的绝佳工具,通过Pandas和Openpyxl等库可以实现数据的导入、导出和自动化处理,从基础的数据读取和清洗到复杂... 目录python导入导出Excel数据开启数据之旅:为什么Python是Excel数据处理的最佳拍档

在Pandas中进行数据重命名的方法示例

《在Pandas中进行数据重命名的方法示例》Pandas作为Python中最流行的数据处理库,提供了强大的数据操作功能,其中数据重命名是常见且基础的操作之一,本文将通过简洁明了的讲解和丰富的代码示例,... 目录一、引言二、Pandas rename方法简介三、列名重命名3.1 使用字典进行列名重命名3.编

Python使用Pandas库将Excel数据叠加生成新DataFrame的操作指南

《Python使用Pandas库将Excel数据叠加生成新DataFrame的操作指南》在日常数据处理工作中,我们经常需要将不同Excel文档中的数据整合到一个新的DataFrame中,以便进行进一步... 目录一、准备工作二、读取Excel文件三、数据叠加四、处理重复数据(可选)五、保存新DataFram

使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)

《使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)》在现代软件开发中,处理JSON数据是一项非常常见的任务,无论是从API接口获取数据,还是将数据存储为JSON格式,解析... 目录1. 背景介绍1.1 jsON简介1.2 实际案例2. 准备工作2.1 环境搭建2.1.1 添加

MySQL中删除重复数据SQL的三种写法

《MySQL中删除重复数据SQL的三种写法》:本文主要介绍MySQL中删除重复数据SQL的三种写法,文中通过代码示例讲解的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下... 目录方法一:使用 left join + 子查询删除重复数据(推荐)方法二:创建临时表(需分多步执行,逻辑清晰,但会