bucket 2.0:通过删除Shuffle来提高Spark SQL性能

下载幻灯片

桶式存储通常用于Hive和Spark SQL中,通过消除Join或group-by-aggregate场景中的Shuffle来提高性能。这对于字节跳动的各种写一次和读多次的数据集来说是理想的。

然而,Spark SQL bucket有各种限制:

  1. Spark SQL中的桶机制与Hive中的桶机制不同,因此从Hive迁移到Spark SQL的成本很高;
  2. Spark SQL bucket需要在读取时进行排序,这大大降低了性能;
  3. Spark向bucket table写入数据时,可以生成数千万个HDFS不支持的小文件;
  4. 只有当两个表具有相同数量的桶时才会触发桶连接;
  5. 它要求桶键集与连接键集或分组键集相同。在过去的一年里,我们在Apache Spark中添加了一系列优化,以消除上述限制,这样新的桶机制就可以覆盖更多的场景。新的桶使Hive到Spark SQL的迁移更加顺利。

这些努力的直接结果是,我们看到字节跳动整个数据仓库中利用桶存储的查询增长了90%以上。在本次演讲中,我们将介绍如何设计和实现一种新的桶机制,以解决上述所有限制,并显著提高连接和按聚合分组的性能。

点击这里观看更多Spark + AI课程

免费试用Databricks

视频记录

-大家好,感谢大家参加这次会议。主题是“通过移除Shuffle改进SPARK SQL性能的下一代bucket”。我叫郭军,我的英文名字叫Jason。我是字节跳动公司数据引擎团队的负责人。让我介绍一下我们是谁以及我们的工作。我们是字节跳动的数据引擎团队。我们为OLAP构建了一bob体育客户端下载站式体验平台,用户可以在平台上构建PB级数据仓库,并通过编写SQL来分析PB级数据,而无需关心底层的执行引擎。我们提供开放的API和自助服务平台。bob体育客户端下载此外,我们还优化了Spark SQL, Presto和Hive数据引擎。

同时我们为字节跳动的许多业务线设计数据架构。我的演讲将由四个部分组成,第一部分是字节跳动的Spark SQL。下一个问题是,桶是什么?然后我将介绍Spark bucket的限制,最后我将说明优化Spark bucket的方法。让我们进入第一部分,Spark SQL如何在字节跳动工作。我们在2015年将Spark引入字节跳动。

字节跳动的Spark SQL

在2016年之前,我们只是运行Spark SQL进行小规模的实验,在2017年底,Spark SQL支持字节跳动的部分ad-hoc查询工作负载。然后在2018年底,Spark SQL支持大多数临时查询和生产中的一些ETL管道。在2019年底,Spark SQL将支持大部分临时查询和大部分生产中的ETL管道。现在Spark SQL是字节跳动公司数据仓库领域的主要引擎。

什么是桶装

让我简单介绍一下什么是桶装。现在,首先我们应该在Spark SQL中以两种方式创建bucket表。在左边,我们可以用堆栈的方式创建一个桶状表。使用parquet创建一个表的顺序,由user_id按user_id排序并分成1024个桶。

在这个例子中,我们可以说我们需要指定一个用户,我们需要用CLUSTERED BY子句指定打包键集,我们还需要指定按键集排序,我们按子句排序。我们需要知道的是按键集排序的键集可能与CLUSTERED by键集不同。

在右边我们可以创建一个。多么兼容的桶形表。创建一个表order CLUSTERED BY user_id,按user_id分成1024个bucket,存储为parquet。

被存储为拼花而不是使用拼花,所以用这种方法我们创建了一个与Hive引擎兼容的表。当我们得到一个桶形表时,我们可以像这样向桶形表中插入一些数据集。INSERT INTO order select order_id, user_id, product, amount FROM order_staging我们可以说,我们只是将一些数据插入到bucket表中。比如如何将数据插入到非bucket表中?换句话说,当用户向bucket中插入某个数据集时,她不需要知道表是否被bucket。当我们得到一个桶形表并填充该表时,我们如何从桶形表中获益?

让我们以ShuffledHashJoin为例。我们知道ShuffledHashJoin是Spark SQL中常用的shuffle机制之一。当选择打乱哈希表时,Spark SQL需要确保两个表是共分区的。也就是说,如果我们想要在user_id上联接订单表和表,Spark SQL需要确保它们在user_id上是共分区的。如果它们不是共分区,Spark SQL将添加一些shuffle。在Spark SQL中,我们使用交换操作符进行shuffle,因此在这个图中,我们可以说交换节点。它们中的大多数都在user_id上,这样交换后,它们中的大多数都在user_id上进行了共同分区,这样就可以使用ShuffledHashJoin了。但如果它们都是桶形表,这意味着我们在填充它们之前将表设置为桶形表。然后,我们不需要在连接之前添加一些交换节点,因为大多数交换节点都是根据user_id进行预洗牌的。这就是为什么我们要使用桶形桌子的原因。 Let’s go to the SortMergeJoin. SortMergeJoin is another commonly used join mechanism.

对于SortMergeJoin, SortMergeJoin要求子节点或(听不清)应该是共分区的,而且它们应该在连接键集上排序。例如,我们是否希望连接订单和user_id?Spark SQL将确保它们中的大多数根据user_id进行共分区,并且每个分区都按照user_id进行排序。否则,Spark SQL将添加交换操作符和排序操作符来确保这一点。但如果它们都是bucket表,并且在填充表之前也根据user_id进行排序。那么我们不需要……

我们不需要再次添加exchange和sort节点,因为它们大部分都是预洗牌和预排序的。这就是为什么我们要将表设置为桶形表。让我来介绍一下当前Spark SQL bucket机制的局限性。最……

火花桶限制

最重要的限制之一是小文件。以下面的SQL为例,我们可以用INSERT INTO order SELECT order_id, user_id, product, amount FROM order_staging填充表。当SQL运行时,我们对单个任务文件夹中有多少文件进行快照。我们通过hdfs dfs -ls来计数,然后我们来计数文件。我们发现在一个任务文件夹下有988个文件。每个任务将生成最多1024个小文件。1024是桶号。请记住,我们创建了另一个有1024桶号的表,总共将有1024乘以M个小文件。M是任务编号。当M为1024时,将有多达100万个小文件,而实际上M可能比1024大得多。 For example 10,000 ,then there will be up to 10 million small files.

我们知道对于hadoop echo系统来说,小文件可能会招致一些灾难,因为当小文件太多的时候,Spark SQL需要频繁地与HDFS NameNode进行通信,而HDFS NameNode是单点的,会成为瓶颈。所以我们不会等它这样做,SQL会运行得非常慢。

即使我们完成了数据填充,下游sql也会运行得非常慢,因为它们需要打开太多的小文件,这是非常慢的。所以我认为这应该是最大的限制之一。我们也可以用一些其他的机制来解决这个问题。例如,我们可以添加DISTRIBUTE BY子句,还需要将spark.sql.shuffle.partition配置设置为某个值。当1024是M的倍数时,最多有1024个文件,当M是1024的倍数时,最多有M个文件。

否则,仍然会有高达1024乘以M的小文件。M等于值spark.sql.shuffle.partitions。

是的,我们可以通过这个配置和DISTRIBUTE by子句来减少文件数量,但是任何用户都很难设置这个配置,所以我们需要一种机制来自动减少文件数量。下一个限制是Spark SQL bucket在SQL引擎之间是不兼容的。例如,Spark SQL桶与Hive桶不同,Spark桶与Presto桶也不同。Presto和Hive bucket兼容。

它们不相容有两个原因。将数据写入Hive bucket表中。

一个额外的shuffle将被引入,以确保它应该减少任务,将写入一个桶文件。但是对于Spark SQL来说,不会有额外的shuffle,所以每个任务都会写入多达M个bucket文件。所有的文件,每个都很小。其次,它们使用不同的哈希机制。对于Hive, Hive将使用HiveHash,而对于Spark SQL将使用Murmur3,因此数据分布将非常不同。这就是他们不相容的原因。由于不兼容,所以在Spark SQL中用Hive bucket连接表或在Hive中用Spark bucket连接表时需要exchange和Sort。

2019年,我们将成千上万的Hive SQL迁移到Spark SQL,但其中许多都使用桶形表。

所以当你必须让它们兼容时,我们就可以自动迁移sql。

此外,在Spark SQL中,由于Spark SQL中的bucket将由多个文件组成,因此需要额外的排序,因此Spark必须对单个bucket内的文件进行排序,以确保整个bucket在join上排序,这样才能使用SortMergeJoin。

Spark SQL的另一个限制是,如果用户想要连接两个表,如果他们想要使用Spark和join,他们中的大多数应该聚集在键上,并且桶号应该相同。例如,如果另一个表左边有4096个桶,而用户表右边有1024个桶,那么Spark会将用户表交换为4096个桶,这样它就可以与另一个表合并。所以在这张图中,我们可以说,尽管它们都是桶表,但由于它们的桶号不同,所以引入了交换。另一个限制是,Spark SQL要求大多数表都放在与连接键集相同的键集上。例如,如果两个表都在user_id上,但我们想通过user_id和location_id来连接它们,那么exchange将为它们两个都引入。下一个是当我们使用Union All条款时,将需要交换。例如,也许另一个将来自web和移动,我们想要联合另一个web和另一个移动,然后用user_id加入他们。

在这个例子中,将引入exchange,因为在Union之后,outputPartitioning和outputOrdering将被设置为unknown,而Spark SQL无法知道底层表是bucket表,因此将引入exchange。让我来介绍一下字节跳动如何优化bucket。

字节跳动的桶式优化

首先,我们将Spark bucket与Hive对齐。

之前我介绍过,两者的区别在于文件号不同。对于Hive来说,每个桶只有一个文件,但对于Spark来说,每个桶有多个文件。因此,我们需要确保在Spark中每个bucket都只包含一个文件。还有一件事我们需要知道,Spark使用Murmur3Hash, Hive使用HiveHash,所以我们改变了Spark SQL

使用HiveHash当桶和当我们

读或写回表。

因此,我们需要做以下事情来确保Spark将,

Spark会像Hive一样将数据写入bucket表。首先,我们改变InsertIntoHiveTable计划所需的分布,我们设置,

并且我们在桶形键上用HiveHash将值设置为HashClusteredDistribution,并且我们在桶形键上用升序覆盖所需的排序为SortOrder。这样,我们就能确保,

将引入一个额外的shuffle,以确保每个任务只写入一个bucket表,并且任务号将与bucket号相同。这样就有M个桶文件,每个桶对应一个桶。

那么我们如何以与Hive相同的方式读取这些数据呢?

下一步是我们需要…Spark需要识别Hive桶表,所以我们覆盖HiveTableScanExec的其他预分区到hashparpartitioning with HiveHash在桶键上,然后我们覆盖outputOrdering into SortOrder在桶键上与acending。让我用这张图进一步说明。在左边,我们看到没有我们的改变,HiveTableScan的outputPartitioning是UnknownPartitioning, outputOrdering是Nil。这就是为什么Spark仍然需要为SortMergeJoin添加exchange和SortNode,即使它们有bucket表。因为SortMergeJoin的requireChildDistribution是HashClusteredDistribution,而requireChildOrdering是SortOrder。

HiveTableScan的outputPartitioning是UnknownPartitioning,它不满足SortMergeJoin的requireChildDistribution。好的。更改之后,我们将outputPartitioning设置为右侧的HashPartitioning,这满足要求

SortMergeJoin的requireChildDistribution,它是HashClusteredDistribution。并且HiveTableScan的outputPartitioning被更改为SortOrder,这可以满足SortMergeJoin的requireChildOrdering,也就是SortOrder。所以交换和排序不再需要了。

这样Spark SQL就可以从Hive bucket table中读取数据,并且Spark SQL可以在没有shuffle和sort的情况下连接两个bucket table。

现在Spark SQL和Hive bucket表是兼容的。

我们要做的下一件事是支持一对多的桶连接。让我们以这个为例。对于表A,我们看到有三个桶,对于表B,有六个桶。对于Spark,如果我们在Spark中加入它们,就需要额外的交换,因为它们有不同的存储桶。但是在我们公司我们需要支持这个案子,不需要交换。第一种方法是将表B中的0号桶和3号桶合并,将表B中的1号桶和4号桶合并,将表B中的2号桶和5号桶合并,这样合并后表B就有3个桶,可以与表A合并而不shuffle。在右边我们可以看到,我们可以把它们和排序结合起来。所以物理的,就像它说的,我们扫描了表B并对表B排序,然后,你知道,它可以与表A连接而不需要洗牌。此外,我们还提供了另一种机制来支持一对多的桶连接,因为前面的方法有一些局限性。并行度将是3,这可能太小了,我们提供了另一种机制,以便我们可以使用6作为并行度,将有6个任务使用桶连接。 But how can we do that? We can clone the Table A, so that Table A will have six buckets and mostly we clone them by a new mechanism named

桶联盟。我们知道Spark支持一个名为Union的机制,但如果我们简单地使用Union, Union操作符,outputPartitioning将是未知的,所以我们创建了另一个,我们提供了另一个新的(听不清楚)Bucket Union,这样outputPartitioning和outputOrdering就被保留了。因此在Bucket Union之后,它可以与表B连接,而不需要洗牌和排序。

是的,即使我们移除了洗牌和排序,还有一些其他问题。例如,如果B左连接A是B左半连接A或B反连接A,或B内连接A,这种机制工作得很好,但如果我们用B右连接A连接B和A,或B完全外部连接A,或B交叉连接A,将会有一些重复的记录,因为我们只是扫描了表A两次,对吗?

我们动态地添加一个过滤器来解决这个问题。例如,过滤器是这样的,哈希,连接ID,然后是桶的数量

我们需要确保结果与桶id相同。让我们以桶0为例。0号桶有6条记录0,3,6,9,12和15,

我们需要的是0 6和12。所以我们对ID进行哈希然后用6和4 3 9和

15,这不是目标数字。当我们(喃喃)输入6时,结果将是3,这与桶id 0不同。然后6 3 9 15会被取出来。有了这种机制,就不再有重复的记录了。我们支持的下一件事是,我们不仅支持bucket key,还支持join。在左手边,我们可以看到表X被桶装在A上,A是列名,在右手边,表Y被桶装在A上,A也是桶列,列名。但是SQL查询要求我们同时连接A和B上的表X和表Y,这样交换和排序就减少了。

在A和B上交换,在A和B上排序。

这种限制要求用户自行设计表格

桶式存储非常小心,有时用户很难找到最好的桶式存储机制。例如,因为我们经常查询如下所示,从表1组中按A、B、C、D进行选择,然后从表2组中按B、C、D进行选择,从表3组中按B、D、e进行选择,有时用户可能想要连接表1和表2

A, B或,在B, C上连接2和表3,在B, d上连接表1和表3,如何设置bucket key ?如果我们用A, B, C和D来代替表1,

所以当我们连接表1和表2时,我们可以使用桶连接。为了解决这些问题,我们支持一种机制,它支持多个键的连接。让我们回到这个例子,表X被桶装在A上,表Y被桶装在B上,这样我们就不需要引入交换,我们需要做的是我们可以在A和B上对表X和表Y进行排序。然后我们可以用SortMergeJoin连接表X和Y,而不交换节点。通过这种方法,我们可以很容易地设计桶键集。让我们回到前一页。在这个例子中,我们可以说,我们可以将列B设置为存储机制。因为B是bucket键集,

当我们用新的桶连接表1和表2时,其中一个查询可以从桶机制中受益,因为B是桶键集,而连接键集是B的超集。现在我们也可以用桶连接表2和表3,因为连接键集是B和C,它是桶键集的超集。最后一点是我们支持桶式进化。让我们以这两种情况为例。案例1,一个非桶状表是按日期划分的,用户希望在没有开销的情况下将其转换为桶状表。

默认情况下,如果要更改桶表,则将非桶表作为桶表。

我们要么将现有的详细信息转换为bucket分布,要么在查询现有数据时,查询将失败,因为现有数据还没有被bucket覆盖。案例2,桶号可能是X,由于数据量增加,用户需要将桶号放大到2X。为了解决这些问题,我们提供了一种叫做桶式进化的机制。

为了实现这一点,我们将桶信息放入分区参数中。我们知道在Spark SQL中,桶信息在表属性中,而不是在分区参数中。通过将桶信息放入分区参数中,我们可以知道每个分区的桶信息,并且只有当所有目标分区都具有相同的桶信息时,该表才会被读为桶表?否则,它将被读取为非bucket表。例如,表是无桶的,今天我们将其转换为桶表,今天的数据分区将按桶键集分布和排序。明天当我们查询这个表时,如果唯一的目标分区是今天,那么Spark将知道它是一个桶表,可以使用桶连接。但是如果查询需要同时读取今天和昨天的数据,Spark会发现目标分区具有不同的bucket信息。Spark会把这个表读成一个非bucket表

将斗型表读取为非斗型表只会影响性能而不会影响正确性,因此在实践中效果良好。

点击这里观看更多Spark + AI课程

免费试用Databricks
«回来
关于郭军

Bytedance

郭军,字节跳动数据引擎团队负责人。他的团队专注于EB级数据平台的数据仓库架构开发和优化。bob体育客户端下载Spark SQL是这个团队中最重要的引擎之一,Spark SQL每天处理数百PB的数据。在加入字节跳动之前,他曾在思科和eBay工作,在那里他专注于数据平台和数据仓库基础设施优化。bob体育客户端下载