Flink在大规模状态数据集下的checkpoint调优

2024-09-06 21:48

本文主要是介绍Flink在大规模状态数据集下的checkpoint调优,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

今天接到一个同学的反馈问题,大概是:
 
Flink程序运行一段时间就会报这个错误,定位好多天都没有定位到。checkpoint时间是5秒,20秒都不行。	Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://HDFSaaaa/flink/PointWideTable_OffTest_Test2/1eb66edcfccce6124c3b2d6ae402ec39/chk-355/1005127c-cee3-4099-8b61-aef819d72404 in order to obtain the stream state handle	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)	at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)	at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)	at java.util.concurrent.FutureTask.run(FutureTask.java:266)	at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)	... 7 more	
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /flink/PointWideTable_OffTest_Test2/1eb66edcfccce6124c3b2d6ae402ec39/chk-355/1005127c-cee3-4099-8b61-aef819d72404 (inode 937800469): File does not exist. [Lease.  Holder: DFSClient_NONMAPREDUCE_1949016825_84, pendingcreates: 10]	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3432)	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3520)	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3487)	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:787)	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:537)	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)	at java.security.AccessController.doPrivileged(Native Method)	at javax.security.auth.Subject.doAs(Subject.java:422)	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)	at org.apache.hadoop.ipc.Client.call(Client.java:1476)	at org.apache.hadoop.ipc.Client.call(Client.java:1413)	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)	at com.sun.proxy.$Proxy17.complete(Unknown Source)	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:462)	at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)	at java.lang.reflect.Method.invoke(Method.java:498)	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)	at com.sun.proxy.$Proxy18.complete(Unknown Source)	at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2506)	at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2482)	at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2447)	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)	at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)	at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
同样在网上搜索,也有同样的问题,比如:
 
任务一开始正常,跑一两天后就会checkpoint超时,收不到Latest Acknowledgement,然后用同样的包重启又可以正常跑几天如此反复,一直找不到原因。	
设置项如下:	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();	//设置失败后一直重启	env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.milliseconds(1000), Time.minutes(5)));	env.disableOperatorChaining(); 	env.enableCheckpointing(1000 * 60 * 15, CheckpointingMode.AT_LEAST_ONCE);	env.getCheckpointConfig().setFailOnCheckpointingErrors(true);	//业务比较复杂设置超时时间1个小时。	env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 60);	env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000  * 10);	env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
前言 众所周知,Flink内部为了实现它的高可用性,实现了一套强大的checkpoint机制,还能保证作用的Exactly Once的快速恢复。对此,围绕checkpoint过程本身做了很多的工作。在官方文档中,也为用户解释了checkpoint的部分原理以及checkpoint在实际生产中(尤其是大规模状态集下)的checkpoint调优参数。笔者结合官方文档,给大家做个总结,也算是对Flink checkpoint机理的一个学习。


Checkpoint快慢的性能指标 如果说我们想要对flink的checkpoint操作做调优,那么我们首先得有个衡量指标来展现当前checkpoint是否快慢。 在这里,官方提供了以下2个metric指标:


Checkpoint每次开始的时间。 观察每次checkpoint开始的时间是为了检测在每次前后checkpoint中间是否存在空闲时间间隔。 如果存在间隔时间,说明当前checkpoint都在合理时间内完成。


观察数据buffered的量。 这个buffered动作是为了等待其它较慢数据流的stream barriers而设计的。 这个偏向于checkpoint原理化的相关内容了。


但大体上,用户根据第一条就能够监测出应用的checkout快慢了。


相邻Checkpoint的间隔时间设置 我们假设一个使用场景,在极大规模状态数据集下,应用每次的checkpoint时长都超过系统设定的最大时间(也就是checkpoint间隔时长),那么会发生什么样的事情。


答案是应用会一直在做checkpoint,因为当应用发现它刚刚做完一次checkpoint后,又已经到了下次checkpoint的时间了,然后又开始新的checkpoint。 最后就会造成一个很坏的结果: 用户应用本身都没法跑了。


当然了,我们可能会说了,我们设置一下并行checkpoint数,或者说做增量checkpoint,不用每次做全量checkpoint。 每次只checkpoint出对前一次checkpoint内的状态数据的增量改动。 然后恢复的时候做状态改动的重放。


但是这里,我们可以采用一种更加直接有效的方法,设置连续checkpoint的时间间隔。 形象地解释,就是强行在checkpoint间塞入空闲时间,如下图。


640?wx_fmt=bmp 涉及的相关配置设置如下:
 
StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
Checkpoint的资源设置 当我们对越多的状态数据集做checkpoint时,需要消耗越多的资源。 因为Flink在checkpoint时是首先在每个task上做数据checkpoint,然后在外部存储中做checkpoint持久化。 在这里的一个优化思路是: 在总状态数据固定的情况下,当每个task平均所checkpoint的数据越少,那么相应地checkpoint的总时间也会变短。 所以我们可以为每个task设置更多的并行度(即分配更多的资源)来加速checkpoint的执行过程。


Checkpoint的task本地性恢复 为了大家未来对checkpoint的优化,我们有必要在runtime级别的checkpoint过程。 首先我们要明白一点,flink的checkpoint不是一个完全在master节点的过程,而是分散在每个task上执行,然后在做汇总持久化。 这些task做的checkpoint数据在后面应用恢复时包括并行度扩增或减少时还能够重新打散分布。


为了快速的状态恢复,每个task会同时写checkpoint数据到本地磁盘和远程分布式存储,也就是说,这是一份双拷贝。 只要task本地的checkpoint数据没有被破坏,系统在应用恢复时会首先加载本地的checkpoint数据,这样就大大减少了远程拉取状态数据的过程。 此过程如下图所示: 640?wx_fmt=png


外部State的存储选择 上小节的方法其实还并没有从本质上解决大规模状态集下checkpoint慢的问题,只是说它降低了这个慢的风险和造成的影响。在这里我们反复强调的是一个大规模状态,我们理理思路,因为规模之大,所以我们才会慢。那如果我们能找到一种更快的存储状态的介质(或者策略),那么这个过程也是能够变快的。


所以在这里,我们可以选择更加高效的外部存储介质来做State的存储(比如RocksDB),而不是仅限于存储于有限的内存空间里,或完全落地到磁盘上。这是我们在State Backend上做的一个选择。


可以使用RocksDB来作为增量checkpoint的存储,并在其中不是持续增大,可以进行定期合并清楚历史状态。


640?wx_fmt=other


该例子中,子任务的操作是一个keyed-state,一个checkpoint文件保存周期是可配置的,本例中是2,配置方式 state.checkpoints.num-retained ,上面展示了每次checkpoint时RocksDB示例中存储的状态以及文件引用关系等。
  • 对于checkpoint CP1,本地RocksDB目录包含两个磁盘文件(sstable),它基于checkpoint的name来创建目录。当完成checkpoint,将在共享注册表(shared state registry)中创建两个实体并将其count置为1.在共享注册表中存储的Key是由操作、子任务以及原始存储名称组成,同时注册表维护了一个Key到实际文件存储路径的Map。

  • 对于checkpoint CP2,RocksDB已经创建了两个新的sstable文件,老的两个文件也存在。在CP2阶段,新的两个生成新文件,老的两个引用原来的存储。当checkpoint结束,所有引用文件的count加1。

  • 对于checkpoint CP3,RocksDB的compaction将sstable-(1),sstable-(2)以及sstable-(3)合并为sstable-(1,2,3),同时删除了原始文件。合并后的文件包含原始文件的所有信息,并删除了重复的实体。除了该合并文件,sstable-(4)还存在,同时有一个sstable-(5)创建出来。Flink将新的sstable-(1,2,3)和sstable-(5)存储到底层,sstable-(4)引用CP2中的,并对相应引用次数count加1.老的CP1的checkpoint现在可以被删除,由于其retained已达到2,作为删除的一部分,Flink将所有CP1中的引用文件count减1.

  • 对于checkpoint CP4,RocksDB合并sstable-(4)、sstable-(5)以及新的sstable-(6)成sstable-(4,5,6)。Flink将该新的sstable存储,并引用sstable-(1,2,3),并将sstable-(1,2,3)的count加1,删除CP2中retained到2的。由于sstable-(1), sstable-(2), 和sstable-(3)降到了0,Flink将其从底层删除。


— THE END —

640?wx_fmt=jpeg

这篇关于Flink在大规模状态数据集下的checkpoint调优的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143209

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

hdu1565(状态压缩)

本人第一道ac的状态压缩dp,这题的数据非常水,很容易过 题意:在n*n的矩阵中选数字使得不存在任意两个数字相邻,求最大值 解题思路: 一、因为在1<<20中有很多状态是无效的,所以第一步是选择有效状态,存到cnt[]数组中 二、dp[i][j]表示到第i行的状态cnt[j]所能得到的最大值,状态转移方程dp[i][j] = max(dp[i][j],dp[i-1][k]) ,其中k满足c

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi