本文主要是介绍【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse
- 1)需求分析
- 2)功能实现
- 3)准备工作
- 3.1.Kafka
- 3.2.ClickHouse
- 4)Flink-Sql
- 5)验证
1)需求分析
1、数据源为 Kafka,定义 Kafka-Topic 为动态临时视图表。
2、写入到 ClickHouse,自定义 Sink 表。
3、source 和 sink 都使用 Flink 集成的 Connector
2)功能实现
导入ClickHouse连接器
<dependency><groupId>com.aliyun</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.14.0</version>
</dependency>
如果在服务器上执行,需要将 jar 放到 Flink 的 lib 目录下。
3)准备工作
3.1.Kafka
1、创建好Topic
2、准备测试数据
{"id": 1,"eventId": "TEST123","eventStDt": "2022-11-3023:37:49","bak6": "测试","bak7": "https://test?user","businessId": "17279811111111111111111111111111","phone": "12345678910","bak1": "1234","bak2": "2022-12-0100:00:00","bak13": "17279811111111111111111111111111","bak14": "APP","bak11": "TEST"
}
3.2.ClickHouse
1、创建表(此处我们使用生产环境中较为常用的 cluster 集群模式建表)
注意:集群模式要创建两次表,一次为 local 本地表,一次为 cluster 集群表。
- local
CREATE TABLE test.kafka2ck_test_local on cluster test_cluster
(`id` UInt32,`eventId` LowCardinality(Nullable(String)),`eventStDt` LowCardinality(Nullable(String)),`bak6` LowCardinality(Nullable(String)),`bak7` LowCardinality(Nullable(String)),`businessId` LowCardinality(Nullable(String)),`phone` LowCardinality(Nullable(String)),`bak1` LowCardinality(Nullable(String)),`bak2` LowCardinality(Nullable(String)),`bak13` LowCardinality(Nullable(String)),`bak14` LowCardinality(Nullable(String)),`bak11` LowCardinality(Nullable(String))
)
ENGINE = ReplicatedMergeTree
PARTITION BY id
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192;
- cluster
CREATE TABLE test.kafka2ck_test on cluster test_cluster
(`id` UInt32,`eventId` LowCardinality(Nullable(String)),`eventStDt` LowCardinality(Nullable(String)),`bak6` LowCardinality(Nullable(String)),`bak7` LowCardinality(Nullable(String)),`businessId` LowCardinality(Nullable(String)),`phone` LowCardinality(Nullable(String)),`bak1` LowCardinality(Nullable(String)),`bak2` LowCardinality(Nullable(String)),`bak13` LowCardinality(Nullable(String)),`bak14` LowCardinality(Nullable(String)),`bak11` LowCardinality(Nullable(String))
)
ENGINE = Distributed('test_cluster', 'test', 'kafka2ck_test_local', rand());
4)Flink-Sql
- source
CREATE TABLE source_kafka_test (id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING) WITH ('connector' = 'kafka','topic' = 'test','format'='json','properties.bootstrap.servers' = '${kafka-bootstrap-server}','properties.group.id' = 'test01','scan.startup.mode' = 'earliest-offset','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka'
);
- sink
CREATE TABLE sink_ck_test (id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'clickhouse','url' = 'jdbc:clickhouse://123.1.1.1:9090','database-name'='test','table-name' = 'kafka2ck_test_local','username' = 'test','password' = '123456','sink.batch-size' = '100','sink.flush-interval' = '1000','sink.max-retries' = '3'
);
- insert
insert into sink_ck_test select * from source_kafka_test;
5)验证
在 Kafka 中写入对应 ClickHouse 格式的 Json 测试数据,观察 ClickHouse 中是否有数据写入。
{"id": 1,"eventId": "TEST123","eventStDt": "2022-11-3023:37:49","bak6": "测试","bak7": "https://test?user","businessId": "17279811111111111111111111111111","phone": "12345678910","bak1": "1234","bak2": "2022-12-0100:00:00","bak13": "17279811111111111111111111111111","bak14": "APP","bak11": "TEST"
}
这篇关于【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!