104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()

本文主要是介绍104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct(),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

代码

ConcatLongStringUDF.java

GroupConcatDistinctUDAF.java

AreaTop3ProductSpark.java


本篇文章记录各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()。

代码

spark.product

ConcatLongStringUDF.java

package graduation.java.spark.product;import org.apache.spark.sql.api.java.UDF3;/*** FileName: ConcatLongStringUDF* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-4-1 下午8:27* Description:* 将两个字段拼接起来(使用指定的分隔符)*/
public class ConcatLongStringUDF implements UDF3<Long,String,String,String> {private static final  long serialVersionUID = 1L;@Overridepublic String call(Long v1, String v2, String split) throws Exception {return String.valueOf(v1) + split+v2;}
}

GroupConcatDistinctUDAF.java

package graduation.java.spark.product;import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;import java.util.Arrays;/*** FileName: GroupConcatDistinctUDAF* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-4-1 下午8:39* Description:* 内部拼接去重函数(group_concat_distinct)*/
public class GroupConcatDistinctUDAF  extends UserDefinedAggregateFunction {//指定输入数据字段和类型private StructType inputScheam = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("cityInfo",DataTypes.StringType,true)));//指定缓冲数据的字段和类型private StructType bufferSchema = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bufferCityInfo",DataTypes.StringType,true)));//指定返回类型private DataType dataType = DataTypes.StringType;//指定是否是确定性private boolean deterministic = true;@Overridepublic StructType inputSchema() {return inputScheam;}@Overridepublic StructType bufferSchema() {return bufferSchema;}@Overridepublic DataType dataType() {return dataType;}@Overridepublic boolean deterministic() {return deterministic;}/*** 初始化,可以认为是在内部指定一个初始值* @param buffer*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0,"");}/*** 更新* 可以认为 一个一个的将组内的字段传递进来进行逻辑拼接* @param buffer* @param input*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {// 缓冲中的是已经拼接的城市信息String bufferCityInfo = buffer.getString(0);//传递进来的城市信息String cityInfo = input.getString(0);//实现逻辑拼接//判断:之前没有拼接过的某个城市信息,那么这里才可以把它拼接进去if (!bufferCityInfo.contains(cityInfo)){if ("".equals(bufferCityInfo)){bufferCityInfo += cityInfo;}else {bufferCityInfo += "," + cityInfo;}buffer.update(0,bufferCityInfo);}}/*** 合并* update操作,可能是针对一个分组内的部分数据,在某个节点上发生的* 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {String bufferCityInfo1 = buffer1.getString(0);String bufferCityInfo2 = buffer2.getString(0);for (String cityInfo : bufferCityInfo2.split(",")){if (! bufferCityInfo1.contains(cityInfo)){if ("".equals(bufferCityInfo1)){bufferCityInfo1 += cityInfo;}else {bufferCityInfo1 += ","+ cityInfo;}}}buffer1.update(0,bufferCityInfo1);}@Overridepublic Object evaluate(Row row) {return row.getString(0);}
}

AreaTop3ProductSpark.java

        //3.注册字符串拼接函数sqlContext.udf().register("concat_long_string",new ConcatLongStringUDF(),DataTypes.StringType);sqlContext.udf().register("group_concat_distinct",new GroupConcatDistinctUDAF());

 

这篇关于104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/624367

相关文章

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p

hdu1496(用hash思想统计数目)

作为一个刚学hash的孩子,感觉这道题目很不错,灵活的运用的数组的下标。 解题步骤:如果用常规方法解,那么时间复杂度为O(n^4),肯定会超时,然后参考了网上的解题方法,将等式分成两个部分,a*x1^2+b*x2^2和c*x3^2+d*x4^2, 各自作为数组的下标,如果两部分相加为0,则满足等式; 代码如下: #include<iostream>#include<algorithm

hdu1171(母函数或多重背包)

题意:把物品分成两份,使得价值最接近 可以用背包,或者是母函数来解,母函数(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v) 其中指数为价值,每一项的数目为(该物品数+1)个 代码如下: #include<iostream>#include<algorithm>

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定