【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决

本文主要是介绍【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景


Flink版本:1.13.5

一个使用FlinkSQL开发的生产线上任务, 使用Tumble Window做聚和统计,并且配置table.exec.state.ttl为7200000,设置checkpoint周期为5分钟,使用rocksdb的增量模式。

正常情况下,任务运行一段时间以后,新增和过期的状态达到动态的平衡,随着RocksDB的compaction,checkpoint的大小会在小范围内上下起伏。

实际观察到,checkpoint大小持续缓慢增长,运行20天以后,从最初了100M左右,增长到了2G,checkpoint的时间也从1秒增加到了几十秒。

源码分析


我们看一下RocksIncrementalSnapshotStrategy.RocksDBIncrementalSnapshotOperation类中的get()方法:

public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {boolean completed = false;SnapshotResult<StreamStateHandle> metaStateHandle = null;Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap();HashMap miscFiles = new HashMap();boolean var15 = false;SnapshotResult var18;try {var15 = true;metaStateHandle = this.materializeMetaData(snapshotCloseableRegistry);Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);synchronized(RocksIncrementalSnapshotStrategy.this.materializedSstFiles) {RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());}IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();SnapshotResult snapshotResult;if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);} else {snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);}completed = true;var18 = snapshotResult;var15 = false;} finally {if (var15) {if (!completed) {List<StateObject> statesToDiscard = new ArrayList(1 + miscFiles.size() + sstFiles.size());statesToDiscard.add(metaStateHandle);statesToDiscard.addAll(miscFiles.values());statesToDiscard.addAll(sstFiles.values());this.cleanupIncompleteSnapshot(statesToDiscard);}}}

重点关注uploadSstFiles()方法的实现细节:

            Preconditions.checkState(this.localBackupDirectory.exists());Map<StateHandleID, Path> sstFilePaths = new HashMap();Map<StateHandleID, Path> miscFilePaths = new HashMap();Path[] files = this.localBackupDirectory.listDirectory();if (files != null) {this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));}

进入到createUploadFilePaths()方法:

        private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {Path[] var5 = files;int var6 = files.length;for(int var7 = 0; var7 < var6; ++var7) {Path filePath = var5[var7];String fileName = filePath.getFileName().toString();StateHandleID stateHandleID = new StateHandleID(fileName);if (!fileName.endsWith(".sst")) {miscFilePaths.put(stateHandleID, filePath);} else {boolean existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);if (existsAlready) {sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());} else {sstFilePaths.put(stateHandleID, filePath);}}}}

  这里是问题的关键,我们可以归纳出主要逻辑:

1. 扫描rocksdb本地存储目录下的所有文件,获取到所有的sst文件和misc文件(除sst文件外的其他所有文件);

2. 将sst文件和历史checkpoint上传的sst文件做对比,将新增的sst文件路径记录下来;

3. 将misc文件的路径记录下来;

这里就是增量checkpoint的关键逻辑了, 我们发现一点,增量的checkpoint只针对sst文件, 对其他的misc文件是每次全量备份的,我们进到一个目录节点看一下有哪些文件被全量备份了:

[hadoop@fsp-hadoop-1 db]$ ll
总用量 8444
-rw-r--r-- 1 hadoop hadoop       0 3月  28 14:56 000058.log
-rw-r--r-- 1 hadoop hadoop 2065278 3月  31 10:17 025787.sst
-rw-r--r-- 1 hadoop hadoop 1945453 3月  31 10:18 025789.sst
-rw-r--r-- 1 hadoop hadoop   75420 3月  31 10:18 025790.sst
-rw-r--r-- 1 hadoop hadoop   33545 3月  31 10:18 025791.sst
-rw-r--r-- 1 hadoop hadoop   40177 3月  31 10:18 025792.sst
-rw-r--r-- 1 hadoop hadoop   33661 3月  31 10:18 025793.sst
-rw-r--r-- 1 hadoop hadoop   40494 3月  31 10:19 025794.sst
-rw-r--r-- 1 hadoop hadoop   33846 3月  31 10:19 025795.sst
-rw-r--r-- 1 hadoop hadoop      16 3月  30 19:46 CURRENT
-rw-r--r-- 1 hadoop hadoop      37 3月  28 14:56 IDENTITY
-rw-r--r-- 1 hadoop hadoop       0 3月  28 14:56 LOCK
-rw-rw-r-- 1 hadoop hadoop   38967 3月  28 14:56 LOG
-rw-r--r-- 1 hadoop hadoop 1399964 3月  31 10:19 MANIFEST-022789
-rw-r--r-- 1 hadoop hadoop   10407 3月  28 14:56 OPTIONS-000010
-rw-r--r-- 1 hadoop hadoop   13126 3月  28 14:56 OPTIONS-000012

1. CURRENT、IDENTIFY、LOCK、OPTIONS-*, 这些文件基本是固定大小,不会有变化;

2. LOG文件, 这个文件是rocksdb的日志文件,默认情况下,flink设置的rocksdb的日志输出级别是HEAD级别,几乎不会有日志输出,但是如果你配置了state.backend.rocksdb.log.level,比如说配置为了INFO_LEVEL,那么这个LOG文件会持续输出并且不会被清理;

3. MANIFEST-*,这是rocksdb的事务日志,在任务恢复重放过程中会用到, 这个日志也会持续增长,达到阈值以后滚动生成新的并且清楚旧文件;

原因总结


在增量checkpoint过程中,虽然sst文件所保存的状态数据大小保持动态平衡,但是LOG日志和MANIFEST文件仍然会当向持续增长,所以checkpoint会越来越大,越来越慢。

解决办法


1. 在生产环境关闭Rocksdb日志(保持state.backend.rocksdb.log.level的默认配置即可);

2. 设置manifest文件的滚动阈值,我设置的是10485760byte;

问题解决。

这篇关于【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/170807

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

好题——hdu2522(小数问题:求1/n的第一个循环节)

好喜欢这题,第一次做小数问题,一开始真心没思路,然后参考了网上的一些资料。 知识点***********************************无限不循环小数即无理数,不能写作两整数之比*****************************(一开始没想到,小学没学好) 此题1/n肯定是一个有限循环小数,了解这些后就能做此题了。 按照除法的机制,用一个函数表示出来就可以了,代码如下

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

如何解决线上平台抽佣高 线下门店客流少的痛点!

目前,许多传统零售店铺正遭遇客源下降的难题。尽管广告推广能带来一定的客流,但其费用昂贵。鉴于此,众多零售商纷纷选择加入像美团、饿了么和抖音这样的大型在线平台,但这些平台的高佣金率导致了利润的大幅缩水。在这样的市场环境下,商家之间的合作网络逐渐成为一种有效的解决方案,通过资源和客户基础的共享,实现共同的利益增长。 以最近在上海兴起的一个跨行业合作平台为例,该平台融合了环保消费积分系统,在短

购买磨轮平衡机时应该注意什么问题和技巧

在购买磨轮平衡机时,您应该注意以下几个关键点: 平衡精度 平衡精度是衡量平衡机性能的核心指标,直接影响到不平衡量的检测与校准的准确性,从而决定磨轮的振动和噪声水平。高精度的平衡机能显著减少振动和噪声,提高磨削加工的精度。 转速范围 宽广的转速范围意味着平衡机能够处理更多种类的磨轮,适应不同的工作条件和规格要求。 振动监测能力 振动监测能力是评估平衡机性能的重要因素。通过传感器实时监

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

缓存雪崩问题

缓存雪崩是缓存中大量key失效后当高并发到来时导致大量请求到数据库,瞬间耗尽数据库资源,导致数据库无法使用。 解决方案: 1、使用锁进行控制 2、对同一类型信息的key设置不同的过期时间 3、缓存预热 1. 什么是缓存雪崩 缓存雪崩是指在短时间内,大量缓存数据同时失效,导致所有请求直接涌向数据库,瞬间增加数据库的负载压力,可能导致数据库性能下降甚至崩溃。这种情况往往发生在缓存中大量 k

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

6.1.数据结构-c/c++堆详解下篇(堆排序,TopK问题)

上篇:6.1.数据结构-c/c++模拟实现堆上篇(向下,上调整算法,建堆,增删数据)-CSDN博客 本章重点 1.使用堆来完成堆排序 2.使用堆解决TopK问题 目录 一.堆排序 1.1 思路 1.2 代码 1.3 简单测试 二.TopK问题 2.1 思路(求最小): 2.2 C语言代码(手写堆) 2.3 C++代码(使用优先级队列 priority_queue)