flink反压及解决思路和实操

2024-02-08 04:04

本文主要是介绍flink反压及解决思路和实操,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 反压原因

反压其实就是 task 处理不过来,算子的 sub-task 需要处理的数据量 > 能够处理的数据量,比如:

当前某个 sub-task 只能处理 1w qps 的数据,但实际上到来 2w  qps 的数据,但是实际只能处理 1w 条,从而反压

常见原因有:

  1. 数据倾斜:数据分布不均,个别task 处理数据过多

  2. 算子性能问题:可能某个节点逻辑很复杂,比如sink节点很慢,lookup join 热查询慢

  3. 流量陡增,比如大促时流量激增,或者使用了数据炸开的函数

2. 反压的危害

  1. 任务处理性能出现瓶颈:以消费 Kafka 为例,大概率会出现消费 Kafka Lag

  2. Checkpoint 时间长或者失败:因为某些反压会导致 barrier 需要花很长时间才能对齐,任务稳定性差

  3. 整个任务完全卡住。比如在 TUMBLE 窗口算子的任务中,反压后可能会导致下游算子的 input pool 和上游算子的 output pool 满了,这时候如果下游窗口的 watermark 一直对不齐,窗口触发不了计算的话,下游算子就永远无法触发窗口计算了,整个任务卡住

3. 定位反压节点

查看WebUI

作业图的 UI 展示,会通过不同颜色和数值代表繁忙和反压的程度          可以通过BackPressure查看 subtask 反压情况

还可以查看Flink 任务的 Metrics 

我这个是并行度是 4 ,所以会有 0、1、2、3 代表是哪个 subTask(task 下每个并行task),其中看到的比较多的是这两个,outPutUsage 代表发送端 Buffer 的使用率,inPutusage 代表的接收端 Buffer 的使用率

然后就很好定位了,基本上常出现反压的就那么几个算子

还不行就设置pipeline.operator-chaining: false,禁用 operator chains ,这时候一个算子就是一个 task ,在根据定位到具体算子

4. 排查反压原因

我们生产环境中,会遇到负载高峰、CheckPoint、作业重启引起的数据积压而导致反压,这种情况反压如果是暂时的,我们可以忽略它

除了定位反压节点,还需要排查原因

4.1 数据倾斜

我们可以用 Web UI 查看该节点每个 SubTask 的 Record Send 和 Record Received 来看是否数据倾斜,也可以通过 Checkpoint 每个 Subtask 的 state 的 size 大小

4.2 火焰图

在代码提交时设置开启火焰图,然后可以在 Web UI 里面查看

rest.flamegraph.enabled: true #默认 false

纵向是调用链,从下往上,顶部就是正在执行的函数

不是用颜色代表的,而是横向长度,代表出现次数或者说执行时长,某个函数过宽,出现了平顶,那这个函数可能有性能问题

4.3 分析 GC

也可能是 TaskManager 的内存引起的 GC 问题,也会导致反压,我们一般使用 G1 回收机制,有可能是 TaskManager JVM 各区内存分配不合理导致频繁的 Full GC

我们可以提交任务时设置打印 GC 日志然后查看Web UI GC 情况或者直接看日志

-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"

5. 常见处理方案

  1. 很多时候反压就是资源不足导致的,给任务加资源
  2. 如果是数据倾斜、算子性能问题之类,那就去解决这些问题
  3. 如果确实是流量过大消费不过来,就调大并行度(如果是kafka,需要同时调大kafka分区数)
  4. 限制数据源的消费数据速度。比如在事件时间窗口的应用中,可以自己设置在数据源处加一些限流措施,让每个数据源都能够够匀速消费数据,避免出现有的 Source 快,有的 Source 慢,导致窗口 input pool 打满,watermark 对不齐导致任务卡住
  5. 关闭 Checkpoint。关闭 Checkpoint 可以将 barrier 对齐这一步省略掉,促使任务能够快速回溯数据。然后等数据回溯完成之后,再将 Checkpoint 打开

这篇关于flink反压及解决思路和实操的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/689848

相关文章

Feign Client超时时间设置不生效的解决方法

《FeignClient超时时间设置不生效的解决方法》这篇文章主要为大家详细介绍了FeignClient超时时间设置不生效的原因与解决方法,具有一定的的参考价值,希望对大家有一定的帮助... 在使用Feign Client时,可以通过两种方式来设置超时时间:1.针对整个Feign Client设置超时时间

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

mysql出现ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)的解决方法

《mysql出现ERROR2003(HY000):Can‘tconnecttoMySQLserveron‘localhost‘(10061)的解决方法》本文主要介绍了mysql出现... 目录前言:第一步:第二步:第三步:总结:前言:当你想通过命令窗口想打开mysql时候发现提http://www.cpp

SpringBoot启动报错的11个高频问题排查与解决终极指南

《SpringBoot启动报错的11个高频问题排查与解决终极指南》这篇文章主要为大家详细介绍了SpringBoot启动报错的11个高频问题的排查与解决,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一... 目录1. 依赖冲突:NoSuchMethodError 的终极解法2. Bean注入失败:No qu

springboot报错Invalid bound statement (not found)的解决

《springboot报错Invalidboundstatement(notfound)的解决》本文主要介绍了springboot报错Invalidboundstatement(not... 目录一. 问题描述二.解决问题三. 添加配置项 四.其他的解决方案4.1 Mapper 接口与 XML 文件不匹配

Python中ModuleNotFoundError: No module named ‘timm’的错误解决

《Python中ModuleNotFoundError:Nomodulenamed‘timm’的错误解决》本文主要介绍了Python中ModuleNotFoundError:Nomodulen... 目录一、引言二、错误原因分析三、解决办法1.安装timm模块2. 检查python环境3. 解决安装路径问题

如何解决mysql出现Incorrect string value for column ‘表项‘ at row 1错误问题

《如何解决mysql出现Incorrectstringvalueforcolumn‘表项‘atrow1错误问题》:本文主要介绍如何解决mysql出现Incorrectstringv... 目录mysql出现Incorrect string value for column ‘表项‘ at row 1错误报错

如何解决Spring MVC中响应乱码问题

《如何解决SpringMVC中响应乱码问题》:本文主要介绍如何解决SpringMVC中响应乱码问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring MVC最新响应中乱码解决方式以前的解决办法这是比较通用的一种方法总结Spring MVC最新响应中乱码解

Java报NoClassDefFoundError异常的原因及解决

《Java报NoClassDefFoundError异常的原因及解决》在Java开发过程中,java.lang.NoClassDefFoundError是一个令人头疼的运行时错误,本文将深入探讨这一问... 目录一、问题分析二、报错原因三、解决思路四、常见场景及原因五、深入解决思路六、预http://www

pip无法安装osgeo失败的问题解决

《pip无法安装osgeo失败的问题解决》本文主要介绍了pip无法安装osgeo失败的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 进入官方提供的扩展包下载网站寻找版本适配的whl文件注意:要选择cp(python版本)和你py