Blink SQL之创建消息队列Kafka源表

2023-11-29 15:32

本文主要是介绍Blink SQL之创建消息队列Kafka源表,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创建消息队列Kafka源表

注意事项

  • 仅适用于Blink 2.0及以上版本。
  • 仅适用于独享模式。
  • Kafka源表支持读取自建Kafka集群,但需要注意版本对应关系,以及自建集群和Blink版集群的网络环境配置。
  • 二进制数据不支持本地调试,语法检查没有问题请进行线上调试。

从Kafka输出的数据为序列化后的VARBINARY(二进制)格式。对于输出的每条数据,需要您编写自定义表值函数(UDTF)将其解析为序列化前的数据结构。Kafka源表数据解析流程通常为:Kafka Source Table -> UDTF -> Realtime Compute for Apache Flink -> Sink。此外,Flink SQL中也支持通过CAST函数将VARBINARY解析为VARCHAR类型。

DDL定义

Kafka源表定义DDL部分必须与以下SQL完全一致,可以更改WITH参数中的设置。

create table kafka_stream(   --必须和Kafka源表中的5个字段的顺序和类型保持一致。messageKey VARBINARY,`message`    VARBINARY,topic      VARCHAR,`partition`  INT,`offset`     BIGINT        
) with (type ='kafka010',topic = '<yourTopicName>',`group.id` = '<yourGroupId>',...
);

WITH参数

  • 通用配置
参数注释说明是否必选备注
typeKafka对应版本Kafka版本需要是Kafka08、Kafka09、Kafka010或Kafka011。
topic读取的单个topic
topicPattern读取一批topic的表达式Topic用竖线(|)分隔。例如:topic1|topic2|topic3
startupMode启动位点启动位点取值如下:
GROUP_OFFSETS(默认值):根据Group读取。
EARLIEST:从Kafka最早分区开始读取。
LATEST:从Kafka最新位点开始读取。
TIMESTAMP:从指定的时间点读取。
partitionDiscoveryIntervalMS定时检查是否有新分区产生Kafka 08版本:系统默认开启该功能。
Kafka 09版本及以上版本:不支持partitionDiscoveryIntervalMS参数。
extraConfig额外的KafkaConsumer配置项目不在可选配置项中,但是期望额外增加的配置。
  • Kafka08配置

    • Kafka08必选配置

      参数注释说明是否必选
      group.id消费组ID
      zookeeper.connectzk链接地址
    • 可选配置Key

      • consumer.id
      • socket.timeout.ms
      • fetch.message.max.bytes
      • num.consumer.fetchers
      • auto.commit.enable
      • auto.commit.interval.ms
      • queued.max.message.chunks
      • rebalance.max.retries
      • fetch.min.bytes
      • fetch.wait.max.ms
      • rebalance.backoff.ms
      • refresh.leader.backoff.ms
      • auto.offset.reset
      • consumer.timeout.ms
      • exclude.internal.topics
      • partition.assignment.strategy
      • client.id
      • zookeeper.session.timeout.ms
      • zookeeper.connection.timeout.ms
      • zookeeper.sync.time.ms
      • offsets.storage
      • offsets.channel.backoff.ms
      • offsets.channel.socket.timeout.ms
      • offsets.commit.max.retries
      • dual.commit.enabled
      • partition.assignment.strategy
      • socket.receive.buffer.bytes
      • fetch.min.bytes
  • Kafka09/Kafka010/Kafka011配置

    • Kafka09/Kafka010/Kafka011必选配置

      参数注释说明
      group.id消费组ID
      bootstrap.serversKafka集群地址
    • Kafka09/Kafka010/Kafka011可选配置,请参Kafka官方文档进行配置。

      • Kafka09
      • Kafka010
      • Kafka011

    当需要配置某选项时,在DDL中的WITH部分增加对应的参数即可。例如,配置SASL登录,需增加security.protocolsasl.mechanismsasl.jaas.config3个参数,示例如下。

    create table kafka_stream(messageKey varbinary,`message` varbinary,topic varchar,`partition` int,`offset` bigint
    ) with (type ='kafka010',topic = '<yourTopicName>',`group.id` = '<yourGroupId>',...,`security.protocol`='SASL_PLAINTEXT',`sasl.mechanism`='PLAIN',`sasl.jaas.config`='org.apache.kafka.common.security.plain.PlainLoginModule required username="<yourUserName>" password="<yourPassword>";'
    );
    

Kafka版本对应关系

typeKafka版本
Kafka080.8.22
Kafka090.9.0.1
Kafka0100.10.2.1
Kafka0110.11.0.2及以上

Kafka消息解析示例

  • 场景1:将Kafka中的数据进行计算,并将计算结果输出到RDS。

    Kafka中保存了JSON格式数据,需要使用实时计算Flink版进行计算,消息格式示例如下。

    {"name":"Alice","age":13,"grade":"A"
    }                
    
    • 方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK

      Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。

      CREATE TABLE kafka_src (messageKey  VARBINARY,`message`   VARBINARY,topic       VARCHAR,`partition` INT,`offset`    BIGINT
      ) WITH (type = 'kafka010'   --请参见Kafka版本对应关系。
      );CREATE TABLE rds_sink (`name`       VARCHAR,age         VARCHAR,grade       VARCHAR
      ) WITH(type='rds'
      );CREATE VIEW input_view AS SELECT CAST(`message` as VARCHAR ) as `message`
      FROM kafka_src;INSERT INTO rds_sink
      SELECT JSON_VALUE(`message`,'$.name'),JSON_VALUE(`message`,'$.age'),JSON_VALUE(`message`,'$.grade')
      FROM input_view;
      
    • 方法2:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink

      针对不规则数据、复杂JSON数据,需要您自行编写UDTF代码进行解析,示例如下。

      • SQL

        -- 定义解析Kafka message的UDTF。
        CREATE FUNCTION kafkaparser AS 'com.alibaba.kafkaUDTF';-- 定义源表。注意:Kafka源表DDL字段必须与以下示例完全一致。WITH中参数可以修改。
        CREATE TABLE kafka_src (messageKey  VARBINARY,`message`   VARBINARY,topic       VARCHAR,`partition` INT,`offset`    BIGINT
        ) WITH (type = 

这篇关于Blink SQL之创建消息队列Kafka源表的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/433364

相关文章

SQL注入漏洞扫描之sqlmap详解

《SQL注入漏洞扫描之sqlmap详解》SQLMap是一款自动执行SQL注入的审计工具,支持多种SQL注入技术,包括布尔型盲注、时间型盲注、报错型注入、联合查询注入和堆叠查询注入... 目录what支持类型how---less-1为例1.检测网站是否存在sql注入漏洞的注入点2.列举可用数据库3.列举数据库

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Mysql虚拟列的使用场景

《Mysql虚拟列的使用场景》MySQL虚拟列是一种在查询时动态生成的特殊列,它不占用存储空间,可以提高查询效率和数据处理便利性,本文给大家介绍Mysql虚拟列的相关知识,感兴趣的朋友一起看看吧... 目录1. 介绍mysql虚拟列1.1 定义和作用1.2 虚拟列与普通列的区别2. MySQL虚拟列的类型2

Window Server创建2台服务器的故障转移群集的图文教程

《WindowServer创建2台服务器的故障转移群集的图文教程》本文主要介绍了在WindowsServer系统上创建一个包含两台成员服务器的故障转移群集,文中通过图文示例介绍的非常详细,对大家的... 目录一、 准备条件二、在ServerB安装故障转移群集三、在ServerC安装故障转移群集,操作与Ser

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

Window Server2016 AD域的创建的方法步骤

《WindowServer2016AD域的创建的方法步骤》本文主要介绍了WindowServer2016AD域的创建的方法步骤,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、准备条件二、在ServerA服务器中常见AD域管理器:三、创建AD域,域地址为“test.ly”

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

MySQL中时区参数time_zone解读

《MySQL中时区参数time_zone解读》MySQL时区参数time_zone用于控制系统函数和字段的DEFAULTCURRENT_TIMESTAMP属性,修改时区可能会影响timestamp类型... 目录前言1.时区参数影响2.如何设置3.字段类型选择总结前言mysql 时区参数 time_zon

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

使用SQL语言查询多个Excel表格的操作方法

《使用SQL语言查询多个Excel表格的操作方法》本文介绍了如何使用SQL语言查询多个Excel表格,通过将所有Excel表格放入一个.xlsx文件中,并使用pandas和pandasql库进行读取和... 目录如何用SQL语言查询多个Excel表格如何使用sql查询excel内容1. 简介2. 实现思路3