Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink

本文主要是介绍Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1. 准备工作

生成数据

创建数据表

2. 创建数据表

创建数据源表

创建数据目标表

3. 计算

WITH子句


1. 准备工作

生成数据

source kafka json 数据格式 :

topic  case_kafka_mysql:

{"ts": "20201011","id": 8,"price_amt":211}

topic  flink_test_2:

{"id": 8,"coupon_price_amt":100}

注意:针对双流中的每条记录都发触发

topic: case_kafka_mysql

docker exec -it 192d1369463a bashbash-5.1# cd /opt/kafka_2.12-2.5.0/binbash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic case_kafka_mysql>{"ts": "20201011","id": 8,"price_amt":211}

topic: flink_test_2

docker exec -it 192d1369463a bashbash-5.1# cd /opt/kafka_2.12-2.5.0/binbash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test_2>{"id": 8,"coupon_price_amt":100}

创建数据表

mysql 建表语句

CREATE TABLE `sync_test_2` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;CREATE TABLE `sync_test_22` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`coupon_ratio` double DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

2. 创建数据表

创建数据源表
create table flink_test_2_1 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test2-1','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');create table flink_test_2_2 (id BIGINT,coupon_price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'flink_test_2','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test2-2','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');

关键配置的说明:

json.fail-on-missing-field:在json缺失字段时是否报错

json.ignore-parse-errors:在解析json失败时是否报错

一般无法保证json格式,所以以上两个配置是比较重要的。

创建数据目标表
CREATE TABLE sync_test_2 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_2','username' = 'root','password' = 'Admin');CREATE TABLE sync_test_22 (ts string,coupon_ration bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_2','username' = 'root','password' = 'Admin');

3. 计算

一个作业中写入一个Sink或多个Sink

说明 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO sync_test_2
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id)
GROUP BY ts;INSERT INTO sync_test_22
SELECTts,sum(coupon_price_amt)/sum(amount) AS coupon_ration
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id)
GROUP BY ts;;
END;      --写入多个Sink时,必填。

WITH子句

WITH提供了一种编写辅助语句以用于更大的查询的方法。这些语句通常被称为公共表表达式(CTE),可以被视为定义仅针对一个查询存在的临时视图。

改写上述查询:

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
with orders_with_coupon AS (SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id
)INSERT INTO sync_test_2
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM orders_with_coupon
GROUP BY ts;INSERT INTO sync_test_22
SELECTts,coupon_price_amt/price_amt AS coupon_ration
FROM orders_with_coupon
GROUP BY ts;;
END;      --写入多个Sink时,必填。

这篇关于Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/461648

相关文章

Ubuntu中远程连接Mysql数据库的详细图文教程

《Ubuntu中远程连接Mysql数据库的详细图文教程》Ubuntu是一个以桌面应用为主的Linux发行版操作系统,这篇文章主要为大家详细介绍了Ubuntu中远程连接Mysql数据库的详细图文教程,有... 目录1、版本2、检查有没有mysql2.1 查询是否安装了Mysql包2.2 查看Mysql版本2.

基于SpringBoot+Mybatis实现Mysql分表

《基于SpringBoot+Mybatis实现Mysql分表》这篇文章主要为大家详细介绍了基于SpringBoot+Mybatis实现Mysql分表的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录基本思路定义注解创建ThreadLocal创建拦截器业务处理基本思路1.根据创建时间字段按年进

Python3.6连接MySQL的详细步骤

《Python3.6连接MySQL的详细步骤》在现代Web开发和数据处理中,Python与数据库的交互是必不可少的一部分,MySQL作为最流行的开源关系型数据库管理系统之一,与Python的结合可以实... 目录环境准备安装python 3.6安装mysql安装pymysql库连接到MySQL建立连接执行S

Java编译生成多个.class文件的原理和作用

《Java编译生成多个.class文件的原理和作用》作为一名经验丰富的开发者,在Java项目中执行编译后,可能会发现一个.java源文件有时会产生多个.class文件,从技术实现层面详细剖析这一现象... 目录一、内部类机制与.class文件生成成员内部类(常规内部类)局部内部类(方法内部类)匿名内部类二、

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

mysql出现ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)的解决方法

《mysql出现ERROR2003(HY000):Can‘tconnecttoMySQLserveron‘localhost‘(10061)的解决方法》本文主要介绍了mysql出现... 目录前言:第一步:第二步:第三步:总结:前言:当你想通过命令窗口想打开mysql时候发现提http://www.cpp

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

MySQL错误代码2058和2059的解决办法

《MySQL错误代码2058和2059的解决办法》:本文主要介绍MySQL错误代码2058和2059的解决办法,2058和2059的错误码核心都是你用的客户端工具和mysql版本的密码插件不匹配,... 目录1. 前置理解2.报错现象3.解决办法(敲重点!!!)1. php前置理解2058和2059的错误