FlinkSQL时间更新问题

2023-12-20 10:38
文章标签 问题 更新 时间 flinksql

本文主要是介绍FlinkSQL时间更新问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

最近项目中使用FlinkSQL来做数据统计,遇到一些问题,小结一下。

第一个问题:聚合好的正确数据写入数据库后不正确。

场景:因为是做数据聚合,会upsert(更新或写入)数据,为了保证效率,批量每10s中在数据库中写一次数据,异步写入,每次最多更新500条。

结果:日志打印出最终的统计结果正确,但写入数据库的值不正确。

原因:异步写入,无法保证写入顺序,如果一批数据中有 对同一条记录进行更新的 一条以上的数据,无法保证两条记录的先后执行顺序,导致数据写入数据库不正确。而且会重复更新,对数据库造成压力。

解决办法:每条统计结果增加时间戳,每次把一批数据通过关键字(看自己业务逻辑确定关键字奥)去重,保留时间最大的那条记录。过滤后,在进行更新操作。但因为业务量大,只增加时间戳有时区分不了哪条记录是最新的,因为时间戳相同,所以采用【时间戳+递增字段值】做标记。解决了问题。

 

第二个问题:项目上线正常运行,但第二天发现数据没有更新

场景:因为没有使用窗口,所以统计当天数据的时间范围是自己限制的。至于为什么没有用窗口,是因为外层查询和子查询中都有group by操作,Flink的回撤流不能很好的处理,使用会报错。所以用Java代码生成了一个datekey(像这样20210603)用来框定时间。

原因:FlinkSQL在项目启动时加载一次,时间是作为变量写进去的,是一个固定的值,固定的值。

解决办法:使用FlinkSQL时间函数来动态获取当前时间,sql改为下面这样,问题解决。

whereCAST(dateKey as VARCHAR) = DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMdd')

这里需要注意,要使用本地TIMESTAMP (LOCALTIMESTAMP),不然时间会差八小时,会导致框选的时间不对哦。(时差问题在Flink 1.13版本中有了解决方案,提供了一种新的时间类型,返回long类型值,格式化后以当前服务器的时区为准)

其他问题,这个是因为算子进行计算是有异常产生,log日志加上就能看到哪里问题了。

java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id container_e26_1597048467840_3275_01_000003  timed out.at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1202)at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2021-05-20 17:58:38.226 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy  - Calculating tasks to restart to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1.
2021-05-20 17:58:38.229 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy  - 58 tasks should be restarted to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1. 

最后,愿所有的bug都能被解决

 

这篇关于FlinkSQL时间更新问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/515730

相关文章

Kotlin Map映射转换问题小结

《KotlinMap映射转换问题小结》文章介绍了Kotlin集合转换的多种方法,包括map(一对一转换)、mapIndexed(带索引)、mapNotNull(过滤null)、mapKeys/map... 目录Kotlin 集合转换:map、mapIndexed、mapNotNull、mapKeys、map

nginx中端口无权限的问题解决

《nginx中端口无权限的问题解决》当Nginx日志报错bind()to80failed(13:Permissiondenied)时,这通常是由于权限不足导致Nginx无法绑定到80端口,下面就来... 目录一、问题原因分析二、解决方案1. 以 root 权限运行 Nginx(不推荐)2. 为 Nginx

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

解决1093 - You can‘t specify target table报错问题及原因分析

《解决1093-Youcan‘tspecifytargettable报错问题及原因分析》MySQL1093错误因UPDATE/DELETE语句的FROM子句直接引用目标表或嵌套子查询导致,... 目录报js错原因分析具体原因解决办法方法一:使用临时表方法二:使用JOIN方法三:使用EXISTS示例总结报错原

Windows环境下解决Matplotlib中文字体显示问题的详细教程

《Windows环境下解决Matplotlib中文字体显示问题的详细教程》本文详细介绍了在Windows下解决Matplotlib中文显示问题的方法,包括安装字体、更新缓存、配置文件设置及编码調整,并... 目录引言问题分析解决方案详解1. 检查系统已安装字体2. 手动添加中文字体(以SimHei为例)步骤

SpringSecurity整合redission序列化问题小结(最新整理)

《SpringSecurity整合redission序列化问题小结(最新整理)》文章详解SpringSecurity整合Redisson时的序列化问题,指出需排除官方Jackson依赖,通过自定义反序... 目录1. 前言2. Redission配置2.1 RedissonProperties2.2 Red

nginx 负载均衡配置及如何解决重复登录问题

《nginx负载均衡配置及如何解决重复登录问题》文章详解Nginx源码安装与Docker部署,介绍四层/七层代理区别及负载均衡策略,通过ip_hash解决重复登录问题,对nginx负载均衡配置及如何... 目录一:源码安装:1.配置编译参数2.编译3.编译安装 二,四层代理和七层代理区别1.二者混合使用举例

go中的时间处理过程

《go中的时间处理过程》:本文主要介绍go中的时间处理过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1 获取当前时间2 获取当前时间戳3 获取当前时间的字符串格式4 相互转化4.1 时间戳转时间字符串 (int64 > string)4.2 时间字符串转时间

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java 线程安全与 volatile与单例模式问题及解决方案

《Java线程安全与volatile与单例模式问题及解决方案》文章主要讲解线程安全问题的五个成因(调度随机、变量修改、非原子操作、内存可见性、指令重排序)及解决方案,强调使用volatile关键字... 目录什么是线程安全线程安全问题的产生与解决方案线程的调度是随机的多个线程对同一个变量进行修改线程的修改操