从三角洲湖开始

简化和扩展数据工程管道

丹尼·李。Databricks的开发者倡导者
Denny Lee是Databricks的开发者倡导者。他是一名实干的分布式系统和数据科学工程师,在为内部部署和云环境开发互联网规模的基础设施、数据平台和预测分析系统方面拥有丰富的经验。bob体育客户端下载他还拥有俄勒冈健康与科学大学(Oregon Health and Sciences University)的生物医学信息学硕士学位,并为企业医疗保健客户构建和实现了强大的数据解决方案。

系列的细节

本次会议是丹尼·李(Denny Lee)和三角洲湖团队“三角洲湖入门”系列的一部分。

会议摘要

常见的数据工程管道架构使用对应不同质量级别的表,逐步向数据添加结构:数据摄取(“青铜”表),转换/特征工程(“银”表),以及机器学习训练或预测(“金”表)。结合起来,我们将这些表称为“多跳”体系结构。它允许数据工程师建立一个管道,从原始数据开始,作为“单一的真相来源”,一切都从中流动。在本节课中,我们将展示如何使用Delta Lake构建可伸缩的数据工程数据管道。

Delta Lake是一个开源存储层,为数据湖带来可靠性。Delta Lake提供ACID事务,可扩展的元数据处理,并统一流和批量数据处理。它运行在您现有的数据湖之上,并与Apache Spark api完全兼容。

在本节课中,您将学习:

  • 数据工程管道体系结构
  • 数据工程管道场景
  • 数据工程管道最佳实践
  • Delta Lake如何增强数据工程管道
  • 采用Delta Lake构建数据工程管道的便便性

你需要:
注册社区版在这里并获得研讨会演示材料和样本笔记本。

视频记录

-[教练]大家好。欢迎使用Delta Lake简化和扩展数据工程管道。我的名字是Denny Lee,我是Databricks的开发者倡导者。

今天订阅!

我想让你们知道,我们也在我们的Databricks YouTube频道上直播。你可以去dbricks。有限公司/ youtube。所以如果你想去听这个直播如果你想提前一点离开或者如果你想今天晚一点听。这个和其他德尔塔湖在线技术讲座将在Databricks YouTube频道上播出。

今天的演讲者

在我们开始之前,请允许我先自我介绍一下。我叫丹尼·李。我是Databricks的开发者倡导者。我是一名实际操作的分布式系统和数据科学工程师,在互联网规模的基础设施、数据平台和预测分析系统方面拥有经验。bob体育客户端下载我曾经在微软工作,帮助建立了现在被称为HG Insights的公司,与SQL Server客户一起工作,现在我从2005年开始与Apache Spark一起工作。这就是我数据工程经验的一些背景知识。一些快速物流。这段录音和幻灯片将在这次技术讲座之后,在这次网络研讨会之后发布。如上所述,它最终也会发布到YouTube上,所以是Databricks YouTube频道。既然大家都静音了,请把问题放在问答区,而不是聊天区。 I’ll be looking for questions inside there. And we will also provide a link to anybody that logged in who saved a spot through Zoom or through the YouTube channel. So that will give you all the information you need. This does include, by the way, any of the notebooks that we’re using for this demo. It’s actually included in this presentation, the link, as well, we’re going to provide it inside the YouTube channel, okay?

让我们开始吧。

数据工程师的旅程…

如果你们中的一些人参加了之前的一些会议,我们确实讨论了一些数据工程之旅,或数据工程管道。

我将做一个快速的调出只是为了给人们提供上下文,但是如果你想深入研究它我们上周介绍了Delta体系结构,基本上超越了Lambda体系结构,介绍了Delta体系结构。前一周,我们还和Delta Lake和MLFlow一起为数据科学准备了数据,我们讨论了一些这个问题。但这里的背景是,如果你看左边的这个图,你会看到事件,它进入Apache Kafka或其他流机制,比如Kinesis或Azure event Hubs或Cosmos DB,与哪种方法无关,通常你必须构建一个Lambda架构,因为Lambda架构允许你在顶部处理实时处理,在底部处理批处理。顶部在Apache Spark或结构流中,它处理来自Kafka的数据,然后将这些数据推到数据库右侧的统一视图中。这就是人工智能报道的情况。但这是为了你不断流的东西本身。然后还有批处理数据,在你们左边。左边的数据基本上是向下流,并被连续写入某个表中,你可以对它进行正常的批处理。为了验证这些数据你需要建立一个统一的视图来在上面的流和左边的流之间进行验证在你在流中所做的处理,结构流和你最终进入表格的处理之间进行验证这样就有了一个协调过程来确保数据实际上是相同的。

一旦你把数据写入这个表,因为它是连续写入的,你就会运行批处理。批处理允许你将数据重新处理到另一个表中,它每小时都会被压缩,因为你的文件的大小,你有很多小文件是由流生成的,所以我们将继续把这些文件压缩在一起这样我们就能确保更好的性能。当任何分布式系统中有太多小文件时,系统的性能就会变慢。

在这里,你可以对数据进行更新和合并,如果有任何迟到的数据,你可以在中间这个底部表格中重新处理它。然后,另一个Spark批处理过程将从表中获取数据,将其放入这个统一视图中,这样您就可以进行AI报告。这就是传统的Lambda架构的样子。通常你想问的问题是,这个可以化简吗?这就是数据工程师的梦想,对吧?

数据工程师的梦想…

当新数据到达时,您希望能够以一种经济高效的方式连续且增量地处理数据,而不必在批处理或流处理之间做出选择。当我们创建Spark流,或结构化流,几年前,我们创建数据帧的整个前提,当我们称之为静态数据帧和动态数据帧时,实际上你不必考虑如何处理低延迟数据和批处理数据之间的区别。换句话说,无论您是针对流数据集还是针对批处理数据集运行查询,您都将以完全相同的方式对待和运行查询。这就是你真正想做的,这样无论你是流数据集还是批处理数据集你都能使用相同的api或相同的SQL语法。现在,它将显著地简化不仅仅是代码的维护,而且同样重要的是,涉及到的思维类型,你在精神上实际上不必改变你的思维过程,因为你在做批处理或流处理。所以这个梦想,基本上在左边,你的Kafka, Kinesis, Event Hub,你的数据湖,不管它是流媒体源还是批处理源,都没关系,Spark把它,把它放到某个存储中,这就是Delta Lake参与的地方,这里有个小提示,然后Spark再次处理它,然后你就有了人工智能报告。这就是你要做的。

缺失的是什么?

为了做到这一点,我们缺少了什么?现在,Spark已经很接近解决所有这些问题了,但是要真正实现这个目标,还需要做很多事情。我们来谈谈这里的问题。第一个是在写入数据时读取一致数据的能力。正如我们在前几次会议中提到的,我们实际上是在讨论ACID事务的概念。ACID,回到这个概念,即原子性、一致性、隔离性和持久性。这就是ACID所代表的含义,而事务的概念是,当数据写入存储或磁盘或任何写入到您的数据时,您希望能够相信这一想法,确保它确实发生了,并且没有损坏的机会。如果你看看传统的分布式系统,当你把它们写入磁盘时,无论是作为blob存储或ADLS或谷歌存储或S3,都是相同的概念。这些是BASSE和ACID的比较,这里面有个化学笑话,BASSE的概念实际上是最终一致的可用软状态,这里最重要的方面是最终一致这意味着,传统上,当我们有云存储甚至Hadoop系统时,默认情况下,有三份数据写入磁盘或存储中。也就是说,客户端1访问节点1客户端2访问节点2,也就是数据的三个副本。 Well eventually consistent in this environment basically means that I’ve written the data to node one but I haven’t written to note two yet because there’s some delay, but it’s possible that client one, client two hit node one, node two at exactly the same time. So node for client one that hits node one, they’ll see the data. For client two it hits node two, they don’t see the data. All right, this is this concept of eventual consistency. You’re not consistent in whether the data exists or not. Well this is important, this concept of consistency is extremely important if you want to be able to look at streaming and batch data concurrently. Because if you’re streaming data and you’re constantly writing to disk extremely quickly with all these small files, right, the distribute storage that you’re writing to, whether it’s on Prem Hadoop, whether it’s your own environment, whether it’s cloud storage, doesn’t really matter. There’s multiple copies of that data sitting somewhere else, and if, again back to the example I use, client one is hitting node one and client two is hitting node two, and client one sees that data and client two does not see the data, those clients are about to perform some action against the data in which you’ve got inconsistent data. So for example, if I’m gonna do an update or delete or something else, client one sees it, client two does not see it, their actions are gonna be quite different or the results of those actions are going to be quite different. So this ability to have consistent data is extremely important.

第二种呼出,能够以良好的吞吐量递增地从大型表中读取数据。这是一个非常重要的概念,对吧?表越大,占用的资源就越多。你需要有某种机制来一致地处理数据。你想要有回滚的能力,以防糟糕的写入。例如,我写入数据试图写入磁盘。在这个作业中有五个任务要写。第四个任务,第四个和第五个任务失败了。

在这种情况下通常会发生什么?其他三个,可能还有第五个,将写入磁盘,而第四个将回滚并失败。接下来会发生什么呢,这意味着5个任务中有4个已经写入存储而第4个任务,比如4个任务中的一个,5个任务中的一个,最终无法写入。写入失败,你得到了错误现在你的数据处于糟糕的状态。换句话说,你不确定你的数据发生了什么变化,如果你只是插入数据,那就已经很糟糕了,因为如果你只是插入数据,你可能会说,我失败了一个检查点,或者我失败了一个队列。我知道我是通过这个批处理ID运行的,或者在这个时间点,回滚所有东西,手动删除所有东西。这很酷,但会变得更复杂,首先维护这些已经很麻烦了,但即使对数据运行更新或删除也没问题,但一旦完成这些,系统中途崩溃,我该如何回滚更新呢?如何回滚删除?所以你需要有回滚的能力以防出现错误,无论是磁盘级别的错误还是源系统或业务日志中的错误。错误在哪里并不重要,但如果有某种类型的错误写入,你可以回滚。

您还希望能够重放历史数据和到达的新数据。换句话说,取所有历史数据,也就是说,有一个单独的表,它可以同时查看新数据和重放历史数据。同样,作为数据工程师,你要从这样的角度来看待数据,这是我在这个查询点上的所有数据,不管它是全新的还是旧的,而不是,让我把新数据和旧数据合并在一起,让我用它来写一个查询。与我们正在讨论的所有这些概念一样重要的是,能够处理延迟到达的数据,而不必实际延迟下游处理。

所以。答案是什么?

那么答案是什么呢?答案基本上就是把结构化流的概念和三角洲湖结合在一起,以创造这个三角洲建筑。上周我们讨论了很多,但正如我们所注意到的,这节课是关于简化和扩展数据工程管道的,所以在我们讨论如何做到这一点之前,我只想先简要介绍一下这个背景。但这里真正的调用基本上是统一的批处理和流处理,具有连续的数据流,无限保留以在需要时重放和重新处理历史事件,并且您实际上可以拥有独立的弹性计算和存储。他们的技能可以彼此独立,这样你就可以平衡成本。这就是我们要做的。所以让我们用三角洲湖来试试这个概念。

的一个

因此,我们在数据工程之旅中所隐含的内容基本上是在铜、银、金数据质量级别的概念中爆发的。所以当你看数据工程管道时,我们实际上展示了一个数据库图标,但实际上它被分解成不同的数据质量级别。现在,有些人会问,我真的需要三个独立的表吗?这是三个物理实现吗?快速的答案是,在某些情况下,是的,在某些情况下,不是。这取决于源数据的干净程度。如果您已经在上游处理和清理了数据,那么您可能不需要这三个级别。您可能可以直接使用gold,但如果您使用的是原始数据源,您确实需要摄取并保存数据,因为您调用的存储和REST API实际上是非持久性的。换句话说,它只保存短时间的数据。比如,Kafka会,假设你设置了你的Kafka, Kinesis,或Event hub,只保存数据一天或几个小时,你需要一个地方来存放这些数据。 So that’s what this concept of a bronze table is. It’s the raw ingestion. Make sure you have a source where that data resides. Then you go to this concept of silver. Now that you ensure that you’ve written the data to storage, to disk, let me go ahead and filter it. I don’t need all these login calls, or these IP traces or whatever else. Clean it, so in other words there’s data that actually has, with simple business logic I have to remove this description or I have to change this ID or I want to filter out all from this particular geo region. This cleansing concept, so that’s what silver data does Oh, and also augment it. So in other words if I have other sources of data that I want to join with this data, this is where you would normally do that. And then finally you’re at the gold level. The gold level is basically, now that I’ve filtered, cleansed it, and augmented it, I also will possibly aggregate it as well. So in other words I don’t need to look at the eight million transactions that just happened per hour. I can look at them by minute, as opposed to PI second, using that as an example. Because I only need minute latency for the purpose of my reporting. So if I only need minute latency for the purposes of reporting, I can shrink the minutes down to hours, yeah actually, the other way around. I only need hour reporting as opposed to minute reporting. And so, irrelevant of the whatever business logic requires, the idea is that this goal table is smaller, more compact, has exactly what you need for your streaming analytics or your AI reporting to go ahead and piggyback off of. And so this concept of basically these data quality levels for the Delta architecture, which is what you see in terms of these data quality levels, it allows you to incrementally improve the quality of your data until it is ready for consumption. That’s the important call-out.

如果你是数据工程师,我很想在网上问你,举手,你们中有多少人是数据工程师有多少人是企业数据仓库的?

数据生命周期

它让你想起了什么?这个概念让你想起标准数据生命周期。青铜表更像是你的数据湖,你把所有的数据都倒进去,然后你就可以了。然后构建一个登台数据库或登台表,它基本上允许您继续过滤、清理和增加数据。对不起,然后您构建了一个数据标记,它实际上具有那些业务级聚合。所以我们谈论的这个概念非常相似,非常接近数据生命周期的概念。只是对于Delta架构,它不仅仅是单个数据库中的单个表。

在分布式系统中,我们所讨论的以分布式方式应用的不同表的思想。这就是这个体系结构的意义所在,它还允许您同时处理流数据和批处理数据。那么,我们如何从传统的数据生命周期过渡到Delta Lake生命周期呢?

的一个

就像我们说的原始摄取一样,青铜是原始数据的垃圾场。你往往会有很长时间的挽留,错误,那往往是在岁月里。您希望避免容易出错的解析,所以换句话说,它实际上只是关于存储数据。这就是青铜的概念。

正如我们所知道的,对于这种银,您得到的是应用了一些清理的中间数据。它是可查询的,易于调试,因此,例如,如果您是一名数据科学家,您想继续工作,不一定要使用业务级别的聚合,但您想要更深层次或更详细,您可以只在银数据上运行分析。这在调试中也很常见。举个例子,我是一家航空公司,我有数据,我希望能够为登录到我的网站订购机票的人调试或提供客户支持。这是用来过滤和清除增强数据的。通常里面有足够的信息,所以你可以调试,所有的信息都在这里。

一旦你准备好了,你有了业务级别的聚合,你有了干净的数据消费,现在可以用Spark或Presto读取了。实际上,这里的星星,我应该把星星去掉,但是在Delta Lake 0.5.0版本中,我们包含了创建清单文件的功能有了这些清单文件,Athena和Presto实际上都能够读取Delta Lake表。所以我们的想法是Delta Lake不仅仅是为了Spark。它当然是从Spark开始的,我将向你们展示一个使用它的例子。但是其他系统完全可以利用Delta Lake因为它们可以读取清单文件。

好的,然后正如我们在这里注意到的,流通过Delta Lake移动数据无论是低延迟还是手动触发,它消除了计划和作业的管理。我忘了我有没有把这张幻灯片放在这张牌上,但我想说的一个很酷的例子是去年在旧金山的Spark和AI峰会上提到的,康卡斯特,康卡斯特继续把80多个作业减少到3个,因为即使他们在做一个会话化过程,不一定需要他们所有的东西都流化,因为他们能够运行低延迟的流化作业,他们能够替换所有80多个批处理作业,并将其缩减到3个作业。Delta Lake的ACID事务组合,他们将能够以同样的方式查看问题,无论是流处理还是批处理,允许他们降低工作结构的复杂性,现在他们能够以更容易的方式维护系统。这是一个很重要的方面。即使你不能从本质上利用流,因为你没有传统意义上的流作业,你当然可以把它分解成小的微批量,这样你就有了一个持续运行的作业来降低你正在构建的东西的复杂性。现在Delta Lake,为了能够提供所有这些容量,现在它基本上恢复了你的DML,插入,更新,删除,合并,覆盖的能力。现在,你当然可以在青铜层进行插入,金色和覆盖,所以这并不一定意味着是包罗一切的,它只是简单地说明,传统上在青铜层中,你要么插入,更新传统上,在银色层中,你删除数据,这样你就可以缩小它,而传统上在金色层中,你在合并或覆盖数据。但是那些dml,那些数据操作语言,那些语句实际上可以一直应用。它允许你保留,允许你更正,它甚至允许GDPR,一般的数据隐私法规。 And this is actually an important concept within newer systems where if you’re holding a lot of data CCPA, GDPR, these concepts of GRC or governance risk and compliance, risk management and compliance, it’s an important aspect of how to ensure your data is not just safe but you ensure the privacy of the individual behind that data. And so that’s an important aspect which we actually are going to cover next week, by the way. So in next week’s session of tech talk, we are going to be talking about how to address GDPR or CCPA by utilizing Delta Lake. So please do join us next week for that session, as well. So a little shout out to that session. But with Delta Lake the concept is that, you know, I can run, if I need to recompute, if the business logic changes, I can clear the silver, I can clear the gold, I could just run deletes, and then I could restart the streams or reprocess the data. I could scale the environment or scale the systems to go ahead and process more data and then you’re good to go. And so now, let’s talk about demos. The remaining 20 minutes or so, 25 minutes, are purely demo. So now that you’ve been patient enough to let me give you some of the context, let’s go right into the demo part. Okay, all right.

就像我说的,这个笔记本是可以下载和使用的。我实际上只会用到其中的一小部分,但只是给你们一些关于如何构建这些可伸缩管道的背景知识。我只讲流和处理。这就是我要做的。这个笔记本目前在Databricks社区版中运行,因为我们在这里做,这样更好,实际上数据集在Databricks数据集中,但你也可以像在Jupiter笔记本中那样自己运行,数据集实际上,我们在这里包含了获取数据的链接。你当然可以在自己的环境中运行这些数据。实际上你不需要在Databricks上运行这个,但是如果你这样做了,你可以在Databricks社区版上运行这个,它是免费的。好,现在我们有,我想开始,我们有一些数据它有一个这样的模式。这样您就有了装载ID、资金金额、支付金额和地址状态。这是我们从Kaggle下载的贷款数据。 Like I said, we included the link there. All right, and so how many records does it have? Right now this particular one has only 14,000 rows inside there. So it’ll be important to call this out later, but that’s a that’s a quick important call-out there. Okay, so I’m gonna go ahead and run this particular function here.

和刷新。

好了,开始吧。好,我要运行这个函数。我不会讲得太深,这里的重点是我要创建这个,生成并追加数据流。流的目的基本上是,我将从我拥有的贷款数据中获取数据,但我将把这个插入到相同的位置但我将在Parquet中做。最初在顶部这里,我回滚到这里给你们一些概念,我创建了这个Parquet路径。我创建了这个文件夹。所以我的表,这个Parquet表,是基于这里的数据,这个临时SAS EU 19示范贷款。我将运行这个generate函数,它会生成并追加数据流。它会把数据输入到Parquet的位置。好,现在当我运行它时一个结构化的流作业就会启动。 So we’re gonna wait a few seconds for it to kick off.

很好,我们要初始化一个流,这里发生的事情是我在Databricks Community Edition中展示这个的原因是它有一个很酷的东西,我实际上能够向你展示输入处理速率和批处理持续时间,我们每秒处理20.6条记录,所以我们正在输入数据。到目前为止,一切顺利。让我们看看是否有数据被添加进去。我将继续快速计算相同的Parquet路径,或者说借出的Parquet桌子在哪里。我将直接从这里运行它,当我运行这个时,你会注意到里面有170行。好吧,这个数字似乎有点偏差,我马上会讲到这个问题,但是如果我试图运行第二个流会发生什么呢?所以这是一个重要的方面,如果我试图继续扩展我的数据工程管道,我有不止一个源。

(打喷嚏)对不起,我希望你没听到我打喷嚏。很抱歉。我有不止一个源因为它是一个分布式多个REST api或者分布式源,所以我想运行第二个流去到同一个表。如果我对着拼花桌这样做会发生什么?

它会开始,它会尝试开始写入到相同的位置。除了你,你会注意到,批处理持续时间几乎是1秒,但每秒有0条记录。所以没有东西进去。所以第二个流不能写到与第一个流相同的位置。所以,数据仍然从第一个流进入。它确实跳到了570,从这里可以看出,数据仍然在输入,从这里可以看到,但这里没有。这是不可能发生的。为什么会这样呢?因为最终发生的事情是,当你从两个磁盘写数据时,有一个ACID事务的概念,每一次写都是受保护的,有一个围绕它们的事务保护这些信息。实际上发生的是本质上有一个锁,如果你想理解传统的EDW,本质上是一个对表的锁它正在写入磁盘,所以第一个流可以这样做,但第二个流不能这样做。 So what ends up happening more times than not, you end up creating multiple tables and then you have to merge those table together, I.e. increase the operational complexity of your system. But wait, remember how I started off? It said that there are 14,705 rows? And right now there’s certainly less than that. I mean, I’m streaming data in, so it’s probably a little higher now, so we’ll just run it, 870, but certainly not the 14,705 that we talked about. So if I look at the data and look at it, you’ll notice that in addition to loan ID, funded amount, paid amount, and address state, I also have timestamp and value inside here. Okay so the schema changed. Well that’s our problem here. So I’m gonna stop the streams as I explain this concept.

因为我们有一个流查询,其中一个因为第二个不行,写入Parquet表因为这是一个结构化流作业,自动结构化流作业,我们包含了额外的一列时间戳和值。举个例子,如果我回头看这里的代码,我打开这里,我的流数据实际上就在这里。

我读取这些数据,我要求贷款ID,资金金额,支付金额和地址状态。我做了一个写流,基本上,你会注意到一个流数据。写流格式表格格式,这是输入参数之一,它是Parquet。检查点位置只是一个位置,我们可以确保我们的流正确运行,所以我们现在可以跳过它。我们做一个点触发器,也就是每10秒我们会处理数据,也就是写入数据,然后它会进入那个表格路径,那个表格路径和我们最初列出的Parquet路径是一样的。这很好。您将注意到,我从未实际指定时间戳和值,因此它是作为流处理的一部分自动添加的。因为我自动添加了这些列,时间戳和值,所以我有了两个不同的模式。我有一个只有四列的旧模式,然后我们有一个有六列的新模式,因为我有六列,本质上我基本上覆盖了原来的表。这就是为什么当我查询数据时,我只看到850或小于1000。我没有看到我原来的14000。 So this is a problem of Parquet There’s no concept of schema enforcement to ensure that the data actually coming in is actually going to be the schema that’s already existing inside the table. And there there’s no interoperability, between not just back to streaming workloads, I can’t actually even have two streaming workloads write into the same table concurrently, which sort of sucks. So let me just restart this process over again. I’m just gonna go ahead and clear out the data, and I’m gonna republish the data, so I’m just gonna run that process.

一旦这个完成,我将继续运行这一步,用一个基本的创建这个为Delta表。换句话说,这是原始的Parquet文件。抱歉,这是Parquet文件。我现在要运行这个,不使用Parquet路径,而是使用Delta路径。我要把这个存储为表格。这就是这条线。

所以你可以看出,从Parquet转换到Delta是很容易的。换句话说,我从Parquet读取,然后用Delta写入。完全相同的格式,读。format Parquet,写。format Delta。如果我想读取一个表,它会被read。format。非常简单。现在我创建了这个表。我还碰巧创建了一个临时视图,以防万一。让我们再看一遍数据。

好的,我们应该回到14705,就像你在这里看到的,我们想快速地看一下模式。

这个模式实际上有四列。我回到了原来的地方,唯一的区别是我有了一个Delta table而不是Parquet table,我要做的一件事是我还要创建一个Delta loan still to stream。我有一个贷款Delta表,用于批量查询。我还会创建一个小的流,它是用于流查询的,但它们都去同一个位置,这个路径。所以不管我运行的是批量查询还是流查询,都是相同的文件系统。好,现在我得到了这个,这是我的快速计数,哇,14705,让我们继续,试着再次运行这个查询。同样的,但是记住我实际上有6列而不是4列。对于这个,我有六列,而不是四列,但你猜怎么着,因为我使用的是Delta表,所以我现在看到了模式不匹配。它告诉你,如果我想合并模式,我可以用。option, word, schemas, true。换句话说,如果我想要包含这些额外的列,我所要做的就是改变我所使用的代码来包含这个选项,我可以允许这两列进入我的数据。 But I don’t, for now, want to do that, but it’s a good call out. So in other words, if I have good business justification or good business reasons to do so, there you go. But it’s calling you out right here. Here’s your table schema, these four columns, loan ID funded amount, paid amount, address state, and here’s my current data schema. I’ve got the timestamp and value. So it’s warning me that there’s a problem here. All right, so let’s instead just simply go fix this. So in this case, what I’m doing is that the stream data, because I know the stream data automatically includes the timestamp and value because it’s streaming, let me just go ahead and specifically specify dot select where I’m only going to include the four columns. Now that I’ve only done that, when I write this data down to my Delta path location, it’ll only write it with four columns, not six.

这就是Delta Lake的优点,因为它既有模式强制的概念,也就是说,我们防止数据进入,破坏现有的数据,同时我们允许模式进化。所以如果你有很好的理由想要随着时间的推移而改变,我们可以合并新模式,特别是如果你想要包括时间戳和值,我们可以合并新模式和旧模式,但这不会破坏数据。旧的模式仍然存在。这就是很酷的地方。现在让我们运行这个。它会花几秒钟来启动,但是现在我们有了流查询启动。

让我们开始吧。现在我们有了一个仪表盘。它试图处理记录,所以现在它试图把数据放进去。它现在每秒运行54或55条记录,这很酷。回到上面这里,你会发现数字变了,15255。随着时间的推移,随着流的启动,它将能够放入更多的数据,显然它会增加。还有一点很酷,因为我用的是Delta Lake表,我可以继续运行这个。换句话说,我有第二个流。这是我的流查询3。相同的概念,相同的代码,为了论证,我将会运行,这代表两个不同的源,但是它们有相同的模式,相同的数据都试图同时写入这个东西。 And sure enough, right away, stream query 3 is jumping up to 147 records per second, and if you go back up you’ll notice the numbers keep on steadily increasing here. So, this is what we mean by simplifying what you create. It’s simplifying your data engineering pipelines. The fact is that, irrelevant of a single source, multiple sources, whatever, if I decide that I need to put everything down into a single table, which is what this example is, I can because I have multiple streaming jobs that can hit the table concurrently, and because I have ACID transactions, I’m protecting the data underneath this at the entire time. And let’s go ahead and run this. Remember I said that we were looking at loans underscore Delta underscore read stream, that’s actually telling me what the read stream looks like, but I can also look at the batch. So in other words, the loans for Delta. This is a batch table that’s looking at the exact same source. And again now I have a batch query read, which is telling me right now it’s 23,455. I’ve got two streaming writes all happening at the same time. So this is how powerful it is, that you can actually, because I can have multiple streams read and write, multiple batches read and write, now I can actually simplify everything because everything’s shown in a single view. I can then organize my jobs to be streaming jobs as opposed to just a bunch of batch jobs. Then instead of having 20 batch jobs, I could potentially run a bunch of single micro batch or streaming job to actually simplify things.

因此,让我们看一看隐藏在幕后的文件系统。记住,就像我说的,我们要停止所有的流,然后我们来看看它。这就是我们之前讨论过的路径。

如果你注意到的话,这里基本上就是一堆Parquet文件。我们开始吧。就像你以前和帕克特一起工作一样。主要的区别是有一个Delta log文件夹。否则,所有原拼花。所有这些小的Parquet文件都来自流,你可能会看到这个大一点的,这是原始的Parquet文件,里面有原始的数据。

如果你看这个对数,对数是多少?基本上,它向你展示了所有不同版本的数据,所以每一个插入都有一个JSON, JSON描述并告诉你在事务中发生了什么。这样挺酷的。事实上,为了好玩,我就选这个吧。

您打开JSON文件并查看它。它告诉你时间戳,提交信息。是谁做的,比如这是我,什么类型的信息,就像它是一个流更新,所有的交易,这是一个发生的交易,以及所有与之相关的值。都塞在这里面了。这就是JSON里面的内容。在幕后,我们创建这个ACID事务来保护幕后的数据。为了让它看起来更漂亮,我们创建了这个历史,你只需要描述历史,然后你就能看到所有版本的数据,所有的流版本,所有的都在这里。

所以在我继续讲之前,让我快速回顾一下幻灯片。我来做一下,哦,不好意思。

串连点点滴滴……

所以,让我们把这些点连起来,因为我们还有大约七分钟的演讲时间,我仍然想留一些时间来进行问答。让我把这些点连起来。我是否能够读取一致的数据?使用Delta Lake,我能够因为我在写入器和读取器之间实现了快照隔离,就像您看到的那样,我能够同时运行多个流写入和读取。我是否能够从一个大表中增量读取?是的,我可以。实际上,我已经用可伸缩的元数据处理优化了文件源。这个可伸缩元数据处理,让我回头快速给你们展示一下。抱歉,我得弄清楚。你会注意到这里有一个检查点Parquet。

这里每10个JSON中就有一个。Parquet。有个问题问我,你知道那个拼花是什么吗?他们设法知道了。所以他们看到了JSON,但仍然对Parquet感到疑惑。现在我来回答这个问题。这里的Parquet基本上是说,在每10个事务之后,我要把这个转换成Parquet。Spark不需要读取每个单独的JSON文件,Spark可以读取Parquet文件,这样以后,如果我关闭集群然后稍后重新启动集群,我就不需要读取每个单独的JSON文件,我只需要读取Parquet文件。这样事情就简单多了。它已经有格式了,它会流进内存,然后你就开始了。基本上,我将读取31个JSON文件,而不是3个Parquet文件然后我将读取一个额外的JSON以跟上当前事务的速度。

好极了。

好,那么我能回滚吗?是的,你可以。我其实有时间旅行的概念。在今天的课上我没有时间给你们展示时间旅行,但是如果你们看之前的YouTube视频。它现在在YouTube上,不好意思,之前的Tech Talk是关于为数据科学准备数据的,我们实际上展示了很多关于回滚和时间旅行的内容。因为你可以看到表的历史,你看到的不仅仅是历史,表的数据仍然存在。这意味着我可以回滚到之前的数据视图,这很酷。

例如,我可以回滚到数据的第19个版本。为了方便论证,我可以这样做,我忘记怎么写版本语句了。这就是现场表演的后果。幸运的是,这里有版本声明,我复制一下。

对不起。

好消息是,我在这里展示的是完整的笔记本,完整的笔记本实际上展示了回滚和这里的所有内容,这很酷。但是让我继续,让时间旅行变得正确。

你知道吗,可能剩下的时间,我可能会跳过这个,但让我继续,实际上完成我所说的,然后我会回到演示。酷。回到这里。很抱歉。

好的,我们还可以回放历史数据,通过相同的管道流回填充的历史数据,这很不错。换句话说,无论是历史数据还是流数据,我都可以毫无问题地将这两个概念合并在一起。这很重要,因为我希望能够毫无问题地查看数据。我希望能够看到数据,不管它是新数据还是旧数据,我希望能够看到它,而不是真正地进入并说。我必须建立一个单独的表流和一个单独的表批量。不,相反,我可以继续看同一个表它同时有我的批处理数据和流数据,这很好。这也是一个很重要的概念。同样重要的是,我希望能够流化任何迟到的数据并在添加时将其添加到表中。这基本上就是这样的概念,我有数据进来。当数据进来时,我希望能够对数据进行更新或者我希望能够对表进行删除。 Well guess what, with Delta Lake I’m actually able to do that now because the I have ACID transactions protecting this data the entire time. Because I’m able to protect the data without any problem, then sure enough I’m able to go ahead and reprocess it, and this goes back to why we talked about the silver, gold, blue, silver, sorry, bronze, silver, gold concept because that way I can go ahead and look at the data, reprocess the data from the original bronze concept, and delete it and reprocess it all over again if I had to. All right, so altogether this allows Delta Lake to basically put everything, allow us to put everything together. How we can build this Delta architecture, is because Delta Lake allows us to do all of these things to allow us to simplify our data engineering pipeline. So some quick questions. Who’s using Delta Lake?

被全世界上千个组织使用

有无数的组织,无数的客户。上个月,仅上个月就处理了2艾字节,不是1艾字节,是2艾字节。我讲了康卡斯特环球的案例。你可以在Spark和AI峰会上看到许多其他客户场景,它们实际上有这些信息。好了,我把幻灯片放进去了。

康卡斯特公司

例如,康卡斯特与Delta Lake进行会话。

千万亿级的任务因为为了提高可靠性他们有这些千万亿级的任务他们需要通过使用德尔塔湖使其更可靠。他们不仅将84个作业减少到3个,还降低了数据延迟,同样重要的是,如果不是更重要的话,他们实际上减少了10倍的计算量。从640个实例减少到64个,这非常酷。

那么如何使用德尔塔湖呢?对,要使用它,你可以添加一个Spark包,你可以使用Maven,或者像我说的,用dataframe而不是点格式Parquet,只需切换到文档格式Delta,现在你就在使用了。

使用Spark api开始了解Delta

所以如果你愿意,你现在就可以建造你自己的三角洲湖。然后我实际上想做一个时间旅行的小技巧现在我记得版本了。

如果我保存这个会更好。

如果我去三角洲会更好。

所以这实际上是对一个表运行的,所以我猜在这个例子中我又搞砸了。这是我的错。等我有机会切换到API的时候,我会给你们一个不同版本的代码库。看起来SQL上下文实际上是不工作的,所以我的坏在那一个,哈哈。

尽管如此,我们还是要回到这个环节。

今天的笔记

所以,你想用这个笔记本。这个笔记本实际上是相对简单的。你可以从dbricks.co/sais-eu19-delta下载。我们实际上会把这个链接放到YouTube频道链接中,如果有人注册了这个课程,他们就可以登录进去。所以不管怎样,我觉得你都没问题。这样做的话,我想你们就可以去尝试一下了。

Delta Lake连接器

好吧,我还想说一些其他的事情。Delta Lake,正如你在这里看到的,正在迅速成为我们的标准,因为它正在迅速成为标准,所以这里有一个问题,它能和Hive一起工作吗?是的,事实上,我们现在正在与社区合作,继续前进,并弄清楚如何与Hive一起工作。这是现在的私人预览。所以现在它在私人预览中,我们很快就会把它变成公共预览,但我们的想法是,我们将能够继续下去,让它在Hive中可用。我们也可以,给我一秒钟,就像我在Delta Lake 0.50博客中提到的,我们实际上可以继续和Presto和Athena一起展示它。我们也在与红移和雪花合作,这只是我们目前的数据。我们很快就会有更多,但正如你在这里看到的,实际上有很多连接器已经能够与Delta Lake一起工作了,这很好。

Delta Lake合bob体育外网下载作伙伴和供应商

还有Delta Lake的供应商。这些合作伙伴bob体育外网下载和供应商已经在与Delta Lake合作。你可以继续,正如你在这里看到的,使用Tableau。您还可以使用Privacera, Attunity, talend, Streamsets, Qlik, WANDisco, informatica,此外,谷歌Dataproc。Dataproc最近宣布,实际上能够继续与Delta Lake合作。我们期待很快会有更多的消息发布。

所以基本上,慢慢来,我们将继续展示它。

三角洲湖的使用者

然后是三角洲湖的用户。这只是一个小样本,一个小例子,但有很多很酷的客户正在使用Delta Lake。不管他们是否使用Databricks,他们都在使用Delta Lake,这是一件很酷的事情。

今天订阅!

说到这里,我要留几分钟时间来回答一些问题,如果你想听这节课,你可以今天就通过dbricks订阅。co/youtube或重新听,在所有这些之间,我确实想继续重新运行演示,因为我一直搞砸了。你们注意到,一开始我有,让我们看一下,在我的贷款表中,至少有23000,所以我要继续运行这个小代码片段。它会显示版本19的样子,现在我要运行它。

这是我们河流的第十九种版本。

所以实际上是22455。0版本的时候是什么样子?我再运行一遍。这是该表第一次创建时的值,是14705。这就是我刚才说的时间旅行的概念。所以我为这里的延迟道歉,但这或多或少是一个概念,你可以运行时间旅行来回滚,这对GDPR的目的也很方便,就像我说的,这也是我们继续讨论的一个很好的过渡,下周我们实际上有一个会议是关于如何使用Delta Lake解决GDPR和CCPA的问题?还有几个问题。让我继续,试着回答这些问题。如果你确实有问题,就像我说的,把它们放在问答环节。我要回答的第一个问题是,我可以将Delta Lake用于OLTP工作负载吗? And the answer quick answer is it is not designed for OLTP purposes, it’s really designed for the purpose of data warehousing or BI-type queries, the data warehousing and data workload.

如果你想把它用于OLTP,我建议你使用一个实际的TP系统。我们并没有试图简化这个过程。我们正在努力简化分析数据的过程,然后继续对其进行机器学习。我相信还有另一个问题,我们能用Hive创建这个吗?如上所述,Hive连接器目前处于私有预览。它将很快向公众开放。

我可以在内部使用Delta Lake吗?这是个好问题,你绝对可以使用Delta Lake on-prem。Delta Lake本身实际上,基本上,从所有意图和目的来看,只是一个添加到Spark进程中的罐子。例如,你去隐藏包,所以你基本上可以包含jar或包含Maven坐标,或者基本上使用Maven如果你在编译或SVT或其他,但重点是,一旦你做了这些,现在Delta jar被包含了。因此,如果你有一个内部部署的Spark环境,你完全可以这样做,只需要包含jar,最新的Delta Lake作业,它匹配你的Spark版本和Scala版本,然后你就可以开始了。

我将用这个答案来结束这个问题,我们在生产中使用这个的可能性有多大?事实上,三角洲湖这个项目在过去两年里已经开始生产了。当我们最初创建Delta时,Delta的背景故事是我们试图建立它来解决一些糟糕的问题,比如,我犯了一个错误,人们在他们的数据中犯了错误,和/或我们试图解决我们在流媒体中遇到的问题。因为我们遇到了这些问题,我们实际上把Delta最初作为一个Databricks项目,这是Spark的一个附加组件。但由于它的受欢迎程度,由于人们提出的所有问题,由于人们真正想看到的一切,我们意识到这个项目非常有意义,不仅对Spark社区或Databricks而言,而且不仅对Spark社区,而且对整个数据工程社区而言。这就是我们去年开源这个项目的原因。所以这是一个开源项目,bob下载地址你可以使用on-prem,你可以在你自己的环境中使用它,EMR, HG Insight,你只需要确保一切都匹配。只要你使用的是正确的Spark版本,你将Spark版本和Scala版本对齐,你就可以使用它。说了这么多,如果你想更多地了解我们是如何做到这一点的,比如我们如何在生产BOB低频彩中使用这些东西,我想让你做的另一件事是去dbricks。co/youtube,我们的youtube频道Databricks,了解三角洲湖的起源。这是我采访Burak Yavuz的地方,他是Databricks的高级软件工程师,他实际上是这个项目创建的一部分。 So you can learn a little bit more about the back story behind it, as well. Okay, well that’s it for today. I realized that there are other questions. I apologize I couldn’t get into all of them, but I want to be a cognizant of the timing here. So I do thank you for your time, for attending the session. Please do, number one, be patient with me for my mistake in trying to get that particular time travel query done for you, but at least I got it done, so thank goodness for that. Number two, you’ve got comments. Go ahead and ping me directly at my Twitter handle, @dennylee, or just as likely, go ahead and go to the Databricks YouTube channel and go ahead and chime in, and put your comments directly on the YouTube channel. I will regularly go login and answer those questions. And finally, also go to Delta.io, that’s the Delta Lake page. So the latest videos, the latest notebooks, tutorials, all of them are actually there.

高级:潜入三角洲湖

深入了解Delta Lake的内部结构,这是一种流行的开源技术,在您的数据湖之上支持ACID事务、时间旅bob下载地址行、模式强制等。

看现在