Flink全链路延迟的测量方式和实现原理

2024-09-06 17:08

本文主要是介绍Flink全链路延迟的测量方式和实现原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标”

回复”面试“获取更多惊喜

0b061881aabb8a9e7f59ce098182eeca.png

本文已经加入「大数据成神之路PDF版」中提供下载。
你可以关注公众号,后台回复:PDF 即可获取。

一、背景

Flink Job端到端延迟是一个重要的指标,用来衡量Flink任务的整体性能和响应延迟(大部分流式应用,要求低延迟特性)。

通过流处理引擎竞品对比,我们发现大部分流计算引擎产品,都在告警监控页面,集成了全链路时延指标展示(直方图)。

一些低延时的处理场景,例如用于登陆、用户下单规则检测,实时预测场景,需要一个可度量的Metric指标,来实时观测、监控集群全链路时延情况。

二、源码分析来源

1、本文的源码分析基于Flink社区issue FLINK-3660,以及issue对应的pr源码pull-2386,另外,个人也新增了实现源码的说明。

2、其pr源码中只涉及到了部分全链路时延实现代码,因此,我在文章中总结了:

  • Source到Sink处理Latency Marker源码

  • LatencyMarksEmitter 提交时延标记类

  • LatencyStats(时延直方图Metric实现)源码

时延测量–整体架构图

三、腾讯Oceanus监控指标参考

如下图,红色框线对应的数据延时,即我们描述的指标

75b0268c7e04fbf799757be61a0ea4be.png b88093c73a73b4d18608fd0fc8dc1ffa.png
四、Flink LatencyMarker实现思路
  1. 实现方案变迁

在webinterface中,加入流式job的端到端延迟是一个重要特性。因此,Flink社区最初的想法是在每个记录的source上附加一个摄取时间(ingestion-time)时间戳。

然而,这为不使用monitor feature(监控功能)的用户,带来了额外开销(每个元素+每个元素上的System.currentTimeMilis()需要8个字节)。

因此,Flink社区最后决定,通过定期发送特殊事件来实现此功能,类似于通过拓扑发送水印watermark。

  1. 实现原理

这些特殊事件(LatencyMarker)在source上以可配置发送间隔,并由任务Task转发。Sink最后接收到LatencyMarks后,将比较LatencyMarker的时间戳与当前系统时间,以确定延迟。

LatencyMarker不会增加作业的延迟,但是LatencyMarker与常规记录类似,可以被delay阻塞(例如反压情况),因此LatencyMarker的延迟与Record延迟近似。

  1. 节点间时钟偏移及准确性

当前方案期望所有任务管理器TaskManager上的时钟是同步的。否则,测量的延迟也包括TaskManager时钟之间的偏移。

后续,我们可以尝试通过使用JobManager作为计时服务中心(central timing service)来缓解这个问题。taskmanager将定期查询JM的当前时间,以确定其时钟的偏移量。

这个偏移量仍然包括TM和JM之间的网络延迟,但是仍然比较好的测量时延。

五、Flink LatencyMarker实现源码

本章节对应到pr源码pull-2386的实现,这里简要说明。

dda64f736732fd3a70e333e8117f577d.png
  1. 实现基础类及下发标记

Flink源码中,引入了一个新的StreamElement,称为LatencyMarker。

与水印类似,LatencyMarker按配置的间隔从源发出。这个时间间隔的默认值是0毫秒,即不触发(配置项在ExecutionConfig#latencyTrackingInterval,名称metrics.latency.interval),例如可以配置成2000毫秒触发一次LatencyMarker发送。

LatencyMarker不能“多于”常规元素。这确保了测量的延迟接近于常规流元素的端到端延迟。

常规操作符Operator(不包括那些参与迭代的Operator)如果不是sink,就会转发延迟标记LatencyMarker。

  1. 多输出通道—随机下发标记

具有多个输出channel的Operator,随机选择一个channel通道,将LatencyMarker发送给它。这可以确保每个LatencyMarker标记在系统中只存在一次,并且重新分区步骤不会导致传输的LatencyMarker数量激增。

public class RecordWriterOutput{@Overridepublic void emitLatencyMarker(LatencyMarker latencyMarker) {serializationDelegate.setInstance(latencyMarker);try {// 内部实现了随机选择通道recordWriter.randomEmit(serializationDelegate);}catch (Exception e) {throw new RuntimeException(e.getMessage(), e);}}
}

上述RecordWriterOutput#emitLatencyMarker()会被StreamSource、AbstractStreamOperator调用,分别实现source和中间operator的延迟标记下发。

如果操作符Operator是Sink,它将维护每个已知source实例的最后128个LatencyMarker信息。

  1. Metric展示

每个已知source的最小/最大/平均值/p50/p95/p99时延,在sink的LatencyStats对象中,进行汇总(如果没有任何输出的Operator,就是是sink)。

本pr只涉及全链路延迟统计的实现,Flink已有一整套Metric显示体系,全链路时延Metric展示交给Flink框架本身)。

此外,目前还没有确保系统时钟同步的机制,因此如果硬件时钟不正确,则延迟测量将不准确。

六、时延粒度Granularity说明
  1. 时延粒度–概念说明

任意一个中间Operator或Sink,可以通过配置metrics.latency.granularity项,调整与Source间统计的粒度(Singe、Operator、Subtask):

A、统计的时候,可以选择source源id、source源subtask index进行组合,调整统计粒度。

B、统计的时候,当前Operator及当前Operator subtask index总是参与粒度名称的生成,固定的。

  1. 三种时延跟踪策略及其源码定义

Single - 跟踪延迟,无需区分:源+源子任务:

(例如双流Join的两个source,这里都默认为一个数据源了)

SINGLE {String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {// 只有自己的operatorId和operatorSubtaskIndex参与Metric名称生成// LatencyMarker带有的id(源)不参与Metric名称生成return String.valueOf(operatorId) + operatorSubtaskIndex;}}

Operator - 跟踪延迟,区分源,但不区分源的子任务:

OPERATOR {String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {// LatencyMarker带有的id(源)中id参与计算return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;}}

Subtask - 跟踪延迟,区分源+源子任务:

SUBTASK {String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;}}

根据上述不同的名称key,将直方图对象放入Map中,Map定义:

Map<String, DescriptiveStatisticsHistogram> latencyStats = new HashMap<>()
伪代码(创建直方图):
latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
this.latencyStats.put(uniqueName, latencyHistogram);伪代码(更新直方图):
long now = System.currentTimeMillis();
latencyHistogram.update(now - marker.getMarkedTime())
  1. Single、Operator、Subtask时延策略在Web Metric中的体现

上述Single、Operator 、Subtask不同测试,生成的Metric名称和group就会产生变化,Web Metric中名称相应改变

一个Subtask时延粒度的Metric路径:

Job_<source_id><source_subtask_index><operator_id>_<operator_subtask_index> .latency
七、总结说明
  1. LatencyMarker不参与window、MiniBatch的缓存计时,直接被中间Operator下发。

  2. Metric路径:TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency(根据时延配置粒度Granularity,路径会有变化,参考本文第六章节)

  3. 每个中间Operator、以及Sink都会统计自己与Source节点的链路延迟,我们在监控页面,一般展示Source至Sink链路延迟。

  4. 延迟粒度细分到Task,可以用来排查哪台机器的Task时延偏高,进行对比和运维排查。

  5. 从实现原理来看,发送时延标记间隔配置大一些(例如20秒一次),一般不会影响系统处理业务数据的性能(所有的StreamSource Task都按间隔发送时延标记,中间节点有多个输出通道的,随机选择一个通道下发,不会复制多份数据出来)。

参考原文:https://blog.csdn.net/LS_ice/article/details/103295774


《大数据成神之路》正在全面PDF化。

你只需要关注并在后台回复「PDF」就可以看到阿里云盘下载链接了!

另外我把发表过的文章按照体系全部整理好了。现在你可以在后台方便的进行查找:

3f3705feb9d6a49e415911fdcb6ab5fb.png649d24ea0c59f06dd549b717db689621.png

电子版把他们分类做成了下面这个样子,并且放在了阿里云盘提供下载。

30eb9e426d44d1e93fbae4adf7e7c1fa.png

我们点开一个文件夹后:

3d9b17ba137b355073b42e51677a4b2e.png

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

Hi,我是王知无,一个大数据领域的原创作者。 

放心关注我,获取更多行业的一手消息。

37e86f51cbcb6caa2a4c35e798c4510e.png

571c935c02dfe324f52e12cdcaac6d0c.png

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

这篇关于Flink全链路延迟的测量方式和实现原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1142607

相关文章

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Python使用OpenCV实现获取视频时长的小工具

《Python使用OpenCV实现获取视频时长的小工具》在处理视频数据时,获取视频的时长是一项常见且基础的需求,本文将详细介绍如何使用Python和OpenCV获取视频时长,并对每一行代码进行深入解析... 目录一、代码实现二、代码解析1. 导入 OpenCV 库2. 定义获取视频时长的函数3. 打开视频文

golang版本升级如何实现

《golang版本升级如何实现》:本文主要介绍golang版本升级如何实现问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录golanwww.chinasem.cng版本升级linux上golang版本升级删除golang旧版本安装golang最新版本总结gola

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

IDEA中新建/切换Git分支的实现步骤

《IDEA中新建/切换Git分支的实现步骤》本文主要介绍了IDEA中新建/切换Git分支的实现步骤,通过菜单创建新分支并选择是否切换,创建后在Git详情或右键Checkout中切换分支,感兴趣的可以了... 前提:项目已被Git托管1、点击上方栏Git->NewBrancjsh...2、输入新的分支的

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方