说说Flink DataStream的八种物理分区逻辑

2024-09-06 21:18

本文主要是介绍说说Flink DataStream的八种物理分区逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

By 大数据技术与架构

场景描述:Spark的RDD有分区的概念,Flink的DataStream同样也有,只不过没有RDD那么显式而已。Flink通过流分区器StreamPartitioner来控制DataStream中的元素往下游的流向。

Spark的RDD有分区的概念,Flink的DataStream同样也有,只不过没有RDD那么显式而已。Flink通过流分区器StreamPartitioner来控制DataStream中的元素往下游的流向,以StreamPartitioner抽象类为中心的类图如下所示。

在Flink的Web UI界面中,各算子之间的分区器类型会在箭头上标注出来,如下所示。

StreamPartitioner继承自ChannelSelector接口。这里的Channel概念与Netty不同,只是Flink对于数据写入目的地的简单抽象,我们可以直接认为它就是下游算子的并发实例(即物理分区)。所有StreamPartitioner的子类都要实现selectChannel()方法,用来选择分区号。下面分别来看看Flink提供的8种StreamPartitioner的源码。

GlobalPartitioner
    // dataStream.global()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return 0;}

GlobalPartitioner只会将数据输出到下游算子的第一个实例,简单暴力。

ShufflePartitioner
    private Random random = new Random();// dataStream.shuffle()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return random.nextInt(numberOfChannels);}

ShufflePartitioner会将数据随机输出到下游算子的并发实例。由于java.util.Random生成的随机数符合均匀分布,故能够近似保证平均。

RebalancePartitioner
    private int nextChannelToSendTo;@Overridepublic void setup(int numberOfChannels) {super.setup(numberOfChannels);nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);}// dataStream.rebalance()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;return nextChannelToSendTo;}

RebalancePartitioner会先随机选择一个下游算子的实例,然后用轮询(round-robin)的方式从该实例开始循环输出。该方式能保证完全的下游负载均衡,所以常用来处理有倾斜的原数据流。

KeyGroupStreamPartitioner
    private final KeySelector<T, K> keySelector;private int maxParallelism;// dataStream.keyBy()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}public static int assignToKeyGroup(Object key, int maxParallelism) {return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;}public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}

这就是keyBy()算子底层所采用的StreamPartitioner,可见是先在key值的基础上经过了两重哈希得到key对应的哈希值,第一重是Java自带的hashCode(),第二重则是MurmurHash。然后将哈希值乘以算子并行度,并除以最大并行度,得到最终的分区ID。

BroadcastPartitioner
    // dataStream.broadcast()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");}@Overridepublic boolean isBroadcast() {return true;}

BroadcastPartitioner是广播流专用的分区器。由于广播流发挥作用必须靠DataStream.connect()方法与正常的数据流连接起来,所以实际上不需要BroadcastPartitioner来选择分区(广播数据总会投递给下游算子的所有并发),selectChannel()方法也就不必实现了。细节请参见Flink中BroadcastStream相关的源码,这里就不再列举了。

RescalePartitioner
    private int nextChannelToSendTo = -1;// dataStream.rescale()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {if (++nextChannelToSendTo >= numberOfChannels) {nextChannelToSendTo = 0;}return nextChannelToSendTo;}

这个看起来也太简单了,并且与RebalancePartitioner的逻辑是相同的?实际上并不是。我们看看StreamingJobGraphGenerator类,它负责把Flink执行计划中的StreamGraph(逻辑执行计划)转换为JobGraph(优化的逻辑执行计划)。其connect()方法中有如下代码。

        if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.POINTWISE,resultPartitionType);} else {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,DistributionPattern.ALL_TO_ALL,resultPartitionType);

粗略地讲,如果分区逻辑是RescalePartitioner或ForwardPartitioner(下面会说),那么采用POINTWISE模式来连接上下游的顶点,对于其他分区逻辑,都用ALL_TO_ALL模式来连接。看下面两张图会比较容易理解。

也就是说,POINTWISE模式的RescalePartitioner在中间结果传送给下游节点时,会根据并行度的比值来轮询分配给下游算子实例的子集,对TaskManager来说本地性会比较好。而ALL_TO_ALL模式的RebalancePartitioner是真正的全局轮询分配,更加均衡,但是就会不可避免地在节点之间交换数据,如果数据量大的话,造成的网络流量会很可观。

ForwardPartitioner
   // dataStream.forward()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return 0;}

与GlobalPartitioner的实现相同。但通过上面对POINTWISE和ALL_TO_ALL连接模式的讲解,我们能够知道,它会将数据输出到本地运行的下游算子的第一个实例,而非全局。在上下游算子的并行度相同的情况下,默认就会采用ForwardPartitioner。反之,若上下游算子的并行度不同,默认会采用前述的RebalancePartitioner。

CustomPartitionerWrapper
    Partitioner<K> partitioner;KeySelector<T, K> keySelector;// dataStream.partitionCustom()@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance(), e);}return partitioner.partition(key, numberOfChannels);}

这就是自定义的分区逻辑了,我们可以通过继承Partitioner接口自己实现,并传入partitionCustom()方法。举个简单的栗子,以key的长度做分区:

    sourceStream.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {return key.length() % numPartitions;}}, 0);

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于说说Flink DataStream的八种物理分区逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143140

相关文章

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

逻辑表达式,最小项

目录 得到此图的逻辑电路 1.画出它的真值表 2.根据真值表写出逻辑式 3.画逻辑图 逻辑函数的表示 逻辑表达式 最小项 定义 基本性质 最小项编号 最小项表达式   得到此图的逻辑电路 1.画出它的真值表 这是同或的逻辑式。 2.根据真值表写出逻辑式   3.画逻辑图   有两种画法,1是根据运算优先级非>与>或得到,第二种是采

UMI复现代码运行逻辑全流程(一)——eval_real.py(尚在更新)

一、文件夹功能解析 全文件夹如下 其中,核心文件作用为: diffusion_policy:扩散策略核心文件夹,包含了众多模型及基础库 example:标定及配置文件 scripts/scripts_real:测试脚本文件,区别在于前者倾向于单体运行,后者为整体运行 scripts_slam_pipeline:orb_slam3运行全部文件 umi:核心交互文件夹,作用在于构建真

Matter.js:Web开发者的2D物理引擎

Matter.js:Web开发者的2D物理引擎 前言 在现代网页开发中,交互性和动态效果是提升用户体验的关键因素。 Matter.js,一个专为网页设计的2D物理引擎,为开发者提供了一种简单而强大的方式,来实现复杂的物理交互效果。 无论是模拟重力、碰撞还是复杂的物体运动,Matter.js 都能轻松应对。 本文将带你深入了解 Matter.js ,并提供实际的代码示例,让你一窥其强大功能

JAVAEE初阶第七节(中)——物理原理与TCP_IP

系列文章目录 JAVAEE初阶第七节(中)——物理原理与TCP_IP 文章目录 系列文章目录JAVAEE初阶第七节(中)——物理原理与TCP_IP 一.应用层重点协议)1. DNS2 .NAT3. NAT IP转换过程 4 .NAPT5. NAT技术的缺陷6. HTTP/HTTPS7. 自定义协议 二. 传输层重点协议 1 .UDP协议 2.1.1 UDP协议端格式 2.1.2 UD

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

数据库系统 第41节 数据库分区简介

数据库分区是一种数据库设计技术,用于将大型表或索引的数据分布到不同的物理区域,以提高查询性能、优化数据管理、简化维护任务,并提高数据的可用性。下面我将详细介绍每种分区类型,并结合伪代码或概念性的源代码来说明其实现方式。 1. 范围分区 (Range Partitioning) 范围分区是根据某个列的值范围来划分数据。例如,可以按照日期或数值范围来分区。 示例场景:一个订单表,按年份分区。

Windows环境下SD卡多分区 隐藏分区 解决python裸读写扇区失败

SD卡分区 右键“我的电脑”->“管理”->“磁盘管理”; 如果SD卡有文件系统,点击"删除卷",重新做卡; 删除文件系统后如下图,点击“新建简单卷”; 在导航页“指定卷大小”,设置分区的大小,留出剩余的空间; python读写SD卡 这里需要注意的是,写SD卡需要加锁,否则写不成功 #带文件系统,需要锁定卷,否则无法写扇区@contextlib.contextmanagerd

基础物理-向量3

总结 标量和向量 标量,如温度,仅具有大小。它们通过一个带有单位的数字(例如 10°C)表示,并遵循算术和普通代数的规则。向量,如位移,既具有大小又具有方向(例如 5 米,向北),并遵循向量代数的规则。 几何法加向量 两个向量 a ⃗ \vec{a} a 和 b ⃗ \vec{b} b 可以通过几何法相加,即将它们按照共同的比例绘制,并首尾相接放置。连接第一个向量的尾部和第二个

【Java编程的逻辑】原子变量 CAS 显示锁

原子变量 在理解synchronized中有使用synchronized保证原子更新操作,但是使用synchronized成本太高了,需要先获取锁,最后还要释放锁,如果获取不到锁还需要等到。这些成本都是比较高的,对于这种情况,可以使用原子变量。 Java并发包中的基本原子变量类型有以下几种: AtomicBoolean:原子Boolean类型,常用来在程序中表示一个标志位 AtomicIn