Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink

本文主要是介绍Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1. 准备工作

生成数据

创建数据表

2. 创建数据表

创建数据源表

创建数据目标表

3. 计算

WITH子句


1. 准备工作

生成数据

source kafka json 数据格式 :

topic  case_kafka_mysql:

{"ts": "20201011","id": 8,"price_amt":211}

topic  flink_test_2:

{"id": 8,"coupon_price_amt":100}

注意:针对双流中的每条记录都发触发

topic: case_kafka_mysql

docker exec -it 192d1369463a bashbash-5.1# cd /opt/kafka_2.12-2.5.0/binbash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic case_kafka_mysql>{"ts": "20201011","id": 8,"price_amt":211}

topic: flink_test_2

docker exec -it 192d1369463a bashbash-5.1# cd /opt/kafka_2.12-2.5.0/binbash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test_2>{"id": 8,"coupon_price_amt":100}

创建数据表

mysql 建表语句

CREATE TABLE `sync_test_2` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;CREATE TABLE `sync_test_22` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`coupon_ratio` double DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

2. 创建数据表

创建数据源表
create table flink_test_2_1 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test2-1','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');create table flink_test_2_2 (id BIGINT,coupon_price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'flink_test_2','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test2-2','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');

关键配置的说明:

json.fail-on-missing-field:在json缺失字段时是否报错

json.ignore-parse-errors:在解析json失败时是否报错

一般无法保证json格式,所以以上两个配置是比较重要的。

创建数据目标表
CREATE TABLE sync_test_2 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_2','username' = 'root','password' = 'Admin');CREATE TABLE sync_test_22 (ts string,coupon_ration bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_2','username' = 'root','password' = 'Admin');

3. 计算

一个作业中写入一个Sink或多个Sink

说明 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO sync_test_2
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id)
GROUP BY ts;INSERT INTO sync_test_22
SELECTts,sum(coupon_price_amt)/sum(amount) AS coupon_ration
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id)
GROUP BY ts;;
END;      --写入多个Sink时,必填。

WITH子句

WITH提供了一种编写辅助语句以用于更大的查询的方法。这些语句通常被称为公共表表达式(CTE),可以被视为定义仅针对一个查询存在的临时视图。

改写上述查询:

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
with orders_with_coupon AS (SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_2_1 as aLEFT JOIN flink_test_2_2 b on b.id = a.id
)INSERT INTO sync_test_2
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM orders_with_coupon
GROUP BY ts;INSERT INTO sync_test_22
SELECTts,coupon_price_amt/price_amt AS coupon_ration
FROM orders_with_coupon
GROUP BY ts;;
END;      --写入多个Sink时,必填。

这篇关于Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/461648

相关文章

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

安卓链接正常显示,ios#符被转义%23导致链接访问404

原因分析: url中含有特殊字符 中文未编码 都有可能导致URL转换失败,所以需要对url编码处理  如下: guard let allowUrl = webUrl.addingPercentEncoding(withAllowedCharacters: .urlQueryAllowed) else {return} 后面发现当url中有#号时,会被误伤转义为%23,导致链接无法访问

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

poj 1113 凸包+简单几何计算

题意: 给N个平面上的点,现在要在离点外L米处建城墙,使得城墙把所有点都包含进去且城墙的长度最短。 解析: 韬哥出的某次训练赛上A出的第一道计算几何,算是大水题吧。 用convexhull算法把凸包求出来,然后加加减减就A了。 计算见下图: 好久没玩画图了啊好开心。 代码: #include <iostream>#include <cstdio>#inclu

uva 1342 欧拉定理(计算几何模板)

题意: 给几个点,把这几个点用直线连起来,求这些直线把平面分成了几个。 解析: 欧拉定理: 顶点数 + 面数 - 边数= 2。 代码: #include <iostream>#include <cstdio>#include <cstdlib>#include <algorithm>#include <cstring>#include <cmath>#inc