Spark---累加器

2024-01-13 18:44
文章标签 spark 累加器

本文主要是介绍Spark---累加器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.累加器实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    //建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)var sum=0dataRdd.foreach(num=>sum+=num)println(sum)context.stop()

运行结果:
在这里插入图片描述
我们预期是想要实现数据的累加,开始数据从Driver被传输到了Executor中进行计算,但是每个分区在累加数据完成之后并没有将计算结果返回到Driver端,所以导致最后的结果与预期的不一致。
在这里插入图片描述
对上述代码使用累加器

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))val sum = context.longAccumulator("sum")dataRdd.foreach(num=>{//使用累加器sum.add(num)})//获取累加器的值println(sum.value)

运行结果:
在这里插入图片描述
由此可见,在使用了累加器之后,每个Executor在开始都会获得这个累加器变量,每个Executor在执行完成后,累加器会将每个Executor中累加器变量的值聚合到Driver端。
在这里插入图片描述

Spark提供了多种类型的累加器,以下是其中的一些:
在这里插入图片描述

2.自定义累加器

用户可以通过继承AccumulatorV2来自定义累加器。需求:自定义累加器实现WordCount案例。

AccumulatorV2[IN,OUT]中:
IN:输入数据的类型
OUT:输出数据类型

在这里插入图片描述
WordCount案例实现完整代码:

package bigdata.wordcount.leijiaqiimport bigdata.wordcount.leijiaqi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable/*** 使用累加器完成WordCount案例*/
object Spark_addDemo {def main(args: Array[String]): Unit = {//建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val dataRDD: RDD[String] = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")//创建累加器对象val wordCountAccumulator = new WordCountAccumulator//向Spark中进行注册context.register(wordCountAccumulator,"wordCountAccumulator")//实现累加dataRDD.foreach(word => {wordCountAccumulator.add(word)})//获取累加结果,打印在控制台上println(wordCountAccumulator.value)//关闭链接context.stop()}}class WordCountAccumulator extends  AccumulatorV2[String,mutable.Map[String,Long]]
{//定义一个map用于存储累加后的结果var map: mutable.Map[String, Long] =mutable.Map[String,Long]()//累加器是否为初始状态override def isZero: Boolean = {map.isEmpty}//复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new WordCountAccumulator()}//重置累加器override def reset(): Unit = {map.clear()}//向累加器添加数据INoverride def add(word: String): Unit = {// 查询 map 中是否存在相同的单词// 如果有相同的单词,那么单词的数量加 1// 如果没有相同的单词,那么在 map 中增加这个单词val newValue = map.getOrElse(word, 0L) + 1map.update(word,newValue)}//合并累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {var map1=this.mapvar map2=other.value//合并两个mapmap2.foreach({case (word,count)=>{val newValue = map1.getOrElse(word,0L)+countmap1.update(word,newValue)}})}//返回累加器的结果(OUT)override def value: mutable.Map[String, Long] = this.map
}
}

运行结果:
在这里插入图片描述

这篇关于Spark---累加器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/602432

相关文章

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

Spark数据介绍

从趋势上看,DataFrame 和 Dataset 更加流行。 示例场景 数据仓库和 BI 工具集成: 如果你需要处理存储在数据仓库中的结构化数据,并且希望与 BI 工具集成,那么 DataFrame 和 Dataset 是首选。 机器学习流水线: 在构建机器学习流水线时,使用 DataFrame 和 Dataset 可以更好地管理数据流,并且可以方便地与 MLlib 集成。 实时数据处理:

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop

Spark-在集群上运行Spark

Spark-在集群上运行Spark

Spark—数据读取和保存

Spark—数据读取和保存

Spark源码分析之Spark Shell(上)

终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。 先来介绍一下Spark-shell是什么? Spark-shell是提供给用户即时交互的一个命令窗口,你可以在里面编写spark代码,然后根据你的命令立即进行

[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。可以查看 编程指南了解更多的内容。 为了良好的阅读下面的文档,最好是结合实际的练习。首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs。 Spark Shell 交互 基本操作 Spark Shell提供给用