本文主要是介绍104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct(),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
代码
ConcatLongStringUDF.java
GroupConcatDistinctUDAF.java
AreaTop3ProductSpark.java
本篇文章记录各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()。
代码
spark.product
ConcatLongStringUDF.java
package graduation.java.spark.product;import org.apache.spark.sql.api.java.UDF3;/*** FileName: ConcatLongStringUDF* Author: hadoop* Email: 3165845957@qq.com* Date: 19-4-1 下午8:27* Description:* 将两个字段拼接起来(使用指定的分隔符)*/ public class ConcatLongStringUDF implements UDF3<Long,String,String,String> {private static final long serialVersionUID = 1L;@Overridepublic String call(Long v1, String v2, String split) throws Exception {return String.valueOf(v1) + split+v2;} }
GroupConcatDistinctUDAF.java
package graduation.java.spark.product;import org.apache.spark.sql.Row; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType;import java.util.Arrays;/*** FileName: GroupConcatDistinctUDAF* Author: hadoop* Email: 3165845957@qq.com* Date: 19-4-1 下午8:39* Description:* 内部拼接去重函数(group_concat_distinct)*/ public class GroupConcatDistinctUDAF extends UserDefinedAggregateFunction {//指定输入数据字段和类型private StructType inputScheam = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("cityInfo",DataTypes.StringType,true)));//指定缓冲数据的字段和类型private StructType bufferSchema = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bufferCityInfo",DataTypes.StringType,true)));//指定返回类型private DataType dataType = DataTypes.StringType;//指定是否是确定性private boolean deterministic = true;@Overridepublic StructType inputSchema() {return inputScheam;}@Overridepublic StructType bufferSchema() {return bufferSchema;}@Overridepublic DataType dataType() {return dataType;}@Overridepublic boolean deterministic() {return deterministic;}/*** 初始化,可以认为是在内部指定一个初始值* @param buffer*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0,"");}/*** 更新* 可以认为 一个一个的将组内的字段传递进来进行逻辑拼接* @param buffer* @param input*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {// 缓冲中的是已经拼接的城市信息String bufferCityInfo = buffer.getString(0);//传递进来的城市信息String cityInfo = input.getString(0);//实现逻辑拼接//判断:之前没有拼接过的某个城市信息,那么这里才可以把它拼接进去if (!bufferCityInfo.contains(cityInfo)){if ("".equals(bufferCityInfo)){bufferCityInfo += cityInfo;}else {bufferCityInfo += "," + cityInfo;}buffer.update(0,bufferCityInfo);}}/*** 合并* update操作,可能是针对一个分组内的部分数据,在某个节点上发生的* 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {String bufferCityInfo1 = buffer1.getString(0);String bufferCityInfo2 = buffer2.getString(0);for (String cityInfo : bufferCityInfo2.split(",")){if (! bufferCityInfo1.contains(cityInfo)){if ("".equals(bufferCityInfo1)){bufferCityInfo1 += cityInfo;}else {bufferCityInfo1 += ","+ cityInfo;}}}buffer1.update(0,bufferCityInfo1);}@Overridepublic Object evaluate(Row row) {return row.getString(0);} }
AreaTop3ProductSpark.java
//3.注册字符串拼接函数sqlContext.udf().register("concat_long_string",new ConcatLongStringUDF(),DataTypes.StringType);sqlContext.udf().register("group_concat_distinct",new GroupConcatDistinctUDAF());
这篇关于104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!