Spark SQL自定义collect_list分组排序

2023-11-08 21:15

本文主要是介绍Spark SQL自定义collect_list分组排序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

想要在spark sql中对group by + concat_ws()的字段进行排序,可以参考如下方法。
原始数据如下:

+---+-----+----+
|id |name |type|
+---+-----+----+
|1  |name1|p   |
|2  |name2|p   |
|3  |name3|p   |
|1  |x1   |q   |
|2  |x2   |q   |
|3  |x3   |q   |
+---+-----+----+

目标数据如下:

+----+---------------------+
|type|value_list           |
+----+---------------------+
|p   |[name3, name2, name1]|
|q   |[x3, x2, x1]         |
+----+---------------------+

spark-shell:

val df=Seq((1,"name1","p"),(2,"name2","p"),(3,"name3","p"),(1,"x1","q"),(2,"x2","q"),(3,"x3","q")).toDF("id","name","type")
df.show(false)

1.使用开窗函数

df.createOrReplaceTempView("test")
spark.sql("select type,max(c) as c1 from (select type,concat_ws('&',collect_list(trim(name)) over(partition by type order by id desc)) as c  from test) as x group by type ")

因为使用开窗函数本身会使用比较多的资源,
这种方式在大数据量下性能会比较慢,所以尝试下面的操作。

2.使用struct和sort_array(array,asc?true,flase)的方式来进行,效率高些:

val df3=spark.sql("select type, concat_ws('&',sort_array(collect_list(struct(id,name)),false).name) as c from test group by type ")
df3.show(false)

例如:计算一个结果形如:

user_id    stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time 

需要按照time 升序排,则:

Dataset<Row> splitStkView = session.sql("select client_id, innercode, entrust_bs, business_amount, business_price, trade_date from\n" +"(select client_id,\n" +"       split(action,':')[0] as innercode,\n" +"       split(action,':')[1] as entrust_bs,\n" +"       split(action,':')[2] as business_amount,\n" +"       split(action,':')[3] as business_price,\n" +"       split(action,':')[4] as trade_date,\n" +"       ROW_NUMBER() OVER(PARTITION BY split(action,':')[0] ORDER BY split(action,':')[4] DESC) AS rn\n" +"from stk_temp)\n" +"where rn <= 5000");splitStkView.createOrReplaceTempView("splitStkView");Dataset<Row> groupStkView = session.sql("select client_id, CONCAT(innercode, ':', entrust_bs, ':', business_amount, ':', business_price, ':', trade_date) as behive, trade_date from splitStkView");groupStkView.createOrReplaceTempView("groupStkView");Dataset<Row> resultData = session.sql("SELECT client_id, concat_ws('\t',sort_array(collect_list(struct(trade_date, behive)),true).behive) as behives FROM groupStkView GROUP BY client_id");

3.udf的方式

import org.apache.spark.sql.functions._
import org.apache.spark.sql._
val sortUdf = udf((rows: Seq[Row]) => {rows.map { case Row(id:Int, value:String) => (id, value) }.sortBy { case (id, value) => -id } //id if asc.map { case (id, value) => value }
})val grouped = df.groupBy(col("type")).agg(collect_list(struct("id", "name")) as "id_name")
val r1 = grouped.select(col("type"), sortUdf(col("id_name")).alias("value_list"))
r1.show(false)

这篇关于Spark SQL自定义collect_list分组排序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/372561

相关文章

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

【数据结构】——原来排序算法搞懂这些就行,轻松拿捏

前言:快速排序的实现最重要的是找基准值,下面让我们来了解如何实现找基准值 基准值的注释:在快排的过程中,每一次我们要取一个元素作为枢纽值,以这个数字来将序列划分为两部分。 在此我们采用三数取中法,也就是取左端、中间、右端三个数,然后进行排序,将中间数作为枢纽值。 快速排序实现主框架: //快速排序 void QuickSort(int* arr, int left, int rig

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

usaco 1.3 Mixing Milk (结构体排序 qsort) and hdu 2020(sort)

到了这题学会了结构体排序 于是回去修改了 1.2 milking cows 的算法~ 结构体排序核心: 1.结构体定义 struct Milk{int price;int milks;}milk[5000]; 2.自定义的比较函数,若返回值为正,qsort 函数判定a>b ;为负,a<b;为0,a==b; int milkcmp(const void *va,c

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份