【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决

本文主要是介绍【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景


Flink版本:1.13.5

一个使用FlinkSQL开发的生产线上任务, 使用Tumble Window做聚和统计,并且配置table.exec.state.ttl为7200000,设置checkpoint周期为5分钟,使用rocksdb的增量模式。

正常情况下,任务运行一段时间以后,新增和过期的状态达到动态的平衡,随着RocksDB的compaction,checkpoint的大小会在小范围内上下起伏。

实际观察到,checkpoint大小持续缓慢增长,运行20天以后,从最初了100M左右,增长到了2G,checkpoint的时间也从1秒增加到了几十秒。

源码分析


我们看一下RocksIncrementalSnapshotStrategy.RocksDBIncrementalSnapshotOperation类中的get()方法:

public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {boolean completed = false;SnapshotResult<StreamStateHandle> metaStateHandle = null;Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap();HashMap miscFiles = new HashMap();boolean var15 = false;SnapshotResult var18;try {var15 = true;metaStateHandle = this.materializeMetaData(snapshotCloseableRegistry);Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);synchronized(RocksIncrementalSnapshotStrategy.this.materializedSstFiles) {RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());}IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();SnapshotResult snapshotResult;if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);} else {snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);}completed = true;var18 = snapshotResult;var15 = false;} finally {if (var15) {if (!completed) {List<StateObject> statesToDiscard = new ArrayList(1 + miscFiles.size() + sstFiles.size());statesToDiscard.add(metaStateHandle);statesToDiscard.addAll(miscFiles.values());statesToDiscard.addAll(sstFiles.values());this.cleanupIncompleteSnapshot(statesToDiscard);}}}

重点关注uploadSstFiles()方法的实现细节:

            Preconditions.checkState(this.localBackupDirectory.exists());Map<StateHandleID, Path> sstFilePaths = new HashMap();Map<StateHandleID, Path> miscFilePaths = new HashMap();Path[] files = this.localBackupDirectory.listDirectory();if (files != null) {this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));}

进入到createUploadFilePaths()方法:

        private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {Path[] var5 = files;int var6 = files.length;for(int var7 = 0; var7 < var6; ++var7) {Path filePath = var5[var7];String fileName = filePath.getFileName().toString();StateHandleID stateHandleID = new StateHandleID(fileName);if (!fileName.endsWith(".sst")) {miscFilePaths.put(stateHandleID, filePath);} else {boolean existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);if (existsAlready) {sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());} else {sstFilePaths.put(stateHandleID, filePath);}}}}

  这里是问题的关键,我们可以归纳出主要逻辑:

1. 扫描rocksdb本地存储目录下的所有文件,获取到所有的sst文件和misc文件(除sst文件外的其他所有文件);

2. 将sst文件和历史checkpoint上传的sst文件做对比,将新增的sst文件路径记录下来;

3. 将misc文件的路径记录下来;

这里就是增量checkpoint的关键逻辑了, 我们发现一点,增量的checkpoint只针对sst文件, 对其他的misc文件是每次全量备份的,我们进到一个目录节点看一下有哪些文件被全量备份了:

[hadoop@fsp-hadoop-1 db]$ ll
总用量 8444
-rw-r--r-- 1 hadoop hadoop       0 3月  28 14:56 000058.log
-rw-r--r-- 1 hadoop hadoop 2065278 3月  31 10:17 025787.sst
-rw-r--r-- 1 hadoop hadoop 1945453 3月  31 10:18 025789.sst
-rw-r--r-- 1 hadoop hadoop   75420 3月  31 10:18 025790.sst
-rw-r--r-- 1 hadoop hadoop   33545 3月  31 10:18 025791.sst
-rw-r--r-- 1 hadoop hadoop   40177 3月  31 10:18 025792.sst
-rw-r--r-- 1 hadoop hadoop   33661 3月  31 10:18 025793.sst
-rw-r--r-- 1 hadoop hadoop   40494 3月  31 10:19 025794.sst
-rw-r--r-- 1 hadoop hadoop   33846 3月  31 10:19 025795.sst
-rw-r--r-- 1 hadoop hadoop      16 3月  30 19:46 CURRENT
-rw-r--r-- 1 hadoop hadoop      37 3月  28 14:56 IDENTITY
-rw-r--r-- 1 hadoop hadoop       0 3月  28 14:56 LOCK
-rw-rw-r-- 1 hadoop hadoop   38967 3月  28 14:56 LOG
-rw-r--r-- 1 hadoop hadoop 1399964 3月  31 10:19 MANIFEST-022789
-rw-r--r-- 1 hadoop hadoop   10407 3月  28 14:56 OPTIONS-000010
-rw-r--r-- 1 hadoop hadoop   13126 3月  28 14:56 OPTIONS-000012

1. CURRENT、IDENTIFY、LOCK、OPTIONS-*, 这些文件基本是固定大小,不会有变化;

2. LOG文件, 这个文件是rocksdb的日志文件,默认情况下,flink设置的rocksdb的日志输出级别是HEAD级别,几乎不会有日志输出,但是如果你配置了state.backend.rocksdb.log.level,比如说配置为了INFO_LEVEL,那么这个LOG文件会持续输出并且不会被清理;

3. MANIFEST-*,这是rocksdb的事务日志,在任务恢复重放过程中会用到, 这个日志也会持续增长,达到阈值以后滚动生成新的并且清楚旧文件;

原因总结


在增量checkpoint过程中,虽然sst文件所保存的状态数据大小保持动态平衡,但是LOG日志和MANIFEST文件仍然会当向持续增长,所以checkpoint会越来越大,越来越慢。

解决办法


1. 在生产环境关闭Rocksdb日志(保持state.backend.rocksdb.log.level的默认配置即可);

2. 设置manifest文件的滚动阈值,我设置的是10485760byte;

问题解决。

这篇关于【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/170807

相关文章

Java循环创建对象内存溢出的解决方法

《Java循环创建对象内存溢出的解决方法》在Java中,如果在循环中不当地创建大量对象而不及时释放内存,很容易导致内存溢出(OutOfMemoryError),所以本文给大家介绍了Java循环创建对象... 目录问题1. 解决方案2. 示例代码2.1 原始版本(可能导致内存溢出)2.2 修改后的版本问题在

大数据小内存排序问题如何巧妙解决

《大数据小内存排序问题如何巧妙解决》文章介绍了大数据小内存排序的三种方法:数据库排序、分治法和位图法,数据库排序简单但速度慢,对设备要求高;分治法高效但实现复杂;位图法可读性差,但存储空间受限... 目录三种方法:方法概要数据库排序(http://www.chinasem.cn对数据库设备要求较高)分治法(常

Vue项目中Element UI组件未注册的问题原因及解决方法

《Vue项目中ElementUI组件未注册的问题原因及解决方法》在Vue项目中使用ElementUI组件库时,开发者可能会遇到一些常见问题,例如组件未正确注册导致的警告或错误,本文将详细探讨这些问题... 目录引言一、问题背景1.1 错误信息分析1.2 问题原因二、解决方法2.1 全局引入 Element

linux报错INFO:task xxxxxx:634 blocked for more than 120 seconds.三种解决方式

《linux报错INFO:taskxxxxxx:634blockedformorethan120seconds.三种解决方式》文章描述了一个Linux最小系统运行时出现的“hung_ta... 目录1.问题描述2.解决办法2.1 缩小文件系统缓存大小2.2 修改系统IO调度策略2.3 取消120秒时间限制3

关于@MapperScan和@ComponentScan的使用问题

《关于@MapperScan和@ComponentScan的使用问题》文章介绍了在使用`@MapperScan`和`@ComponentScan`时可能会遇到的包扫描冲突问题,并提供了解决方法,同时,... 目录@MapperScan和@ComponentScan的使用问题报错如下原因解决办法课外拓展总结@

MybatisGenerator文件生成不出对应文件的问题

《MybatisGenerator文件生成不出对应文件的问题》本文介绍了使用MybatisGenerator生成文件时遇到的问题及解决方法,主要步骤包括检查目标表是否存在、是否能连接到数据库、配置生成... 目录MyBATisGenerator 文件生成不出对应文件先在项目结构里引入“targetProje

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

numpy求解线性代数相关问题

《numpy求解线性代数相关问题》本文主要介绍了numpy求解线性代数相关问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 在numpy中有numpy.array类型和numpy.mat类型,前者是数组类型,后者是矩阵类型。数组

C#中图片如何自适应pictureBox大小

《C#中图片如何自适应pictureBox大小》文章描述了如何在C#中实现图片自适应pictureBox大小,并展示修改前后的效果,修改步骤包括两步,作者分享了个人经验,希望对大家有所帮助... 目录C#图片自适应pictureBox大小编程修改步骤总结C#图片自适应pictureBox大小上图中“z轴