本文主要是介绍Flink cdc如何只进行增量同步,不同步历史数据(只读取binlog),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
环境
flink: 1.15.3
flink-connector-mysql-cdc: 2.4.0
场景:
mysql cdc到starrocks,
目前线上环境以及有老的任务在同步,现在升级了任务(旧checkpoints无法使用)旧表里面数据特别大,不方便重新同步
方案
思考:表里面的旧数据放着不动,只从binlog同步最新的数据。
解决
https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/mysql/
这是官网的解决办法,但是没有说参数设置在哪里。起初设置到debeziumProperties()里面是不生效的。
其实MySqlSourceBuilder是有一个方法特意指定startUP mode的
MySqlSourceBuilder<String> builder = new MySqlSourceBuilder<>();if (StringUtils.isNotBlank(latestOffset)){// 从最新的binlog同步builder.startupOptions(StartupOptions.latest());}
这样就可以了,重启任务不会读取历史数据。
最后(注意!!!)
执行过后请停止任务从最新的checkpoints恢复,或者记录好checkpoints。否则下次仍然指定这种模式会丢失数据的,还有指定了这个模式不要写死,否则checkpoints会失效的,用参数的方式传入!
这篇关于Flink cdc如何只进行增量同步,不同步历史数据(只读取binlog)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!