如何提高性能的三角洲湖并入查询使用分区修剪

学习如何使用分区修剪提高三角洲湖合并成查询的性能。

写的亚当Pavlacka

去年发表在:2023年6月1日

本文阐述了如何引发三角洲湖分区修剪合并成(AWS|Azure|GCP从砖)查询。

分区修剪是一种优化技术来限制分区的数量所检查的一个查询。

讨论

合并成可以计算昂贵的如果处理效率低下。你应该分区底层数据之前使用合并成。如果你不这样做,可以影响查询性能。

主要的教训是:如果你知道哪些分区合并成查询需要检查,你应该查询中指定它们,以便执行分区修剪。

示范:没有分区修剪

这是业绩不佳的一个例子合并成查询没有分区修剪。

首先创建以下δ表,调用delta_merge_into:

% scala

val df = spark.range (30000000)
.withColumn (“par”(“id”% 1000美元).cast (IntegerType))
.withColumn (“t”, current_timestamp ())
.write
.format(“δ”)
.mode(“覆盖”)
.partitionBy(“标准”)
.saveAsTable (“delta_merge_into”)

然后合并DataFrame到三角洲表中创建一个表更新:

% scala

val updatesTableName = "更新"
val targetTableName = " delta_merge_into "
val更新= spark.range (100)。withColumn (“id”, (rand () * 30000000 * 2) .cast (IntegerType))
.withColumn (“par”(“id”% 2美元).cast (IntegerType))
.withColumn (“t”, current_timestamp ())
.dropDuplicates (" id ")
updates.createOrReplaceTempView (updatesTableName)

更新表有100行三列,id,票面价值,ts。的价值票面价值总是1或0。

假设你运行以下简单合并成查询:

% scala

spark.sql (s”“”
|并入targetTableName美元
|使用$ updatesTableName
| targetTableName美元。id= $updatesTableName.id
|当匹配
|更新设置targetTableName美元。ts = $ updatesTableName.ts
|不匹配
|插入(id、par ts)值(updatesTableName美元。id, updatesTableName美元。票面价值,$updatesTableName.ts)
”““.stripMargin)

查询需要13.16分钟来完成:

没有分区合并成过滤器运行。

该查询包含的物理方案PartitionCount: 1000,如下所示。这意味着Apache火花是扫描所有1000个分区来执行查询。这不是一个有效的查询,因为更新数据只有分区的值10:

= = = =物理计划
* (5)HashAggregate(键=[],函数= [finalmerge_count(合并计算# 8452 l)计数(1)# 8448 l),输出= [count # 8449 l])
+ -交换SinglePartition
+ - * (4)HashAggregate(键=[],函数= [partial_count(1)计数# 8452 l),输出= [count # 8452 l])
+ - *(4)项目
+ - *(4)滤波器(isnotnull (count # 8440 l) & & (count # 8440 l > 1))
+ - * (4)HashAggregate(键= (_row_id_ # 8399 l),函数= [finalmerge_sum(合并和# 8454 l)和(cast(1 # 8434为bigint)) # 8439 l),输出= [count # 8440 l])
+ -交换hashpartitioning (_row_id_ # 8399 l, 200)
+ - * (3)HashAggregate(键= (_row_id_ # 8399 l),函数= [partial_sum (cast(1 # 8434为bigint))和# 8454 l),输出= [_row_id_ # 8399 l,和# 8454 l))
+ - *(3)项目(_row_id_ # 8399 l, UDF (_file_name_ # 8404)作为一个# 8434)
+ - * (3)BroadcastHashJoin[铸(id # 7514为bigint)], [id # 8390 l],内心,BuildLeft,假的
:- BroadcastExchange HashedRelationBroadcastMode(列表(cast(输入[0,int,真]为bigint)))
:+ - * (2)HashAggregate(键= [id # 7514] =[],功能输出= (# 7514)
:+ -交换hashpartitioning (id # 7514、200)
:+ - * (1)HashAggregate(键= [id # 7514] =[],功能输出= (# 7514)
:+ - *(1)过滤isnotnull (id # 7514)
:+ - *(1)项目(铸造(((rand (8188829649009385616) * 3.0 e7) * 2.0) int) id # 7514)
:+ - *(1)范围(0 100 = 1步,分裂= 36)
+ - *(3)过滤器isnotnull (id # 8390 l)
+ - *(3)项目[id # 8390 l, _row_id_ # 8399 l, input_file_name () _file_name_ # 8404)
+ - *(3)项目[id # 8390 l, monotonically_increasing_id () _row_id_ # 8399 l)
+ - *(3)项目[id # 8390, # 8391, ts # 8392)
+ - * (3)FileScan拼花[id # 8390 l ts # 8392, par # 8391)分批处理:真的,DataFilters:[],格式:拼花,地点:TahoeBatchFileIndex [dbfs: / user /蜂巢/仓库/ delta_merge_into], PartitionCount: 1000年,PartitionFilters: [], PushedFilters: [], ReadSchema: struct < id: bigint ts:时间戳>

解决方案

重写查询指定分区。

合并成直接查询指定分区:

% scala

spark.sql (s”“”
|并入targetTableName美元
|使用$ updatesTableName
| targetTableName美元。票面价值IN (1,0) AND $targetTableName.id = $updatesTableName.id
|当匹配
|更新设置targetTableName美元。ts = $ updatesTableName.ts
|不匹配
|插入(id、par ts)值(updatesTableName美元。id, updatesTableName美元。票面价值,$updatesTableName.ts)
”““.stripMargin)

现在查询只需要20.54秒完成在同一集群:

运行与分区合并成过滤器。

该查询包含的物理方案PartitionCount: 2,如下所示。只有一些小的变化,查询现在快40多倍:

= = = =物理计划

* (5)HashAggregate(键=[],函数= [finalmerge_count(合并计算# 7892 l)计数(1)# 7888 l),输出= [count # 7889 l])
+ -交换SinglePartition
+ - * (4)HashAggregate(键=[],函数= [partial_count(1)计数# 7892 l),输出= [count # 7892 l])
+ - *(4)项目
+ - *(4)滤波器(isnotnull (count # 7880 l) & & (count # 7880 l > 1))
+ - * (4)HashAggregate(键= (_row_id_ # 7839 l),函数= [finalmerge_sum(合并和# 7894 l)和(cast(1 # 7874为bigint)) # 7879 l),输出= [count # 7880 l])
+ -交换hashpartitioning (_row_id_ # 7839 l, 200)
+ - * (3)HashAggregate(键= (_row_id_ # 7839 l),函数= [partial_sum (cast(1 # 7874为bigint))和# 7894 l),输出= [_row_id_ # 7839 l,和# 7894 l))
+ - *(3)项目(_row_id_ # 7839 l, UDF (_file_name_ # 7844)作为一个# 7874)
+ - * (3)BroadcastHashJoin[铸(id # 7514为bigint)], [id # 7830 l],内心,BuildLeft,假的
:- BroadcastExchange HashedRelationBroadcastMode(列表(cast(输入[0,int,真]为bigint)))
:+ - * (2)HashAggregate(键= [id # 7514] =[],功能输出= (# 7514)
:+ -交换hashpartitioning (id # 7514、200)
:+ - * (1)HashAggregate(键= [id # 7514] =[],功能输出= (# 7514)
:+ - *(1)过滤isnotnull (id # 7514)
:+ - *(1)项目(铸造(((rand (8188829649009385616) * 3.0 e7) * 2.0) int) id # 7514)
:+ - *(1)范围(0 100 = 1步,分裂= 36)
+ - *(3)项目[id # 7830 l, _row_id_ # 7839 l, _file_name_ # 7844)
+ - *(3)过滤器(par # 7831 (1,0) & & isnotnull (id # 7830 l))
+ - *(3)项目[id # 7830, # 7831, _row_id_ # 7839 l, input_file_name () _file_name_ # 7844)
+ - *(3)项目[id # 7830, # 7831, monotonically_increasing_id () _row_id_ # 7839 l)
+ - *(3)项目[id # 7830, # 7831, ts # 7832)
+ - * (3)FileScan拼花[id # 7830 l ts # 7832, par # 7831)分批处理:真的,DataFilters:[],格式:拼花,地点:TahoeBatchFileIndex [dbfs: / user /蜂巢/仓库/ delta_merge_into], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct < id: bigint ts:时间戳>

这篇文章有用吗?