本文主要是介绍Blink SQL之创建消息队列Kafka源表,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
创建消息队列Kafka源表
注意事项
- 仅适用于Blink 2.0及以上版本。
- 仅适用于独享模式。
- Kafka源表支持读取自建Kafka集群,但需要注意版本对应关系,以及自建集群和Blink版集群的网络环境配置。
- 二进制数据不支持本地调试,语法检查没有问题请进行线上调试。
从Kafka输出的数据为序列化后的VARBINARY(二进制)格式。对于输出的每条数据,需要您编写自定义表值函数(UDTF)将其解析为序列化前的数据结构。Kafka源表数据解析流程通常为:Kafka Source Table -> UDTF -> Realtime Compute for Apache Flink -> Sink。此外,Flink SQL中也支持通过CAST函数将VARBINARY解析为VARCHAR类型。
DDL定义
Kafka源表定义DDL部分必须与以下SQL完全一致,可以更改WITH参数中的设置。
create table kafka_stream( --必须和Kafka源表中的5个字段的顺序和类型保持一致。messageKey VARBINARY,`message` VARBINARY,topic VARCHAR,`partition` INT,`offset` BIGINT
) with (type ='kafka010',topic = '<yourTopicName>',`group.id` = '<yourGroupId>',...
);
WITH参数
- 通用配置
参数 | 注释说明 | 是否必选 | 备注 |
---|---|---|---|
type | Kafka对应版本 | 是 | Kafka版本需要是Kafka08、Kafka09、Kafka010或Kafka011。 |
topic | 读取的单个topic | 是 | 无 |
topicPattern | 读取一批topic的表达式 | 否 | Topic用竖线(| )分隔。例如:topic1|topic2|topic3 。 |
startupMode | 启动位点 | 否 | 启动位点取值如下: GROUP_OFFSETS(默认值):根据Group读取。 EARLIEST:从Kafka最早分区开始读取。 LATEST:从Kafka最新位点开始读取。 TIMESTAMP:从指定的时间点读取。 |
partitionDiscoveryIntervalMS | 定时检查是否有新分区产生 | 否 | Kafka 08版本:系统默认开启该功能。 Kafka 09版本及以上版本:不支持partitionDiscoveryIntervalMS参数。 |
extraConfig | 额外的KafkaConsumer配置项目 | 否 | 不在可选配置项中,但是期望额外增加的配置。 |
-
Kafka08配置
-
Kafka08必选配置
参数 注释说明 是否必选 group.id 消费组ID 是 zookeeper.connect zk链接地址 是 -
可选配置Key
- consumer.id
- socket.timeout.ms
- fetch.message.max.bytes
- num.consumer.fetchers
- auto.commit.enable
- auto.commit.interval.ms
- queued.max.message.chunks
- rebalance.max.retries
- fetch.min.bytes
- fetch.wait.max.ms
- rebalance.backoff.ms
- refresh.leader.backoff.ms
- auto.offset.reset
- consumer.timeout.ms
- exclude.internal.topics
- partition.assignment.strategy
- client.id
- zookeeper.session.timeout.ms
- zookeeper.connection.timeout.ms
- zookeeper.sync.time.ms
- offsets.storage
- offsets.channel.backoff.ms
- offsets.channel.socket.timeout.ms
- offsets.commit.max.retries
- dual.commit.enabled
- partition.assignment.strategy
- socket.receive.buffer.bytes
- fetch.min.bytes
-
-
Kafka09/Kafka010/Kafka011配置
-
Kafka09/Kafka010/Kafka011必选配置
参数 注释说明 group.id 消费组ID bootstrap.servers Kafka集群地址 -
Kafka09/Kafka010/Kafka011可选配置,请参Kafka官方文档进行配置。
- Kafka09
- Kafka010
- Kafka011
当需要配置某选项时,在DDL中的WITH部分增加对应的参数即可。例如,配置SASL登录,需增加
security.protocol
、sasl.mechanism
和sasl.jaas.config
3个参数,示例如下。create table kafka_stream(messageKey varbinary,`message` varbinary,topic varchar,`partition` int,`offset` bigint ) with (type ='kafka010',topic = '<yourTopicName>',`group.id` = '<yourGroupId>',...,`security.protocol`='SASL_PLAINTEXT',`sasl.mechanism`='PLAIN',`sasl.jaas.config`='org.apache.kafka.common.security.plain.PlainLoginModule required username="<yourUserName>" password="<yourPassword>";' );
-
Kafka版本对应关系
type | Kafka版本 |
---|---|
Kafka08 | 0.8.22 |
Kafka09 | 0.9.0.1 |
Kafka010 | 0.10.2.1 |
Kafka011 | 0.11.0.2及以上 |
Kafka消息解析示例
-
场景1:将Kafka中的数据进行计算,并将计算结果输出到RDS。
Kafka中保存了JSON格式数据,需要使用实时计算Flink版进行计算,消息格式示例如下。
{"name":"Alice","age":13,"grade":"A" }
-
方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK
Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。
CREATE TABLE kafka_src (messageKey VARBINARY,`message` VARBINARY,topic VARCHAR,`partition` INT,`offset` BIGINT ) WITH (type = 'kafka010' --请参见Kafka版本对应关系。 );CREATE TABLE rds_sink (`name` VARCHAR,age VARCHAR,grade VARCHAR ) WITH(type='rds' );CREATE VIEW input_view AS SELECT CAST(`message` as VARCHAR ) as `message` FROM kafka_src;INSERT INTO rds_sink SELECT JSON_VALUE(`message`,'$.name'),JSON_VALUE(`message`,'$.age'),JSON_VALUE(`message`,'$.grade') FROM input_view;
-
方法2:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink
针对不规则数据、复杂JSON数据,需要您自行编写UDTF代码进行解析,示例如下。
-
SQL
-- 定义解析Kafka message的UDTF。 CREATE FUNCTION kafkaparser AS 'com.alibaba.kafkaUDTF';-- 定义源表。注意:Kafka源表DDL字段必须与以下示例完全一致。WITH中参数可以修改。 CREATE TABLE kafka_src (messageKey VARBINARY,`message` VARBINARY,topic VARCHAR,`partition` INT,`offset` BIGINT ) WITH (type =
-
-
这篇关于Blink SQL之创建消息队列Kafka源表的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!