Spark Paimon 中为什么我指定的分区没有下推

2023-12-15 04:52

本文主要是介绍Spark Paimon 中为什么我指定的分区没有下推,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

最近在使用 Paimon 的时候遇到了一件很有意思的事情,写的 SQL 居然读取的数据不下推,明明是分区表,但是却全量扫描了。
目前使用的版本信息如下:
Spark 3.5.0
Paimon 0.6.0
paimon的建表语句如下:

CREATE TABLE `table_demo`(`user_id` string COMMENT 'from deserializer' )
PARTITIONED BY ( `dt` string COMMENT '日期, yyyyMMdd', `hour` string COMMENT '小时, HH')
ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe' 
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler' 
WITH SERDEPROPERTIES ( 'serialization.format'='1')
LOCATION'xxxx'
TBLPROPERTIES ('bucket'='50', 'bucketing_version'='2', 'bukect-key'='user_id', 'file.format'='parquet', 'merge-engine'='partial-update', 'partial-update.ignore-delete'='true', 'primary-key'='user_id', 'transient_lastDdlTime'='1701679855', 'write-only'='false')

查询的SQL如下:

select * from 
table_demo
where dt =20231212
and hour =10
limit 100;

注意我们这里写的dt是整数类型,而表中定义的是字符串类型

结论及解决方法

结论

具体的原因是Spark DSv2中的规则 V2ScanRelationPushDown.pushDownFilters 对于 Cast类型转换表达式不会传递到DataSource端,所以只会在读取完Source转换进行过滤,
这种情况下,对于文件的读取IO会增大,但是对于shuffle等操作是不会有性能的影响的。

解决方法

对于分区字段来说,我们在写SQL对分区字段进行过滤的时候,保持和分区字段类型一致

分析

错误写法分析

针对于错误的写法,也就是导致读取全量数据的写法,我们分析一下,首先是类型转换阶段,在Spark中,对于类型不匹配的问题,spark会用规则进行转换,具体的规则是
CombinedTypeCoercionRule,
在日志中可以看到:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule ==='GlobalLimit 100                                                                                   'GlobalLimit 100+- 'LocalLimit 100                                                                                     +- 'LocalLimit 100+- 'Project [*]                                                                                        +- 'Project [*]
!      +- 'Filter ((dt#520 = 20231212) AND (hour#521 = 10))                                                   +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))+- SubqueryAlias spark_catalog.default.table_demo                                                      +- SubqueryAlias spark_catalog.default.table_demo+- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo                          +- RelationV2[user_id#497,dt#520, hour#521] spark_catalog.default.table_demo

通过以上规则我们可以看到 过滤条件(dt#520 = 20231212) AND (hour#521 = 10) 转换为了 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)

接着再经过以下规则:V2ScanRelationPushDown的洗礼,我们可以看到如下日志:

12-13 13:52:58 763  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour)
Post-Scan Filters: (cast(dt#520 as int) = 20231212),(cast(hour#521 as int) = 10)
12-13 13:52:58 723  INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour)
12-13 13:52:58 823  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Output: user_id#497, dt#520, hour#52112-13 13:52:58 837  INFO (org.apache.spark.sql.catalyst.rules.PlanChangeLogger:60) - 
=== Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===InsertIntoHadoopFsRelationCommand ], Overwrite, [user_id,  dt, hour]                                                                        InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour] +- WriteFiles                                                                                                                                    +- WriteFiles+- Repartition 1, true                                                                                                                           +- Repartition 1, true+- GlobalLimit 100                                                                                                                               +- GlobalLimit 100+- LocalLimit 100                                                                                                                                +- LocalLimit 100
!            +- Filter ((isnotnull(dt#520) AND isnotnull(hour#521)) AND ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)))                   +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))
!               +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo table_demo                                                                     +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo   table_demo

这里只有过滤条件 isnotnull(dt#520) AND isnotnull(hour#521) 被下推到了 DataSource。
从现象来看,确实分区的过滤条件没有推到DataSource端, 我们来分析一下该规则的数据流:

V2ScanRelationPushDown.pushDownFilters||\/
PushDownUtils.pushFilters||\/
DataSourceStrategy.translateFilterWithMappin||\/
translateLeafNodeFilter

具体到translateLeafNodeFilter 方法:

  private def translateLeafNodeFilter(predicate: Expression,pushableColumn: PushableColumnBase): Option[Filter] = predicate match {case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>Some(sources.EqualTo(name, convertToScala(v, t)))case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>Some(sources.EqualTo(name, convertToScala(v, t)))...case _ => None

这里没有对Cast表达式进行处理,所以说最后返回的就是不能下推的处理,而 Paimon datasouce那边,具体的类为PaimonBaseScanBuilder

  override def pushFilters(filters: Array[Filter]): Array[Filter] = {

这里传进来的filters实参 就不存在 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10) 这个过滤条件,所以就不会下推到Paimon中去

其实不仅仅是对于Paimon Source, 其他的source也会有这个问题。

正确学法分析

正确的SQL如下:

select * from 
table_demo
where dt ='20231212'
and hour ='10'
limit 100;

运行如上SQL,我们可以看到如下日志:

12-14 14:22:42 328  INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour),EqualTo(dt,20231212),EqualTo(hour,10)
12-14 14:22:42 405  INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) - 
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour), EqualTo(dt,20231212), EqualTo(hour,10)
Post-Scan Filters: === Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]                       InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]+- WriteFiles                                                                                                            +- WriteFiles+- Repartition 1, true                                                                                                   +- Repartition 1, true+- GlobalLimit 100                                                                                                       +- GlobalLimit 100+- LocalLimit 100                                                                                                        +- LocalLimit 100
!            +- Filter ((isnotnull(dt#1330) AND isnotnull(hour#1331)) AND ((dt#1330 = 20231212) AND (hour#1331 = 10)))               +- RelationV2[user_id#1307,  dt#1330, hour#1331] table_demo
!               +- RelationV2[user_id#1307,  dt#1330, hour#1331] spark_catalog.ad_dwd.table_demo table_demo                           

可以看到经过了规则转换 所有的过滤条件都下推到了DataSource了,但是具体的下推还得在DataSource进一步处理才能保证真正的下推

这篇关于Spark Paimon 中为什么我指定的分区没有下推的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/495186

相关文章

MySQL 分区与分库分表策略应用小结

《MySQL分区与分库分表策略应用小结》在大数据量、复杂查询和高并发的应用场景下,单一数据库往往难以满足性能和扩展性的要求,本文将详细介绍这两种策略的基本概念、实现方法及优缺点,并通过实际案例展示如... 目录mysql 分区与分库分表策略1. 数据库水平拆分的背景2. MySQL 分区策略2.1 分区概念

Mysql表如何按照日期字段的年月分区

《Mysql表如何按照日期字段的年月分区》:本文主要介绍Mysql表如何按照日期字段的年月分区的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、创键表时直接设置分区二、已有表分区1、分区的前置条件2、分区操作三、验证四、注意总结一、创键表时直接设置分区

jupyter代码块没有运行图标的解决方案

《jupyter代码块没有运行图标的解决方案》:本文主要介绍jupyter代码块没有运行图标的解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录jupyter代码块没有运行图标的解决1.找到Jupyter notebook的系统配置文件2.这时候一般会搜索到

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

使用Python实现获取网页指定内容

《使用Python实现获取网页指定内容》在当今互联网时代,网页数据抓取是一项非常重要的技能,本文将带你从零开始学习如何使用Python获取网页中的指定内容,希望对大家有所帮助... 目录引言1. 网页抓取的基本概念2. python中的网页抓取库3. 安装必要的库4. 发送HTTP请求并获取网页内容5. 解

Python实现合并与拆分多个PDF文档中的指定页

《Python实现合并与拆分多个PDF文档中的指定页》这篇文章主要为大家详细介绍了如何使用Python实现将多个PDF文档中的指定页合并生成新的PDF以及拆分PDF,感兴趣的小伙伴可以参考一下... 安装所需要的库pip install PyPDF2 -i https://pypi.tuna.tsingh

Flask解决指定端口无法生效问题

《Flask解决指定端口无法生效问题》文章讲述了在使用PyCharm开发Flask应用时,启动地址与手动指定的IP端口不一致的问题,通过修改PyCharm的运行配置,将Flask项目的运行模式从Fla... 目录android问题重现解决方案问题重现手动指定的IP端口是app.run(host='0.0.

电脑没有仿宋GB2312字体怎么办? 仿宋GB2312字体下载安装及调出来的教程

《电脑没有仿宋GB2312字体怎么办?仿宋GB2312字体下载安装及调出来的教程》仿宋字体gb2312作为一种经典且常用的字体,广泛应用于各种场合,如何在计算机中调出仿宋字体gb2312?本文将为您... 仿宋_GB2312是公文标准字体之一,仿China编程宋是字体名称,GB2312是字php符编码标准名称(简

使用Python合并 Excel单元格指定行列或单元格范围

《使用Python合并Excel单元格指定行列或单元格范围》合并Excel单元格是Excel数据处理和表格设计中的一项常用操作,本文将介绍如何通过Python合并Excel中的指定行列或单... 目录python Excel库安装Python合并Excel 中的指定行Python合并Excel 中的指定列P

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处