SparkSQL的两种UDAF的讲解

2023-10-09 03:48
文章标签 讲解 两种 sparksql udaf

本文主要是介绍SparkSQL的两种UDAF的讲解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark的dataframe提供了通用的聚合方法,比如count(),countDistinct(),avg(),max(),min()等等。然而这些函数是针对dataframe设计的,当然sparksql也有类型安全的版本,java和scala语言接口都有,这些就适用于强类型Datasets。本文主要是讲解spark提供的两种聚合函数接口:

1, UserDefinedAggregateFunction

2,Aggregator

这两个接口基本上满足了,用户自定义聚合函数的需求。

UserDefinedAggregateFunction

类UserDefinedAggregateFunction,在文件udaf.scala里面。是实现用户自定义聚合函数UDAF的基础类,首先,我们先看看该类的基本信息

abstract class UserDefinedAggregateFunction extends Serializable {
StructType代表的是该聚合函数输入参数的类型。例如,一个UDAF实现需要两个输入参数,
类型分别是DoubleType和LongType,那么该StructType格式如下:
   new StructType()
   .add("doubleInput",DoubleType)
   .add("LongType",LongType)
那么该udaf就只会识别,这种类型的输入的数据。
 def inputSchema: StructType
  该StructType代表aggregation buffer的类型参数。例如,一个udaf的buffer有
  两个值,类型分别是DoubleType和LongType,那么其格式将会如下:

    new StructType()
     .add("doubleInput", DoubleType)
     .add("longInput", LongType)
    也只会适用于类型格式如上的数据
 
 def bufferSchema: StructType

   dataTypeda代表该UDAF的返回值类型
 def dataType: DataType

   如果该函数是确定性的,那么将会返回true,例如,给相同的输入,就会有相同
   的输出
 def deterministic: Boolean
   
   初始化聚合buffer,例如,给聚合buffer以0值
   在两个初始buffer调用聚合函数,其返回值应该是初始函数自身,例如
   merge(initialBuffer,initialBuffer)应该等于initialBuffer。
 def initialize(buffer: MutableAggregationBuffer): Unit

   利用输入输入去更新给定的聚合buffer,每个输入行都会调用一次该函数
 def update(buffer: MutableAggregationBuffer, input: Row): Unit

   合并两个聚合buffer,并且将更新的buffer返回给buffer1
   该函数在聚合并两个部分聚合数据集的时候调用
 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit

   计算该udaf在给定聚合buffer上的最终结果
 def evaluate(buffer: Row): Any


   使用给定的Column作为输入参数,来为当前UDAF创建一个Column
 @scala.annotation.varargs
 def apply(exprs: Column*): Column = {
val aggregateExpression =
AggregateExpression(
ScalaUDAF(exprs.map(_.expr), this),
       Complete,
       isDistinct = false)
Column(aggregateExpression)
}

   使用给定Column去重后的值作为参数来生成一个Column
 @scala.annotation.varargs
 def distinct(exprs: Column*): Column = {
val aggregateExpression =
AggregateExpression(
ScalaUDAF(exprs.map(_.expr), this),
       Complete,
       isDistinct = true)
Column(aggregateExpression)
}
}

/**
* A `Row` representing a mutable aggregation buffer.
*
* This is not meant to be extended outside of Spark.
*
* @since 1.5.0
*/
@InterfaceStability.Stable
abstract class MutableAggregationBuffer extends Row {

/** Update the ith value of this buffer. */
 def update(i: Int, value: Any): Unit
}

给出一个非类型安全的UDAF实现:


import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession


object UserDefinedUntypedAggregation {


 object MyAverage extends UserDefinedAggregateFunction {
//输入参数的数据类型
   def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
//聚合 buffer的数据类型
   def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的数据类型
   def dataType: DataType = DoubleType
// 给定的相同的输入,就会有相同的输入。
   def deterministic: Boolean = true
   //初始化给定的聚合buffer
   def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
     buffer(1) = 0L
   }
// 用输入的row来更新聚合buffer
   def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
     }
}
// 合并两个聚合buffer,并将合并后的值返回给 `buffer1`
   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 计算最终输出
   def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// $example off:untyped_custom_aggregation$

 def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark SQL user-defined DataFrames aggregation example")
.getOrCreate()

   //为了使用聚合函数,需要先注册
   spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
   // |   name|salary|
   // +-------+------+
   // |Michael|  3000|
   // |   Andy|  4500|
   // | Justin|  3500|
   // |  Berta|  4000|
   // +-------+------+

   val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
   // |average_salary|
   // +--------------+
   // |        3750.0|
   // +--------------+
   // $example off:untyped_custom_aggregation$

   spark.stop()
}

}


Aggregator

用户自定义聚合函数的基类,可以在Dataset中使用,取出一个组的数据,然后聚合。该类的源码

        举个栗子
*   val customSummer =  new Aggregator[Data, Int, Int] {
*     def zero: Int = 0
*     def reduce(b: Int, a: Data): Int = b + a.i
*     def merge(b1: Int, b2: Int): Int = b1 + b2
*     def finish(r: Int): Int = r
*   }.toColumn()
*
*   val ds: Dataset[Data] = ...
*   val aggregated = ds.select(customSummer)
* }}}
* @tparam IN The input type for the aggregation.
* @tparam BUF The type of the intermediate value of the reduction.
* @tparam OUT The type of the final output result.
* @since 1.6.0
*/
@Experimental
@InterfaceStability.Evolving
abstract class Aggregator[-IN, BUF, OUT] extends Serializable {

   该剧和函数的0值。需要满足对于任何输入b,那么b+zero=b
 def zero: BUF

   聚合两个值产生一个新的值,为了提升性能,该函数会修改b,然后直接返回b,而
   不适新生成一个b的对象。

 def reduce(b: BUF, a: IN): BUF
   合并两个中间值
 def merge(b1: BUF, b2: BUF): BUF

   转换reduce的输出
 def finish(reduction: BUF): OUT

   为中间值类型提供一个编码器
 def bufferEncoder: Encoder[BUF]

   为最终的输出结果提供一个编码器
 def outputEncoder: Encoder[OUT]
   
   将该聚合函数返回为一个TypedColumn,目的是为了能在Dataset中使用
 def toColumn: TypedColumn[IN, OUT] = {
implicit val bEncoder = bufferEncoder
implicit val cEncoder = outputEncoder

val expr =
AggregateExpression(
TypedAggregateExpression(this),
       Complete,
       isDistinct = false)

new TypedColumn[IN, OUT](expr, encoderFor[OUT])
}
}

该类的一个实现

// 
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
//

object UserDefinedTypedAggregation {

// 自定义输入类型和输出类型
 case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
// 0值
   def zero: Average = Average(0L, 0L)
// 合并,输入和buffer
   def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
     buffer
}
// 合并中间结果
   def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 转换reduce的输出类型
   def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 为中间值类型指定编码器
   def bufferEncoder: Encoder[Average] = Encoders.product
   // 为输出类型指定编码器。
   def outputEncoder: Encoder[Double] = Encoders.scalaDouble
 }
//

 def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark SQL user-defined Datasets aggregation example")
.getOrCreate()

import spark.implicits._

//
   val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
   // |   name|salary|
   // +-------+------+
   // |Michael|  3000|
   // |   Andy|  4500|
   // | Justin|  3500|
   // |  Berta|  4000|
   // +-------+------+

   // Convert the function to a `TypedColumn` and give it a name
   val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
   // |average_salary|
   // +--------------+
   // |        3750.0|
   // +--------------+
   // $example off:typed_custom_aggregation$

   spark.stop()
}

}

该例子整理自spark2.2.1源码,希望对大家有帮助,欢迎大家进入知识星球,想大神学习,进入高级大牛行列。

640?wx_fmt=jpeg


这篇关于SparkSQL的两种UDAF的讲解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/170109

相关文章

计算机毕业设计 大学志愿填报系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不到哟~Java毕业设计项目~热门选题推荐《1000套》 目录 1.技术选型 2.开发工具 3.功能

ispunct函数讲解 <ctype.h>头文件函数

目录 1.头文件函数 2.ispunct函数使用  小心!VS2022不可直接接触,否则..!没有这个必要,方源一把抓住VS2022,顷刻 炼化! 1.头文件函数 以上函数都需要包括头文件<ctype.h> ,其中包括 ispunct 函数 #include<ctype.h> 2.ispunct函数使用 简述: ispunct函数一种判断字符是否为标点符号的函

深度学习速通系列:深度学习算法讲解

深度学习算法是一系列基于人工神经网络的算法,它们通过模拟人脑处理信息的方式来学习和解决复杂问题。这些算法在图像识别、语音识别、自然语言处理、游戏等领域取得了显著的成就。以下是一些流行的深度学习算法及其基本原理: 1. 前馈神经网络(Feedforward Neural Networks, FNN) 原理:FNN 是最基本的神经网络结构,它由输入层、隐藏层和输出层组成。信息从输入层流向隐藏层,最

C#设计模式(1)——单例模式(讲解非常清楚)

一、引言 最近在学设计模式的一些内容,主要的参考书籍是《Head First 设计模式》,同时在学习过程中也查看了很多博客园中关于设计模式的一些文章的,在这里记录下我的一些学习笔记,一是为了帮助我更深入地理解设计模式,二同时可以给一些初学设计模式的朋友一些参考。首先我介绍的是设计模式中比较简单的一个模式——单例模式(因为这里只牵涉到一个类) 二、单例模式的介绍 说到单例模式,大家第一

android两种日志获取log4j

android   log4j 加载日志使用方法; 先上图: 有两种方式: 1:直接使用架包 加载(两个都要使用); 架包:android-logging-log4j-1.0.3.jar 、log4j-1.2.15.jar  (说明:也可以使用架包:log4j-1.2.17.jar)  2:对架包输入日志的二次封装使用; 1:直接使用 log4j 日志框架获取日志信息: A:配置 日志 文

[项目][CMP][直接向堆申请页为单位的大块内存]详细讲解

目录 1.系统调用 1.系统调用 Windows和Linux下如何直接向堆申请页为单位的大块内存: VirtualAllocbrk和mmap // 直接去堆上按页申请空间static inline void *SystemAlloc(size_t kpage){#ifdef _WIN32void *ptr = VirtualAlloc(0, kpage << 13,

c++11工厂子类实现自注册的两种方法

文章目录 一、产品类构建1. 猫基类与各品种猫子类2.狗基类与各品种狗子类 二、工厂类构建三、客户端使用switch-case实现调用不同工厂子类四、自注册方法一:公开注册函数显式注册五、自注册方法二:构造函数隐形注册总结 一、产品类构建 1. 猫基类与各品种猫子类 class Cat {public:virtual void Printer() = 0;};class

高斯平面直角坐标讲解,以及地理坐标转换高斯平面直角坐标

高斯平面直角坐标系(Gauss-Krüger 坐标系)是基于 高斯-克吕格投影 的一种常见的平面坐标系统,主要用于地理信息系统 (GIS)、测绘和工程等领域。该坐标系将地球表面的经纬度(地理坐标)通过一种投影方式转换为平面直角坐标,以便在二维平面中进行距离、面积和角度的计算。 一 投影原理 高斯平面直角坐标系使用的是 高斯-克吕格投影(Gauss-Krüger Projection),这是 横

车险该怎么买?行业人讲解车险

很多车主对汽车保险知识不了解,稀里糊涂的买了车辆保险,但是出险时发现很多不赔的,还有很多对自己来说没什么用的保险,花了不少钱,还没买到自己想要的,殊不知只要多了解点汽车保险知识就能轻松省下一大笔钱并且买到自己真正想要的,何乐而不为呢! 因为卖保险的或者4S店,都是按照常规情况给你推荐保险,具体用车情况,只有你自己最清楚,所以保险是个个性化定制的产品,需要什么买什么,不需要的就没必要购买了。 一般

VB和51单片机串口通信讲解(只针对VB部分)

标记:该篇文章全部搬自如下网址:http://www.crystalradio.cn/thread-321839-1-1.html,谢谢啦            里面关于中文接收的部分,大家可以好好学习下,题主也在研究中................... Commport;设置或返回串口号。 SettingS:以字符串的形式设置或返回串口通信参数。 Portopen:设置或返回串口