三角洲湖的第一批用户的模式和操作见解

下载幻灯片

网络威胁检测和响应需要对大量日志和遥测数据进行繁重的工作负载。几年前,我在另一家FAANG公司开发了这样一个系统后来到苹果,我的老板让我再做一次。从我之前大规模使用Apache Spark和AWS S3的经验中,我学到了很多东西——一些好的模式,但也有一些我想避免的坏模式和技术。那一年,我在Spark+AI峰会上遇到了Michael Armbrust,并描述了我想做的事情以及测试Databricks作为新系统基础的计划。几个月后,当我们在Databricks上进行概念验证时,Michael给了我一些他们称之为Tahoe的代码。这是后来的三角洲湖的早期alpha,这正是我们想要的。从一开始,我们的整个系统每天都在Delta Lake上写入数百TB的数据。

本演讲将介绍我们在Databricks和Delta Lake上运行大工作负载时遇到的一些问题和学到的东西。

  • 有效的Delta Lake模式用于流ETL、数据丰富、分析工作负载、大型数据集查询和用于快速回答的大型物化聚合
  • z顺序和32列的默认限制。哦。优化模式以确保z顺序是有效的
  • 长尾分布或非同步时钟的日期划分和事件时间的含义
  • 优化,优化,优化,当自动优化是你唯一的选择
  • Upsert模式简化了重要的工作
  • 为非常大的表和低延迟访问调优Delta Lake

点击这里观看更多Spark + AI课程

免费试用Databricks

视频记录

-你好,我是多米尼克·布雷曾斯基。我是苹果信息安全的杰出工程师,我要讲Delta Lake模式和见解。需求是所有发明之母,而建立大数据系统,有很多挑战。

有时最伟大的创新来自解决问题

大约三年前,我在Spark峰会上遇到了Michael Armbrust,并谈到了我在以前的雇主那里建立的一个相当大规模的系统,每天处理数百tb的数据。我不得不在苹果公司建立一个类似的系统,我描述了我们经历过的一些事情,我们做过的一些事情,以及我们在某些组件上遇到的挑战,我看到了一闪而过,你知道,灵感,恐惧,辉煌,在迈克尔的眼中,他说,“是的,我不认为我们的架构现在能满足那些要求,但我有这个东西在我的口袋里。当我们进一步讨论这个问题时,它真的与我想要的位置以及我想为这个项目做什么产生了共鸣。所以,在POC进行了几个月后,Michael找到我说:“嘿,我有一些代码让你试试。“这基本上就是三角洲湖的阿尔法。当时它的代号是Tahoe,在进行了一些测试和验证之后,我们就全力以赴了。在很短的一段时间内,我们每天处理几十tb的数据到Delta Lakes,现在我们的规模更大了,有几百tb,每天接近1pb的数据我们通过ETL管道摄取,然后我们从内部的日常工作中处理更大容量的数据。

因此,我们在这个过程中学到了一些东西,并有了一些见解。我不会讲太多关于表格格式数据的标准内容,但我会讲一些Delta Lake允许我们做的更新奇或有趣的事情,然后是一些我们经历过的操作上的洞见和陷阱,希望有一些技巧和技巧混合在一起。

所以,首先我要谈谈模式,首先,因为我说过我们运行一个大的ETL管道,是反转ETL提取转换负载,但在大数据生态系统中,我们通常做一些更像提取,加载,然后转换的事情。这就是我们的系统的样子。我们实际上有一个提取系统,它将一堆数据转储到S3中,然后我们获取这些数据,将其加载到一个staging表中,然后我们从staging表中获得一堆流,这些流正在进行转换,从而产生具有定义良好的模式的唯一数据表,并为该数据的下游消费者提供真正稳定的接口。

解析

但在我们深入研究这些模式之前,我想先停下来谈谈我们使用的解析框架,因为我将展示一些示例数据,或者一些示例代码,它将使用这个,所以对你来说,暂停一秒钟,看看它会更容易。我们有一个抽象类叫Parser,它的一些独特之处在于我们实际上把它分解成一些基本步骤这给了我们很大的灵活性。我们有一个准备步骤,我们的解析器基本上是数据帧转换?输入一个数据帧,再输出一个数据帧。所以准备,给定源数据帧,它意味着做一些简单的事情,比如过滤或简单的提取,这将为我们提供额外的信息,我们可能需要在解析中使用。从根本上来说,准备一些表格,不好意思,是一些列,如果需要的话,进来。但除此之外,没有太多的处理。Parse是我们对数据进行大部分提取和转换的地方,然后complete是一个机会,一旦解析完成,一旦验证发生,我们就可以在数据被提交到表或从流输出之前对数据进行任何修正。有趣的是,基本上apply function在上面,这都是Scala,如果你没有注意到,它以固定的顺序运行这些步骤,在一些步骤之间运行。例如,我们运行prepare,但它实际做的是创建一个结构体包含传入数据帧中的所有列,然后进入一个原点列。 And so we basically just transposed all the columns into a struct, put them in one column that we start with, and then we run the parse method, and then we see this set parse details is automatically run, and the interesting thing that it does is it actually creates a column with a struct in it and it uses this validConditionsExpression, as well as the additionalDetailsExpression, in order to validate the row and then possibly capture additional detail about it that could be useful downstream in either dealing with a parsing error or tracking lineage of the data. And also puts a timestamp on it so we have a marker from when the data was actually parsed. And once the parsed details has been set, finally the complete method is called. So, this is sort of a layout for what we do through our ETL pipeline. When we get into the extract part, we actually have a fairly complicated system upstream, it’s actually a couple systems, that is responsible for collecting log data or telemetry or extracting data from upstream systems, databases, APIs, and fundamentally what it does is takes the raw data, wraps it in a JSON metadata layer that includes, for instance, the event type, the source of the data, we get timestamps from that system so we can track latencies. But fundamentally, it’s metadata. And then there’s a raw field in that that includes the actual log event or piece of telemetry or data in it. And the extract system plops that into s3.

从那里,我们实际上在Spark中使用s3-sqs源,这与s3桶上的通知绑定在一起,这就是我们如何消耗大量数据而不必在s3桶上执行昂贵的列表操作和缓慢的列表操作。这就是我们数据的来源。还记得我说过它是JSON对象每行格式吗?但这里需要注意的是,我们实际上并没有对Spark流使用JSON输入格式。实际上,我们使用文件格式将其捕捉为单个文本行。我会多讲一点我们为什么要这么做以及它的优势。因此,当我们从s3加载数据时,这是我们的分段解析器,你可以看到我们的验证条件只是它有一个有效的时间戳。在那个additionalDetails中,我们捕获的东西之一实际上是输入文件名。这给了我们从这一行到s3文件的谱系。这只是为了帮助调试,修复类型的信息,但它至少为数据建立了一个基线历史。 And then our parser’s really simple. We’re basically just extracting the JSON from the text file input, creates a single column called value, so that’s been transposed into this origins struct. So we’re just using a from_json. As you noticed here, we’ve actually extended from JSON to include some fancy timestamp parsing stuff that we do and add some semantics that is consistent for our use case. So, we know the schema for this input data because it’s the standard metadata wrapper, so we’re just extracting those fields and then we’re basically transposing them again out of the resulting struct in the top-level columns and then we’re preserving that origin column. And so, we have the raw data as it came in in origin.value, and then we have the actual extracted data in those columns.

然后,解析器在内部执行验证并检查是否存在有效的时间戳。在我们的complete方法中,我们会检查时间戳是否为空。如果是,我们用解析时间来代替。如果不是,我们使用实际提取的时间戳,然后我们捕获所有这些标准列和解析的细节,以及起源,这让我们从这里继承。这里很重要的一点是,我们总能得到一个有效的记录即使我们无法提取JSON,我们有原始字符串作为沿袭,然后我们有解析细节,这样我们就能知道是否被正确解析,如果加载有问题,我们有很多信息来帮助我们。我们在这个阶段使用文本文件输入的原因是如果你只是尝试使用JSON阅读器它遇到了一个错误的行,或者更糟,它遇到了一个与提取模式不匹配的行,你会得到所有的null,你不会确切地知道发生了什么。我们要做的是捕捉整个字符串以防失败,所以我们有它的副本,我们有它的沿袭,这对处理上游数据形状改变有很大帮助当数据形状改变进入我们的表时,实际上允许我们对同一个表重新处理,如果我们需要重写。从修复问题的角度来看,它有真正的优势。

然后我们就到了变换阶段。在最后一个加载到的staging表之外,它是按日期和事件类型划分的。通常情况下,我们实际的数据解析器有一个流从staging表中读取,staging表上有一个谓词,寻找特定的事件类型,或者可能是戳到原始日志本身,寻找其中的某种修复标记。然后把它开进河里。在准备阶段,我们会从前一个表中删除原始结构体因为我们会用这个的传入数据创建一个新结构体。我们去掉原点,加上其他所有字段。它会创建一个新的原点。然后通过解析,很多数据集实际上是JSON本身,我只是给出了一个从JSON中提取的例子再次使用模式,然后转置。我们的一些方法更复杂,这个解析方法会有一堆代码来处理丑陋的、原始的、半结构化的事件。这取决于源的类型。 The output from this will then go to an actual clean data table. Somebody might call that silver or gold, you know, kind of depending on it. For our users, that data is first-class and super important. There are a lot of derivative data sets from those, but the events themselves actually have a lot of value in our environment.

合并逻辑调试

这就是提取,加载,变换。一旦我们有了这些表,我们就要开始做一些事情。Delta Lake的一些独特之处在于能够做一些事情,比如大规模的资产更新。在upsert中,有一个表,有新数据进入,根据(本质上类似于连接条件),某些列是否匹配或彼此之间是否有关系,您可能希望选择更新现有行、插入新行或删除行。这是非常非常强大的。我们将在一些用例中更多地讨论这个。但事实证明,有时候你的数据并不像你想象的那样,或者你的合并条件,本质上是on子句,不像它们需要的那样具体。新输入数据中的多行实际上会匹配现有表中的一行,这违反了归并的约束,它会抛出一个异常,你会想,嗯,我想知道这是否是我的on条件,或者我想知道我的输入数据中是否有重复的数据?然后你就被困住了。合并逻辑调试。通常情况下,如果你在流中进行归并,你是在for-each批处理写入器中进行的。 Or sorry, a for-each batch method. And a little trick that you can do is, the input data, so you get this microbatch data frame, you can just take that and you can just write it out to some other Delta table and just overwrite the data in it. And if then it throws an exception in your actual merge, you’ll be left with the actual input data that came in and violated the constraints, and you can just go manually run a join with the same on clause conditions, and then validate whether or not you had duplicate rows in your input, or whether or not your on conditions are actually too wide and matching multiple rows that aren’t duplicates. And it makes it super easy to debug when you hit that case. I wouldn’t recommend leading that intermediate write-in as it’ll add latency in production. If you’re not latency-sensitive, it’s not a terrible idea, it’s just after each batch, that table will represent the last batch that was processed. But in development, super good. And if you have a production stream and it breaks, you can just go and wedge that little bit of code in there, run it against this existing checkpoint. It’ll try to do that last batch, it’ll fail again, and you’ll have the data that you need to debug. So, super helpful thing.

有状态合并

我们用合并做的另一件事,这真的结合了Spark结构化流的强大功能,本质上是有状态处理,和合并,我们经常使用它的地方是创建像DHCP会话,VPN会话这样的东西,所以任何会话都很有意义,或者当你有一个有状态的改变或更新到你想做的事情。然后,这两个结合得很好因为你基本上可以将一个映射组运行到一个状态或者将一个平面映射组运行到一个状态。因此,在我们的例子中,当一台机器获得一个新的会话和IP绑定时,使用DHCP。我们想要尽快忽略它,这样任何其他事件数据,我们可以通过IP映射到正确的机器标识。我们想让这个开放非常快,但最终那个会话会关闭,它会有一个结束时间,这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要这对于处理旧数据非常重要。所以,我们要做的是省略一个开放,并通过for - each基本上得到的一批作家,被添加到表中,然后我们省略结束当我们看到在会话结束时,通过合并,我们发现公开会议,更新相应的公开会议,我们这一行要关闭会话结束的时间和其他一些州和属性信息。这样,会话表就很小了。我们要么有一个开盘,要么有一个收盘。我们不会以一个开放的和一个封闭的结束。它保留了处理逻辑,即当我们为了丰富其他数据而进行连接时。 It keeps that logic more simple. And it’s easy to reason about the data that you see in the table, as well. So, map groups with state, or flat map groups with state married up with Delta Lake merge, they’re really peanut butter and jelly.

聚合漏斗

另一个我们所做的,这是一种疯狂的事情,但是我们有几十个表,列一个IP或逗号划定的IP列表或数组的IPs,和对我们来说很常见,试图寻找我们周围的所有数据或遥测给定IP或一组“诱导多能性”或“诱导多能性”在一些滑块范围,这是一个非常昂贵的操作,即使是动态文件跳过和IP列的z-排序。因为其中一些不仅仅是单个IP,而是一组IP,你必须进行包含或reg检查,这很糟糕,你在做全表扫描。所以在过去,当我们拥有一组ip时,我们必须在所有这些数据集中审视我们的长期留存窗口,在一个非常大的集群中,这可能需要48小时。我们决定需要一个索引,但我们有38个表。顺便说一下,这些表非常大,我们每天要在这些表上写几百tb的数据,所以这是很多数据。第一件很简单的事是,从这些表中创建一个流取出IP地址,日期分区和表,将这些字符串联合在一起,对其进行聚合,并尝试将其写入到表中。你可以想象那有多顺利。即使在非常大的集群上,这也行不通。我们已经超出了计算资源的范围。 So we had to start decomposing the problem a bit, and the really interesting thing is that Delta Lake semantics and operations you can do on it, in some ways really complement structured streaming, or can be used instead of certain features on structured streaming. So what we did on this, is we basically hang one or more streams off of each sourced data set, where we’re doing a for-each batch grouped by source IP, dst IP, and the date partition. We also include the source table name and column information in that. So this means, for-each batch means we’re running this aggregation just on each microbatch on the stream but not in totality on the stream itself. And we take that and we append that to the step one table. So, we can have a bunch of source tables, they each have streams coming off of them, and we’ll just have all those streams append to one table within some reasonable bound. And that creates the step one table, and then off the step one table, we’ll do another for-each batch, actually two operations, one where we’re doing a group by source IP and the date, and another one by the dst IP and the date, because ultimately our index is gonna be about one IP to the table’s columns and date partition that it showed up in.

所以我们做了两个group-by,我们把它们结合在一起,然后我们附加到步骤二的表格中,然后从步骤二的表格中,我们基本上做了for-each batch,这里不是做group-by,我们实际上对它做了一个窗口聚合。结果更有性能,还没深入研究。

然后我们得到这个聚合的输出,我们实际上对索引表做了一个upsert。最终的索引表会是什么样子,我们实际上有列的原始值,我们从那个值中提取了一个单独的IP,我们有一个日期分区,然后我们有一个数据集名称的数组以及IP在那个日期分区的原始值中显示的列。然后我们还保留了一行数的总和,这给了我们一个很好的近似值,即那天该IP有多少活动。

在一天中,随着新数据的到来,这个数组可能会增长,总的计数也会增长,但我们不会创建一堆其他的行。这给了我们一个更小,更紧凑的表格,我们可以根据提取的值进行z轴排序,我们可以很快地搜索到ip。

然后,我们得到一些元数据关于它们所在的数据集日期分区和一些行数,我们可以用它做一些有趣的事情,或者我们使用这些信息动态地计算对源数据集的查询,现在我们有了一个日期分区谓词,以及原始值。我们对表中的列值进行相等搜索,我们用了48小时的方法从源表中获得真实数据,只花了一分钟。这是一个巨大的进步。所以我们在前期花费了一些时间来计算这个,但是每次我们去做这种类型的搜索,我们都节省了大量的集群资源,并且我们为用户提供了他们需要的数据更快的响应。

关于合并的表,有趣的是,这里有很多很棒的,我们建立了这些索引表,这些会话表,我们对它们做upserts。问题是,如果你真的想要这些数据你想要得到,比如说,一个缓慢变化的数据提要。你可以看到新的会话,或者看到会话的更新状态,或者看到从它里面出来的新索引值。

问题是,如果你只是尝试

为了将流挂起,一旦有更新或删除,流将抛出异常,因为通常这违反了标准语义。现在你必须在流上抛出ignore change选项,当你只做更新和插入时,本质上发生的是,如果一个底层的Parquet文件因为其中一行需要更新而被读取,那一行被更新,一个新的Parquet文件被写出来并替换表中旧的那个。通过设置忽略更改,Parquet文件的整个内容将沿着流播放。所以你可以想象你有大量的副本。因此,您不仅看到新的插入,也不仅仅看到更新的行,还看到来自相同Parquet文件的所有其他未更改的行。为了得到一个干净的,插入并更新的行提要,我们做了一个小技巧,我们实际上挂起了一个流,这个流使用忽略变化,然后它做另一个upsert,但是使用重复删除模式。重复数据删除模式本质上就是检查是否所有的列都匹配,如果匹配就忽略它,如果不匹配则它是一个你在表中没有看到的新值,即它被插入或者它是一个更新的记录,然后我们将它插入到一个干净的表中。

现在这个干净的表可以成为流的一个很好的源

它会给你一个SED更新类型的提要。这是一个额外的步骤,你增加了一点延迟,但我们现在已经解决了一个有趣的问题,通过理解Delta的语义,并以一种有趣的方式将它与流结合起来,并使用不同的语义将链接合并在一起。

所以,这是一个非常非常有趣和强大的模式,我们也在一些地方使用。

这就是模式。现在我要讲讲我们的一些见解。

高规模表的存储隔离

所以,很明显,我们运行着一些非常大的基础设施。我说过我们每天接收1pb的数据。我们也有一个很长的保存窗口,所以我们实际上有大约2pb的表,我们有大量的半pb的表,我们经常对它们进行操作。

根据我们之前的经验,我们知道如果在路径文件的顶部没有足够的熵,那么s3就会出现性能问题,这样s3桶就可以很好地分片和分配工作负载。当我们开始的时候,我们说,嘿,我们知道怎么做,所以我们可以在一个bucket上得到很高的IO,所以我们把所有的表都放在一个bucket里,你知道,这比较简单。事实证明,这并不容易,在某些情况下,s3有个糟糕的日子,我们可能会被节流,有时节流会影响同一bucket中的其他表。好吧,如果它们碰巧共享同一个碎片或类似的东西。我们发现最佳实践是,对于大多数表,除非它们非常小,有界限,我们只是把每个表和它对应的检查点放在它自己的存储桶里,我们在那个存储桶上启用随机前缀,或者在那个表上。这意味着写入表的数据会很好地分布在一个不错的散列或可分区的传递空间中,然后s3的IO会非常高,我们不会受到限制。它还有其他一些很好的优点,bucket成为表访问的代理,所以我们可以使用标准IAM accols,所以我们可以与非spark消费者共享单独的表。因此,例如,如果你想使用Delta Lake连接器的Hive,或使用Delta Lake连接器的Starburst Presto,你可以通过在桶上使用accols来单独暴露表,甚至可能在其他帐户中运行,这比在前缀上使用accols并试图跟踪整个事情要简单一些。只是多了一点保险而已。如果你想将一个表复制到另一个区域,你可以使用桶复制,它会复制表中的数据元数据和检查点数据。 So, under most conditions, that other table within a reasonably short, or sorry, the other bucket in another region will have a corresponding version of the table and even a checkpoint you can restart from. So, some nice properties there.

我们有了下一个见解,这显示在父类中,我们真正意识到Delta Lake和Spark中的结构化流是真正可组合的。如果你真正理解了语义,你可以通过各种配置选项,使用普通流或流中的for-each批处理。然后关于Delta Lake的语义,我们发现我们能够克服可能存在的规模障碍。例如,如果我们想在一个巨大的流上保持一个运行的聚合,在我们的例子中,通常我们流上的事件时间戳实际上有一个长尾分布,所以试图在水印上设置一个窗口要么是不可能的,要么是可能的,但资源消耗有点粗略,可能不是超级稳定,我们不能很好地保护数据的巨大峰值。但是我们发现我们可以在流上进行for-each batch聚合,它不必保持在我们的批处理状态,然后在Delta Lake上使用merge操作来不断更新那里的聚合输出,这样我们就有了流上的运行总数。所以现在我们能够使用Delta Lake语义来克服我们可能在Spark结构化流上遇到的伸缩性问题。当你使用for-each batch时,你放弃了恰好是一次的语义而得到至少一次,但是如果你在Delta Lake中包含了一个batch ID列并将其合并到其中,你实际上可以在on条件中包含它以确保你不会合并来自你已经看到的批处理的内部结果,现在你回到了恰好是一次。因此,Delta Lake和Structured Streaming功能之间的相互作用提供了一个超级可组合的构建模块。我们发现我们已经解决了很多问题,包括构建一个巨大的流倒索引,而实际上不需要在我们的堆栈中引入任何其他技术,只使用我们在这两个方面的专业知识。

模式命令

需要注意的一点是,Delta Lake文档中已经很好地介绍了这一点,但是模式排序和与统计数据的交互是我们以前遇到过的问题,尽管它已经被记录下来了。所以我又把它叫出来了。本质上,这是Delta中的stats collection,对于前32个字段,我马上会澄清字段和列,在Delta中生成了min和max这样的统计值。它在元数据中以每个Parquet文件为基础保存。当我说字段而不是列时,这是一个糟糕的术语,但Delta Lake试图在类型周围没有异常。例如,在结构体中,它认为结构体的每个成员本质上都是一列。因此,前32个顶级列并不具有统计集合。如果你在早期列索引中有一个深度结构体,那么你可能会吃掉前32个前几列的结构体,然后那个大结构体就留在那里了。所以你要确保你数完,然后再往下数。如果你想知道你是否在这个范围内,你可以修改32使用dataSkippingNumIndexedCols设置,你可以把它设置为- 1如果你想要它在任何东西上或任何你想要的值。 So, dynamic file pruning uses the min max column stats. When you have a predicate on a column value, if it falls outside the min and max for that file then the answer can’t be there and it can choose to not read that file, and reduced IO is increased performance. So z-ordering, also in the Databricks product, essentially maximizes the utility of min max by doing a clustered sort across one or more columns. So this gives you tighter min max ranges without inner file overlaps, and so this is really gonna maximize the number of files that you can skip based off of a predicate like that. So, the key is, make sure if you’re z-ordering or you’re sorting columns, that those columns have stats collection on, i.e. they’re within the first dataSkippingNumIndex or by default, 32 fields. Otherwise, happily go on in z-order and do nothing for you. It might catch that these days, but originally it didn’t. And happily would optimize on a field that didn’t have stats collection. Another little hint there is that it’s expensive to collect stats on long strings. I don’t have an exact number for what constitutes long, but I’d say, you know, when you’re talking about hundreds of bytes to a kilobyte plus, you’re crossing a threshold. If you’re under 100 bytes, you’re definitely fine.

但这样做代价很大,如果你能把长字符串移到模式的末尾,超过dataSkippingNumIndexedCols值,如果你的列宽不是32,如果它更小,你就能调低表的值把那些长字符串列放在它外面,这会让你写得更快。如果你真的需要它们的统计信息,尽管它们对字符串的帮助不大,但在写入时你要付出代价。这取决于你要调优和优化什么。

不要过度划分Delta表(笑)。

所以这是值得讨论一下的。当您向表中添加分区时,您通常会增加存储系统上表下面的对象或文件计数。这是因为如果你没有分区,你可以优化表让它有最少的1g,或者任何你优化对象的东西。因此,您可以最小化对象的数量,并获得较大的对象,从而从底层存储获得较大的吞吐量。一旦你引入分区,你可能会有更多的对象,甚至更小的对象进入这些分区。即使你对它进行了优化,分区也会迫使这些数据和数据混合的方式中断。所以你增加了对象数。当您增加对象计数时,它通常会对全表扫描、大型聚合、连接等等产生性能影响,除非您在谓词中自由而有效地使用分区值。

基本上,如果您的大多数查询都是连接或聚合操作,可以在操作中指定分区值作为谓词,那么您可能选择了一个相当好的分区对象。我认为Michael建议分区中应该至少有1g的数据作为一般情况。这样就可以给你,一个通过。但我们经常看到10g,甚至100g的表它们通常交互的方式,分区实际上没有多大意义,如果不分区,它会更快。需要注意的是,如果你必须有一个保留窗口,或者每次删除特定的数据,如果你可以将它设置为可以根据分区值删除的位置,比如日期或月份,那么这将使删除更清晰,并与表上的其他操作分离,但同样,这通常会付出更高的对象计数的代价,这可能会降低某些操作的性能。onc情况下,过度分区是有意义的,如果你试图创建一个表,它只提供对分区值的查找服务,而且通常是少量的分区值。如果只向表中追加数据,并且只按分区值删除数据。在这种情况下,如果一个分区最终只有非常少量的数据,但你故意只试图查找数据而不是分区,没关系,你不会真正付出吞吐量性能问题。这就是过度分区对高性能点查询有利的狭义情况。但当你需要混合,比如点查询加载和,聚合或连接之类的东西时,最好是适当大小的分区,甚至在分区下。 And then fall back on something like z-ordering or sorting on your primary key column to maximize the dynamic file printing off the min max stats. So one thing that you’ll run into with Delta Lake is sometimes having conflicting transactions, and this really comes about when you’re doing things like deletes and updates on a table.

处理冲突事务

如果您只做附加类型的操作,那么实际上就不会有冲突。我们真正遇到这种情况的地方是在我们做upserts或merge的表格上因为表格一直在变化。然后如果我们要在表上做z顺序优化,优化需要一点时间,它可能会碰到表从下面往外变化。我们的处理方法是当我们有两个相关的操作发生冲突时,你可以让所有的分区都不相交,这意味着它们在不同的分区时间上操作,但如果你有一些操作在整个表上,比如优化z轴顺序,我们所做的就是内联它就像做upsert的方法一样,然后,例如,在for-each batch中,我们只是对批次ID和每个批次的结束数量做一个mod,我们执行优化。这样它们就被序列化了,不会发生冲突,批处理的延迟会稍微长一点,但我们最终会降低延迟因为表是有序的,漂亮而紧凑。

最后一件事是我们有一些非常大的桌子。

大表元数据优化

我提到过,1.5拍字节,2拍字节,里面有数百万个对象。这意味着元数据更大。我们发现了一件事,你可以使用快照分区设置来使用更多分区或任务来读取元数据。因此,如果您有一个较大的集群,可以将其提高到数百,甚至1024,从而减少读取元数据的延迟,直到我们看到一些元数据操作的优化。然后,有时您可能想要向下旋转像您的日志保留。如果你在一个有很多元数据的表中添加大量的数据,你会建立更多的元数据和更多的底层元数据文件和变化,所以你可能想要缩短那些保存的间隔因为它们可能会受到影响,时间旅行和其他类似的东西。所以,你知道,挖掘那些细节,但那可以帮助你保持元数据的大小减小一点。所以它实际上是在读取元数据时添加了更多的并行操作,然后通过设置将元数据保持在合理的大小内。对于大多数表,我们什么都不碰而是在集群范围内设置快照分区,根据集群大小,我们需要多少资源来读取这些大型表元数据。以上就是我的模式和见解。 Thank you, and get in contact with me if you have any questions. I’m also on the Delta Lake Slack often and happy to answer questions or work through issues with you. Thank you very much, and take care.

点击这里观看更多Spark + AI课程

免费试用Databricks
«回来
关于多米尼克·布里津斯基

苹果

多米尼克·布雷辛斯基(Dominique Brezinski)是苹果公司信息安全领导团队的成员,也是威胁响应组织的首席工程师。他在安全工程领域有25年的经验,专注于入侵检测和事件响应系统的设计和开发。Dom自0.8发行版以来一直在生产环境中与Apache Spark合作。