累加器 - 分布式共享写变量

2024-02-14 19:04

本文主要是介绍累加器 - 分布式共享写变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 概念
      • 注意:
      • 应用

概念

  因为RDD是可分区的,每个分区在不同的节点上运行,如果想要对某个值进行全局累加,就需要将每个task中的值取到然后进行累加,而各个Executor之间是不能相互读取对方数据的,所以就没办法在task里面进行最终累加结果的输出,所以就需要一个全局统一的变量来处理。

用下面的代码举例:

@Test
def test(): Unit = {import org.apache.spark.{SparkConf, SparkContext}val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[4]")val sc = new SparkContext(conf)var sum = 0val rdd = sc.parallelize(List(10,20,30,40,60))rdd.foreach(x=>{sum+=x})println(sum)
}

以上代码的输出结果是 0
分析:因为函数体外的代码是在Driver端执行的,函数体内的代码是在task里面执行的,而上述代码中创建sum变量是在Driver端创建的,函数体内的代码sum+=x是在task里面执行的,task里面对sum进行了累加,然后在Driver端打印sum的值,打印出来的还是Driver端sum的初始值0


  以下代码是用累加器实现的相同功能
@Test
def test(): Unit = {import org.apache.spark.{SparkConf, SparkContext}val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[4]")val sc = new SparkContext(conf)// 创建累加器从sparkContext中val acc = sc.longAccumulator("sumAcc")val rdd = sc.parallelize(List(10,20,30,40,60))//    rdd.foreach(x=>{//      sum+=x//    })// 将要累加的变量放到累加器中rdd.foreach(x=>{acc.add(x)})// 打印累加器的值println(acc.value)}

上面代码的输出结果是160
分析:上述代码中从SparkContext中创建了longAccumulator累加器,起名为sumAcc,然后在task中执行累加的时候调用改累加器的add()方法将要累加的变量加入到累加器中,最后在driver端调用累加器的value方法取出累加器的值,所以就会得出160

总结
累加器的创建:val acc = sc.longAccumulator(“sumAcc”)
向累加器中添加数据:acc.add(x)
取出累加器中的数据:acc.value

注意:

  • Executor端不要获取累加器的值,那样取到的那个值不是累加器最终的累加结果,因为累加器是一个分布式共享的写变量
  • 使用累加器时(也就是向累加器中添加要累加的变量的时候)要将其放在行动算子中。因为在行动算子中这个累加器的值可以保证绝对可靠,如果在转换算子中使用累加器,假如这个spark应用程序有多个job,每个job执行的时候都会执行一遍转换算子,那么这个累加器就会被累加多次,这个值也就不准确了。所以累加器要在行动算子中使用。

应用

  累加器在某些场景下可以避免shuffle。spark中自带的累加器有三个longAccumulator()doubleAccumulator()collectionAccumulator[](),比较常用的是collectionAccumulator[]()

我们用累加器实现WordCount
下面是正常的WordCount代码:

  def main(args: Array[String]): Unit = {import org.apache.spark.{SparkConf, SparkContext}val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[*]")val sc = new SparkContext(conf)val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(_.split(" "))val rdd3 = rdd2.map((_, 1))rdd3.reduceByKey(_ + _).collect().foreach(println)Thread.sleep(100000000)}

结果:
在这里插入图片描述查看web页面:
在这里插入图片描述

下面用collectionAccumulator[]()累加器实现,用累加器替换掉reduceByKey

def main(args: Array[String]): Unit = {import org.apache.spark.{SparkConf, SparkContext}val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[*]")val sc = new SparkContext(conf)//    定义集合累加器,并声明累加器中存放的元素类型    可变的map,可变就是可以直接在原来的集合上修改不会返回新的集合val acc = sc.collectionAccumulator[mutable.Map[String,Int]]("accNum")val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(_.split(" "))val rdd3 = rdd2.map((_, 1))//    rdd3.reduceByKey(_ + _).collect().foreach(println)// 遍历每个二元元组rdd3.foreachPartition(it=>{// 定义一个map用来存放一个分区的累加结果val resMap =  mutable.Map[String,Int]()// 遍历分区的每个元素(hadoop,1)it.foreach(y=>{// 从resMap中取这个元素的key看有没有,没有的话返回0val num = resMap.getOrElse(y._1, 0)// 将取到的数和元素的标记1累加val num2 = num+y._2// 写回到map中resMap.put(y._1,num2)})// 将每个分区的累加结果添加到累加器中acc.add(resMap)})//    println(rdd4.collect())// 在driver端取到累加器的结果,这个结果是Java类型的listval res = acc.value// 添加 scala.collection.JavaConverters._ 用里面的asScala将Java类型的list转为Scala类型import scala.collection.JavaConverters._val scalaList = res.asScala//    将list里面的map压掉,剩下()元组val flatten = scalaList.flatten//  以元组的key分组val grouped = flatten.groupBy(_._1)// 得出每个分组内的次数和val res_end = grouped.map(x => {// 将元组中的第二个次数来一次map然后sum,取出每个单词的频率val sum = x._2.map(y => y._2).sum(x._1, sum)})res_end.foreach(println)Thread.sleep(100000000)
}

结果:
在这里插入图片描述查看web页面:
在这里插入图片描述
通过对比两次的web页面可以发现,使用reduceByKey会有一次shuffle,使用累加器替换掉reduceByKey实现相同的功能,没有产生shuffle,因此累加器在某些聚合场景下可以避免掉shuffle从而在一定程度上提高性能

这篇关于累加器 - 分布式共享写变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/709340

相关文章

java父子线程之间实现共享传递数据

《java父子线程之间实现共享传递数据》本文介绍了Java中父子线程间共享传递数据的几种方法,包括ThreadLocal变量、并发集合和内存队列或消息队列,并提醒注意并发安全问题... 目录通过 ThreadLocal 变量共享数据通过并发集合共享数据通过内存队列或消息队列共享数据注意并发安全问题总结在 J

浅析Rust多线程中如何安全的使用变量

《浅析Rust多线程中如何安全的使用变量》这篇文章主要为大家详细介绍了Rust如何在线程的闭包中安全的使用变量,包括共享变量和修改变量,文中的示例代码讲解详细,有需要的小伙伴可以参考下... 目录1. 向线程传递变量2. 多线程共享变量引用3. 多线程中修改变量4. 总结在Rust语言中,一个既引人入胜又可

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

java如何分布式锁实现和选型

《java如何分布式锁实现和选型》文章介绍了分布式锁的重要性以及在分布式系统中常见的问题和需求,它详细阐述了如何使用分布式锁来确保数据的一致性和系统的高可用性,文章还提供了基于数据库、Redis和Zo... 目录引言:分布式锁的重要性与分布式系统中的常见问题和需求分布式锁的重要性分布式系统中常见的问题和需求

使用Nginx来共享文件的详细教程

《使用Nginx来共享文件的详细教程》有时我们想共享电脑上的某些文件,一个比较方便的做法是,开一个HTTP服务,指向文件所在的目录,这次我们用nginx来实现这个需求,本文将通过代码示例一步步教你使用... 在本教程中,我们将向您展示如何使用开源 Web 服务器 Nginx 设置文件共享服务器步骤 0 —

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

Redis分布式锁使用及说明

《Redis分布式锁使用及说明》本文总结了Redis和Zookeeper在高可用性和高一致性场景下的应用,并详细介绍了Redis的分布式锁实现方式,包括使用Lua脚本和续期机制,最后,提到了RedLo... 目录Redis分布式锁加锁方式怎么会解错锁?举个小案例吧解锁方式续期总结Redis分布式锁如果追求

java如何调用kettle设置变量和参数

《java如何调用kettle设置变量和参数》文章简要介绍了如何在Java中调用Kettle,并重点讨论了变量和参数的区别,以及在Java代码中如何正确设置和使用这些变量,避免覆盖Kettle中已设置... 目录Java调用kettle设置变量和参数java代码中变量会覆盖kettle里面设置的变量总结ja

Perl 特殊变量详解

《Perl特殊变量详解》Perl语言中包含了许多特殊变量,这些变量在Perl程序的执行过程中扮演着重要的角色,:本文主要介绍Perl特殊变量,需要的朋友可以参考下... perl 特殊变量Perl 语言中包含了许多特殊变量,这些变量在 Perl 程序的执行过程中扮演着重要的角色。特殊变量通常用于存储程序的

Python使用pysmb库访问Windows共享文件夹的详细教程

《Python使用pysmb库访问Windows共享文件夹的详细教程》本教程旨在帮助您使用pysmb库,通过SMB(ServerMessageBlock)协议,轻松连接到Windows共享文件夹,并列... 目录前置条件步骤一:导入必要的模块步骤二:配置连接参数步骤三:实例化SMB连接对象并尝试连接步骤四: