本文主要是介绍FlinkSQL时间更新问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
最近项目中使用FlinkSQL来做数据统计,遇到一些问题,小结一下。
第一个问题:聚合好的正确数据写入数据库后不正确。
场景:因为是做数据聚合,会upsert(更新或写入)数据,为了保证效率,批量每10s中在数据库中写一次数据,异步写入,每次最多更新500条。
结果:日志打印出最终的统计结果正确,但写入数据库的值不正确。
原因:异步写入,无法保证写入顺序,如果一批数据中有 对同一条记录进行更新的 一条以上的数据,无法保证两条记录的先后执行顺序,导致数据写入数据库不正确。而且会重复更新,对数据库造成压力。
解决办法:每条统计结果增加时间戳,每次把一批数据通过关键字(看自己业务逻辑确定关键字奥)去重,保留时间最大的那条记录。过滤后,在进行更新操作。但因为业务量大,只增加时间戳有时区分不了哪条记录是最新的,因为时间戳相同,所以采用【时间戳+递增字段值】做标记。解决了问题。
第二个问题:项目上线正常运行,但第二天发现数据没有更新
场景:因为没有使用窗口,所以统计当天数据的时间范围是自己限制的。至于为什么没有用窗口,是因为外层查询和子查询中都有group by操作,Flink的回撤流不能很好的处理,使用会报错。所以用Java代码生成了一个datekey(像这样20210603)用来框定时间。
原因:FlinkSQL在项目启动时加载一次,时间是作为变量写进去的,是一个固定的值,固定的值。
解决办法:使用FlinkSQL时间函数来动态获取当前时间,sql改为下面这样,问题解决。
whereCAST(dateKey as VARCHAR) = DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMdd')
这里需要注意,要使用本地TIMESTAMP (LOCALTIMESTAMP),不然时间会差八小时,会导致框选的时间不对哦。(时差问题在Flink 1.13版本中有了解决方案,提供了一种新的时间类型,返回long类型值,格式化后以当前服务器的时区为准)
其他问题,这个是因为算子进行计算是有异常产生,log日志加上就能看到哪里问题了。
java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id container_e26_1597048467840_3275_01_000003 timed out.at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1202)at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2021-05-20 17:58:38.226 [flink-akka.actor.default-dispatcher-2] INFO o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1.
2021-05-20 17:58:38.229 [flink-akka.actor.default-dispatcher-2] INFO o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy - 58 tasks should be restarted to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1.
最后,愿所有的bug都能被解决
这篇关于FlinkSQL时间更新问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!