Flink AsyncFunction导致的Kafka数据不消费

2024-09-06 21:18

本文主要是介绍Flink AsyncFunction导致的Kafka数据不消费,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

问题描述

flinksql从kafka读取数据,异步函数加载Mysql数据进行维表关联,最后将数据写入到mysql中。任务在启动时会消费kafka数据,一段时间后不读kafka或者能够持续读kafka数据但是异步函数不发送给下游算子。

  • 不读kafka数据:kafka读取线程像卡住一样,从kafka中读取不到数据,以为是网络原因,但是计算节点和工作节点在同一台机器中,于是排除网络原因。

  • 持续读kafka数据,但是异步函数不下发数据:以为是设置的异步超时间超时,默认是10s,增大超时时间后依然不下发。


Jstack 排查

打印执行线程堆栈信息,虽然BLOCKED状态的线程很多,但大部分是第三方类的执行线程,都比较正常。突然发现和我们程序有关的代码阻塞线程。

原来是调用我们的timeout函数出现了阻塞。

   public void timeout(Row input, ResultFuture<Row> resultFuture) {StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;try {// 阻塞等待if (null == future.get()) {resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));}} catch (Exception e) {resultFuture.completeExceptionally(new Exception(e));}}
阻塞原因

在flink异步函数asyncInvoke中,只处理了正常逻辑。也就是匹配上调用resultFuture.complete(rowList);但是fillData里面进行数据类型转换时很容易发生异常,当发生异常时,resultFuture并没有结果输出,从而导致整个链路阻塞。

 List<Row> rowList = Lists.newArrayList();for (Object jsonArray : (List) val.getContent()) {Row row = fillData(input, jsonArray);rowList.add(row);}resultFuture.complete(rowList);
解决以及注意事项

fillData进行try-catch捕获发生异常时调用resultFuture.completeExceptionally(exception);

在flink异步函数中,resultFuture.complete()只会被调用一次,complete一个集合需要先在填充然后一次性发送,而不是通过遍历调用多次resultFuture.complete()

使用异步Future一定要记得有输出值。
堆栈信息重点关注有没有我们自己的逻辑 。

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Flink AsyncFunction导致的Kafka数据不消费的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143147

相关文章

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下

Java中注解与元数据示例详解

《Java中注解与元数据示例详解》Java注解和元数据是编程中重要的概念,用于描述程序元素的属性和用途,:本文主要介绍Java中注解与元数据的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参... 目录一、引言二、元数据的概念2.1 定义2.2 作用三、Java 注解的基础3.1 注解的定义3.2 内