从三角洲湖开始

Delta Lake让Apache Spark变得更好

迈克尔。Databricks的首席软件工程师
Michael Armbrust是Apache Spark的提交者和PMC成员,也是Spark SQL的最初创建者。他目前在Databricks领导设计和构建结构化流和Databricks Delta的团队。他于2013年在加州大学伯克利分校获得博士学位,并由迈克尔·富兰克林、大卫·帕特森和阿曼多·福克斯担任顾问。他的论文专注于构建允许开发人员快速构建可伸缩交互应用程序的系统,并特别定义了规模独立性的概念。他的兴趣广泛包括分布式系统、大规模结构化存储和查询优化。

系列的细节

本次会议是丹尼·李(Denny Lee)和三角洲湖团队“三角洲湖入门”系列的一部分。

会议摘要

加入Delta Lake工程团队负责人Michael Armbrust,了解他的团队如何基于Apache Spark将ACID事务和其他数据可靠性技术从数据仓库世界引入云数据湖。

Apache Spark是大数据的主要处理框架。Delta Lake为Spark增加了可靠性,因此您的分析和机器学习计划可以随时访问高质量、可靠的数据。本次网络研讨会将介绍如何使用Delta Lake增强Spark环境中的数据可靠性。

主题领域包括

  • Apache Spark在大数据处理中的作用
  • 使用数据湖作为数据体系结构的重要组成部分
  • 数据湖可靠性挑战
  • Delta Lake如何为Spark处理提供可靠数据
  • Delta Lake增加的具体改进
  • 采用Delta Lake为数据湖供电的便利性

你需要:
注册社区版在这里并获得研讨会演示材料和样本笔记本。

视频记录

- [Denny]大家好。欢迎参加我们今天的网络研讨会,用Delta Lake让Apache Spark变得更好。

在我们开始今天的演示之前,我们想回顾一下一些内务事项,以确保您有最好的体验。请注意,为了让大家观看时更舒适,您的音频连接将被静音。如果您有任何顾虑或问题,请在问题小组或聊天中提出。在小组讨论中,我们鼓励你们利用这段时间尽可能多地提出问题,并澄清对今天话题的任何疑问。我们今天的主要主讲人Michael Armbrust是Spark SQL和Structured Streaming的最初创建者,也是Delta Lake的主要创建者之一。他是数据库公司的首席工程师,所以不要再拖延了,把迈克尔带走吧。-谢谢你,Denny。今天我非常高兴能在这里谈论如何通过使用Delta Lake使Apache Spark变得更好。然而,在我开始之前,我想先谈谈数据湖的概念,以及为什么这么多人对它感到兴奋,以及为什么当他们试图设置这些东西时,会有很多挑战。

数据的承诺

首先,什么是数据湖,它对我意味着什么?所以数据湖的承诺基本上是这样的,组织有很多数据。它可能是OLTP系统中精心策划的客户数据。它可能是来自web服务器的原始点击流,也可能是来自一堆传感器的非结构化数据。而数据湖的承诺就是你可以把所有的数据都倒进数据湖里。当你将它与传统数据库相比时,这实际上是非常强大的,因为在传统数据库中,你必须从想出一个模式开始,并做大量的清理工作。这通常被称为写时模式,数据湖允许你做的是它允许你放弃这个过程,只是开始收集所有东西,因为有时你直到很久以后才知道数据为什么有价值,如果你还没有存储它,那么你已经失去了它。数据湖就是文件系统中的一堆文件。它可以是S3或HDFS或Azure Blob存储,你可以把所有东西都转储到那里,然后再回来查看。我们的想法是,当你完成了,一旦你收集了所有的信息,你就可以从中得到真正的见解。 You can do data science and machine learning. You can build powerful tools for your business, like recommendation engines or fraud detection algorithms. You can even do crazy things like cure cancer using genomics and DNA sequencing. However, I’ve seen this story many, many times, and typically what happens is unfortunately the data at the beginning is garbage. And so the data that you store in your data lake is garbage, and as a result, you get garbage out from these kind of more advanced processes that you try to do at the end. And why does that happen? Why is it so difficult to get quality and reliability out of these data lakes? And what does this kinda typical project look like? So I wanna walk you through a story that I’ve seen kind of happen over and over again at many organizations, when they sit down and try to extract insights from their data.

前沿数据湖的演变

它通常是这样的,这在今天被认为是最先进的,但在三角洲湖之前。一个很常见的模式是你有一连串的事件。它们进入到像Apache Kafka这样的系统中,你的任务是做两件事。你需要做流分析,这样你就可以实时了解你的业务中发生了什么。你也想做人工智能和报告,你可以看更长的一段时间,做纵向分析,实际上是看历史和趋势,对未来做出预测。那我们要怎么做?所以第一步,我坐在我的电脑前,我知道Spark有很好的api可以从Apache Kafka读取数据。你可以使用数据帧,数据集,SQL和Spark SQL来处理,做聚合,时间窗口和各种各样的事情,然后得出你的流分析。所以,我们从那开始,很快,它工作得很好,但这给我们带来了第一个挑战,那就是历史查询。

挑战#1:历史查询?

Kafka非常适合进行实时分析,但它只能存储一天或一周的数据。你不希望在Kafka中存储年复一年的数据。所以我们也要解决这个问题。实时技术对于当前正在发生的事情很有帮助,但是对于寻找历史趋势就不太好了。所以我读了很多博客文章,这里发生的一个非常常见的模式是有一种叫做Lambda架构的东西,据我所知,基本上你只需要做两次所有的事情。你有一个实时的东西,它在做一个近似,给你在这个时刻确切地发生了什么,你有另一个管道,它可能更有策划。它运行得稍微慢一点,但它会将所有数据存档到你的数据湖中。这是第一步。如果我们想解决这个历史查询问题,我们还要在Apache Spark之上建立Lambda架构,一旦我在数据湖中有了所有的数据,我的想法是现在我也可以在上面运行Spark SQL查询,现在你可以做人工智能和报告。这是一些额外的工作,一些额外的协调,但幸运的是Spark有统一的API用于批处理和流处理。 And so it’s possible to do it and we get it set up, but that brings us to challenge number two.

挑战#2:混乱的数据?

就像我之前说的,现实世界中的数据通常是混乱的。上游的一些团队在没有通知您的情况下更改了模式,现在您遇到了问题。我在这里看到的一个模式是你需要添加验证。所以你需要编写额外的Spark SQL程序来检查你对数据的假设是否正确。如果他们去了,如果他们错了,它会发送一封电子邮件让你改正。现在,当然,因为我们已经做了Lambda架构,我们必须在两个不同的地方做验证,但那是,那是我们可以做的事情。我们可以用火花来做。现在我们设置验证来处理混乱的数据。不幸的是,我们要面对第三个挑战,那就是错误和失败。这些验证很好,但有时你忘记了,或者你的代码中有错误,或者更困难的是你的代码在中间崩溃,因为你在EC2上运行,你的Spot实例死了,或者其他什么,现在你必须担心,“我如何清理它?” The real problem with using these kinds of distributed systems and kinda distributed file systems is if a job crashes in the middle, it leaves garbage results out there that need to be cleaned up. And so you’re kind of forced to do all of the reasoning about correctness yourself. The system isn’t giving you a lot of help here. And so a pretty common pattern is people, rather than working on an entire table at a time, because if something goes wrong, we need to recompute the entire table. They’ll instead break it up into partitions. So you have a different folder. Each folder stores a day or an hour or a week, whatever kind of granularity makes sense for your use case. And you can build a lot of, kinda scripting around it, so that it’s easy for me to do recomputation. So if one of those partitions gets corrupted for any reason, whether it was a mistake in my code or just a job failure, I can just delete that entire directory and reprocess that data for that one partition from scratch. And so kind of by building this partitioning and reprocessing engine, now I can handle these mistakes and failures. There was a little bit of extra code to write, but now I can kind of sleep safe and sound knowing that this is gonna work.

挑战4:更新?

这就带来了第四个挑战,更新。做点更新是非常困难的。添加数据非常容易,但在这个数据湖中更改数据以及如何正确地进行更改非常困难。出于GDPR的原因,你可能需要这样做。你可能需要留住玩家。你可能需要做一些匿名化或者其他的事情,或者你可能只是在数据中有一些错误。因此,现在你必须编写一个完整的其他类的Spark作业来执行更新和合并。这是非常困难的,通常因为它是如此的困难,我看到人们做的不是单独的更新,这将是非常便宜的,他们实际上只要他们需要做一些事情,每当他们每月得到一组dsr,他们就会复制整个表,删除任何由于GDPR而被要求遗忘的人。他们可以这样做,但这是另一个要运行的Spark作业。这非常昂贵。 And there’s kind of a subtlety here that makes it extra difficult, which is if you modify a table while somebody is reading it, generating a report, they’re gonna see inconsistent results and that report will be wrong. So you’ll need to be very careful to schedule this to avoid any conflicts, when you’re performing those modifications. But these are all problems that people can solve. You do this at night and you run your reports during the day or something. And so now we’ve got a mechanism for doing updates. However, the problem here is this has become really complicated. And what that means is you’re wasting a lot of time and money solving systems problems rather than doing what you really want to be doing, which is extracting value from your data. And the way I look at this is these are all distractions of the data lake that prevents you from actually accomplishing your job at hand.

数据湖干扰

总结一下我的想法,这里最重要的一点是没有原子性。当你运行一个分布式计算时,如果作业在中间失败了,你仍然有一些部分的结果。不是全有或全无。因此原子性意味着当一个作业运行时,它要么完全正确地完成,要么如果出现任何错误,它完全回滚,什么都不会发生。这样你就不用再让数据处于损坏状态需要你费力地构建这些工具来进行手动恢复。另一个关键问题是没有质量执行。在每一项工作中,你都要根据你的假设手动检查输入的数据的质量。系统没有提供帮助,就像传统数据库中的变体一样,您可以说,“不,这一列是必需的”或“这必须是这种类型的模式”。所有这些东西都是留给程序员来处理的。最后,没有对一致性或隔离性的控制。 And this means you can really only do one right operation to any data lake table at a time, and it makes it very difficult to mix streaming and batch to do operations while people are reading from it. And these are all things that you kind of, you would expect from your data storage system. You would want to be able to do these things, and people should always be able to see a consistent snapshot automatically.

现在让我们退一步看看Delta Lake的过程是怎样的。

数据湖的挑战

Delta Lake的想法是,我们采用了这个相对复杂的架构,其中很多正确性和其他东西都是由你手动编写Spark程序来完成的。我们把它改变成这样,你只考虑数据流,你从你的组织中引入所有的数据,让它流动,不断提高质量,直到它准备好供消费。

的一个

这种架构的特点是,首先,Delta Lake为Apache Spark带来了完整的ACID事务。这意味着现在运行的每个Spark作业要么完成整个作业,要么什么都不完成。同时读写的人可以保证看到一致的快照。当一些东西被写出来的时候,它肯定是被写出来的,它不会丢失。这些都是ACID的特征。这样你就可以专注于你实际的数据流而不是考虑所有这些额外的系统问题并一遍又一遍地解决这些已知的问题。Delta Lake的另一个关键方面是它基于开放标准,而且是开源的。bob下载地址所以这是一个完整的Apache许可证,没有什么愚蠢的公共条款之类的。你可以完全免费地将它用于任何你想要的应用程序。就我个人而言,这对我来说非常重要如果我要存储pb级的数据,对吧? Data has a lot of gravity. There’s a lot of inertia when you collect a lot of data and I wouldn’t want to put it in some black box where it’s very difficult for me to extract it. And this means that you can store that mass amount of data without worrying about lock-in. So both is it open source, but it’s also based on open standards. So I’ll talk about this in more detail later in the talk, but underneath the covers, Delta is actually storing your data in parquet. So you can read it with other engines and there’s kind of a growing community around Delta Lake building this native support in there. But worst case scenario, if you decide you want to leave from Delta Lake all you need to do is delete the transaction log and it just becomes a normal parquet table. And then finally, Delta Lake is deeply powered by Apache Spark. And so what this means is if you’ve got existing Spark jobs, whether they’re streaming or batch, you can easily convert those to getting all kinds of benefits of Delta without having to rewrite those programs from scratch. And I’m gonna talk exactly about what that looks like later in the talk. But now I want to take this picture and simplify it a little to talk about some of the other hallmarks I see of the Delta Lake architecture, and where I’ve seen people be very successful. So first of all, I wanna kind of zone in on this idea of data quality levels. These are not fundamental things of Delta Lake. I think these are things that people use a variety of systems, but I’ve seen people very successful with this pattern, alongside the features of Delta.

A三角洲湖

这些只是数据质量的一般类别,这里的想法是,当你把数据带入数据湖,而不是试图让它一次完美,你会逐步提高数据的质量,直到它可以消费。我会讲一下为什么我认为这实际上是一个非常强大的模式,可以帮助你更有效率。所以从最开始就是你的青铜等级数据。这是原始数据的倾倒场。它仍然在燃烧,我实际上认为这是一件好事,因为这里的核心思想是,如果您捕获了所有东西,而不进行大量的解析或解析,那么在解析和解析代码中就不可能有错误。你从一开始就保留了所有内容你通常可以保留一年的用户留存。我会讲一下为什么我认为这真的很重要,但这意味着你可以收集所有东西。你不需要提前花很多时间来决定哪些数据有价值,哪些数据没有价值。你可以在做分析的过程中弄清楚。从青铜开始,我们来看银的水平数据。 This is data that is not yet ready for consumption. It’s not a report that you’re gonna give to your CEO, but I’ve already done some cleanup. I filtered out one particular event type. I’ve parsed some JSON and given it a better schema or maybe I’ve joined and augmented different data sets. I kinda got all the information I want in one place. And you might ask, if this data isn’t ready for consumption, why am I creating a table, taking the time to materialize it? And there’s actually a couple of different reasons for that. One is oftentimes these intermediate results are useful to multiple people in your organizations. And so by creating these silver level tables where you’ve taken your domain knowledge and cleaned the data up, you’re allowing them to benefit from that kind of automatically without having to do that work themselves. But a more interesting and kind of more subtle point here is it also can really help with debugging. When there’s a bug in my final report, being able to query those intermediate results is very powerful ’cause I can actually see what data produced those bad results and see where in the pipeline it made sense. And this is a good reason to have multiple hops in your pipeline. And then finally, we move on to kind of the gold class of data. This is clean data. It’s ready for consumption at business-level aggregates, and actually talk about kind of how things are running and how things are working, and this is almost ready for a report. And here you start using a variety of different engines. So like I said, Delta Lake already works very well with Spark, and there’s also a lot of interest in adding support for Presto and others, and so you can do your kind of streaming analytics and AI and reporting on it as well.

现在我想谈谈人们是如何通过三角洲湖,通过这些不同的质量等级来移动数据的。我反复看到的一个模式是流媒体实际上是一个非常强大的概念。在我深入研究流媒体之前,我想纠正一些我经常听到的误解。当人们听到流媒体时,他们通常会想到一件事,他们认为它必须非常快。它必须非常复杂,因为你想要它非常快。Spark实际上是支持这种模式的如果你有这个应用的话。有持续的处理,你不断地拉服务器获取新数据,有点像保持核心,它支持毫秒延迟,但这实际上不是唯一的应用程序,流可以有意义。流媒体对我来说实际上是增量计算。它是关于一个我想在新数据到达时持续运行的查询。因此,与其把它看作是一堆离散的工作,并把这些离散工作的所有管理都放在我或某个工作流引擎上,流媒体可以消除这种情况。 You write a query once. You say, “I want to read from the bronze table, I’m gonna do these operations, I went right to the silver table,” and you just run it continuously. And you don’t have to think about the kind of complicated bits of what data is new, what data has already been processed. How do I process that data and commit it downstream transactionally? How do I checkpoint my state, so that if the job crashes and restarts, I don’t lose my place in the stream? Structured streaming takes care of all of these concerns for you. And so, rather than being more complicated, I think it can actually simplify your data architecture. And streaming in Apache Spark actually has this really nice kind of cost-latency tradeoff that you can too. So at the far end, you could use continuous processing mode. You can kind of hold onto those cores for streaming persistently, and you can get millisecond latency. In the middle zone, you can use micro-batch. And the nice thing about micro-batch is now you can have many streams on the cluster and they’re time-multiplexing those cores. So you run a really quick job and then you give up that core and then someone else comes in and runs it. And with this, you can get seconds to minutes latency. This is kind of a sweet spot for many people, ’cause it’s very hard to tell if one of your reports is up to date within the last minute, but you do care if it’s up to date within the last hour. And then finally, there’s also this thing called trigger once mode in Structured Streaming. So if you have a job where data only arrives once a day or once a week or once a month, it doesn’t make any sense to have that cluster up and running all the time, especially if you’re running in the cloud where you can give it up and stop paying for it. And Structured Streaming actually has a feature for this use case as well. And it’s called trigger once where basically rather than run the job continuously, anytime new data arrives, you boot it up. You say trigger once. It reads any new data that has arrived, processes it, commits a downstream transaction and shuts down. And so this can give you the benefits of streaming, kind of the ease of coordination, without any of the costs that are traditionally associated with an always running cluster. Now, of course, streams are not the only way to move data through a Delta Lake. Batch jobs are very important as well. Like I mentioned before, you may have GDPR or kind of these corrections that you need to make. You may have changed data capture coming from some other system where you’ve got a set of updates coming from your operational store, and you just want to reflect that within your Delta Lake and for this, we have UPSERTS. And of course, we also support just standard insert, delete, and those kinds of commands as well. And so the really nice thing about Delta Lake is it supports both of these paradigms, and you can use the right tool for the right job. And so, you can kind of seamlessly mix streaming and batch without worrying about correctness or coordination.

我想讲的最后一种模式是重新计算的概念。所以当你有了这个早期的表格保存了你所有的原始结果当你有了很长时间的保留,也就是几年的原始数据。当你在Delta Lake数据图的不同节点之间使用流时,你很容易进行重新计算。你可能想要重新计算因为你的代码中有一个错误,或者你可能想要重新计算因为你决定要提取一些新的东西。这里真正好的事情是由于流媒体的工作方式是非常简单的。为了让你们了解结构化流在Apache Spark中是如何工作的,我们基本上有这样一个模型,流查询应该总是返回与批查询相同数量数据的相同结果。这意味着当你对Delta表启动一个新流时,它会在流开始的那一刻对那个表进行快照。你做这个回填操作你处理快照中的所有数据,把它分解成漂亮的小块,沿途检查你的状态,向下游提交它。当您到达快照的末尾时,我们切换到跟踪事务日志,只处理自查询开始以来到达的新数据。这意味着您得到的结果与您在最后运行查询相同,但是比从头开始一遍又一遍地运行查询工作量要少得多。 So if you want to do recomputation under this model, all you need to do is clear out the downstream table, create a new checkpoint, and start it over. And it will automatically process from the beginning of time and catch up to where we are today.

这实际上是一个非常强大的模式用于纠正错误和做其他事情。现在我们已经讲过了高层次的内容,我想谈谈Delta Lake在降低成本和简化在这些数据湖上使用Apache Spark的管理方面发挥了重要作用的一些具体用例。我想讲一下德尔塔湖的历史。

被全世界上千个组织使用

三角洲湖已经有两年的历史了。我们在数据库里存了两年了。这是一个专有的解决方案,我们的一些大客户正在使用它。所以我要特别讲讲康卡斯特,还有拳头游戏,Jam City,还有英伟达,这些都是你们知道的大公司。他们已经用了很多年了。大约两个月前,在Spark峰会上,我们决定开源它,这样每个人,甚至是使用prem或在其他地方运行的人都可以使用Deltabob下载地址 Lake的电力。我想讲一个我认为很酷的特殊用例。这是康卡斯特。所以他们的问题是他们在世界各地都有机顶盒,为了了解人们如何与他们的程序交互,他们需要对这些信息进行会话。所以你看这个电视节目,你换台,你到这里,你回到另一个电视节目。 And with this they can create better content by understanding how people consume it. And as you can imagine, Comcast has many subscribers, so there’s petabytes of data. And before Delta Lake, they were running this on top of Apache Spark. And the problem was the Spark job to do this sessionization was so big that the Spark job, the Spark scheduler would just tip over. And so, rather than run one job, what they actually had to do was they had to take this one job, partition it by user ID. So they kind of take the user ID, they hash it, they mod it by, I think, by 10. So they break it into kind of 10 different jobs, and then they run each of those jobs independently. And that means that there’s 10x, the overhead, in terms of coordination. You need to make sure those are all running. You need to pay for all of those instances. You need to handle failures and 10 times as many jobs, and that’s pretty complicated. And the really cool story about switching this to Delta was they were able to switch a bunch of these kinds of manual processes to streaming. And they were able to dramatically reduce their costs by bringing this down into one job, running on 1/10 of the hardware. So they’re now computing the same thing, but with 10x less overhead and 10x less cost. And so that’s a pretty kind of powerful thing here that what Delta’s scalable metadata can really bring to Apache Spark. And I’m gonna talk later in the talk exactly how that all works.

但在开始之前,我想说,我想向您展示,如果您已经在Delta Lake中使用Apache Spark,那么入门是多么容易。

使用Spark api开始了解Delta

所以开始是微不足道的。所以它发布在Spark Packages上。在Spark集群上安装Delta Lake所需要做的就是使用Spark包。如果你在用PySpark,你可以做破折号包,然后是Delta。如果你用的是火花弹,也是一样的。如果您正在构建一种Java或Scala jar,并且希望依赖于Delta,那么您所需要做的就是添加一个Maven依赖项,然后更改代码也同样简单。如果你在spark SQL中使用数据帧读取器和写入器,你所需要做的就是将数据源从parquet或JSON或CSV或任何你现在使用的东西更改为Delta,其他一切都应该是一样的。唯一不同的是,现在所有的东西都是可扩展和事务性的,正如我们之前看到的,这是非常强大的。

数据质量

到目前为止,我讲的大部分都是关于正确性的系统问题。如果我的工作崩溃了,我不希望它破坏这个表。如果两个人同时向表写入数据,我希望他们都能看到一致的快照,但数据质量实际上不止于此。您可以编写正确运行的代码,但代码中可能存在错误并得到错误的答案。这就是为什么我们要扩展数据质量的概念,让你可以声明性地谈论质量约束。这是下个季度左右的工作,但这里的想法是,我们允许你,在一个地方,指定你的三角洲湖的布局和限制。首先我们可以看到一些重要的东西,比如数据存储在哪里。您可以选择启用严格的模式检查。Delta Lake在这里有两种不同的模式,我经常看到人们在进行数据质量测试时同时使用这两种模式。在之前的表格中,你会使用模式印记,你可能只是读取了一堆JSON,然后把它原样放入Delta Lake中。 We have nice tools here where we will automatically perform safe schema migrations. So if you’re writing data into Delta Lake, you can flip on the merge schema flag, and it will just automatically add new columns that appear in the data to the table, so that you can just capture everything without spending a bunch of time writing DDL. We, of course, also support kinda standard strict schema checking where you say, create table with the schema, reject any data that doesn’t match that schema, and you can use alter table to change the schema of a table. And often I see this use kind of down the road in kind of the gold level tables where you really want strict enforcement of what’s going in there. And then finally, you can register tables in the Hive Metastore. That support is coming soon, and also put human readable descriptions, so people coming to this table can see things, like this data comes from this source and it’s parsed in this way, and it’s owned by this team. These kind of extra human information that you can use to understand what data will get you the answers you want. And then finally, the feature that I’m most excited about is this notion of expectations. An expectation allows you to take your notion of data quality and actually encode it into the system. So you can say things like, for example, here, I said, I expect that this table is going to have a valid timestamp. And I can say what it means to be a valid timestamp for me and from my organization. So, I expected that the timestamp is there and I expect that it happened after 2012 because my organization started in 2012, and so if you see data from, say, 1970 due to a date parsing error, we know that’s incorrect and we want to reject it. So this is very similar to those of you who are familiar with a traditional database. This sounds a lot like a variant where you could say not null or other things on a table, but there’s kind of a subtle difference here. I think if you, so the idea of invariants are, you can say things about tables, and if one of those invariants is violated, the transaction will be aborted, will automatically fail. And I think the problem with big data, why invariants alone are not enough is if you stop processing every single time you see something unexpected, especially in those earlier bronze tables, you’re never going to process anything. And that can really hurt your agility. And so the cool thing about expectations is we actually have a notion of tuneable severity. So we do support this kind of fail stop, which you might want to use on a table that your finance department is consuming ’cause you don’t want them to ever see anything that is incorrect. But we also have these kinds of weaker things where you can just monitor how many records are valid and how many are failing to parse and alert at some threshold. Or even more powerful, we have this notion of data quarantining where you can say any record that doesn’t meet my expectations, don’t fail the pipeline, but also don’t let it go through. Just quarantine it over here in another table, so I can come and look at it later and decide what I need to do to kind of remediate that situation. So this allows you to continue processing, but without kind of corrupting downstream results with this invalid record. So like I said, this is a feature that we’re actively working on now. Stay tuned to GitHub for more work on it. But I think this kind of fundamentally changes the way that you think about data quality with Apache Spark and with your data lake.

现在我已经讲过了,什么是,为什么要关心它?我想深入了解Delta是如何工作的。因为这听起来太好了,我们可以把这些完整的ACID事务带到像Apache Spark这样的分布式系统中,并且仍然保持良好的性能。

磁盘增量

首先,让我们看一下当一个Delta表被存储在磁盘上时是什么样子的。对于那些已经有数据湖的人来说,这看起来很熟悉。它只是存储在文件系统中的一个目录,S3, HDFS, Azure Blob存储,ADLS。它只是一个目录,里面有一堆拼花文件。还有一点非常重要,那就是我们还要存储这个事务日志。在事务日志内部,有不同的表版本。我一会儿会讲到这些表的版本,但我们仍然把数据存储在分区目录中。然而,这实际上主要用于调试。它们也是Delta的一种模式,我们可以以最优的方式直接使用存储系统。例如,在S3上,他们建议如果你要定期写大量数据,而不是创建日期分区,那会产生时间局部性的热点,而不是随机散列分区,因为Delta元数据的强大,我们也可以这样做。 And then finally, standard data files, which are just normal and coded parquet that can be read by any system out there.

Table =一系列操作的结果

那么这些表格版本里到底有什么呢?我们如何推断表的当前状态是什么?每个版本的表都有一组操作应用于表并以某种方式改变它。在这个时刻,表的当前状态,是所有这些动作的和的结果。我说的是什么样的行为呢?举个例子,我们可以改变元数据。我们可以说,这是表的名字。这是表的模式。您可以向表中添加列或其他内容。您可以设置表的分区。 So one action you can take is change the metadata. The other actions are add a file and remove a file. So we write out a parquet file, and then to actually make it visible in the table, it needs to also be added to the transaction log. And I’ll talk about why that kind of extra level of indirection is a really powerful trick in a moment. And another kind of detail here is when we add files into Delta, we can keep a lot of optional statistics about them. So in some versions we can actually keep the min and max value for every column, which we can use to do data skipping or quickly compute aggregate values over the table. And then finally you can also remove data from the table by removing the file. And again, this is kind of a lazy operation. This level of indirection is really powerful. When we remove a file from the table, we don’t necessarily delete that data immediately, allowing us to do other cool things like time travel. And so the result here of taking all these things is you end up with the current metadata, a list of files, and then also some details, like a list of transactions that have committed, the protocol version for that.

实现原子性

那么这是如何让我们得到ACID的呢?来获得事务性数据库的这些优良属性?这里有一个细节,当我们创建这些表版本时,我们将它们存储为有序的原子单元,称为提交。我之前讲过这个。我们通过创建这个文件0.json创建表的0版本。这里的思想是,当Delta在文件系统上构造那个文件时,我们将使用底层原子原语。因此,在S3上,为了保证原子性,您所需要做的就是上传到系统。他们这样做的方式是你在上传开始时说,我希望上传这么多字节。除非你真的成功上传了那么多字节,否则S3不会接受写操作。所以你可以保证要么得到整个文件,要么一个文件都得不到。 On other systems like Azure or HDFS, what we’ll do is we’ll create a temporary file with the whole contents and then we’ll do an atomic rename, so that the entire file is created or not. So then you can kind of have successive versions. So version one, we added these two files or sorry, in version zero, we added these two files. In version one, we removed them and put in a third. So for example, you could be doing compaction here where you atomically take those two files and compact them into one larger file.

确保Serializablity

现在,另一个重要的细节是我们想要每个提交都具有原子性,但我们也想要可序列化性。我们希望每个人都同意这个表的变化顺序,这样我们就可以正确地做一些事情,比如合并到变化数据捕获和其他需要这个属性的事情。为了在有多个作者的情况下达成一致,我们需要这个叫做互斥的性质。如果两个人尝试创建同一个版本的Delta表,那么只有一个人能够成功。为了让这个更清楚一点,用户1可以写表的0版本,用户2可以写表的1版本,但是如果他们都尝试写表的2版本,那么其中一个可以成功。但另一个必须得到一个错误消息,说,对不起,您的交易没有通过。

乐观地解决冲突

现在你可能会说,等一下,如果两个人同时做一件事,它就失败了。听起来我浪费了很多时间和很多工作。这听起来对我来说太复杂了。幸运的是,在这里我们使用了第三种很酷的技巧,称为乐观并发。乐观并发的思想是当你在表上执行一个操作时,你只是乐观地假设它将会工作。如果你有冲突,你就会看看这个冲突对你来说是否重要。如果没有,你可以乐观地再试一次。在大多数情况下,实际上这些事务并没有重叠你可以自动进行补救。这里给你一个具体的例子,假设我们有两个用户这两个用户都流进了同一个表。因此,当它们都开始流写时,它们从读取当时的表版本开始。 They both read in version zero. They read in the schema of the table. So they make sure that the data that they’re appending has the correct format. And then they write some data files out for the contents of the stream that are gonna be recorded in this batch. And they record what was read and what was written from the table. Now they both try to commit, and in this case, user one wins the race and user two loses. But what user two will do is they’ll check to see if anything has changed. And because the only thing they read about the schema, of the table with the schema and the schema has not changed, they’re allowed to automatically try again. And this is all kind of hidden from you as the developer. This all happens automatically under the covers. So they’ll both try to commit, and they’ll both succeed.

处理海量元数据

现在,我们的最后一个技巧是表可以有大量的元数据。那些尝试过在Hive Metastore中放置数百万个分区的人可能对这个问题很熟悉。实际上,一旦你有了这些数据大小,元数据本身就会让系统崩溃。所以我们有一个技巧,实际上,我们已经有了一个分布式处理系统能够处理大量的数据。我们只使用Spark。因此,我们获取事务日志及其操作集。我们和斯帕克一起读的。我们可以把它编码为parquet中的检查点。检查点基本上是某个版本的表的整个状态。因此,在读取事务日志时,不必读取整个事务日志,只需从检查点开始,然后从检查点之后发生的任何后续更改。 And then this itself can be processed with Spark. So when you come to a massive table that has millions of files, and you ask the question like, “How many records were added yesterday?” What we’ll do is we’ll run two different Spark jobs. The first one queries the metadata and says, “Which files are relevant to yesterday?” And it’ll get back that list of files, and then you’ll run another Spark job that actually processes them and does the count. And by doing this in two phases, we can drastically reduce the amount of data that needs to be processed. We’ll only look at the files that are relevant to the query, and we’ll use Spark to do that filtering.

路线图

在我们结束进入提问环节之前,我想谈谈路线图。就像我之前说的,虽然这个项目已经存在了几年,但它只是最近才开源的。bob下载地址因此,今年剩下的时间里,我们有一个非常令人兴奋的路线图。基本上,我们的目标是让开源的Delta Lake项目与Datbob下载地址abricks内部可用的API完全兼容,所以我们在本季度剩余时间的路线图基本上是开源我们拥有的许多很酷的功能。实际上,我们在几周前发布了0.2.0版本,增加了从S3读取数据的支持,也支持从Azure Blob Store和Azure Data Lake读取数据。然后这个月,我们计划发布一个0.3.0版本。它将为UPDATE、DELETE、MERGE和VACUUM添加Scala api,随后将添加Python api。在这个季度剩下的时间里,我们计划做一些事情。我们希望添加完全的DDL支持,也就是创建表和修改表。我们还想让你能够在Hive Metastore中存储Delta表,我认为这对于不同组织中的数据发现是非常重要的。 And we want to take those DML commands from before, UPDATE, DELETE, and MERGE, and actually hook them into the Spark SQL parser, so you can use standard SQL to do those operations as well. And then moving forward kind of, let us know what you want. So if you’re interested in doing more, I recommend you to check out our website at delta.io, and it has kind of a high level overview of the project. There’s a quick start guide on how you can get started, and it also has links to GitHub where you can watch the progress and see what our roadmap is, and submit your own issues on where you think the project should be going. So I definitely encourage you to do that, but with that, I think we’ll move over to questions. So let me just pull those up and see what we got.

好的。第一个问题是,之后材料和录音是否可用?关于这一点,我希望丹尼能告诉我们。丹尼,你在吗?- [Denny]没问题。是的,作为一个简短的号召,对于所有报名参加本次网络研讨会的人,我们会把幻灯片和录音都发出去。整个过程大约需要12到24小时。所以你应该会在今天晚些时候或明天早些时候收到这封邮件。

-太棒了,非常感谢。是的,所有这些都应该在这里,你们可以稍后再看。YouTube上也有视频。所以请继续关注三角洲湖的更多内容。接下来是其他问题。第一个问题是,Delta Lake是否增加了性能开销?这是一个非常有趣的问题。我想把它分解一下。首先,三角洲湖被设计成一个高通量系统。所以每一个单独的操作,都会有一些开销。 So you’d basically because rather than just write out the files, we need to write out the files and also write out the transaction log. So that adds a couple of seconds to your Spark job. Now, the important thing here is we designed Delta to be massively parallel and very high throughput. So you get a couple of seconds added to your Spark job, but that is mostly independent of the size of your Spark job. So what Delta Lake is really, really good at is ingesting trillions of records of data or petabytes of data or gigabytes of data. What Delta is not good at is inserting individual records. If you run one Spark job, one record per Spark job, there’ll be a lot of overhead. So kind of the trick here is you want to use Delta in the places where Spark makes the most sense, which are relatively large jobs spread out across lots of machines. And in those cases, the overhead is negligible.

下一个问题是,既然它具有ACID属性,那么我的系统是否也具有高可用性呢?这是一个非常好的问题,我想稍微解释一下。Delta是专门设计来利用云计算利用这些很好的特性。对我来说,云有几个很好的特性。一是云非常稳定。您可以将大量数据放入S3中,它可以任意地处理这些数据。它通常是高度可用的。你总是可以从S3读取数据,无论你在哪里。如果你真的很在乎,甚至还有像复制这样的东西,你可以把数据复制到多个区域,Delta在这方面做得很好。所以从Delta表中读取应该是高度可用的,因为它实际上只是底层存储系统的可用性。 Now, those of you who are familiar with the CAP theorem might be saying, “But wait a second.” So for writes, when we think about consistency, availability, and partition tolerance, Delta chooses consistency. So we will, if you cannot talk to kind of the central coordinator, depending on whether you’re on S3, that might be kind of your own service that you’re running on Azure. They’ve taken kind of the consistency approach (indistinct) we use an atomic operation there. The system will pause. But the nice thing here is because of that kind of optimistic concurrency mechanism, that doesn’t necessarily mean you lose that whole job that you might’ve been running for hours. It just means you’ll have to wait until you’re able to talk to that service. So I would say in terms of reads, very highly available, in terms of writes, we choose consistency, but in general, that actually still works out pretty well.

下一件事是你要保存所有级别的数据。好吧,我想澄清一下铜,银,金背后的概念。不是每个人都保留原始数据。不是每个人都保存所有的数据。你可能有保留要求,说你只允许保留两年的数据。所以,我认为应该由你来决定哪些数据是有意义的。我想说的唯一一件事是,我认为数据湖的好处,以及Delta对它们的应用方式,是你有权保留原始数据,尽可能多地保留。因此,没有技术限制允许你保存所有的数据,因此,我工作过的许多组织实际上保留了法律允许他们保存很长时间的所有数据。只有在他们不得不处理的时候才会移除。

下一个问题是你用什么来写这个逻辑?我们能用Scala写逻辑吗?Delta Lake插入到所有现有api, Apache Spark,这意味着你可以使用其中任何一个。所以如果你是一个Scala程序员,你可以使用Scala。如果你是一个Java程序员,这也可以。我们在Python中也有绑定,如果你是分析师,你根本不想编程,我们也支持纯SQL。所以我们的想法是底层引擎是用Scala写的,Delta也是用Scala写的,但是你的逻辑可以用任何你喜欢的语言写。这是另一种情况,我认为你需要正确的工具来做正确的工作。所以就我个人而言,我在Scala中做了很多事情,但当我需要制作图形时,我切换到Python并使用该平台。bob体育客户端下载但Delta仍然给了我过滤大量数据的能力,将其缩小到适合Pandas的东西,然后我用它做一些图形。

那么下一个问题是,普雷斯托是三角洲湖的一部分还是只有斯帕克?这是一个很好的问题。这实际上是现在发展得非常快的东西。这个问题有几个不同的答案。我来告诉你们俩我们现在在哪,要去哪。现在,在Databricks中有一个特性,我们正在开源中工作,它允许你有Delta的写入器,写出这些被称为清单文件的东西,允许你以一致的方式从Presto或Athena或任何其他基于Presto的系统中查询Delta表。然而,我们正在与Presto背后的公司之一Starburst深入合作,为Presto构建一个本地连接器。我们也从Hive社区和Scalding社区得到了积极的兴趣,所以有很多人对构建连接器感兴趣。所以今天,Delta的核心是在Spark中构建的,但我认为开源和开放标准的真正强大之处在于,任何人都可以与它集成。bob下载地址同时,这个项目我们致力于发展生态系统并与任何人合作。 So if you’re a committer on one of those projects, please join our mailing list, join our Slack channel, check it out, and let us know how we can help you build these additional connectors.

下一个问题,我们可以在社区版的Databricks上试验Delta Lake吗?是的,你可以。Delta Lake有社区版,看看吧。所有东西都应该在那里。让我们知道你的想法。

下一个问题是,可以用Hive查询Delta表吗?是的,基本上和Presto的答案一样。社区对建立这种支持很感兴趣。现在还没有,但这绝对是我们想要开发的东西。下一个问题,Delta Lake如何处理从原始到黄金的缓慢变化?

这是个好问题,在www.neidfyre.com网站上有一篇博客文章。如果你慢慢改变维度,Delta,它会告诉你所有的细节,但我认为真正正确的答案是合并运算符加上Spark的功能,它实际上很容易构建所有不同类型的慢慢改变维度。Delta在Spark之上添加的神奇之处就是这些事务。在没有事务的情况下修改表是非常危险的,而Delta使这成为可能,因此在某种程度上支持这种用例。

下一个是,我们通常处理Azure。我们想知道Delta Lake在Azure事件中心而不是Kafka上运行时是否有任何不同的行为。是的,我会更笼统地回答这个问题。我讲过Delta的一个强大之处就是它与Spark的整合。其中一个重要原因是,我把Spark当作大数据生态系统的废物。世界上几乎每个大数据系统都有Spark连接器。如果星火能读取数据,它就能和德尔塔湖一起工作。因此,Event Hub有一个通过Spark数据源插入的本地连接器,也有一个与Spark Kafka一起工作的Kafka API。因此,你可以很容易地从事件中心读取数据,并使用事件中心而不是Kafka来完成我今天谈到的所有事情。这确实适用于Spark可以读取的任何系统。

总的来说,为了回答Azure的问题,Delta完全支持Azure,包括ADLS。我们最近刚刚改进了对ADLS的支持,第2代。所以你可以下载它,它也是Azure数据库的一部分,开箱即用。

下一个问题是,用于DML命令(如更新)的Scala API到底是什么?答案是,它看起来像Spark SQL吗,在Spark SQL中,你传递一个字符串来进行更新?答案是,我们两者都支持。所以如果你真的去GitHub存储库,我相信这段代码已经被合并了。所以你可以看到Scala API,如果没有,有一个设计文档讨论了添加更新的细节。这里有一个Scala函数叫做Update,你可以通过编程方式使用它而不需要做字符串插值,还有一种SQL方法来做它。你可以创建一个SQL字符串并传递进去。这就像,你使用你最熟悉的语言,它已经是你工具箱的一部分,Delta应该自动地处理它。

下一个问题是,Delta Lake能和HDFS一起工作吗?是的,它完全与HDFS一起工作。HDFS有我们需要的所有原语,所以你不需要任何额外的细节,我说的是HDFS支持原子重命名,如果目标已经存在,它就会失败。所以只要你运行一个足够新版本的HDFS,它甚至不是那么新,它就应该自动工作。如果你查看Delta文档中的入门指南。Io,它有我们支持的所有不同的存储系统,以及你需要做什么来设置的详细信息。

下一个问题,更新、删除是单行级还是记录级?这个问题有两个答案。是的,Delta允许你进行细粒度的,单独的行更新。因此,您不必在分区级别上进行更新或删除。如果您在分区级别上执行它们,那么它们就很重要。例如,如果在分区级别上执行删除操作,效率会显著提高,因为我们可以直接删除元数据。实际上我们不需要手动重写。但如果它们不在分区级别,如果你做的是细粒度的单行更新或删除,我们要做的是找到相关的parquet文件,重写它们,提交添加和删除以使操作发生,然后这是一种完成操作的事务。所以它确实支持它,但它涉及到重写单个文件。我在这里要说的是,Delta显然不是被设计成OLTP系统的。 You should not use it if you have lots of individual row updates, but we do support that fine granularity use case.

你知道Delta Lake的Scala api什么时候可以用吗?这个问题有几个答案。Delta Lake读写、流处理和批处理已经可以在Scala中工作了。今天就能买到。如果您具体谈论的是更新、删除和合并,那么我相信大部分代码已经放到存储库中了。如果你下载并自己构建,它就在那里。我们希望能在7月发布。所以希望这个月,下一个版本会包含这些额外的Scala api。

让我们来看看。

下一个问题是关于数据质量的。除了时间戳之外,我们还能有其他字段用于验证吗?是的,所以我们之前讨论的期望只是一般的SQL表达式。因此,任何可以在SQL中编码的期望都是允许的。所以在那个例子中,它可以是,一个非常简单的比较操作与某个特定的日期,但它可以是你想要的任何东西。它甚至可以是一个检查数据质量的UDF。这里最重要的是我们允许你把它们作为数据流的属性,而不是作为你需要记住自己去做的手动验证。这就强制了所有使用这个系统的人。

Delta Lake是否支持从数据帧而不是临时表进行合并?是的,一旦Scala和Python api可用,你就可以传入一个数据帧。如今在Databricks内部,唯一可用的是SQL DML,在这种情况下,你需要把它注册为一个临时表。但就像我说的,请继续关注本月底。我们会有一个带有Scala api的版本,然后你就可以自己传递数据帧了。

这个问题我已经见过几次了,所以我再回答一次。我们同时支持ADLS第一代和第二代,虽然第二代会更快,因为我们有一些额外的优化。

下一个是检查点示例,计算Delta Lake检查点的Spark作业是内部的还是需要手写的?这是个好问题。所以当你使用流来读取或写入一个Delta表时,如果你只是在两个不同的Delta表之间使用它,检查点是由结构化流处理的。因此,您不需要做任何额外的工作来构造检查点。这是引擎内置的。结构化流在Spark中的工作方式是每个源和每个同步,都有一个合约,允许我们自动地做检查点。因此源需要能够说,我正在处理从这里到这里的数据,而那些关于它们在流中的位置的概念,我们称之为偏移量,这些需要是可序列化的。我们只是把它们存储在检查点。我们基本上将检查点用作提前写日志。所以我们说,第10批将是这个数据。 Then we attempt to process batch number 10, then we write it to the sync, and the guarantee here is the sync must be idempotent. So it must only accept batch number 10 once, and if we try to write it twice due to a failure, it must reject that and kind of just skip over it. And by putting all of these kind of constraints together, you actually get exactly once processing with automatic checkpointing without you needing to do any extra work.

问得好。为什么不使用多语言持久性并使用RDBMS来存储资产事务呢?这是个很好的问题,我们也试过。事实上,Delta的一个早期版本使用了MySQL,这里的问题是MySQL是一台机器,所以仅仅是为一个大表获取文件列表实际上就会成为瓶颈。然而,当您以Spark本身可以本地处理的形式存储此元数据时,您可以利用Spark来进行处理。因此,没有什么可以阻止您在存储系统之上实现Delta类型的事务协议。事实上,现在在GitHub存储库上有一个相当长的对话,这有点来回讨论构建Delta的基础DB版本需要什么,这当然是可能的,但在我们最初的可伸缩性测试中,我们发现Spark是最快的方法,至少在我们测试的系统中,这就是为什么我们决定这样做。

另一个问题,这是否意味着我们不需要数据帧,而是可以在Delta Lake上进行所有的转换?我会说没有。我认为你只能使用更新、删除和合并,而不能使用任何实际的数据帧代码。你可以使用纯SQL,但实际上,我认为这是用于正确工作的正确工具。Delta Lake确实与Spark数据帧进行了深度集成。就我个人而言,我觉得这是一个非常强大的转换工具。它有点像SQL plus plus,因为你有所有这些关系概念,但嵌入在一个完整的编程语言中。我认为这是一种非常有效的方式来编写数据管道。

Delta Lake如何管理Spark的新版本?是的,Delta Lake需要Spark 2.4.3,这是一个最近发布的版本。这是因为在Spark的早期版本中,实际上存在一些错误,这些错误阻止了数据源正确地插入其中。总的来说,我们致力于Spark兼容性。这实际上是我们这个季度的核心项目之一,就是确保Delta中的所有东西都能插入Spark的良好公共稳定api,这样我们就可以在未来使用多个版本。

还有一个问题,Delta Lake支持ORC吗?是的,这是一个非常好的问题,我经常被问到。同样,在GitHub上有关于添加支持的讨论。如果这对你来说很重要,我鼓励你去查一下,并就这个问题投票。这个问题有两个答案。一个是Delta Lake事务协议。事务日志中实际存在的内容实际上支持指定所存储数据的格式。它可以用于任何不同的文件格式,txt, JSON, CSV。这已经是协议的一部分了。今天,我们不揭露这是一种选择。 When you’re creating a Delta table, we only do parquet. And the reason for that is pretty simple. I just think less tuning knobs is generally better, but for something like ORC, if there’s a good reason why your organization can switch, I think that support would be really, really easy to add and that’s something that we’re discussing in the community. So please go over to GitHub, find that issue, and fill it in. And then I’m going to take one final question since we’re getting close to time. And the question here is, what is the difference between the Delta Lake that’s included with Databricks versus the open source version? And that’s a question I get a lot. And I think, the way to think about this is I’d like to kind of talk about what my philosophy is behind open source. And that is that I think APIs in general need to be open. So any program you can run correctly inside of Databricks should also work in open source. Now that’s not entirely true today because Delta Lake is only, the open source version of Delta Lake is only two months old. And so what we’re doing is we are working hard to open source all of the different APIs that exist. So update, delete, merge, history, all of those kinds of things that you can do inside of Databricks will also be available in the open source version. Managed Delta Lake is the version that we provide. It’s gonna be easier to set up. It’s gonna integrate with all of the other pieces of Databricks. So we do caching, we have a kind of significantly faster version of Spark, and so that runs much faster, but in terms of capabilities, our goal is for there to be kind of complete feature parity here ’cause we’re kinda committed to making this open source project successful. I think open APIs is the correct way to do that. So with that, I think we’ll end it. Thank you very much for joining me today. And please check out the website, join the mailing list…

高级:潜入三角洲湖

深入了解Delta Lake的内部结构,这是一种流行的开源技术,在您的数据湖之上支持ACID事务、时间旅bob下载地址行、模式强制等。

看现在