FlinkSQL时间更新问题

2023-12-20 10:38
文章标签 问题 更新 时间 flinksql

本文主要是介绍FlinkSQL时间更新问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

最近项目中使用FlinkSQL来做数据统计,遇到一些问题,小结一下。

第一个问题:聚合好的正确数据写入数据库后不正确。

场景:因为是做数据聚合,会upsert(更新或写入)数据,为了保证效率,批量每10s中在数据库中写一次数据,异步写入,每次最多更新500条。

结果:日志打印出最终的统计结果正确,但写入数据库的值不正确。

原因:异步写入,无法保证写入顺序,如果一批数据中有 对同一条记录进行更新的 一条以上的数据,无法保证两条记录的先后执行顺序,导致数据写入数据库不正确。而且会重复更新,对数据库造成压力。

解决办法:每条统计结果增加时间戳,每次把一批数据通过关键字(看自己业务逻辑确定关键字奥)去重,保留时间最大的那条记录。过滤后,在进行更新操作。但因为业务量大,只增加时间戳有时区分不了哪条记录是最新的,因为时间戳相同,所以采用【时间戳+递增字段值】做标记。解决了问题。

 

第二个问题:项目上线正常运行,但第二天发现数据没有更新

场景:因为没有使用窗口,所以统计当天数据的时间范围是自己限制的。至于为什么没有用窗口,是因为外层查询和子查询中都有group by操作,Flink的回撤流不能很好的处理,使用会报错。所以用Java代码生成了一个datekey(像这样20210603)用来框定时间。

原因:FlinkSQL在项目启动时加载一次,时间是作为变量写进去的,是一个固定的值,固定的值。

解决办法:使用FlinkSQL时间函数来动态获取当前时间,sql改为下面这样,问题解决。

whereCAST(dateKey as VARCHAR) = DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMdd')

这里需要注意,要使用本地TIMESTAMP (LOCALTIMESTAMP),不然时间会差八小时,会导致框选的时间不对哦。(时差问题在Flink 1.13版本中有了解决方案,提供了一种新的时间类型,返回long类型值,格式化后以当前服务器的时区为准)

其他问题,这个是因为算子进行计算是有异常产生,log日志加上就能看到哪里问题了。

java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id container_e26_1597048467840_3275_01_000003  timed out.at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1202)at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2021-05-20 17:58:38.226 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy  - Calculating tasks to restart to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1.
2021-05-20 17:58:38.229 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy  - 58 tasks should be restarted to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1. 

最后,愿所有的bug都能被解决

 

这篇关于FlinkSQL时间更新问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/515730

相关文章

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

C++统计函数执行时间的最佳实践

《C++统计函数执行时间的最佳实践》在软件开发过程中,性能分析是优化程序的重要环节,了解函数的执行时间分布对于识别性能瓶颈至关重要,本文将分享一个C++函数执行时间统计工具,希望对大家有所帮助... 目录前言工具特性核心设计1. 数据结构设计2. 单例模式管理器3. RAII自动计时使用方法基本用法高级用法

Vue3绑定props默认值问题

《Vue3绑定props默认值问题》使用Vue3的defineProps配合TypeScript的interface定义props类型,并通过withDefaults设置默认值,使组件能安全访问传入的... 目录前言步骤步骤1:使用 defineProps 定义 Props步骤2:设置默认值总结前言使用T

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

解决升级JDK报错:module java.base does not“opens java.lang.reflect“to unnamed module问题

《解决升级JDK报错:modulejava.basedoesnot“opensjava.lang.reflect“tounnamedmodule问题》SpringBoot启动错误源于Jav... 目录问题描述原因分析解决方案总结问题描述启动sprintboot时报以下错误原因分析编程异js常是由Ja

C# LiteDB处理时间序列数据的高性能解决方案

《C#LiteDB处理时间序列数据的高性能解决方案》LiteDB作为.NET生态下的轻量级嵌入式NoSQL数据库,一直是时间序列处理的优选方案,本文将为大家大家简单介绍一下LiteDB处理时间序列数... 目录为什么选择LiteDB处理时间序列数据第一章:LiteDB时间序列数据模型设计1.1 核心设计原则

MySQL 表空却 ibd 文件过大的问题及解决方法

《MySQL表空却ibd文件过大的问题及解决方法》本文给大家介绍MySQL表空却ibd文件过大的问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录一、问题背景:表空却 “吃满” 磁盘的怪事二、问题复现:一步步编程还原异常场景1. 准备测试源表与数据

解决Nginx启动报错Job for nginx.service failed because the control process exited with error code问题

《解决Nginx启动报错Jobfornginx.servicefailedbecausethecontrolprocessexitedwitherrorcode问题》Nginx启... 目录一、报错如下二、解决原因三、解决方式总结一、报错如下Job for nginx.service failed bec

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁

MySQL按时间维度对亿级数据表进行平滑分表

《MySQL按时间维度对亿级数据表进行平滑分表》本文将以一个真实的4亿数据表分表案例为基础,详细介绍如何在不影响线上业务的情况下,完成按时间维度分表的完整过程,感兴趣的小伙伴可以了解一下... 目录引言一、为什么我们需要分表1.1 单表数据量过大的问题1.2 分表方案选型二、分表前的准备工作2.1 数据评估