FlinkSQL时间更新问题

2023-12-20 10:38
文章标签 问题 更新 时间 flinksql

本文主要是介绍FlinkSQL时间更新问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

最近项目中使用FlinkSQL来做数据统计,遇到一些问题,小结一下。

第一个问题:聚合好的正确数据写入数据库后不正确。

场景:因为是做数据聚合,会upsert(更新或写入)数据,为了保证效率,批量每10s中在数据库中写一次数据,异步写入,每次最多更新500条。

结果:日志打印出最终的统计结果正确,但写入数据库的值不正确。

原因:异步写入,无法保证写入顺序,如果一批数据中有 对同一条记录进行更新的 一条以上的数据,无法保证两条记录的先后执行顺序,导致数据写入数据库不正确。而且会重复更新,对数据库造成压力。

解决办法:每条统计结果增加时间戳,每次把一批数据通过关键字(看自己业务逻辑确定关键字奥)去重,保留时间最大的那条记录。过滤后,在进行更新操作。但因为业务量大,只增加时间戳有时区分不了哪条记录是最新的,因为时间戳相同,所以采用【时间戳+递增字段值】做标记。解决了问题。

 

第二个问题:项目上线正常运行,但第二天发现数据没有更新

场景:因为没有使用窗口,所以统计当天数据的时间范围是自己限制的。至于为什么没有用窗口,是因为外层查询和子查询中都有group by操作,Flink的回撤流不能很好的处理,使用会报错。所以用Java代码生成了一个datekey(像这样20210603)用来框定时间。

原因:FlinkSQL在项目启动时加载一次,时间是作为变量写进去的,是一个固定的值,固定的值。

解决办法:使用FlinkSQL时间函数来动态获取当前时间,sql改为下面这样,问题解决。

whereCAST(dateKey as VARCHAR) = DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMdd')

这里需要注意,要使用本地TIMESTAMP (LOCALTIMESTAMP),不然时间会差八小时,会导致框选的时间不对哦。(时差问题在Flink 1.13版本中有了解决方案,提供了一种新的时间类型,返回long类型值,格式化后以当前服务器的时区为准)

其他问题,这个是因为算子进行计算是有异常产生,log日志加上就能看到哪里问题了。

java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id container_e26_1597048467840_3275_01_000003  timed out.at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1202)at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2021-05-20 17:58:38.226 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy  - Calculating tasks to restart to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1.
2021-05-20 17:58:38.229 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy  - 58 tasks should be restarted to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1. 

最后,愿所有的bug都能被解决

 

这篇关于FlinkSQL时间更新问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/515730

相关文章

关于@MapperScan和@ComponentScan的使用问题

《关于@MapperScan和@ComponentScan的使用问题》文章介绍了在使用`@MapperScan`和`@ComponentScan`时可能会遇到的包扫描冲突问题,并提供了解决方法,同时,... 目录@MapperScan和@ComponentScan的使用问题报错如下原因解决办法课外拓展总结@

MybatisGenerator文件生成不出对应文件的问题

《MybatisGenerator文件生成不出对应文件的问题》本文介绍了使用MybatisGenerator生成文件时遇到的问题及解决方法,主要步骤包括检查目标表是否存在、是否能连接到数据库、配置生成... 目录MyBATisGenerator 文件生成不出对应文件先在项目结构里引入“targetProje

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

numpy求解线性代数相关问题

《numpy求解线性代数相关问题》本文主要介绍了numpy求解线性代数相关问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 在numpy中有numpy.array类型和numpy.mat类型,前者是数组类型,后者是矩阵类型。数组

解决systemctl reload nginx重启Nginx服务报错:Job for nginx.service invalid问题

《解决systemctlreloadnginx重启Nginx服务报错:Jobfornginx.serviceinvalid问题》文章描述了通过`systemctlstatusnginx.se... 目录systemctl reload nginx重启Nginx服务报错:Job for nginx.javas

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1

vue解决子组件样式覆盖问题scoped deep

《vue解决子组件样式覆盖问题scopeddeep》文章主要介绍了在Vue项目中处理全局样式和局部样式的方法,包括使用scoped属性和深度选择器(/deep/)来覆盖子组件的样式,作者建议所有组件... 目录前言scoped分析deep分析使用总结所有组件必须加scoped父组件覆盖子组件使用deep前言

解决Cron定时任务中Pytest脚本无法发送邮件的问题

《解决Cron定时任务中Pytest脚本无法发送邮件的问题》文章探讨解决在Cron定时任务中运行Pytest脚本时邮件发送失败的问题,先优化环境变量,再检查Pytest邮件配置,接着配置文件确保SMT... 目录引言1. 环境变量优化:确保Cron任务可以正确执行解决方案:1.1. 创建一个脚本1.2. 修

Linux Mint Xia 22.1重磅发布: 重要更新一览

《LinuxMintXia22.1重磅发布:重要更新一览》Beta版LinuxMint“Xia”22.1发布,新版本基于Ubuntu24.04,内核版本为Linux6.8,这... linux Mint 22.1「Xia」正式发布啦!这次更新带来了诸多优化和改进,进一步巩固了 Mint 在 Linux 桌面