本文主要是介绍Flink在大规模状态数据集下的checkpoint调优,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Flink程序运行一段时间就会报这个错误,定位好多天都没有定位到。checkpoint时间是5秒,20秒都不行。 Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://HDFSaaaa/flink/PointWideTable_OffTest_Test2/1eb66edcfccce6124c3b2d6ae402ec39/chk-355/1005127c-cee3-4099-8b61-aef819d72404 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) ... 7 more
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /flink/PointWideTable_OffTest_Test2/1eb66edcfccce6124c3b2d6ae402ec39/chk-355/1005127c-cee3-4099-8b61-aef819d72404 (inode 937800469): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_1949016825_84, pendingcreates: 10] at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3432) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3520) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3487) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:787) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:537) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045) at org.apache.hadoop.ipc.Client.call(Client.java:1476) at org.apache.hadoop.ipc.Client.call(Client.java:1413) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy17.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:462) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy18.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2506) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2482) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2447) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
同样在网上搜索,也有同样的问题,比如: 任务一开始正常,跑一两天后就会checkpoint超时,收不到Latest Acknowledgement,然后用同样的包重启又可以正常跑几天如此反复,一直找不到原因。
设置项如下: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置失败后一直重启 env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.milliseconds(1000), Time.minutes(5))); env.disableOperatorChaining(); env.enableCheckpointing(1000 * 60 * 15, CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setFailOnCheckpointingErrors(true); //业务比较复杂设置超时时间1个小时。 env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 60); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 10); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
前言 众所周知,Flink内部为了实现它的高可用性,实现了一套强大的checkpoint机制,还能保证作用的Exactly Once的快速恢复。对此,围绕checkpoint过程本身做了很多的工作。在官方文档中,也为用户解释了checkpoint的部分原理以及checkpoint在实际生产中(尤其是大规模状态集下)的checkpoint调优参数。笔者结合官方文档,给大家做个总结,也算是对Flink checkpoint机理的一个学习。 StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
Checkpoint的资源设置 当我们对越多的状态数据集做checkpoint时,需要消耗越多的资源。 因为Flink在checkpoint时是首先在每个task上做数据checkpoint,然后在外部存储中做checkpoint持久化。 在这里的一个优化思路是: 在总状态数据固定的情况下,当每个task平均所checkpoint的数据越少,那么相应地checkpoint的总时间也会变短。 所以我们可以为每个task设置更多的并行度(即分配更多的资源)来加速checkpoint的执行过程。 state.checkpoints.num-retained
,上面展示了每次checkpoint时RocksDB示例中存储的状态以及文件引用关系等。 对于checkpoint CP1,本地RocksDB目录包含两个磁盘文件(sstable),它基于checkpoint的name来创建目录。当完成checkpoint,将在共享注册表(shared state registry)中创建两个实体并将其count置为1.在共享注册表中存储的Key是由操作、子任务以及原始存储名称组成,同时注册表维护了一个Key到实际文件存储路径的Map。
对于checkpoint CP2,RocksDB已经创建了两个新的sstable文件,老的两个文件也存在。在CP2阶段,新的两个生成新文件,老的两个引用原来的存储。当checkpoint结束,所有引用文件的count加1。
对于checkpoint CP3,RocksDB的compaction将sstable-(1),sstable-(2)以及sstable-(3)合并为sstable-(1,2,3),同时删除了原始文件。合并后的文件包含原始文件的所有信息,并删除了重复的实体。除了该合并文件,sstable-(4)还存在,同时有一个sstable-(5)创建出来。Flink将新的sstable-(1,2,3)和sstable-(5)存储到底层,sstable-(4)引用CP2中的,并对相应引用次数count加1.老的CP1的checkpoint现在可以被删除,由于其retained已达到2,作为删除的一部分,Flink将所有CP1中的引用文件count减1.
对于checkpoint CP4,RocksDB合并sstable-(4)、sstable-(5)以及新的sstable-(6)成sstable-(4,5,6)。Flink将该新的sstable存储,并引用sstable-(1,2,3),并将sstable-(1,2,3)的count加1,删除CP2中retained到2的。由于sstable-(1), sstable-(2), 和sstable-(3)降到了0,Flink将其从底层删除。
— THE END —
这篇关于Flink在大规模状态数据集下的checkpoint调优的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!