Flink 开发生产问题汇总,亲自解决的才是最宝贵的

2024-05-02 07:38

本文主要是介绍Flink 开发生产问题汇总,亲自解决的才是最宝贵的,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文收集了与粉丝沟通过程中常见的问题与解决方案,整理成文,供大家参考和查阅。

1、Checkpoint失败:Checkpoint expired before completing

原因是因为

checkpointConf.setCheckpointTimeout(8000L)

设置的太小了,默认是10min,这里只设置了8sec。

当一个Flink App背压的时候(例如由外部组件异常引起),Barrier会流动的非常缓慢,导致Checkpoint时长飙升。

2、资源隔离建议

在Flink中,资源的隔离是通过Slot进行的,也就是说多个Slot会运行在同一个JVM中,这种隔离很弱,尤其对于生产环境。Flink App上线之前要在一个单独的Flink集群上进行测试,否则一个不稳定、存在问题的Flink App上线,很可能影响整个Flink集群上的App。

3、资源不足导致 container 被 kill
`The assigned slot container_container编号 was removed.`

Flink App 抛出此类异常,通过查看日志,一般就是某一个 Flink App 内存占用大,导致 TaskManager(在 Yarn 上就是 Container )被Kill 掉。

但是并不是所有的情况都是这个原因,还需要进一步看 yarn 的日志( 查看 yarn 任务日志:yarn logs -applicationId  -appOwner),如果代码写的没问题,就确实是资源不够了,其实 1G Slot 跑多个Task( Slot Group Share )其实挺容易出现的。

因此有两种选择,可以根据具体情况,权衡选择一个。

  • 将该 Flink App 调度在 Per Slot 内存更大的集群上。

  • 通过 slotSharingGroup("xxx") ,减少 Slot 中共享 Task 的个数

4、启动报错,提示找不到 jersey 的类
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties

解决办法进入 yarn中 把 lib 目中的一下两个问价拷贝到 flink 的 lib 中

hadoop/share/hadoop/yarn/lib/jersey-client-1.9.jar /hadoop/share/hadoop/yarn/lib/jersey-core-1.9.jar

5、Scala版本冲突
java.lang.NoSuchMethodError:scala.collection.immutable.HashSet$.empty()Lscala/collection/

解决办法,添加

import org.apache.flink.api.scala._
6、没有使用回撤流报错
Table is not an append一only table. Use the toRetractStream() in order to handle add and retract messages.

这个是因为动态表不是 append-only 模式的,需要用 toRetractStream ( 回撤流) 处理就好了.

tableEnv.toRetractStream[Person](result).print()
7、OOM 问题解决思路
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceededat java.util.Arrays.copyOfRange(Arrays.java:3664)at java.lang.String.<init>(String.java:207)at com.esotericsoftware.kryo.io.Input.readString(Input.java:466)at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:177)
......at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

解决方案:

  1. 检查 slot 槽位够不够或者 slot 分配的数量有没有生效

  2. 程序起的并行是否都正常分配了(会有这样的情况出现,假如 5 个并行,但是只有 2 个在几点上生效了,另外 3 个没有数据流动)

  3. 检查flink程序有没有数据倾斜,可以通过 flink 的 ui 界面查看每个分区子节点处理的数据量

8、解析返回值类型失败报错
The return type of function could not be determined automatically
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(RemoteEnvironmentTest.java:27)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)

解决方案:产生这种现象的原因一般是使用 lambda 表达式没有明确返回值类型,或者使用特使的数据结构 flink 无法解析其类型,这时候我们需要在方法的后面添加返回值类型,比如字符串

input.flatMap((Integer number, Collector<String> out) -> {......
})
// 提供返回值类型
.returns(Types.STRING)
9、Hadoop jar 包冲突
Caused by: java.io.IOException: The given file system URI (hdfs:///data/checkpoint-data/abtest) did not describe the authority (like for example HDFS NameNode address/port or S3 host). The attempt to use a configured default authority failed: Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:135)at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)

解决:pom 文件中去掉和 hadoop 相关的依赖就好了

10、时钟不同步导致无法启动

启动Flink任务的时候报错

Caused by: java.lang.RuntimeException: Couldn't deploy Yarn cluster

然后仔细看发现里面有这么一句

system times on machines may be out of sync

意思说是机器上的系统时间可能不同步。同步集群机器时间即可。

这篇关于Flink 开发生产问题汇总,亲自解决的才是最宝贵的的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/953732

相关文章

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

mysql出现ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)的解决方法

《mysql出现ERROR2003(HY000):Can‘tconnecttoMySQLserveron‘localhost‘(10061)的解决方法》本文主要介绍了mysql出现... 目录前言:第一步:第二步:第三步:总结:前言:当你想通过命令窗口想打开mysql时候发现提http://www.cpp

SpringBoot启动报错的11个高频问题排查与解决终极指南

《SpringBoot启动报错的11个高频问题排查与解决终极指南》这篇文章主要为大家详细介绍了SpringBoot启动报错的11个高频问题的排查与解决,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一... 目录1. 依赖冲突:NoSuchMethodError 的终极解法2. Bean注入失败:No qu

springboot报错Invalid bound statement (not found)的解决

《springboot报错Invalidboundstatement(notfound)的解决》本文主要介绍了springboot报错Invalidboundstatement(not... 目录一. 问题描述二.解决问题三. 添加配置项 四.其他的解决方案4.1 Mapper 接口与 XML 文件不匹配

MySQL新增字段后Java实体未更新的潜在问题与解决方案

《MySQL新增字段后Java实体未更新的潜在问题与解决方案》在Java+MySQL的开发中,我们通常使用ORM框架来映射数据库表与Java对象,但有时候,数据库表结构变更(如新增字段)后,开发人员可... 目录引言1. 问题背景:数据库与 Java 实体不同步1.1 常见场景1.2 示例代码2. 不同操作

利用Python开发Markdown表格结构转换为Excel工具

《利用Python开发Markdown表格结构转换为Excel工具》在数据管理和文档编写过程中,我们经常使用Markdown来记录表格数据,但它没有Excel使用方便,所以本文将使用Python编写一... 目录1.完整代码2. 项目概述3. 代码解析3.1 依赖库3.2 GUI 设计3.3 解析 Mark

Python中ModuleNotFoundError: No module named ‘timm’的错误解决

《Python中ModuleNotFoundError:Nomodulenamed‘timm’的错误解决》本文主要介绍了Python中ModuleNotFoundError:Nomodulen... 目录一、引言二、错误原因分析三、解决办法1.安装timm模块2. 检查python环境3. 解决安装路径问题

如何解决mysql出现Incorrect string value for column ‘表项‘ at row 1错误问题

《如何解决mysql出现Incorrectstringvalueforcolumn‘表项‘atrow1错误问题》:本文主要介绍如何解决mysql出现Incorrectstringv... 目录mysql出现Incorrect string value for column ‘表项‘ at row 1错误报错

如何解决Spring MVC中响应乱码问题

《如何解决SpringMVC中响应乱码问题》:本文主要介绍如何解决SpringMVC中响应乱码问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring MVC最新响应中乱码解决方式以前的解决办法这是比较通用的一种方法总结Spring MVC最新响应中乱码解

Java报NoClassDefFoundError异常的原因及解决

《Java报NoClassDefFoundError异常的原因及解决》在Java开发过程中,java.lang.NoClassDefFoundError是一个令人头疼的运行时错误,本文将深入探讨这一问... 目录一、问题分析二、报错原因三、解决思路四、常见场景及原因五、深入解决思路六、预http://www