Spark Paimon 中为什么我指定的分区没有下推

2023-12-19 14:45

本文主要是介绍Spark Paimon 中为什么我指定的分区没有下推,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

最近在使用 Paimon 的时候遇到了一件很有意思的事情,写的 SQL 居然读取的数据不下推,明明是分区表,但是却全量扫描了。
目前使用的版本信息如下:
Spark 3.5.0
Paimon 0.6.0
paimon的建表语句如下:

CREATE TABLE `table_demo`(`user_id` string COMMENT 'from deserializer' )
PARTITIONED BY ( `dt` string COMMENT '日期, yyyyMMdd', `hour` string COMMENT '小时, HH')
ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe' 
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler' 
WITH SERDEPROPERTIES ( 'serialization.format'='1')
LOCATION'xxxx'
TBLPROPERTIES ('bucket'='50', 'bucketing_version'='2', 'bukect-key'='user_id', 'file.format'='parquet', 'merge-engine'='partial-update', 'partial-update.ignore-delete'='true', 'primary-key'='user_id', 'transient_lastDdlTime'='1701679855', 'write-only'='false')

查询的SQL如下:

select * from 
table_demo
where dt =20231212
and hour =10
limit 100;

注意我们这里写的dt是整数类型,而表中定义的是字符串类型

结论及解决方法

结论

具体的原因是Spark DSv2中的规则 V2ScanRelationPushDown.pushDownFilters 对于 Cast类型转换表达式不会传递到DataSource端,所以只会在读取完Source转换进行过滤,
这种情况下,对于文件的读取IO会增大,但是对于shuffle等操作是不会有性能的影响的。

解决方法

对于分区字段来说,我们在写SQL对分区字段进行过滤的时候,保持和分区字段类型一致

分析

错误写法分析

针对于错误的写法,也就是导致读取全量数据的写法,我们分析一下,首先是类型转换阶段,在Spark中,对于类型不匹配的问题,spark会用规则进行转换,具体的规则是
CombinedTypeCoercionRule,
在日志中可以看到:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule ==='GlobalLimit 100                                                                                   'GlobalLimit 100+- 'LocalLimit 100                                                                                     +- 'LocalLimit 100+- 'Project [*]                                                                                        +- 'Project [*]
!      +- 'Filter ((dt#520 = 20231212) AND (hour#521 = 10))                                                   +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))+- SubqueryAlias spark_catalog.default.table_demo                                                      +- SubqueryAlias spark_catalog.default.table_demo+- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo                          +- RelationV2[user_id#497,dt#520, hour#521] spark_catalog.default.table_demo

通过以上规则我们可以看到 过滤条件(dt#520 = 20231212) AND (hour#521 = 10) 转换为了 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)

接着再经过以下规则:V2ScanRelationPushDown的洗礼,我们可以看到如下日志:

12-13 13:52:58 763  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour)
Post-Scan Filters: (cast(dt#520 as int) = 20231212),(cast(hour#521 as int) = 10)
12-13 13:52:58 723  INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour)
12-13 13:52:58 823  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Output: user_id#497, dt#520, hour#52112-13 13:52:58 837  INFO (org.apache.spark.sql.catalyst.rules.PlanChangeLogger:60) - 
=== Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===InsertIntoHadoopFsRelationCommand ], Overwrite, [user_id,  dt, hour]                                                                        InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour] +- WriteFiles                                                                                                                                    +- WriteFiles+- Repartition 1, true                                                                                                                           +- Repartition 1, true+- GlobalLimit 100                                                                                                                               +- GlobalLimit 100+- LocalLimit 100                                                                                                                                +- LocalLimit 100
!            +- Filter ((isnotnull(dt#520) AND isnotnull(hour#521)) AND ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)))                   +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))
!               +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo table_demo                                                                     +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo   table_demo

这里只有过滤条件 isnotnull(dt#520) AND isnotnull(hour#521) 被下推到了 DataSource。
从现象来看,确实分区的过滤条件没有推到DataSource端, 我们来分析一下该规则的数据流:

V2ScanRelationPushDown.pushDownFilters||\/
PushDownUtils.pushFilters||\/
DataSourceStrategy.translateFilterWithMappin||\/
translateLeafNodeFilter

具体到translateLeafNodeFilter 方法:

  private def translateLeafNodeFilter(predicate: Expression,pushableColumn: PushableColumnBase): Option[Filter] = predicate match {case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>Some(sources.EqualTo(name, convertToScala(v, t)))case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>Some(sources.EqualTo(name, convertToScala(v, t)))...case _ => None

这里没有对Cast表达式进行处理,所以说最后返回的就是不能下推的处理,而 Paimon datasouce那边,具体的类为PaimonBaseScanBuilder

  override def pushFilters(filters: Array[Filter]): Array[Filter] = {

这里传进来的filters实参 就不存在 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10) 这个过滤条件,所以就不会下推到Paimon中去

其实不仅仅是对于Paimon Source, 其他的source也会有这个问题。

正确学法分析

正确的SQL如下:

select * from 
table_demo
where dt ='20231212'
and hour ='10'
limit 100;

运行如上SQL,我们可以看到如下日志:

12-14 14:22:42 328  INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour),EqualTo(dt,20231212),EqualTo(hour,10)
12-14 14:22:42 405  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour), EqualTo(dt,20231212), EqualTo(hour,10)
Post-Scan Filters: === Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]                       InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]+- WriteFiles                                                                                                            +- WriteFiles+- Repartition 1, true                                                                                                   +- Repartition 1, true+- GlobalLimit 100                                                                                                       +- GlobalLimit 100+- LocalLimit 100                                                                                                        +- LocalLimit 100
!            +- Filter ((isnotnull(dt#1330) AND isnotnull(hour#1331)) AND ((dt#1330 = 20231212) AND (hour#1331 = 10)))               +- RelationV2[user_id#1307,  dt#1330, hour#1331] table_demo
!               +- RelationV2[user_id#1307,  dt#1330, hour#1331] spark_catalog.ad_dwd.table_demo table_demo                           

可以看到经过了规则转换 所有的过滤条件都下推到了DataSource了,但是具体的下推还得在DataSource进一步处理才能保证真正的下推

这篇关于Spark Paimon 中为什么我指定的分区没有下推的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/512601

相关文章

通过C#获取PDF中指定文本或所有文本的字体信息

《通过C#获取PDF中指定文本或所有文本的字体信息》在设计和出版行业中,字体的选择和使用对最终作品的质量有着重要影响,然而,有时我们可能会遇到包含未知字体的PDF文件,这使得我们无法准确地复制或修改文... 目录引言C# 获取PDF中指定文本的字体信息C# 获取PDF文档中用到的所有字体信息引言在设计和出

多模块的springboot项目发布指定模块的脚本方式

《多模块的springboot项目发布指定模块的脚本方式》该文章主要介绍了如何在多模块的SpringBoot项目中发布指定模块的脚本,作者原先的脚本会清理并编译所有模块,导致发布时间过长,通过简化脚本... 目录多模块的springboot项目发布指定模块的脚本1、不计成本地全部发布2、指定模块发布总结多模

VMWare报错“指定的文件不是虚拟磁盘“或“The file specified is not a virtual disk”问题

《VMWare报错“指定的文件不是虚拟磁盘“或“Thefilespecifiedisnotavirtualdisk”问题》文章描述了如何修复VMware虚拟机中出现的“指定的文件不是虚拟... 目录VMWare报错“指定的文件不是虚拟磁盘“或“The file specified is not a virt

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Oracle Expdp按条件导出指定表数据的方法实例

《OracleExpdp按条件导出指定表数据的方法实例》:本文主要介绍Oracle的expdp数据泵方式导出特定机构和时间范围的数据,并通过parfile文件进行条件限制和配置,文中通过代码介绍... 目录1.场景描述 2.方案分析3.实验验证 3.1 parfile文件3.2 expdp命令导出4.总结

豆包 MarsCode 不允许你还没有女朋友

在这个喧嚣的世界里,爱意需要被温柔地唤醒。为心爱的她制作每日一句小工具,就像是一场永不落幕的浪漫仪式,每天都在她的心田播撒爱的种子,让她的每一天都充满甜蜜与期待。 背景 在这个瞬息万变的时代,我们都在寻找那些能让我们慢下来,感受生活美好的瞬间。为了让这份浪漫持久而深刻,我们决定为女朋友定制一个每日一句小工具。这个工具会在她意想不到的时刻,为她呈现一句充满爱意的话语,让她的每一天都充满惊喜和感动

遮罩,在指定元素上进行遮罩

废话不多说,直接上代码: ps:依赖 jquer.js 1.首先,定义一个 Overlay.js  代码如下: /*遮罩 Overlay js 对象*/function Overlay(options){//{targetId:'',viewHtml:'',viewWidth:'',viewHeight:''}try{this.state=false;//遮罩状态 true 激活,f

Jenkins构建Maven聚合工程,指定构建子模块

一、设置单独编译构建子模块 配置: 1、Root POM指向父pom.xml 2、Goals and options指定构建模块的参数: mvn -pl project1/project1-son -am clean package 单独构建project1-son项目以及它所依赖的其它项目。 说明: mvn clean package -pl 父级模块名/子模块名 -am参数

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

C#关闭指定时间段的Excel进程的方法

private DateTime beforeTime;            //Excel启动之前时间          private DateTime afterTime;               //Excel启动之后时间          //举例          beforeTime = DateTime.Now;          Excel.Applicat