SparkCore(11):RDD概念和创建RDD两种方法,以及RDD的Partitions以及并行度理解

本文主要是介绍SparkCore(11):RDD概念和创建RDD两种方法,以及RDD的Partitions以及并行度理解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、RDD概念

1.概念

Resilient Distributed Datasets弹性分布式数据集,默认情况下:每一个block对应一个分区,一个分区会开启一个task来处理。

(a)Resilient:可以存在给定不同数目的分区、数据缓存的时候可以缓存一部分数据也可以缓存全部数据
(b)Distributed:分区可以分布到不同的executor执行(也就是不同的worker/NM上执行)
(c)Datasets:内部存储是数据

2.特性

(1)是一系列的分片,分区
(2)每个分片有一个方法来做计算
(3)rdd会有依赖其他rdd的操作,可以通过wordCountRDD.toDebugString来查看
(4)(可选项)如果rdd是二元组,就会存在分区器(默认是hashpartition)
(5)(可选项)最佳位置。数据在哪台机器上,任务就启在哪个机器上,数据在本地上,不用走网络。不过数据进行最后汇总的时候就要走网络。(hdfs file的block块)

二、RDD创建方法

1.外部数据

val path = "hdfs://192.168.31.3:8020/page_views.data"
val originalRdd: RDD[String] = sc.textFile(path)

2.内存中数据:基于序列化进行创建

scala> val seq = List(1,2,3,4,5,6,7)
seq: List[Int] = List(1, 2, 3, 4, 5, 6, 7)scala> val rdd2 = sc.parallelize(seq)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29

三、关键:Partitions以及并行度

1.RDD的partitions数目

(1)读取数据阶段,对于textFile来说,没有在方法中的指定分区数,则默认为min(defaultParallelism,2),而defaultParallelism对应的就是spark.default.parallelism。如果是从hdfs上面读取文件,其分区数为文件block数(128MB/block)
(2)在Map阶段partition数目保持不变。
(3)在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的。

 2.并行度

定义:一个job一次所能执行的task数目,即一个job对应的总的core资源个数

执行一个job的task的并行数 = job的Executor数目 * 每个Executor的core个数。
例如提交scalaProjectMaven.jar的spark任务

date=`date +"%Y%m%d%H%M"`
/opt/modules/spark-2.1.0-bin-2.7.3/bin/spark-submit \
--master yarn \
--deploy-mode client \
--name spark_shell_${date} \
--class wordcount \
--driver-memory   8G \
--driver-cores 4 \    
--executor-memory 4G \
--executor-cores 4 \
--num-executors 3 \
--conf spark.app.coalesce=1 \
/opt/datas/scalaProjectMaven.jar

提交job的并行数=3*4=12,即每一个批次执行12个task,对应12个partitions。

3.partitions和并行度关系

一个partition对应一个要做的task,一个executor的core执行一个task
Tasks(一个RDD的总task数)=该RDD的partitions
Doing(并行执行任务数)= job的Executor数目 * 每个Executor核心数
总共执行批次=Tasks/Doing  (如果不是整除,则加1)

例如:Tasks=50,Doing=30,则执行两次,第一次执行30个task,第二次执行20个task。

4.参考

(1)https://blog.csdn.net/yu0_zhang0/article/details/80454517

 

这篇关于SparkCore(11):RDD概念和创建RDD两种方法,以及RDD的Partitions以及并行度理解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/998302

相关文章

Spring Security自定义身份认证的实现方法

《SpringSecurity自定义身份认证的实现方法》:本文主要介绍SpringSecurity自定义身份认证的实现方法,下面对SpringSecurity的这三种自定义身份认证进行详细讲解,... 目录1.内存身份认证(1)创建配置类(2)验证内存身份认证2.JDBC身份认证(1)数据准备 (2)配置依

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

python获取网页表格的多种方法汇总

《python获取网页表格的多种方法汇总》我们在网页上看到很多的表格,如果要获取里面的数据或者转化成其他格式,就需要将表格获取下来并进行整理,在Python中,获取网页表格的方法有多种,下面就跟随小编... 目录1. 使用Pandas的read_html2. 使用BeautifulSoup和pandas3.

Spring 中的循环引用问题解决方法

《Spring中的循环引用问题解决方法》:本文主要介绍Spring中的循环引用问题解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录什么是循环引用?循环依赖三级缓存解决循环依赖二级缓存三级缓存本章来聊聊Spring 中的循环引用问题该如何解决。这里聊

Java学习手册之Filter和Listener使用方法

《Java学习手册之Filter和Listener使用方法》:本文主要介绍Java学习手册之Filter和Listener使用方法的相关资料,Filter是一种拦截器,可以在请求到达Servl... 目录一、Filter(过滤器)1. Filter 的工作原理2. Filter 的配置与使用二、Listen

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

Windows 上如果忘记了 MySQL 密码 重置密码的两种方法

《Windows上如果忘记了MySQL密码重置密码的两种方法》:本文主要介绍Windows上如果忘记了MySQL密码重置密码的两种方法,本文通过两种方法结合实例代码给大家介绍的非常详细,感... 目录方法 1:以跳过权限验证模式启动 mysql 并重置密码方法 2:使用 my.ini 文件的临时配置在 Wi

MySQL重复数据处理的七种高效方法

《MySQL重复数据处理的七种高效方法》你是不是也曾遇到过这样的烦恼:明明系统测试时一切正常,上线后却频频出现重复数据,大批量导数据时,总有那么几条不听话的记录导致整个事务莫名回滚,今天,我就跟大家分... 目录1. 重复数据插入问题分析1.1 问题本质1.2 常见场景图2. 基础解决方案:使用异常捕获3.

最详细安装 PostgreSQL方法及常见问题解决

《最详细安装PostgreSQL方法及常见问题解决》:本文主要介绍最详细安装PostgreSQL方法及常见问题解决,介绍了在Windows系统上安装PostgreSQL及Linux系统上安装Po... 目录一、在 Windows 系统上安装 PostgreSQL1. 下载 PostgreSQL 安装包2.

SQL中redo log 刷⼊磁盘的常见方法

《SQL中redolog刷⼊磁盘的常见方法》本文主要介绍了SQL中redolog刷⼊磁盘的常见方法,将redolog刷入磁盘的方法确保了数据的持久性和一致性,下面就来具体介绍一下,感兴趣的可以了解... 目录Redo Log 刷入磁盘的方法Redo Log 刷入磁盘的过程代码示例(伪代码)在数据库系统中,r