Mark:Spark RDD之Partition

2024-04-06 20:08
文章标签 spark rdd partition mark

本文主要是介绍Mark:Spark RDD之Partition,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概要

Spark RDD主要由Dependency、Partition、Partitioner组成,Partition是其中之一。一份待处理的原始数据会被按照相应的逻辑(例如jdbc和hdfs的split逻辑)切分成n份,每份数据对应到RDD中的一个Partition,Partition的数量决定了task的数量,影响着程序的并行度,所以理解Partition是了解spark背后运行原理的第一步。

Partition定义

 
查看spark源码,trait Partition的定义很简单,序列号index和hashCode方法。Partition和RDD是伴生的,即每一种RDD都有其对应的Partition实现,所以,分析Partition主要是分析其子类。我们关注两个常用的子类,JdbcPartition和HadoopPartition。此外,RDD源码中有5个方法,代表其组成,如下: 

第二个方法,getPartitions是数据源如何被切分的逻辑,返回值正是Partition,第一个方法compute是消费切割后的Partition的方法,所以学习Partition,要结合getPartitions和compute方法。

  • JdbcPartition例子 
    下面是Spark JdbcRDDSuite中一个例子 

    val sc = new SparkContext("local[1]", "test") 
    val rdd = new JdbcRDD( 
    sc, 
    () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, 
    // DATA类型为INTEGER 
    "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 
    1, 100, 3, 
    (r: ResultSet) => { r.getInt(1) } ).count()
     

    查看JdbcPartition实现,相比Partition,主要多了lower和upper这两个字段。 

    查看JdbcRDD的getPartitions,按照如上图所示算法将1到100分为3份(partition数量),结果为(1,33)、(34,66)、(67,100),封装为JdbcPartition并返回,这样数据切分的部分就完成了。 

    查看JdbcRDD的compute方法,逻辑清晰,将Partition强转为JdbcPartition,获取连接并预处理sql,将 
    例子中的”SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?”问号分别用Partition的lower和upper替换(即getPartitions切分好的(1,33)、(34,66)、(67,100))并执行查询。至此,JdbcPartition如何发挥作用就分析完了。

  • HadoopPartition例子 
    举个简单例子

    val sc = new SparkContext("local[1]", "test")
    sc.textFile("hdfs://your-file-path").count()
    • 1
    • 2


    相比Partition,HadoopPartition则多了InputSplit。 

    spark切分hdfs文件,调用的是Hadoop的API,对这块不熟的同学查看上面InputSplit的链接。 

    执行计算的逻辑也很简单,将Partition强转为HadoopPartition,HadoopPartition内有InputSplit对象。调用Hadoop API三个读取数据的相关对象,InputSplit、InputFormat和Reader,读取对应split的数据。这块需要你对Hadoop的掌握,另外我在下面会讲Hadoop split的策略。

决定partition数量的因素

Partition数量可以在初始化RDD时指定(如JdbcPartition例子),不指定的话(如HadoopPartition例子),则 
读取spark.default.parallelism配置,不同类型资源管理器取值不同,如下 

了解了默认的partition数量,再看一些具体API的partition行为

  • RDD初始化相关
Spark APIpartition数量
sc.parallelize(…)sc.defaultParallelism
sc.textFile(…)max(传参, block数)
val hbaseRDD = sc.newAPIHadoopRDD(…)max(传参, block数)
val jdbcRDD = new JdbcRDD(…)传参
  • 通用transformation
filter(),map(),flatMap(),distinct()和父RDD相同
rdd.union(otherRDD)rdd.partitions.size + otherRDD. partitions.size
rdd.intersection(otherRDD)max(rdd.partitions.size, otherRDD. partitions.size)
rdd.subtract(otherRDD)rdd.partitions.size
rdd.cartesian(otherRDD)rdd.partitions.size * otherRDD. partitions.size
  • Key-based Transformations
reduceByKey(),foldByKey(),combineByKey(), groupByKey()和父RDD相同
sortByKey()同上
mapValues(),flatMapValues()同上
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin()所有父RDD按照其partition数降序排列,从partition数最大的RDD开始查找是否存在partitioner,存在则partition数由此partitioner确定,否则,所有RDD不存在partitioner,由spark.default.parallelism确定,若还没设置,最后partition数为所有RDD中partition数的最大值

上面的Partition行为我们从中挑一个细分析,就是sc.textFile(…, numPartitions)读取hdfs时的Partition数,上表给出的答案是numPartitions和block数较大者,如果不指定numPartitions,则numPartitions<=2, 分析这个问题,其实跟spark无关,要查看Hadoop源码FileInputFormat类中getSplits方法

  • 指定numPartitions 
     
    totalSize为待处理文件总大小,numSplits就是我们所指定的numPartitions,得到了平均的文件大小goalSize,接下来 
     
    比较计算得到的goalSize和block大小blockSize,取其中较小者,再和minSize(由属性mapreduce.input.fileinputformat.split.minsize确定,默认值为0,则minSize默认值为1)取较大的。 
    假设待处理文件大小fSize=512M(视为一个大文件,不考虑1.1系数),block大小bSize=128M,

    1. sc.textFile(…, 3) 
      根据上面的公式goalSize=512M/3 > bSize=128M 
      取其较小者bSize,则按照bSize切分,split数=512M/128=4,即partition数=4

    2. sc.textFile(…, 5) 
      根据上面的公式goalSize=512M/5 < bSize=128M 
      取其较小者goalSize,则按照goalSize切分,split数=512M/(512M/5)=5,即partition数=5

    可见指定numPartitions,小于block数时无效,大于则生效。

  • 不指定numPartitions 
     
    默认,传给FileInputFormat类getSplits方法的numSplits值是sc.defaultParallelism和2的较小值,所以spark.default.parallelism几乎是没用的,Partition数就是block数。那么为什么是这样的呢,感兴趣的同学看下这个讨论

Partition数量影响及调整

上面分析了决定Partition数量的因数,接下来就该考虑Partition数量的影响以及合适的值。

  • Partition数量的影响

    1. Partition数量太少 
      太少的影响显而易见,就是资源不能充分利用,例如local模式下,有16core,但是Partition数量仅为8的话,有一半的core没利用到。
    2. Partition数量太多 
      太多,资源利用没什么问题,但是导致task过多,task的序列化和传输的时间开销增大。

    那么多少的partition数是合适的呢,这里我们参考spark doc给出的建议,Typically you want 2-4 partitions for each CPU in your cluster。

  • Partition调整 
    1. repartition 
      reparation是coalesce(numPartitions, shuffle = true),repartition不仅会调整Partition数,也会将Partitioner修改为hashPartitioner,产生shuffle操作。
    2. coalesce 
      coalesce函数可以控制是否shuffle,但当shuffle为false时,只能减小Partition数,无法增大。

总结

Partition对应的是不同数据源的split逻辑,首先以JdbcPartition和HadoopPartition为例,介绍了Partition的组成,以及如何发挥作用,接下来分析了常见API的Partition行为,最后简单介绍了Partition数量的影响及调整。

参考: 
https://techmagie.wordpress.com/2015/12/19/understanding-spark-partitioning/ 
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html 
https://spark.apache.org/docs/latest/tuning.html 
https://www.mapr.com/developercentral/code/loading-hbase-tables-spark

注:图片中代码均为Spark、Hadoop源码,我稍作处理,如去掉log、metric等,使逻辑更清晰。

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011564172/article/details/53611109

这篇关于Mark:Spark RDD之Partition的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/880718

相关文章

openfire+spark 在linux下安装,配置

文章转自:点击打开链接 相关软件下载 链接: https://pan.baidu.com/s/1boJs61h 密码: 2wd7 Openfire 在linux下安装和配置 + spark 在windows下配置 本机环境 系统:CentOS 6.7 64 位JDK 1.7 64 位MySQL 5.6 Openfir

任务5.1 初识Spark Streaming

实战概述:使用Spark Streaming进行词频统计 1. 项目背景与目标 背景: Spark Streaming是Apache Spark的流处理框架,用于构建可伸缩、高吞吐量的实时数据处理应用。目标: 实现一个实时词频统计系统,能够处理流式数据并统计文本中的单词出现频率。 2. 技术要点 Spark Streaming集成: 与Spark生态的其他组件如Spark SQL、ML

count(distinct ...) over (partition by...) 替换成mysql

你这个是用了 Oracle 的分析函数。 SQL Server 是不支持的。如果语句比较简单的。例如SELECT COUNT( distinct A) OVER ( partition by B) FROM C可以修改为:SELECT COUNT( distinct A) FROM CGROUP BY B但是如果你的逻辑很复杂的话,那就麻烦了。

Spark算子:RDD行动Action操作(3)–aggregate、fold、lookup

aggregate def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意se

Spark算子:RDDAction操作–first/count/reduce/collect/collectAsMap

first def first(): T first返回RDD中的第一个元素,不排序。 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at mak

Spark算子:RDD键值转换操作(4)–cogroup/join

cogroup 函数原型:最多可以组合4个RDD,可以通过partitioner和numsPartitions设置 def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) :RDD[(K, (Iterable[V],

Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally

groupByKey def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] 该函数用于将RDD[K,V]中每个K对应

Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues

partitionBy       def partitionBy(partitioner: Partitioner): RDD[(K, V)]       该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。 scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)rd

Spark算子:RDD基本转换操作(6)–zip、zipPartitions

zip       def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]        zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。 scala> var rdd1 = sc.makeRDD(1 to 10,2)

Spark算子:RDD基本转换操作(5)–mapPartitions/mapPartitionsWithIndex

mapPartitions def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]      该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代