说说Flink DataStream的八种物理分区逻辑

2024-09-06 21:18

本文主要是介绍说说Flink DataStream的八种物理分区逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

By 大数据技术与架构

场景描述:Spark的RDD有分区的概念,Flink的DataStream同样也有,只不过没有RDD那么显式而已。Flink通过流分区器StreamPartitioner来控制DataStream中的元素往下游的流向。

Spark的RDD有分区的概念,Flink的DataStream同样也有,只不过没有RDD那么显式而已。Flink通过流分区器StreamPartitioner来控制DataStream中的元素往下游的流向,以StreamPartitioner抽象类为中心的类图如下所示。

在Flink的Web UI界面中,各算子之间的分区器类型会在箭头上标注出来,如下所示。

StreamPartitioner继承自ChannelSelector接口。这里的Channel概念与Netty不同,只是Flink对于数据写入目的地的简单抽象,我们可以直接认为它就是下游算子的并发实例(即物理分区)。所有StreamPartitioner的子类都要实现selectChannel()方法,用来选择分区号。下面分别来看看Flink提供的8种StreamPartitioner的源码。

GlobalPartitioner
    // dataStream.global()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return 0;}

GlobalPartitioner只会将数据输出到下游算子的第一个实例,简单暴力。

ShufflePartitioner
    private Random random = new Random();// dataStream.shuffle()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return random.nextInt(numberOfChannels);}

ShufflePartitioner会将数据随机输出到下游算子的并发实例。由于java.util.Random生成的随机数符合均匀分布,故能够近似保证平均。

RebalancePartitioner
    private int nextChannelToSendTo;@Overridepublic void setup(int numberOfChannels) {super.setup(numberOfChannels);nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);}// dataStream.rebalance()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;return nextChannelToSendTo;}

RebalancePartitioner会先随机选择一个下游算子的实例,然后用轮询(round-robin)的方式从该实例开始循环输出。该方式能保证完全的下游负载均衡,所以常用来处理有倾斜的原数据流。

KeyGroupStreamPartitioner
    private final KeySelector<T, K> keySelector;private int maxParallelism;// dataStream.keyBy()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}public static int assignToKeyGroup(Object key, int maxParallelism) {return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;}public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}

这就是keyBy()算子底层所采用的StreamPartitioner,可见是先在key值的基础上经过了两重哈希得到key对应的哈希值,第一重是Java自带的hashCode(),第二重则是MurmurHash。然后将哈希值乘以算子并行度,并除以最大并行度,得到最终的分区ID。

BroadcastPartitioner
    // dataStream.broadcast()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");}@Overridepublic boolean isBroadcast() {return true;}

BroadcastPartitioner是广播流专用的分区器。由于广播流发挥作用必须靠DataStream.connect()方法与正常的数据流连接起来,所以实际上不需要BroadcastPartitioner来选择分区(广播数据总会投递给下游算子的所有并发),selectChannel()方法也就不必实现了。细节请参见Flink中BroadcastStream相关的源码,这里就不再列举了。

RescalePartitioner
    private int nextChannelToSendTo = -1;// dataStream.rescale()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {if (++nextChannelToSendTo >= numberOfChannels) {nextChannelToSendTo = 0;}return nextChannelToSendTo;}

这个看起来也太简单了,并且与RebalancePartitioner的逻辑是相同的?实际上并不是。我们看看StreamingJobGraphGenerator类,它负责把Flink执行计划中的StreamGraph(逻辑执行计划)转换为JobGraph(优化的逻辑执行计划)。其connect()方法中有如下代码。

        if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.POINTWISE,resultPartitionType);} else {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.ALL_TO_ALL,resultPartitionType);

粗略地讲,如果分区逻辑是RescalePartitioner或ForwardPartitioner(下面会说),那么采用POINTWISE模式来连接上下游的顶点,对于其他分区逻辑,都用ALL_TO_ALL模式来连接。看下面两张图会比较容易理解。

也就是说,POINTWISE模式的RescalePartitioner在中间结果传送给下游节点时,会根据并行度的比值来轮询分配给下游算子实例的子集,对TaskManager来说本地性会比较好。而ALL_TO_ALL模式的RebalancePartitioner是真正的全局轮询分配,更加均衡,但是就会不可避免地在节点之间交换数据,如果数据量大的话,造成的网络流量会很可观。

ForwardPartitioner
   // dataStream.forward()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return 0;}

与GlobalPartitioner的实现相同。但通过上面对POINTWISE和ALL_TO_ALL连接模式的讲解,我们能够知道,它会将数据输出到本地运行的下游算子的第一个实例,而非全局。在上下游算子的并行度相同的情况下,默认就会采用ForwardPartitioner。反之,若上下游算子的并行度不同,默认会采用前述的RebalancePartitioner。

CustomPartitionerWrapper
    Partitioner<K> partitioner;KeySelector<T, K> keySelector;// dataStream.partitionCustom()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance(), e);}return partitioner.partition(key, numberOfChannels);}

这就是自定义的分区逻辑了,我们可以通过继承Partitioner接口自己实现,并传入partitionCustom()方法。举个简单的栗子,以key的长度做分区:

    sourceStream.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {return key.length() % numPartitions;}}, 0);

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于说说Flink DataStream的八种物理分区逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143140

相关文章

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

最新Spring Security实战教程之表单登录定制到处理逻辑的深度改造(最新推荐)

《最新SpringSecurity实战教程之表单登录定制到处理逻辑的深度改造(最新推荐)》本章节介绍了如何通过SpringSecurity实现从配置自定义登录页面、表单登录处理逻辑的配置,并简单模拟... 目录前言改造准备开始登录页改造自定义用户名密码登陆成功失败跳转问题自定义登出前后端分离适配方案结语前言

Java逻辑运算符之&&、|| 与&、 |的区别及应用

《Java逻辑运算符之&&、||与&、|的区别及应用》:本文主要介绍Java逻辑运算符之&&、||与&、|的区别及应用的相关资料,分别是&&、||与&、|,并探讨了它们在不同应用场景中... 目录前言一、基本概念与运算符介绍二、短路与与非短路与:&& 与 & 的区别1. &&:短路与(AND)2. &:非短

虚拟机与物理机的文件共享方式

《虚拟机与物理机的文件共享方式》文章介绍了如何在KaliLinux虚拟机中实现物理机文件夹的直接挂载,以便在虚拟机中方便地读取和使用物理机上的文件,通过设置和配置,可以实现临时挂载和永久挂载,并提供... 目录虚拟机与物理机的文件共享1 虚拟机设置2 验证Kali下分享文件夹功能是否启用3 创建挂载目录4

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

逻辑表达式,最小项

目录 得到此图的逻辑电路 1.画出它的真值表 2.根据真值表写出逻辑式 3.画逻辑图 逻辑函数的表示 逻辑表达式 最小项 定义 基本性质 最小项编号 最小项表达式   得到此图的逻辑电路 1.画出它的真值表 这是同或的逻辑式。 2.根据真值表写出逻辑式   3.画逻辑图   有两种画法,1是根据运算优先级非>与>或得到,第二种是采

UMI复现代码运行逻辑全流程(一)——eval_real.py(尚在更新)

一、文件夹功能解析 全文件夹如下 其中,核心文件作用为: diffusion_policy:扩散策略核心文件夹,包含了众多模型及基础库 example:标定及配置文件 scripts/scripts_real:测试脚本文件,区别在于前者倾向于单体运行,后者为整体运行 scripts_slam_pipeline:orb_slam3运行全部文件 umi:核心交互文件夹,作用在于构建真

Matter.js:Web开发者的2D物理引擎

Matter.js:Web开发者的2D物理引擎 前言 在现代网页开发中,交互性和动态效果是提升用户体验的关键因素。 Matter.js,一个专为网页设计的2D物理引擎,为开发者提供了一种简单而强大的方式,来实现复杂的物理交互效果。 无论是模拟重力、碰撞还是复杂的物体运动,Matter.js 都能轻松应对。 本文将带你深入了解 Matter.js ,并提供实际的代码示例,让你一窥其强大功能

JAVAEE初阶第七节(中)——物理原理与TCP_IP

系列文章目录 JAVAEE初阶第七节(中)——物理原理与TCP_IP 文章目录 系列文章目录JAVAEE初阶第七节(中)——物理原理与TCP_IP 一.应用层重点协议)1. DNS2 .NAT3. NAT IP转换过程 4 .NAPT5. NAT技术的缺陷6. HTTP/HTTPS7. 自定义协议 二. 传输层重点协议 1 .UDP协议 2.1.1 UDP协议端格式 2.1.2 UD