本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Spark三大数据结构分别是:
➢ RDD : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量
一.累加器(accumulator)
问题引入:
当使用foreach来对rdd求和会发现求和数据为0
val rdd = sc.makeRDD(List(1,2,3,4))var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum)
输出:sum = 0
在Spark中声明SparkContext的类称为Driver,所以变量sum在Driver中;而任务Task(即分区数据的运算)的执行是在Executor中进行,即sum = sum + num在Executor节点执行;
问题的关键点在于:Executor只是做了运算,但并没有将sum运算后的值返回Driver中,也就是说Driver中的sum变量至始至终都保持初始值为0;如下图所示:
此时便可以考虑使用累加器解决上述问题
1.系统累加器
val rdd = sc.makeRDD(List(1,2,3,4))
val sumAcc = sc.longAccumulator("sum")//sc.doubleAccumulator//sc.collectionAccumulatorrdd.foreach(num => {// 使用累加器sumAcc.add(num)})// 获取累加器的值println(sumAcc.value)
输出:10
图解:
使用累加器的一些问题:
- 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
val rdd = sc.makeRDD(List(1,2,3,4))val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})println(sumAcc.value)
输出:0
- 多加:累加器为全局共享变量,多次调用行动算子就会多次执行
val sumAcc = sc.longAccumulator("sum")val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})mapRDD.collect()mapRDD.collect()println(sumAcc.value)
输出:20
2.自定义累加器
- 继承 AccumulatorV2,并设定泛型
- 重写累加器的抽象方法
示例:自定义数据累加器WordCount
package org.xyl
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutableobject RDD_Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc").set("spark.testing.memory", "2147480000")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List("hello", "spark", "hello"))// 累加器 : WordCount// 创建累加器对象val wcAcc = new MyAccumulator()// 向Spark进行注册sc.register(wcAcc, "wordCountAcc")rdd.foreach(word => {// 数据的累加(使用累加器)wcAcc.add(word)})// 获取累加器累加的结果println(wcAcc.value)sc.stop()}/*自定义数据累加器:WordCount1. 继承AccumulatorV2, 定义泛型IN : 累加器输入的数据类型 StringOUT : 累加器返回的数据类型 mutable.Map[String, Long]2. 重写方法(6)*/class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private var wcMap = mutable.Map[String, Long]()// 判断是否初始状态override def isZero: Boolean = {wcMap.isEmpty}override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()}override def reset(): Unit = {wcMap.clear()}// 获取累加器需要计算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)}// Driver合并多个累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMapval map2 = other.valuemap2.foreach{case ( word, count ) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}}// 累加器结果override def value: mutable.Map[String, Long] = {wcMap}}
}
输出:Map(spark -> 1, hello -> 2)
二.广播变量(broadcast variable)
问题引入:
当在Executor端用到了Driver变量,比如使用map()函数,在每个Executor中有多少个task就有多少个Driver端变量副本。
广播变量可以让我们在每台计算机上保留一个只读变量,而不是为每个任务复制一份副本。
Spark会自动广播每个stage任务需要的通用数据。这些被广播的数据以序列化的形式缓存起来,然后在任务运行前进行反序列化。也就是说,在以下两种情况下显示的创建广播变量才有用:
1)当任务跨多个stage并且需要同样的数据时;
2)当以反序列化的形式来缓存数据时。
【闭包】:是一个函数,这个函数的执行结果由外部自由变量(不是函数的局部变量也不是函数的参数,也称字面量)决定——函数执行时,才捕获相关的自由变量(获取自由变量的当前值),从而形成闭合的函数。
spark执行一个Stage时,会为待执行函数(function,也称为【算子】)建立闭包(捕获函数引用的所有数据集),形成该Stage所有task所需信息的二进制形式,然后把这个闭包发送到集群的每个Executor上。
示例:
注:在创建广播变量时,广播变量的值必须是本地的可序列化的值,不能是RDD,因为RDD是不存数据的。但可以将RDD的结果广播出去。
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark06_Bc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("broadcast")val sc = new SparkContext(sparConf)val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))val map = mutable.Map(("a", 4),("b", 5),("c", 6))// 封装广播变量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)rdd1.map {case (w, c) => {// 访问广播变量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)sc.stop()}
}
输出:
(a,(1,4))
(b,(2,5))
(c,(3,6))
注意:
累加器只能在driver端定义,driver端读取,不能在Executor端读取。
广播变量只能在driver端定义,在Executor端读取,Executor不能修改。
这篇关于Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!