104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()

本文主要是介绍104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct(),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

代码

ConcatLongStringUDF.java

GroupConcatDistinctUDAF.java

AreaTop3ProductSpark.java


本篇文章记录各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()。

代码

spark.product

ConcatLongStringUDF.java

package graduation.java.spark.product;import org.apache.spark.sql.api.java.UDF3;/*** FileName: ConcatLongStringUDF* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-4-1 下午8:27* Description:* 将两个字段拼接起来(使用指定的分隔符)*/
public class ConcatLongStringUDF implements UDF3<Long,String,String,String> {private static final  long serialVersionUID = 1L;@Overridepublic String call(Long v1, String v2, String split) throws Exception {return String.valueOf(v1) + split+v2;}
}

GroupConcatDistinctUDAF.java

package graduation.java.spark.product;import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;import java.util.Arrays;/*** FileName: GroupConcatDistinctUDAF* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-4-1 下午8:39* Description:* 内部拼接去重函数(group_concat_distinct)*/
public class GroupConcatDistinctUDAF  extends UserDefinedAggregateFunction {//指定输入数据字段和类型private StructType inputScheam = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("cityInfo",DataTypes.StringType,true)));//指定缓冲数据的字段和类型private StructType bufferSchema = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bufferCityInfo",DataTypes.StringType,true)));//指定返回类型private DataType dataType = DataTypes.StringType;//指定是否是确定性private boolean deterministic = true;@Overridepublic StructType inputSchema() {return inputScheam;}@Overridepublic StructType bufferSchema() {return bufferSchema;}@Overridepublic DataType dataType() {return dataType;}@Overridepublic boolean deterministic() {return deterministic;}/*** 初始化,可以认为是在内部指定一个初始值* @param buffer*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0,"");}/*** 更新* 可以认为 一个一个的将组内的字段传递进来进行逻辑拼接* @param buffer* @param input*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {// 缓冲中的是已经拼接的城市信息String bufferCityInfo = buffer.getString(0);//传递进来的城市信息String cityInfo = input.getString(0);//实现逻辑拼接//判断:之前没有拼接过的某个城市信息,那么这里才可以把它拼接进去if (!bufferCityInfo.contains(cityInfo)){if ("".equals(bufferCityInfo)){bufferCityInfo += cityInfo;}else {bufferCityInfo += "," + cityInfo;}buffer.update(0,bufferCityInfo);}}/*** 合并* update操作,可能是针对一个分组内的部分数据,在某个节点上发生的* 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {String bufferCityInfo1 = buffer1.getString(0);String bufferCityInfo2 = buffer2.getString(0);for (String cityInfo : bufferCityInfo2.split(",")){if (! bufferCityInfo1.contains(cityInfo)){if ("".equals(bufferCityInfo1)){bufferCityInfo1 += cityInfo;}else {bufferCityInfo1 += ","+ cityInfo;}}}buffer1.update(0,bufferCityInfo1);}@Overridepublic Object evaluate(Row row) {return row.getString(0);}
}

AreaTop3ProductSpark.java

        //3.注册字符串拼接函数sqlContext.udf().register("concat_long_string",new ConcatLongStringUDF(),DataTypes.StringType);sqlContext.udf().register("group_concat_distinct",new GroupConcatDistinctUDAF());

 

这篇关于104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/624367

相关文章

Kotlin 作用域函数apply、let、run、with、also使用指南

《Kotlin作用域函数apply、let、run、with、also使用指南》在Kotlin开发中,作用域函数(ScopeFunctions)是一组能让代码更简洁、更函数式的高阶函数,本文将... 目录一、引言:为什么需要作用域函数?二、作用域函China编程数详解1. apply:对象配置的 “流式构建器”最

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

Python基于wxPython和FFmpeg开发一个视频标签工具

《Python基于wxPython和FFmpeg开发一个视频标签工具》在当今数字媒体时代,视频内容的管理和标记变得越来越重要,无论是研究人员需要对实验视频进行时间点标记,还是个人用户希望对家庭视频进行... 目录引言1. 应用概述2. 技术栈分析2.1 核心库和模块2.2 wxpython作为GUI选择的优

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

一文教你如何将maven项目转成web项目

《一文教你如何将maven项目转成web项目》在软件开发过程中,有时我们需要将一个普通的Maven项目转换为Web项目,以便能够部署到Web容器中运行,本文将详细介绍如何通过简单的步骤完成这一转换过程... 目录准备工作步骤一:修改​​pom.XML​​1.1 添加​​packaging​​标签1.2 添加

tomcat多实例部署的项目实践

《tomcat多实例部署的项目实践》Tomcat多实例是指在一台设备上运行多个Tomcat服务,这些Tomcat相互独立,本文主要介绍了tomcat多实例部署的项目实践,具有一定的参考价值,感兴趣的可... 目录1.创建项目目录,测试文China编程件2js.创建实例的安装目录3.准备实例的配置文件4.编辑实例的

利用Python开发Markdown表格结构转换为Excel工具

《利用Python开发Markdown表格结构转换为Excel工具》在数据管理和文档编写过程中,我们经常使用Markdown来记录表格数据,但它没有Excel使用方便,所以本文将使用Python编写一... 目录1.完整代码2. 项目概述3. 代码解析3.1 依赖库3.2 GUI 设计3.3 解析 Mark

Android Kotlin 高阶函数详解及其在协程中的应用小结

《AndroidKotlin高阶函数详解及其在协程中的应用小结》高阶函数是Kotlin中的一个重要特性,它能够将函数作为一等公民(First-ClassCitizen),使得代码更加简洁、灵活和可... 目录1. 引言2. 什么是高阶函数?3. 高阶函数的基础用法3.1 传递函数作为参数3.2 Lambda

如何自定义Nginx JSON日志格式配置

《如何自定义NginxJSON日志格式配置》Nginx作为最流行的Web服务器之一,其灵活的日志配置能力允许我们根据需求定制日志格式,本文将详细介绍如何配置Nginx以JSON格式记录访问日志,这种... 目录前言为什么选择jsON格式日志?配置步骤详解1. 安装Nginx服务2. 自定义JSON日志格式各

springboot集成Deepseek4j的项目实践

《springboot集成Deepseek4j的项目实践》本文主要介绍了springboot集成Deepseek4j的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录Deepseek4j快速开始Maven 依js赖基础配置基础使用示例1. 流式返回示例2. 进阶