累加器和广播变量

2024-01-31 13:58
文章标签 广播 变量 累加器

本文主要是介绍累加器和广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、简介

​ Spark 中的三大数据结构:RDD,累加器,广播变量。

​ 累加器和广播变量属于共享变量,累加器是只写变量,广播变量是只读变量。

​ 共享变量是指可以在 Excutor 上来更改(累加器) 和读取(广播变量) Driver 上的数据。

二、累加器

2.1 用途

​ 累加器的常见用途是在调试时对作业执行的过程中的事件进行计数。例如:统计 100 内的偶数的个数。

2.2 用法

  1. 通过调用 SparkContext 的 accumulator(initiaValue) 方法来创建累加器 ac
  2. 在 scala 中通过 += 来更改 ac(java 中通过 add 来修改)
  3. 使用 ac.value 来访问累加器的值
scala> val sourceRDD = sc.makeRDD(1 to 100, 3)
sourceRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24//1. 创建累加器
scala> val accumulator = sc.accumulator(0)
//注意:这里产生了一个警告,这是 spark 的容错性导致的
warning: there were two deprecation warnings; re-run with -deprecation for details
accumulator: org.apache.spark.Accumulator[Int] = 0
//2. 修改累加器的值
scala> val test = sourceRDD.map(x => {if(x % 2 == 0) accumulator += 1})
//3. 访问累加器
scala> println(accumulator.value)
50

2.3 累加器与容错性

​ 如果一个节点上的任务执行的速度过慢或者该节点产生了错误,那么 spark 会把该任务重新分配到其它节点上去运行,那么这样就会导致累加器被改变多次,与实际想要的结果不符,所以 spark 现在的解决策略在行动操作中使用的累加器,spark 只会把每个任务对各累加器的修改应用一次。因此,为了保证无论再失败还是重复计算的时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动算子中

scala> val sourceRDD = sc.makeRDD(1 to 100, 3)
sourceRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24scala> val accumulator = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
accumulator: org.apache.spark.Accumulator[Int] = 0
// 行动算子中使用累加器
scala> sourceRDD.foreach(x => (if(x % 2 == 0) accumulator += 1))scala> println(accumulator.value)
60

2.4 自定义累加器

import java.utilimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2object MyAccumulatorTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("myAccumulatorTest")val sc = new SparkContext(conf)//创建累加器对象val myAccumulator = new MyAccumulator()//注册累加器sc.register(myAccumulator)//创建数据源val wordSource: RDD[String] = sc.makeRDD(List("hadoop", "hive", "hbase", "spark", "kafka"))// action 中对累加器进行修改wordSource.foreach(word => myAccumulator.add(word))//访问累加器中的值println(myAccumulator.value)    // [hbase, hadoop, hive]sc.stop()}}//自定义累加器
class MyAccumulator extends AccumulatorV2[String, util.ArrayList[String]] {val list = new util.ArrayList[String]()//检查是否初始化override def isZero: Boolean = {list.isEmpty}//复制累加器override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {new MyAccumulator}//重置override def reset(): Unit = {list.clear()}//增加累加器override def add(v: String): Unit = {if(v.contains("h")) {list.add(v)}}//两个累加器合并override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {list.addAll(other.value)}//返回累加器的值override def value: util.ArrayList[String] = {list}
}

三、广播变量

2.1 用途

​ 当多个 Executor 中的多个 Task 操作需要使用(读取)同一个很大变量时,如果我们采取常规方式把该变量发送到每一个 task 中,那么会极大地浪费性能,所以我们可以直接把该变量发送到每一个 Executor 上,Executor 上对应的 Task 可以共同访问该变量,这样就可以提高性能。

2.2 用法

  1. 通过 SparkContext 的 broadcast 方法创建广播变量
  2. 通过 value 来访问广播变量的值
//这里只做一个示范,就广播一个字符串,其实实际应用中广播的全是大数据,如巨大的矩阵
scala> val a = "a"
a: String = ascala> val broadValue = sc.broadcast(a)
broadValue: org.apache.spark.broadcast.Broadcast[String] = Broadcast(5)scala> val source = sc.makeRDD(1 to 10, 3)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24scala> val castRDD = source.map(x => x + broadValue.value)
castRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:27scala> castRDD.collect
res8: Array[String] = Array(1a, 2a, 3a, 4a, 5a, 6a, 7a, 8a, 9a, 10a)

这篇关于累加器和广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/664015

相关文章

浅析Rust多线程中如何安全的使用变量

《浅析Rust多线程中如何安全的使用变量》这篇文章主要为大家详细介绍了Rust如何在线程的闭包中安全的使用变量,包括共享变量和修改变量,文中的示例代码讲解详细,有需要的小伙伴可以参考下... 目录1. 向线程传递变量2. 多线程共享变量引用3. 多线程中修改变量4. 总结在Rust语言中,一个既引人入胜又可

java如何调用kettle设置变量和参数

《java如何调用kettle设置变量和参数》文章简要介绍了如何在Java中调用Kettle,并重点讨论了变量和参数的区别,以及在Java代码中如何正确设置和使用这些变量,避免覆盖Kettle中已设置... 目录Java调用kettle设置变量和参数java代码中变量会覆盖kettle里面设置的变量总结ja

Perl 特殊变量详解

《Perl特殊变量详解》Perl语言中包含了许多特殊变量,这些变量在Perl程序的执行过程中扮演着重要的角色,:本文主要介绍Perl特殊变量,需要的朋友可以参考下... perl 特殊变量Perl 语言中包含了许多特殊变量,这些变量在 Perl 程序的执行过程中扮演着重要的角色。特殊变量通常用于存储程序的

变量与命名

引言         在前两个课时中,我们已经了解了 Python 程序的基本结构,学习了如何正确地使用缩进来组织代码,并且知道了注释的重要性。现在我们将进一步深入到 Python 编程的核心——变量与命名。变量是我们存储数据的主要方式,而合理的命名则有助于提高代码的可读性和可维护性。 变量的概念与使用         在 Python 中,变量是一种用来存储数据值的标识符。创建变量很简单,

JS_变量

二、JS的变量 JS中的变量具有如下特征 1 弱类型变量,可以统一声明成var 2 var声明的变量可以再次声明 3 变量可以使用不同的数据类型多次赋值 4 JS的语句可以以; 结尾,也可以不用;结尾 5 变量标识符严格区分大小写 6 标识符的命名规则参照JAVA 7 如果使用了 一个没有声明的变量,那么运行时会报uncaught ReferenceError: *** is not de

使用条件变量实现线程同步:C++实战指南

使用条件变量实现线程同步:C++实战指南 在多线程编程中,线程同步是确保程序正确性和稳定性的关键。条件变量(condition variable)是一种强大的同步原语,用于在线程之间进行协调,避免数据竞争和死锁。本文将详细介绍如何在C++中使用条件变量实现线程同步,并提供完整的代码示例和详细的解释。 什么是条件变量? 条件变量是一种同步机制,允许线程在某个条件满足之前进入等待状态,并在条件满

axure之变量

一、设置我们的第一个变量 1、点击axure上方设置一个全局变量a = 3 2、加入按钮、文本框元件点击按钮文档框展示变量值。 交互选择【单击时】【设置文本】再点击函数。 点击插入变量和函数直接选择刚刚定义的全局变量,也可以直接手动写入函数(注意写入格式。) 这样点击按钮时就直接展示刚刚设置的全局变量3了。 2、更改变量值 在新建交互里点击设置变量值。 将a变量设置成等于10. 将新

shell脚本中变量中字符串替换的测试 /和//的区别

test_char=abbbcbbbf echo "bf:test_char = " $test_char test_char=${test_char/bbb/ddd} echo "af:test_char = " $test_char 输出: bf:test_char =  abbbcbbbf af:test_char =  adddcbbbf 只匹配第一个

eclipse中相同变量显示变色设置

java文件的设置"Window"-"preferences"-"Java"-"Editor"-"Mark Occurrences"复选框勾选 js文件的设  置"Window"-"preferences"-"web"-"javascript"-"Mark Occurrences"复选框勾选 。

Python学习1--变量和简单数据

经过这一段时间的学习,将Python相关的知识点记录下来,好记性不如烂笔头嘛。 本文主要参考了《Python编程从入门到实践》以及唐宇迪老师的教程《Python快速入门视频课程》,然后在博主http://www.cnblogs.com/liubinsh/p/6937409.html的基础上总结而成,特此感谢! 第二章 变量和简单数据类型 什么是变量 这里的message就是变量,