本文主要是介绍FlinkX断点续传原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
FlinkX断点续传原理
声明: 前半部分官方文档+贴实现代码
前提条件
同步任务要支持断点续传,对数据源有一些强制性的要求:
- 数据源(这里特指关系数据库)中必须包含一个升序的字段,比如主键或者日期类型的字段,同步过程中会使用checkpoint机制记录这个字段的值,任务恢复运行时使用这个字段构造查询条件过滤已经同步过的数据,如果这个字段的值不是升序的,那么任务恢复时过滤的数据就是错误的,最终导致数据的缺失或重复;
- 数据源必须支持数据过滤,如果不支持的话,任务就无法从断点处恢复运行,会导致数据重复;
- 目标数据源必须支持事务,比如关系数据库,文件类型的数据源也可以通过临时文件的方式支持;
配置
restore用于配置同步任务类型(离线同步、实时采集)和断点续传功能。具体配置如下所示:
"restore" : {"isStream" : false,"isRestore" : false,"restoreColumnName" : "","restoreColumnIndex" : 0,"maxRowNumForCheckpoint" : 10000
}
名称 | 说明 | 是否必填 | 默认值 | 参数类型 |
---|---|---|---|---|
isStream | 是否为实时采集任务 | 否 | false | Boolean |
isRestore | 是否开启断点续传 | 否 | false | Boolean |
restoreColumnName | 断点续传字段名称 | 开启断点续传后必填 | 无 | String |
restoreColumnIndex | 断点续传字段索引ID | 开启断点续传后必填 | -1 | int |
maxRowNumForCheckpoint | 触发checkpoint数据条数 | 否 | 10000 | int |
任务运行的详细过程
我们用一个具体的任务详细介绍一下整个过程,任务详情如下:
数据源 | mysql表,假设表名data_test,表中包含主键字段id |
---|---|
目标数据源 | hdfs文件系统,假设写入路径为 /data_test |
并发数 | 2 |
checkpoint配置 | 时间间隔为60s,checkpoint的StateBackend为FsStateBackend,路径为 /flinkx/checkpoint |
jobId | 用来构造数据文件的名称,假设为 abc123 |
1) 读取数据
读取数据时首先要构造数据分片,构造数据分片就是根据通道索引和checkpoint记录的位置构造查询sql,sql模板如下:
select * from data_test
where id mod ${channel_num}=${channel_index}
and id > ${offset}
如果是第一次运行,或者上一次任务失败时还没有触发checkpoint,那么offset就不存在,根据offset和通道可以确定具体的查询sql:
offset存在时
第一个通道:
select * from data_test
where id mod 2=0
and id > ${offset_0};第二个通道
select * from data_test
where id mod 2=1
and id > ${offset_1};
offset不存在时
第一个通道:
select * from data_test
where id mod 2=0;第二个通道
select * from data_test
where id mod 2=1;
数据分片构造好之后,每个通道就根据自己的数据分片去读数据了。
2)写数据
写数据前会先做几个操作:
-
检测 /data_test 目录是否存在,如果目录不存在,则创建这个目录,如果目录存在,进行2操作;
-
判断是不是以覆盖模式写数据,如果是,则删除 /data_test目录,然后再创建目录,如果不是,则进行3操作;
-
检测 /data_test/.data 目录是否存在,如果存在就先删除,再创建,确保没有其它任务因异常失败遗留的脏数据文件;
数据写入hdfs是单条写入的,不支持批量写入。数据会先写入/data_test/.data/目录下,数据文件的命名格式为: channelIndex.jobId.fileIndex,包含通道索引,jobId,文件索引三个部分,文件最掐灭。
这篇关于FlinkX断点续传原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!