Spark SQL用UDF实现按列特征重分区 repatition

2024-04-06 18:58

本文主要是介绍Spark SQL用UDF实现按列特征重分区 repatition,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

转:https://cloud.tencent.com/developer/article/1371921

解决问题之前,要先了解一下Spark 原理,要想进行相同数据归类到相同分区,肯定要有产生shuffle步骤。

比如,F到G这个shuffle过程,那么如何决定数据到哪个分区去的呢?这就有一个分区器的概念,默认是hash分区器。

假如,我们能在分区这个地方着手的话肯定能实现我们的目标。

那么,在没有看Spark Dataset的接口之前,浪尖也不知道Spark Dataset有没有给我门提供这种类型的API,抱着试一试的心态,可以去Dataset类看一下,这个时候会发现有一个函数叫做repartition。

/*** Returns a new Dataset partitioned by the given partitioning expressions, using* `spark.sql.shuffle.partitions` as number of partitions.* The resulting Dataset is hash partitioned.** This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).** @group typedrel* @since 2.0.0*/@scala.annotation.varargsdef repartition(partitionExprs: Column*): Dataset[T] = {repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)}

可以传入列表达式来进行重新分区,产生的新的Dataset的分区数是由参数spark.sql.shuffle.partitions决定,那么是不是可以满足我们的需求呢?

明显,直接用是不行的,可以间接使用UDF来实现该功能。

方式一-简单重分区

首先,实现一个UDF截取列值共同前缀,当然根据业务需求来写该udf

val substring = udf{(str: String) => {str.substring(0,str.length-1)}}

注册UDF

spark.udf.register("substring",substring)

创建Dataset

val sales = spark.createDataFrame(Seq(("Warsaw1", 2016, 100),("Warsaw2", 2017, 200),("Warsaw3", 2016, 100),("Warsaw4", 2017, 200),("Beijing1", 2017, 200),("Beijing2", 2017, 200),("Warsaw4", 2017, 200),("Boston1", 2015, 50),("Boston2", 2016, 150))).toDF("city", "year", "amount")

执行充分去操作

val res = sales.repartition(substring(col("city")))

打印分区ID及对应的输出结果

res.foreachPartition(partition=>{println("---------------------> Partition start ")println("partitionID is "+TaskContext.getPartitionId())partition.foreach(println)println("=====================> Partition stop ")})

浪尖这里spark.sql.shuffle.partitions设置的数值为10.

输出结果截图如下:

方式二-SQL实现

对于Dataset的repartition产生的shuffle是不需要进行聚合就可以产生shuffle使得按照字段值进行归类到某些分区。

SQL的实现要实现重分区要使用group by,然后udf跟上面一样,需要进行聚合操作。

完整代码如下:

val sales = spark.createDataFrame(Seq(("Warsaw1", 2016, 100),("Warsaw2", 2017, 200),("Warsaw3", 2016, 100),("Warsaw4", 2017, 200),("Beijing1", 2017, 200),("Beijing2", 2017, 200),("Warsaw4", 2017, 200),("Boston1", 2015, 50),("Boston2", 2016, 150))).toDF("city", "year", "amount")sales.registerTempTable("temp");val substring = udf{(str: String) => {str.substring(0,str.length-1)}}spark.udf.register("substring",substring)val res = spark.sql("select sum(amount) from temp group by substring(city)")
//res.foreachPartition(partition=>{println("---------------------> Partition start ")println("partitionID is "+TaskContext.getPartitionId())partition.foreach(println)println("=====================> Partition stop ")})

输出结果如下:

由上面的结果也可以看到task执行结束时间是无序的。

浪尖在这里主要是讲了Spark SQL 如何实现按照自己的需求对某列重分区。

那么,浪尖在这里就顺带问一下,如何用Spark Core实现该功能呢?

这篇关于Spark SQL用UDF实现按列特征重分区 repatition的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/880586

相关文章

C++对象布局及多态实现探索之内存布局(整理的很多链接)

本文通过观察对象的内存布局,跟踪函数调用的汇编代码。分析了C++对象内存的布局情况,虚函数的执行方式,以及虚继承,等等 文章链接:http://dev.yesky.com/254/2191254.shtml      论C/C++函数间动态内存的传递 (2005-07-30)   当你涉及到C/C++的核心编程的时候,你会无止境地与内存管理打交道。 文章链接:http://dev.yesky

mysql索引四(组合索引)

单列索引,即一个索引只包含单个列,一个表可以有多个单列索引,但这不是组合索引;组合索引,即一个索引包含多个列。 因为有事,下面内容全部转自:https://www.cnblogs.com/farmer-cabbage/p/5793589.html 为了形象地对比单列索引和组合索引,为表添加多个字段:    CREATE TABLE mytable( ID INT NOT NULL, use

mysql索引三(全文索引)

前面分别介绍了mysql索引一(普通索引)、mysql索引二(唯一索引)。 本文学习mysql全文索引。 全文索引(也称全文检索)是目前搜索引擎使用的一种关键技术。它能够利用【分词技术】等多种算法智能分析出文本文字中关键词的频率和重要性,然后按照一定的算法规则智能地筛选出我们想要的搜索结果。 在MySql中,创建全文索引相对比较简单。例如:我们有一个文章表(article),其中有主键ID(

mysql索引二(唯一索引)

前文中介绍了MySQL中普通索引用法,和没有索引的区别。mysql索引一(普通索引) 下面学习一下唯一索引。 创建唯一索引的目的不是为了提高访问速度,而只是为了避免数据出现重复。唯一索引可以有多个但索引列的值必须唯一,索引列的值允许有空值。如果能确定某个数据列将只包含彼此各不相同的值,在为这个数据列创建索引的时候就应该使用关键字UNIQUE,把它定义为一个唯一索引。 添加数据库唯一索引的几种

mysql索引一(普通索引)

mysql的索引分为两大类,聚簇索引、非聚簇索引。聚簇索引是按照数据存放的物理位置为顺序的,而非聚簇索引则不同。聚簇索引能够提高多行检索的速度、非聚簇索引则对单行检索的速度很快。         在这两大类的索引类型下,还可以降索引分为4个小类型:         1,普通索引:最基本的索引,没有任何限制,是我们经常使用到的索引。         2,唯一索引:与普通索引

通过SSH隧道实现通过远程服务器上外网

搭建隧道 autossh -M 0 -f -D 1080 -C -N user1@remotehost##验证隧道是否生效,查看1080端口是否启动netstat -tuln | grep 1080## 测试ssh 隧道是否生效curl -x socks5h://127.0.0.1:1080 -I http://www.github.com 将autossh 设置为服务,隧道开机启动

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测 目录 时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测基本介绍程序设计参考资料 基本介绍 MATLAB实现LSTM时间序列未来多步预测-递归预测。LSTM是一种含有LSTM区块(blocks)或其他的一种类神经网络,文献或其他资料中LSTM区块可能被描述成智能网络单元,因为

vue项目集成CanvasEditor实现Word在线编辑器

CanvasEditor实现Word在线编辑器 官网文档:https://hufe.club/canvas-editor-docs/guide/schema.html 源码地址:https://github.com/Hufe921/canvas-editor 前提声明: 由于CanvasEditor目前不支持vue、react 等框架开箱即用版,所以需要我们去Git下载源码,拿到其中两个主

android一键分享功能部分实现

为什么叫做部分实现呢,其实是我只实现一部分的分享。如新浪微博,那还有没去实现的是微信分享。还有一部分奇怪的问题:我QQ分享跟QQ空间的分享功能,我都没配置key那些都是原本集成就有的key也可以实现分享,谁清楚的麻烦详解下。 实现分享功能我们可以去www.mob.com这个网站集成。免费的,而且还有短信验证功能。等这分享研究完后就研究下短信验证功能。 开始实现步骤(新浪分享,以下是本人自己实现