实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作

本文主要是介绍实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

系列文章目录

实践数据湖iceberg 第一课 入门
实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式
实践数据湖iceberg 第三课 在sqlclient中,以sql方式从kafka读数据到iceberg
实践数据湖iceberg 第四课 在sqlclient中,以sql方式从kafka读数据到iceberg(升级版本到flink1.12.7)
实践数据湖iceberg 第五课 hive catalog特点
实践数据湖iceberg 第六课 从kafka写入到iceberg失败问题 解决
实践数据湖iceberg 第七课 实时写入到iceberg
实践数据湖iceberg 第八课 hive与iceberg集成
实践数据湖iceberg 第九课 合并小文件
实践数据湖iceberg 第十课 快照删除
实践数据湖iceberg 第十一课 测试分区表完整流程(造数、建表、合并、删快照)
实践数据湖iceberg 第十二课 catalog是什么
实践数据湖iceberg 第十三课 metadata比数据文件大很多倍的问题
实践数据湖iceberg 第十四课 元数据合并(解决元数据随时间增加而元数据膨胀的问题)
实践数据湖iceberg 第十五课 spark安装与集成iceberg(jersey包冲突)
实践数据湖iceberg 第十六课 通过spark3打开iceberg的认知之门
实践数据湖iceberg 第十七课 hadoop2.7,spark3 on yarn运行iceberg配置
实践数据湖iceberg 第十八课 多种客户端与iceberg交互启动命令(常用命令)
实践数据湖iceberg 第十九课 flink count iceberg,无结果问题
实践数据湖iceberg 第二十课 flink + iceberg CDC场景(版本问题,测试失败)
实践数据湖iceberg 第二十一课 flink1.13.5 + iceberg0.131 CDC(测试成功INSERT,变更操作失败)
实践数据湖iceberg 第二十二课 flink1.13.5 + iceberg0.131 CDC(CRUD测试成功)
实践数据湖iceberg 第二十三课 flink-sql从checkpoint重启
实践数据湖iceberg 第二十四课 iceberg元数据详细解析
实践数据湖iceberg 第二十五课 后台运行flink sql 增删改的效果
实践数据湖iceberg 第二十六课 checkpoint设置方法
实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作
实践数据湖iceberg 第二十八课 把公有仓库上不存在的包部署到本地仓库
实践数据湖iceberg 第二十九课 如何优雅高效获取flink的jobId
实践数据湖iceberg 第三十课 mysql->iceberg,不同客户端有时区问题
实践数据湖iceberg 更多的内容目录

文章目录

  • 系列文章目录
  • 前言
  • 一、初始化
    • 1.1 代码
    • 1.2 启动命令
    • 1.3.引入库
    • 1.4 sink的iceberg表查询:
    • 1.5 页面查看,开启了checkpoint
  • 二、停止作业
    • 2.1 cancel作业
    • 2.2 写入2条数据
  • 三、 从checkpoint恢复
  • 总结


前言

程序化部署,测试flink cdc重启恢复
测试思路:1.程序停止时,进行checkpoint记录,记录checkpoint的位置 2.程序停止时,写入数据, 记录写入的数据, 测试重启后,能否从故障点开始恢复。
结论:能


一、初始化

1.1 代码

代码思路: 1. 定义source表, 2.定义sink表 3. 写入sink from source

public static void main(String[] args) throws Exception {FromTableToIcebergSqlTemple temple = new FromMysqlToIcebergSql();String fromSql = temple.createFromTableSql();String createToTableSql = temple.createIcebergTableSql();String createIcebergCatalog = temple.createIcebergCatalogSql();System.setProperty("HADOOP_USER_NAME", "root");//TODO 1.准备环境//1.1流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000L);env.setParallelism(1);//1.2 表执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//1.3 建source表tableEnv.executeSql(fromSql);//1.4 建iceberg表tableEnv.executeSql(createIcebergCatalog);tableEnv.executeSql("use catalog "+ temple.icebergCatalogName());tableEnv.executeSql("use "+ temple.icebergDbName());tableEnv.executeSql(createToTableSql);//1.5 执行sqltableEnv.executeSql("use catalog default_catalog");tableEnv.executeSql("use default_database");tableEnv.executeSql(temple.sourceToIcebergSinkSql());//TODO 6.执行任务env.execute();}

1.2 启动命令

108060 YarnCoarseGrainedExecutorBackend
[root@hadoop101 apps]# [root@hadoop101 apps]# flink run -c com.jintemg.cdc.FlinkSqlCdcRunner -C file:///opt/software/flink1.13-iceberg0131/iceberg-flink-runtime-1.13-0.13.1.jar -C file:///opt/software/flink1.13-iceberg0131/flink-sql-connector-hive-2.3.6_2.12-1.13.5.jar -C file:///opt/software/flink1.13-iceberg0131/flink-sql-connector-mysql-cdc-2.1.1.jar flink-iceberg-learning-1.0-SNAPSHOT.jar

1.3.引入库

清空表,写入3条数据

INSERT INTO `stock_basic` VALUES ('0', '000001.SZ', '000001', '平安银行', '深圳', '银行', '19910403', null);
INSERT INTO `stock_basic` VALUES ('1', '000002.SZ', '000002', '万科A', '深圳', '全国地产', '19910129', null);
INSERT INTO `stock_basic` VALUES ('2', '000004.SZ', '000004', '国华网安', '深圳', '软件服务', '19910114', '李映彤');

1.4 sink的iceberg表查询:

Time taken: 0.4 seconds, Fetched 3 row(s)
spark-sql (default)> select * from stock_basic_iceberg_sink;
22/04/07 16:06:45 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
i       ts_code symbol  name    area    industry        list_date       actural_controller
0       000001.SZ       000001  平安银行        深圳    银行    19910403        NULL
1       000002.SZ       000002  万科A   深圳    全国地产        19910129        NULL
2       000004.SZ       000004  国华网安        深圳    软件服务        19910114        李映彤
Time taken: 0.519 seconds, Fetched 3 row(s)

1.5 页面查看,开启了checkpoint

在这里插入图片描述

checkpint地址
Path: hdfs:/flink/checkpoints/aa8a8b5313bee126401e30e8e03491de/chk-223

二、停止作业

2.1 cancel作业

2.2 写入2条数据

写入2条数据

INSERT INTO `stock_basic` VALUES ('3', '000005.SZ', '000005', 'ST星源', '深圳', '环境保护', '19901210', '郑列列,丁芃');
INSERT INTO `stock_basic` VALUES ('4', '000006.SZ', '000006', '深振业A', '深圳', '区域地产', '19920427', '深圳市人民政府国有资产监督管理委员会');

三、 从checkpoint恢复

整体思路:检查是否从id=3开始同步,把3,4同步过来,0,1,2没有同步.

从checkpoint恢复命令:

[root@hadoop101 apps]#  flink run -s hdfs:///flink/checkpoints/aa8a8b5313bee126401e30e8e03491de/chk-224  -c com.jintemg.cdc.FlinkSqlCdcRunner  -C file:///opt/software/flink1.13-iceberg0131/iceberg-flink-runtime-1.13-0.13.1.jar -C file:///opt/software/flink1.13-iceberg0131/flink-sql-connector-hive-2.3.6_2.12-1.13.5.jar  -C file:///opt/software/flink1.13-iceberg0131/flink-sql-connector-mysql-cdc-2.1.1.jar  flink-iceberg-learning-1.0-SNAPSHOT.jar

结果: 到iceberg查,发现从上次中断消费开始继续

spark-sql (default)> select * from stock_basic_iceberg_sink;
22/04/07 16:58:55 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
i       ts_code symbol  name    area    industry        list_date       actural_controller
0       000001.SZ       000001  平安银行        深圳    银行    19910403        NULL
1       000002.SZ       000002  万科A   深圳    全国地产        19910129        NULL
2       000004.SZ       000004  国华网安        深圳    软件服务        19910114        李映彤
3       000005.SZ       000005  ST星源  深圳    环境保护        19901210        郑列列,丁芃
4       000006.SZ       000006  深振业A 深圳    区域地产        19920427        深圳市人民政府国有资产监督管理委员会

总结

发现从cdc是能从checkpoint恢复,程序正常运行。
现在有下一个问题:如何在程序中获取 本任务的checkpoint位置? 本任务的jobId?

这篇关于实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/642889

相关文章

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

MySQL 中 ROW_NUMBER() 函数最佳实践

《MySQL中ROW_NUMBER()函数最佳实践》MySQL中ROW_NUMBER()函数,作为窗口函数为每行分配唯一连续序号,区别于RANK()和DENSE_RANK(),特别适合分页、去重... 目录mysql 中 ROW_NUMBER() 函数详解一、基础语法二、核心特点三、典型应用场景1. 数据分

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

SpringBoot集成LiteFlow工作流引擎的完整指南

《SpringBoot集成LiteFlow工作流引擎的完整指南》LiteFlow作为一款国产轻量级规则引擎/流程引擎,以其零学习成本、高可扩展性和极致性能成为微服务架构下的理想选择,本文将详细讲解Sp... 目录一、LiteFlow核心优势二、SpringBoot集成实战三、高级特性应用1. 异步并行执行2

linux重启命令有哪些? 7个实用的Linux系统重启命令汇总

《linux重启命令有哪些?7个实用的Linux系统重启命令汇总》Linux系统提供了多种重启命令,常用的包括shutdown-r、reboot、init6等,不同命令适用于不同场景,本文将详细... 在管理和维护 linux 服务器时,完成系统更新、故障排查或日常维护后,重启系统往往是必不可少的步骤。本文

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模