自适应查询执行

自适应查询执行(AQE)是在查询执行期间发生的查询重新优化。

运行时重新优化的动机是Databricks在shuffle和广播交换(在AQE中称为查询阶段)结束时具有最新的准确统计信息。因此,Databricks可以选择更好的物理策略,选择最优的shuffle后分区大小和数量,或者进行以前需要提示的优化,例如倾斜连接处理。

这在统计信息收集未打开或统计信息过时时非常有用。它在静态派生统计信息不准确的地方也很有用,比如在复杂查询的中间,或者在发生数据倾斜之后。

功能

在Databricks Runtime 7.3 LTS及以上版本中,AQE默认开启。它有4个主要特点:

  • 动态地将排序合并连接更改为广播散列连接。

  • 在shuffle交换后动态合并分区(将小分区合并为合理大小的分区)。非常小的任务具有较差的I/O吞吐量,并且往往会受到调度开销和任务设置开销的影响。组合小任务可以节省资源并提高集群吞吐量。

  • 动态处理排序合并连接和洗牌散列连接中的倾斜,通过将倾斜的任务拆分(并在需要时复制)为大致均匀大小的任务。

  • 动态检测并传播空关系。

应用程序

AQE适用于以下所有查询:

  • 的非

  • 至少包含一个交换(通常在有连接、聚合或窗口时)、一个子查询或两者都包含。

并非所有应用aqi的查询都必须重新优化。重新优化可能会产生与静态编译的查询计划不同的查询计划,也可能不会。要确定查询的计划是否已被AQE更改,请参阅以下部分,查询计划

查询计划

本节讨论如何以不同的方式检查查询计划。

火花UI

AdaptiveSparkPlan节点

aqi应用的查询包含一个或多个AdaptiveSparkPlan节点,通常作为每个主查询或子查询的根节点。在查询运行之前或运行时,isFinalPlan对应的标志。AdaptiveSparkPlan节点显示为;查询执行完成后,将isFinalPlan标志更改为真实的。

发展计划

查询计划图随着执行的进展而发展,并反映正在执行的最新计划。已经执行的节点(其中有可用的指标)不会改变,但是那些没有执行的节点可以随着时间的推移而改变,这是重新优化的结果。

查询规划图示例如下:

查询计划图

DataFrame.explain ()

AdaptiveSparkPlan节点

aqi应用的查询包含一个或多个AdaptiveSparkPlan节点,通常作为每个主查询或子查询的根节点。在查询运行之前或运行时,isFinalPlan对应的标志。AdaptiveSparkPlan节点显示为;查询执行完成后,将isFinalPlan标志更改为真正的

目前及初步计划

在每个AdaptiveSparkPlan节点将同时包含初始计划(应用任何AQE优化之前的计划)和当前或最终计划,这取决于执行是否已经完成。当前的计划将随着执行的进展而发展。

运行时统计数据

每个shuffle和broadcast阶段都包含数据统计。

在阶段运行之前或在阶段运行时,统计信息是编译时估计和标志isRuntime,例如:统计(sizeInBytes = 1024.0简约,rowCount = 4,isRuntime = false);

阶段执行完成后,统计数据是在运行时收集的统计数据,以及标志isRuntime将成为真正的,例如:统计(sizeInBytes = 658.1简约,rowCount = 2.81 e + 4,isRuntime = true)

以下是一个DataFrame.explain例子:

  • 执行前

    在执行之前
  • 在执行过程中

    在执行期间
  • 执行后

    后执行

SQL解释

AdaptiveSparkPlan节点

aqi应用的查询包含一个或多个AdaptiveSparkPlan节点,通常作为每个主查询或子查询的根节点。

目前没有计划

作为SQL解释如果不执行查询,则当前计划始终与初始计划相同,并且不反映AQE最终将执行什么。

下面是一个SQL解释的例子:

SQL解释

有效性

如果一个或多个AQE优化生效,查询计划将发生变化。这些AQE优化的效果可以通过当前和最终计划与当前和最终计划中的初始计划和特定计划节点之间的差异来证明。

  • 动态更改排序合并连接为广播散列连接:当前/最终计划与初始计划之间的不同物理连接节点

    加入策略字符串
  • 动态合并分区:节点CustomShuffleReader有财产合并

    自定义shuffle阅读器
    自定义shuffle读取器字符串
  • 动态处理倾斜连接:节点SortMergeJoin与字段isSkew是真实的。

    倾斜连接计划
    倾斜连接字符串
  • 动态检测和传播空关系:将部分(或整个)计划替换为节点LocalTableScan,其中关系字段为空。

    扫描本表
    本地表扫描字符串

配置

启用和禁用自适应查询执行

财产

spark.databricks.optimizer.adaptive.enabled

类型:布尔

是否启用或禁用自适应查询执行。

默认值:真正的

动态更改排序合并连接为广播散列连接

财产

spark.databricks.adaptive.autoBroadcastJoinThreshold

类型:字节字符串

在运行时触发切换到广播连接的阈值。

默认值:30 mb

动态合并分区

财产

spark.sql.adaptive.coalescePartitions.enabled

类型:布尔

是否启用或禁用分区合并。

默认值:真正的

spark.sql.adaptive.advisoryPartitionSizeInBytes

类型:字节字符串

合并后的目标尺寸。合并后的分区大小将接近但不大于此目标大小。

默认值:64 mb

spark.sql.adaptive.coalescePartitions.minPartitionSize

类型:字节字符串

合并后分区的最小大小。合并的分区大小将不小于此大小。

默认值:1 mb

spark.sql.adaptive.coalescePartitions.minPartitionNum

类型:整数

合并后的最小分区数。不推荐,因为设置显式地覆盖spark.sql.adaptive.coalescePartitions.minPartitionSize

默认值:2x否。簇核

动态处理倾斜连接

财产

spark.sql.adaptive.skewJoin.enabled

类型:布尔

是否启用或禁用倾斜连接处理。

默认值:真正的

spark.sql.adaptive.skewJoin.skewedPartitionFactor

类型:整数

当与分区大小的中位数相乘时,该因子有助于确定分区是否倾斜。

默认值:5

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

类型:字节字符串

用于确定分区是否倾斜的阈值。

默认值:256 mb

当两者都有时,分区被认为是倾斜的(分区大小>skewedPartitionFactor中位数分区大小)而且(分区大小>skewedPartitionThresholdInBytes)真正的

动态检测和传播空关系

财产

spark.databricks.adaptive.emptyRelationPropagation.enabled

类型:布尔

是否启用或禁用动态空关系传播。

默认值:真正的

常见问题(FAQ)

为什么在已经启用分区合并的情况下,AQE没有更改shuffle分区号?

AQE不会改变初始分区号。建议您为shuffle分区号设置一个合理的高值,并让AQE在查询的每个阶段根据输出数据大小合并小分区。

如果你在工作中看到溢出,你可以试试:

  • 增加shuffle分区编号配置:spark.sql.shuffle.partitions

  • 通过设置使自动优化shufflespark.databricks.adaptive.autoOptimizeShuffle.enabled真正的

为什么AQE没有广播一个小的连接表?

如果期望被广播的关系的大小确实低于这个阈值,但仍然没有被广播:

  • 检查连接类型。某些连接类型不支持广播,例如,a的左关系加入不能广播。

  • 也可能是该关系包含大量空分区,在这种情况下,大多数任务可以通过排序合并连接快速完成,或者可以通过倾斜连接处理进行优化。如果非空分区的百分比低于spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin

我还应该在启用AQE时使用广播连接策略提示吗?

是的。静态计划的广播连接通常比由AQE动态计划的广播连接性能更好,因为AQE可能直到对连接的两端执行shuffle之后才切换到广播连接(此时已获得实际关系大小)。因此,如果您非常了解您的查询,使用广播提示仍然是一个不错的选择。AQE将以与静态优化相同的方式尊重查询提示,但仍然可以应用不受提示影响的动态优化。

倾斜连接提示和AQE倾斜连接优化之间的区别是什么?我应该用哪一个?

建议依赖于AQE倾斜连接处理,而不是使用倾斜连接提示,因为AQE倾斜连接是完全自动的,通常比提示对应的性能更好。

为什么AQE没有自动调整我的连接顺序?

在Databricks Runtime 7.3 LTS中,动态连接重新排序不是aqe3的一部分。

为什么AQE没有检测到我的数据倾斜?

为了将分区检测为倾斜分区,必须满足两个大小条件:

  • 分区大小大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes(默认256 mb)

  • 分区大小大于所有分区的中位数大小乘以倾斜分区因子spark.sql.adaptive.skewJoin.skewedPartitionFactor(默认5)

此外,对某些连接类型的倾斜处理支持是有限的,例如加入,只有左侧偏置才能优化。

遗产

自适应执行(Adaptive Execution)这个术语在Spark 1.6就已经存在了,但是Spark 3.0中的新AQE是完全不同的。就功能而言,Spark 1.6只做了“动态合并分区”部分。就技术体系结构而言,新的AQE是一个基于运行时统计的动态规划和重新规划查询的框架,它支持各种优化,例如我们在本文中描述的优化,并且可以进行扩展以支持更多潜在的优化。