本文主要是介绍Spark你需要知道这些,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
谈到 Spark,我们总是强调它比 Hadoop 更高效。为什么它可以更高效呢?是因为它优先使用内存存储?还是因为它拥有比 MapReduce 更简单高效的计算模型?
与 Hadoop 作业的区别
我们知道在 Hadoop 中,一个作业(Job)可以有一个或多个Task,Task 又可以分成 Map Task 和 Reduce Task。每个Task 分别在自己的进程中运行,Hadoop 中一个 Task 就是一个进程,其模型如下:
在 Spark 中,同样有作业(Job)的概念。一个 Application 和一个 SparkContext 相关联,每个Application 可以有一个或多个 Job并行运行。每个 Job 中包含多个 stage,stage 中?️包含多个 Task,多个Task 构成 Task set,Spark 中的task 可以理解为线程,其模型如下:
与 MapReduce 一个应用一次只能运行一个 Map 和 一个Reduce 不同,Spark 可以根据应用的复杂程度,分割成更多的 stage,这些 stage 组成一个DAG(有向无环图),spark 任务调度器根据 DAG 的依赖关系执行 stage。
执行流程
Spark 中有两个重要的进程:Driver 和 Executor 。Driver 构建 SparkContext ,初始化执行配置和输入数据。SparkContext 启动 DAGScheduler 构造执行的 DGA,切分成最小的执行单位。
Driver 向 Cluster Manager 请求计算资源,用于 DAG 的分布式计算。Cluster Manager 收到请求后,将 Driver 的主机地址等信息通知给集群的所有计算节点 Worker。
Worker 收到信息后,根据 Driver 的主机地址和 Driver 通信,并注册,然后根据自己的空闲资源向 Driver 通报自己可以领用的任务数。Driver 根据 DAG 开始向注册的 Worker 分配任务。你可以将这里的 Driver 类比 Yarn 中的 ApplicationMaster,都具有任务调度的能力。
Worker 收到任务后,启动 Executor 进程执行任务,Executor 拥有线程池,管理 Task 线程,其过程如下:
RDD(弹性分布式数据集 )
通常说 Spark 基于 RDD 模型,具有良好的通用性、容错性和并行处理数据的能力。为什么这样说?
RDD 是数据集的描述,不是数据集本身。它描述的是只读的、可分区的弹性分布式数据集。RDD的几个特点描述如下:
- RDD使用户能够显示将计算结果保存在内存中,控制数据的划分,并使用更丰富的操作集合来处理。
- 记录数据的变换,而不是数据本身保证容错。
- 懒操作,延迟计算,action的时候才操作。
- 瞬时性,用时才产生,用完就释放。
如何保证容错?RDD 记录数据变换而不是数据本身,当部分数据丢失时,RDD 拥有足够的信息得知这部分数据如何被计算到的,可以通过这些计算来重新得到丢失的数据。这种恢复数据的方法很快,无需大量数据复制操作。
Spark 针对 RDD 提供了两类操作:transformations 和 action。transformations 采用懒策略,仅在对相关 RDD 进行action 提交时才触发计算。Spark 中常见的 transformations 和 action 操作如下图所示:
问:groupByKey 和 reduceByKey的区别
reduceByKey又一个map-side combiner过程,能减少计算量。
RDD 依赖关系
谈到 RDD,我们不得不提 RDD 中的两种典型的依赖关系:宽依赖和窄依赖。我们知道每个 RDD 包含了 partition(分区)的集合,partition 是不可分割的。宽依赖:一个父RDD的partition会被多个子RDD的partition使用。窄依赖:一个父RDD的partition至多被子RDD的某个partition使用一次。
Spark 中 stage 的划分依据是 shuffle,每个stage 内部尽可能多的包含一组具有窄依赖关系的转换,并将它们流水线并行化。其划分逻辑如下图所示:
每个 partition 的计算就是一个 task,RDD 的依赖关系也可以理解成 task 的执行关系。
常见的宽依赖和窄依赖的算子
常见的宽依赖操作有:groupByKey、join(父RDD不是hash-partitioned)、partitionBy。
窄依赖操作有:map、filter、union、join(父RDD是hash-partitioned)、mapPartition、mapValues。
问:join是宽依赖还是窄依赖
如果JoinAPI之前调用的RDD API是宽依赖(存在shuffle),而且两个join的RDD的分区数量一致,join结果的RDD分区数量也一致,这时候的Join API是窄依赖,除此之外的,RDD的join API是宽依赖。
yarn cluster vs yarn client
cluster模式下,driver运行在AM中,负责向yarn申请资源,并监督作业运行情况,当用户提交完作业,关闭client,作业会继续在yarn上运行。
cluster模式不适合交互类型的作业。而client模式,AM仅向yarn请求executor,client会和请求的container通信来调度任务,即client不能离开。
调优
spark中的调优可以分为参数调优和开发调优。
参数调优
如何进行参数调优,你需要先了解这两点:
- spark中executor内存分配情况。executor的内存分为3块:
第一块:让task执行代码时,默认占executor总内存的20%;
第二块:task通过shuffle过程拉取上一个stage的task输出后,进行聚合等操作时使用,默认也是占20%;
第三块:让RDD持久化使用,默认占executor总内存的60%。 - spark中,task的执行速度和每个executor进程的CUP core数量有直接关系。一个CUP Core同一时间只能执行一个线程,每个executor进程上分配到的多个task,都是以task一条线程的方式,多线程并发执行。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么就可以比较快和高效地执行完这些task线程。
明白了这些你就很容易理解参数调优的几个关键点:
- num-executors:该作业总共需要多少executor进程执行,建议50-100。
- executor-memory:每个executor进程的内存,建议4G-8G。
- executor-cores:每个executor进程的CUP core数量,设置2-4个较合适。
- driver-memory:设置Driver进程的内存,一般1G够了。
- spark.default.parallelism:每个stage默认task数目,建议500-1000比较合适。
- spark.storage.memoryFraction:设置RDD持久化数据在executor内存中能占的比例,默认0.6。若有比较多的持久化操作,可以设置高一点,否则内存会频繁gc导致运行缓慢。
- spark.shuffle.memoryFraction:聚合操作占executor内存的比例,默认0.2,若持久化比较少,shuffle较多时,可以降低持久化内存占比,提高shuffle操作内存占比。
开发调优:
spark计算的核心是RDD模型,开发调优当然是建立在RDD的基础之上,开发优化大致罗列如下:
- 避免创建重复的RDD,同一份数据应该创建一个RDD。
- 尽可能复用一个RDD,例如,一个RDD数据格式是key-value,另一个是单独value类型,这两个RDD的value部分完全一样,可以复用以达到减少算子执行次数。
- 对多次使用的RDD进行持久化处理,借助cache() 和persist() 方法
- 避免使用shuffle类算子,broadcast+map代替join操作,不会导致shuffle操作,单仅适合RDD数据量较少时使用。
- 使用map-side预聚合的shuffle操作,例如:用reduceByKey或aggregateByKey算子代替groupByKey算子。
- 使用kryo优化序列化性能,spark支持使用kryo序列化库,性能比Java序列化高10倍左右。
容错
spark容错依赖RDD模型以及建立在yarn的容错能力上的,包括如下几点:
- 如果此task失败,AM会重新分配task。
- 如果task依赖的上层partition数据已经失效了,会先将其依赖的partition计算任务再重算一遍。
- 宽依赖中被依赖partition,可以将数据保存HDFS,以便快速重构(checkpoint)。
- 可以指定保存一个RDD的数据至几点的cache中,如果内存不够,会LRU释放一部分,仍有重构的可能。
扩展组件
spark提供了便利的扩展组件,可以针对不同的开发场景,具体分类如下:
- spark core:基于RDD提供操作接口,利用DAG进行同一化的任务规划。
- spark SQL:通过把Hive的HQL转换为Spark DAG计算来实现。
- Spark streaming:spark的流式计算框架。
- MLIB:spark的机器学习库,包含常用的机器学习算法。
再谈
对比spark,相信你很容易理解MapReduce计算模型的几个缺点:
- 调度慢,启动map、reduce太耗时。
- 计算慢,每一步都要保存中间结果到磁盘。
- API 简单,只有map和reduce两个原语。
- 缺乏作业流描述,一项任务需要多伦MR。
那么spark由解决了哪些问题?
- 最大化利用内存cache。
- 中间结果方内存,加速迭代。坊间统计,内存计算下,spark比MR快100倍。
- 其结果集放内存,加速后续查询和处理,解决运行慢的问题。
这篇关于Spark你需要知道这些的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!