本文主要是介绍累加器和广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、简介
Spark 中的三大数据结构:RDD,累加器,广播变量。
累加器和广播变量属于共享变量,累加器是只写变量,广播变量是只读变量。
共享变量是指可以在 Excutor 上来更改(累加器) 和读取(广播变量) Driver 上的数据。
二、累加器
2.1 用途
累加器的常见用途是在调试时对作业执行的过程中的事件进行计数。例如:统计 100 内的偶数的个数。
2.2 用法
- 通过调用 SparkContext 的 accumulator(initiaValue) 方法来创建累加器 ac
- 在 scala 中通过 += 来更改 ac(java 中通过 add 来修改)
- 使用 ac.value 来访问累加器的值
scala> val sourceRDD = sc.makeRDD(1 to 100, 3)
sourceRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24//1. 创建累加器
scala> val accumulator = sc.accumulator(0)
//注意:这里产生了一个警告,这是 spark 的容错性导致的
warning: there were two deprecation warnings; re-run with -deprecation for details
accumulator: org.apache.spark.Accumulator[Int] = 0
//2. 修改累加器的值
scala> val test = sourceRDD.map(x => {if(x % 2 == 0) accumulator += 1})
//3. 访问累加器
scala> println(accumulator.value)
50
2.3 累加器与容错性
如果一个节点上的任务执行的速度过慢或者该节点产生了错误,那么 spark 会把该任务重新分配到其它节点上去运行,那么这样就会导致累加器被改变多次,与实际想要的结果不符,所以 spark 现在的解决策略在行动操作中使用的累加器,spark 只会把每个任务对各累加器的修改应用一次。因此,为了保证无论再失败还是重复计算的时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动算子中。
scala> val sourceRDD = sc.makeRDD(1 to 100, 3)
sourceRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24scala> val accumulator = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
accumulator: org.apache.spark.Accumulator[Int] = 0
// 行动算子中使用累加器
scala> sourceRDD.foreach(x => (if(x % 2 == 0) accumulator += 1))scala> println(accumulator.value)
60
2.4 自定义累加器
import java.utilimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2object MyAccumulatorTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("myAccumulatorTest")val sc = new SparkContext(conf)//创建累加器对象val myAccumulator = new MyAccumulator()//注册累加器sc.register(myAccumulator)//创建数据源val wordSource: RDD[String] = sc.makeRDD(List("hadoop", "hive", "hbase", "spark", "kafka"))// action 中对累加器进行修改wordSource.foreach(word => myAccumulator.add(word))//访问累加器中的值println(myAccumulator.value) // [hbase, hadoop, hive]sc.stop()}}//自定义累加器
class MyAccumulator extends AccumulatorV2[String, util.ArrayList[String]] {val list = new util.ArrayList[String]()//检查是否初始化override def isZero: Boolean = {list.isEmpty}//复制累加器override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {new MyAccumulator}//重置override def reset(): Unit = {list.clear()}//增加累加器override def add(v: String): Unit = {if(v.contains("h")) {list.add(v)}}//两个累加器合并override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {list.addAll(other.value)}//返回累加器的值override def value: util.ArrayList[String] = {list}
}
三、广播变量
2.1 用途
当多个 Executor 中的多个 Task 操作需要使用(读取)同一个很大变量时,如果我们采取常规方式把该变量发送到每一个 task 中,那么会极大地浪费性能,所以我们可以直接把该变量发送到每一个 Executor 上,Executor 上对应的 Task 可以共同访问该变量,这样就可以提高性能。
2.2 用法
- 通过 SparkContext 的 broadcast 方法创建广播变量
- 通过 value 来访问广播变量的值
//这里只做一个示范,就广播一个字符串,其实实际应用中广播的全是大数据,如巨大的矩阵
scala> val a = "a"
a: String = ascala> val broadValue = sc.broadcast(a)
broadValue: org.apache.spark.broadcast.Broadcast[String] = Broadcast(5)scala> val source = sc.makeRDD(1 to 10, 3)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24scala> val castRDD = source.map(x => x + broadValue.value)
castRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:27scala> castRDD.collect
res8: Array[String] = Array(1a, 2a, 3a, 4a, 5a, 6a, 7a, 8a, 9a, 10a)
这篇关于累加器和广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!