本文主要是介绍Spark Paimon 中为什么我指定的分区没有下推,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
背景
最近在使用 Paimon 的时候遇到了一件很有意思的事情,写的 SQL 居然读取的数据不下推,明明是分区表,但是却全量扫描了。
目前使用的版本信息如下:
Spark 3.5.0
Paimon 0.6.0
paimon的建表语句如下:
CREATE TABLE `table_demo`(`user_id` string COMMENT 'from deserializer' )
PARTITIONED BY ( `dt` string COMMENT '日期, yyyyMMdd', `hour` string COMMENT '小时, HH')
ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH SERDEPROPERTIES ( 'serialization.format'='1')
LOCATION'xxxx'
TBLPROPERTIES ('bucket'='50', 'bucketing_version'='2', 'bukect-key'='user_id', 'file.format'='parquet', 'merge-engine'='partial-update', 'partial-update.ignore-delete'='true', 'primary-key'='user_id', 'transient_lastDdlTime'='1701679855', 'write-only'='false')
查询的SQL如下:
select * from
table_demo
where dt =20231212
and hour =10
limit 100;
注意我们这里写的dt是整数类型
,而表中定义的是字符串类型
结论及解决方法
结论
具体的原因是Spark DSv2中的规则 V2ScanRelationPushDown.pushDownFilters
对于 Cast类型转换表达式
不会传递到DataSource
端,所以只会在读取完Source转换进行过滤,
这种情况下,对于文件的读取IO会增大,但是对于shuffle等操作是不会有性能的影响的。
解决方法
对于分区字段来说,我们在写SQL对分区字段进行过滤的时候,保持和分区字段类型一致
分析
错误写法分析
针对于错误的写法,也就是导致读取全量数据的写法,我们分析一下,首先是类型转换阶段,在Spark中,对于类型不匹配的问题,spark会用规则进行转换,具体的规则是
CombinedTypeCoercionRule
,
在日志中可以看到:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule ==='GlobalLimit 100 'GlobalLimit 100+- 'LocalLimit 100 +- 'LocalLimit 100+- 'Project [*] +- 'Project [*]
! +- 'Filter ((dt#520 = 20231212) AND (hour#521 = 10)) +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))+- SubqueryAlias spark_catalog.default.table_demo +- SubqueryAlias spark_catalog.default.table_demo+- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo +- RelationV2[user_id#497,dt#520, hour#521] spark_catalog.default.table_demo
通过以上规则我们可以看到 过滤条件(dt#520 = 20231212) AND (hour#521 = 10)
转换为了 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)
接着再经过以下规则:V2ScanRelationPushDown
的洗礼,我们可以看到如下日志:
12-13 13:52:58 763 INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) -
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour)
Post-Scan Filters: (cast(dt#520 as int) = 20231212),(cast(hour#521 as int) = 10)
12-13 13:52:58 723 INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour)
12-13 13:52:58 823 INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) -
Output: user_id#497, dt#520, hour#52112-13 13:52:58 837 INFO (org.apache.spark.sql.catalyst.rules.PlanChangeLogger:60) -
=== Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===InsertIntoHadoopFsRelationCommand ], Overwrite, [user_id, dt, hour] InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour] +- WriteFiles +- WriteFiles+- Repartition 1, true +- Repartition 1, true+- GlobalLimit 100 +- GlobalLimit 100+- LocalLimit 100 +- LocalLimit 100
! +- Filter ((isnotnull(dt#520) AND isnotnull(hour#521)) AND ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))) +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))
! +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo table_demo +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo table_demo
这里只有过滤条件 isnotnull(dt#520) AND isnotnull(hour#521)
被下推到了 DataSource。
从现象来看,确实分区的过滤条件没有推到DataSource端, 我们来分析一下该规则的数据流:
V2ScanRelationPushDown.pushDownFilters||\/
PushDownUtils.pushFilters||\/
DataSourceStrategy.translateFilterWithMappin||\/
translateLeafNodeFilter
具体到translateLeafNodeFilter
方法:
private def translateLeafNodeFilter(predicate: Expression,pushableColumn: PushableColumnBase): Option[Filter] = predicate match {case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>Some(sources.EqualTo(name, convertToScala(v, t)))case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>Some(sources.EqualTo(name, convertToScala(v, t)))...case _ => None
这里没有对Cast
表达式进行处理,所以说最后返回的就是不能下推的处理,而 Paimon datasouce
那边,具体的类为PaimonBaseScanBuilder
:
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
这里传进来的filters实参 就不存在 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)
这个过滤条件,所以就不会下推到Paimon中去
其实不仅仅是对于Paimon Source
, 其他的source
也会有这个问题。
正确学法分析
正确的SQL如下:
select * from
table_demo
where dt ='20231212'
and hour ='10'
limit 100;
运行如上SQL,我们可以看到如下日志:
12-14 14:22:42 328 INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour),EqualTo(dt,20231212),EqualTo(hour,10)
12-14 14:22:42 405 INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) -
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour), EqualTo(dt,20231212), EqualTo(hour,10)
Post-Scan Filters: === Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour] InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]+- WriteFiles +- WriteFiles+- Repartition 1, true +- Repartition 1, true+- GlobalLimit 100 +- GlobalLimit 100+- LocalLimit 100 +- LocalLimit 100
! +- Filter ((isnotnull(dt#1330) AND isnotnull(hour#1331)) AND ((dt#1330 = 20231212) AND (hour#1331 = 10))) +- RelationV2[user_id#1307, dt#1330, hour#1331] table_demo
! +- RelationV2[user_id#1307, dt#1330, hour#1331] spark_catalog.ad_dwd.table_demo table_demo
可以看到经过了规则转换 所有的过滤条件都下推到了DataSource
了,但是具体的下推还得在DataSource进一步处理才能保证真正的下推
这篇关于Spark Paimon 中为什么我指定的分区没有下推的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!