Blink SQL之创建消息队列Kafka源表

2023-11-29 15:32

本文主要是介绍Blink SQL之创建消息队列Kafka源表,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创建消息队列Kafka源表

注意事项

  • 仅适用于Blink 2.0及以上版本。
  • 仅适用于独享模式。
  • Kafka源表支持读取自建Kafka集群,但需要注意版本对应关系,以及自建集群和Blink版集群的网络环境配置。
  • 二进制数据不支持本地调试,语法检查没有问题请进行线上调试。

从Kafka输出的数据为序列化后的VARBINARY(二进制)格式。对于输出的每条数据,需要您编写自定义表值函数(UDTF)将其解析为序列化前的数据结构。Kafka源表数据解析流程通常为:Kafka Source Table -> UDTF -> Realtime Compute for Apache Flink -> Sink。此外,Flink SQL中也支持通过CAST函数将VARBINARY解析为VARCHAR类型。

DDL定义

Kafka源表定义DDL部分必须与以下SQL完全一致,可以更改WITH参数中的设置。

create table kafka_stream(   --必须和Kafka源表中的5个字段的顺序和类型保持一致。messageKey VARBINARY,`message`    VARBINARY,topic      VARCHAR,`partition`  INT,`offset`     BIGINT        
) with (type ='kafka010',topic = '<yourTopicName>',`group.id` = '<yourGroupId>',...
);

WITH参数

  • 通用配置
参数注释说明是否必选备注
typeKafka对应版本Kafka版本需要是Kafka08、Kafka09、Kafka010或Kafka011。
topic读取的单个topic
topicPattern读取一批topic的表达式Topic用竖线(|)分隔。例如:topic1|topic2|topic3
startupMode启动位点启动位点取值如下:
GROUP_OFFSETS(默认值):根据Group读取。
EARLIEST:从Kafka最早分区开始读取。
LATEST:从Kafka最新位点开始读取。
TIMESTAMP:从指定的时间点读取。
partitionDiscoveryIntervalMS定时检查是否有新分区产生Kafka 08版本:系统默认开启该功能。
Kafka 09版本及以上版本:不支持partitionDiscoveryIntervalMS参数。
extraConfig额外的KafkaConsumer配置项目不在可选配置项中,但是期望额外增加的配置。
  • Kafka08配置

    • Kafka08必选配置

      参数注释说明是否必选
      group.id消费组ID
      zookeeper.connectzk链接地址
    • 可选配置Key

      • consumer.id
      • socket.timeout.ms
      • fetch.message.max.bytes
      • num.consumer.fetchers
      • auto.commit.enable
      • auto.commit.interval.ms
      • queued.max.message.chunks
      • rebalance.max.retries
      • fetch.min.bytes
      • fetch.wait.max.ms
      • rebalance.backoff.ms
      • refresh.leader.backoff.ms
      • auto.offset.reset
      • consumer.timeout.ms
      • exclude.internal.topics
      • partition.assignment.strategy
      • client.id
      • zookeeper.session.timeout.ms
      • zookeeper.connection.timeout.ms
      • zookeeper.sync.time.ms
      • offsets.storage
      • offsets.channel.backoff.ms
      • offsets.channel.socket.timeout.ms
      • offsets.commit.max.retries
      • dual.commit.enabled
      • partition.assignment.strategy
      • socket.receive.buffer.bytes
      • fetch.min.bytes
  • Kafka09/Kafka010/Kafka011配置

    • Kafka09/Kafka010/Kafka011必选配置

      参数注释说明
      group.id消费组ID
      bootstrap.serversKafka集群地址
    • Kafka09/Kafka010/Kafka011可选配置,请参Kafka官方文档进行配置。

      • Kafka09
      • Kafka010
      • Kafka011

    当需要配置某选项时,在DDL中的WITH部分增加对应的参数即可。例如,配置SASL登录,需增加security.protocolsasl.mechanismsasl.jaas.config3个参数,示例如下。

    create table kafka_stream(messageKey varbinary,`message` varbinary,topic varchar,`partition` int,`offset` bigint
    ) with (type ='kafka010',topic = '<yourTopicName>',`group.id` = '<yourGroupId>',...,`security.protocol`='SASL_PLAINTEXT',`sasl.mechanism`='PLAIN',`sasl.jaas.config`='org.apache.kafka.common.security.plain.PlainLoginModule required username="<yourUserName>" password="<yourPassword>";'
    );
    

Kafka版本对应关系

typeKafka版本
Kafka080.8.22
Kafka090.9.0.1
Kafka0100.10.2.1
Kafka0110.11.0.2及以上

Kafka消息解析示例

  • 场景1:将Kafka中的数据进行计算,并将计算结果输出到RDS。

    Kafka中保存了JSON格式数据,需要使用实时计算Flink版进行计算,消息格式示例如下。

    {"name":"Alice","age":13,"grade":"A"
    }                
    
    • 方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK

      Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。

      CREATE TABLE kafka_src (messageKey  VARBINARY,`message`   VARBINARY,topic       VARCHAR,`partition` INT,`offset`    BIGINT
      ) WITH (type = 'kafka010'   --请参见Kafka版本对应关系。
      );CREATE TABLE rds_sink (`name`       VARCHAR,age         VARCHAR,grade       VARCHAR
      ) WITH(type='rds'
      );CREATE VIEW input_view AS SELECT CAST(`message` as VARCHAR ) as `message`
      FROM kafka_src;INSERT INTO rds_sink
      SELECT JSON_VALUE(`message`,'$.name'),JSON_VALUE(`message`,'$.age'),JSON_VALUE(`message`,'$.grade')
      FROM input_view;
      
    • 方法2:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink

      针对不规则数据、复杂JSON数据,需要您自行编写UDTF代码进行解析,示例如下。

      • SQL

        -- 定义解析Kafka message的UDTF。
        CREATE FUNCTION kafkaparser AS 'com.alibaba.kafkaUDTF';-- 定义源表。注意:Kafka源表DDL字段必须与以下示例完全一致。WITH中参数可以修改。
        CREATE TABLE kafka_src (messageKey  VARBINARY,`message`   VARBINARY,topic       VARCHAR,`partition` INT,`offset`    BIGINT
        ) WITH (type = 

这篇关于Blink SQL之创建消息队列Kafka源表的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/433364

相关文章

SQL表间关联查询实例详解

《SQL表间关联查询实例详解》本文主要讲解SQL语句中常用的表间关联查询方式,包括:左连接(leftjoin)、右连接(rightjoin)、全连接(fulljoin)、内连接(innerjoin)、... 目录简介样例准备左外连接右外连接全外连接内连接交叉连接自然连接简介本文主要讲解SQL语句中常用的表

SQL server配置管理器找不到如何打开它

《SQLserver配置管理器找不到如何打开它》最近遇到了SQLserver配置管理器打不开的问题,尝试在开始菜单栏搜SQLServerManager无果,于是将自己找到的方法总结分享给大家,对SQ... 目录方法一:桌面图标进入方法二:运行窗口进入方法三:查找文件路径方法四:检查 SQL Server 安

MySQL 中的 LIMIT 语句及基本用法

《MySQL中的LIMIT语句及基本用法》LIMIT语句用于限制查询返回的行数,常用于分页查询或取部分数据,提高查询效率,:本文主要介绍MySQL中的LIMIT语句,需要的朋友可以参考下... 目录mysql 中的 LIMIT 语句1. LIMIT 语法2. LIMIT 基本用法(1) 获取前 N 行数据(

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

MySQL 分区与分库分表策略应用小结

《MySQL分区与分库分表策略应用小结》在大数据量、复杂查询和高并发的应用场景下,单一数据库往往难以满足性能和扩展性的要求,本文将详细介绍这两种策略的基本概念、实现方法及优缺点,并通过实际案例展示如... 目录mysql 分区与分库分表策略1. 数据库水平拆分的背景2. MySQL 分区策略2.1 分区概念

MySQL高级查询之JOIN、子查询、窗口函数实际案例

《MySQL高级查询之JOIN、子查询、窗口函数实际案例》:本文主要介绍MySQL高级查询之JOIN、子查询、窗口函数实际案例的相关资料,JOIN用于多表关联查询,子查询用于数据筛选和过滤,窗口函... 目录前言1. JOIN(连接查询)1.1 内连接(INNER JOIN)1.2 左连接(LEFT JOI

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

MySQL中动态生成SQL语句去掉所有字段的空格的操作方法

《MySQL中动态生成SQL语句去掉所有字段的空格的操作方法》在数据库管理过程中,我们常常会遇到需要对表中字段进行清洗和整理的情况,本文将详细介绍如何在MySQL中动态生成SQL语句来去掉所有字段的空... 目录在mysql中动态生成SQL语句去掉所有字段的空格准备工作原理分析动态生成SQL语句在MySQL

MySQL中FIND_IN_SET函数与INSTR函数用法解析

《MySQL中FIND_IN_SET函数与INSTR函数用法解析》:本文主要介绍MySQL中FIND_IN_SET函数与INSTR函数用法解析,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一... 目录一、功能定义与语法1、FIND_IN_SET函数2、INSTR函数二、本质区别对比三、实际场景案例分