104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()

本文主要是介绍104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct(),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

代码

ConcatLongStringUDF.java

GroupConcatDistinctUDAF.java

AreaTop3ProductSpark.java


本篇文章记录各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()。

代码

spark.product

ConcatLongStringUDF.java

package graduation.java.spark.product;import org.apache.spark.sql.api.java.UDF3;/*** FileName: ConcatLongStringUDF* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-4-1 下午8:27* Description:* 将两个字段拼接起来(使用指定的分隔符)*/
public class ConcatLongStringUDF implements UDF3<Long,String,String,String> {private static final  long serialVersionUID = 1L;@Overridepublic String call(Long v1, String v2, String split) throws Exception {return String.valueOf(v1) + split+v2;}
}

GroupConcatDistinctUDAF.java

package graduation.java.spark.product;import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;import java.util.Arrays;/*** FileName: GroupConcatDistinctUDAF* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-4-1 下午8:39* Description:* 内部拼接去重函数(group_concat_distinct)*/
public class GroupConcatDistinctUDAF  extends UserDefinedAggregateFunction {//指定输入数据字段和类型private StructType inputScheam = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("cityInfo",DataTypes.StringType,true)));//指定缓冲数据的字段和类型private StructType bufferSchema = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bufferCityInfo",DataTypes.StringType,true)));//指定返回类型private DataType dataType = DataTypes.StringType;//指定是否是确定性private boolean deterministic = true;@Overridepublic StructType inputSchema() {return inputScheam;}@Overridepublic StructType bufferSchema() {return bufferSchema;}@Overridepublic DataType dataType() {return dataType;}@Overridepublic boolean deterministic() {return deterministic;}/*** 初始化,可以认为是在内部指定一个初始值* @param buffer*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0,"");}/*** 更新* 可以认为 一个一个的将组内的字段传递进来进行逻辑拼接* @param buffer* @param input*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {// 缓冲中的是已经拼接的城市信息String bufferCityInfo = buffer.getString(0);//传递进来的城市信息String cityInfo = input.getString(0);//实现逻辑拼接//判断:之前没有拼接过的某个城市信息,那么这里才可以把它拼接进去if (!bufferCityInfo.contains(cityInfo)){if ("".equals(bufferCityInfo)){bufferCityInfo += cityInfo;}else {bufferCityInfo += "," + cityInfo;}buffer.update(0,bufferCityInfo);}}/*** 合并* update操作,可能是针对一个分组内的部分数据,在某个节点上发生的* 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {String bufferCityInfo1 = buffer1.getString(0);String bufferCityInfo2 = buffer2.getString(0);for (String cityInfo : bufferCityInfo2.split(",")){if (! bufferCityInfo1.contains(cityInfo)){if ("".equals(bufferCityInfo1)){bufferCityInfo1 += cityInfo;}else {bufferCityInfo1 += ","+ cityInfo;}}}buffer1.update(0,bufferCityInfo1);}@Overridepublic Object evaluate(Row row) {return row.getString(0);}
}

AreaTop3ProductSpark.java

        //3.注册字符串拼接函数sqlContext.udf().register("concat_long_string",new ConcatLongStringUDF(),DataTypes.StringType);sqlContext.udf().register("group_concat_distinct",new GroupConcatDistinctUDAF());

 

这篇关于104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/624367

相关文章

Django调用外部Python程序的完整项目实战

《Django调用外部Python程序的完整项目实战》Django是一个强大的PythonWeb框架,它的设计理念简洁优雅,:本文主要介绍Django调用外部Python程序的完整项目实战,文中通... 目录一、为什么 Django 需要调用外部 python 程序二、三种常见的调用方式方式 1:直接 im

SpringBoot全局异常拦截与自定义错误页面实现过程解读

《SpringBoot全局异常拦截与自定义错误页面实现过程解读》本文介绍了SpringBoot中全局异常拦截与自定义错误页面的实现方法,包括异常的分类、SpringBoot默认异常处理机制、全局异常拦... 目录一、引言二、Spring Boot异常处理基础2.1 异常的分类2.2 Spring Boot默

Mybatis对MySQL if 函数的不支持问题解读

《Mybatis对MySQLif函数的不支持问题解读》接手项目后,为了实现多租户功能,引入了Mybatis-plus,发现之前运行正常的SQL语句报错,原因是Mybatis不支持MySQL的if函... 目录MyBATis对mysql if 函数的不支持问题描述经过查询网上搜索资料找到原因解决方案总结Myb

自定义注解SpringBoot防重复提交AOP方法详解

《自定义注解SpringBoot防重复提交AOP方法详解》该文章描述了一个防止重复提交的流程,通过HttpServletRequest对象获取请求信息,生成唯一标识,使用Redis分布式锁判断请求是否... 目录防重复提交流程引入依赖properties配置自定义注解切面Redis工具类controller

VSCode开发中有哪些好用的插件和快捷键

《VSCode开发中有哪些好用的插件和快捷键》作为全球最受欢迎的编程工具,VSCode的快捷键体系是提升开发效率的核心密码,:本文主要介绍VSCode开发中有哪些好用的插件和快捷键的相关资料,文中... 目录前言1、vscode插件1.1 Live-server1.2 Auto Rename Tag1.3

Python容器转换与共有函数举例详解

《Python容器转换与共有函数举例详解》Python容器是Python编程语言中非常基础且重要的概念,它们提供了数据的存储和组织方式,下面:本文主要介绍Python容器转换与共有函数的相关资料,... 目录python容器转换与共有函数详解一、容器类型概览二、容器类型转换1. 基本容器转换2. 高级转换示

在C#中分离饼图的某个区域的操作指南

《在C#中分离饼图的某个区域的操作指南》在处理Excel饼图时,我们可能需要将饼图的各个部分分离出来,以使它们更加醒目,Spire.XLS提供了Series.DataFormat.Percent属性,... 目录引言如何设置饼图各分片之间分离宽度的代码示例:从整个饼图中分离单个分片的代码示例:引言在处理

Agent开发核心技术解析以及现代Agent架构设计

《Agent开发核心技术解析以及现代Agent架构设计》在人工智能领域,Agent并非一个全新的概念,但在大模型时代,它被赋予了全新的生命力,简单来说,Agent是一个能够自主感知环境、理解任务、制定... 目录一、回归本源:到底什么是Agent?二、核心链路拆解:Agent的"大脑"与"四肢"1. 规划模

JAVA项目swing转javafx语法规则以及示例代码

《JAVA项目swing转javafx语法规则以及示例代码》:本文主要介绍JAVA项目swing转javafx语法规则以及示例代码的相关资料,文中详细讲解了主类继承、窗口创建、布局管理、控件替换、... 目录最常用的“一行换一行”速查表(直接全局替换)实际转换示例(JFramejs → JavaFX)迁移建

JavaWeb项目创建、部署、连接数据库保姆级教程(tomcat)

《JavaWeb项目创建、部署、连接数据库保姆级教程(tomcat)》:本文主要介绍如何在IntelliJIDEA2020.1中创建和部署一个JavaWeb项目,包括创建项目、配置Tomcat服务... 目录简介:一、创建项目二、tomcat部署1、将tomcat解压在一个自己找得到路径2、在idea中添加