使用结构化流的大规模Lakehouse实现

2021年5月27日下午05:00(太平洋时间)

下载幻灯片

业务主管、高管、分析师和数据科学家依靠最新的信息来做出业务决策、适应市场、满足客户需求或运行有效的供应链运营。

来听听Asurion是如何使用Delta、Structured Streaming、AutoLoader和SQL Analytics来将生产数据延迟从- 1天提高到接近实时的Asurion的技术团队将分享战斗测试的技巧和技巧,你只有在一定规模的情况下才能获得。Asurion数据湖执行超过4000个流作业,并在AWS的生产数据湖中托管超过4000个表。

在本节中请注意:
Tomasz Magdanski, Asurion工程总监

成绩单

托马斯·马格丹斯克:你好。我要告诉你的,你在任何书或博客里都找不到。我将与您分享您在尝试构建大规模数据湖时会遇到的实际障碍。我简单讲一下《亚苏里安》我们是怎么走到这一步的?如何经营规模大、性价比高的湖屋?最重要的是,我们在这一过程中吸取了教训。
我们是一家大型保险和支持公司。每天,我们有超过10,000名专家参与支持会议,帮助全球超过3亿人,我们的服务范围从技术支持到当天的设备维修和更换。为了让您了解这个项目的规模,我们的湖屋从100多个数据库中摄取了4000多个表。我们在数据湖中创建了7500个表,其中只有1 - 2个表,我稍后会讲到。我们从Kafka, Kinesis, SNS和SQS中摄取流数据,以及来自api的文件,平面文件和数据。
我们甚至从其他云提供商那里获取数据。我们的数据库来源从SQL server, Oracle, PostGreSQL, MySQL动态开始Redshift。在我们的数据仓库中,我们将数千个这样的表组合在一起,生成300多个数据模型和600多个数据集市。最后,在我们的消费层,我们有超过10,000个数据视图和超过2000个报告。我们之前的架构是基于Lambda架构的。如您所知,Lambda体系结构包含速度层和批处理层,这其中存在几个问题。首先,每件事都要处理两次。您还必须对数据进行两次验证,并且通常必须使用不同的技术来实现这一点。或者必须以不同的方式处理后期数据,并且必须担心在大部分不可变空间中重新处理数据。你必须担心调度,重写和查询。 Data updates are also really difficult, which makes the data compliance more difficult.
我们没有真正的计算机存储分离,这使得可伸缩性变得困难和昂贵。我们的数据延迟主要是D-1,我们还有一个非常广泛的技术堆栈。从红移到Lambda, Kinesis, Kinesis Firehose, EMR, RDSS, Hive, Spectrum。这很难管理。然后我们看了湖屋建筑,我们只有一个管道。我们拥有接近实时的数据延迟能力,Apache Spark的可扩展性,高度集成的生态系统,而且技术堆栈非常狭窄。这是非常非常有希望的。
另一个有助于开发的重要进步是直接利用生产数据。我们的目标是尽可能减少数据移动,并利用计算机存储分离。开发数据平台,获取真实的生产数据bob体育客户端下载来识别和解决许多微妙的问题,以及处理实际规模,这些都是我们在开发环境中无法重现的。所以这个结果对我们来说是非常理想的。我们使用IAM角色和挂载点以只读模式透明地将预产品计算集群连接到生产数据,同时还将数据写回生产数据包。因此,数据实际上从未长期驻留在环境中,处于真正的计算和存储分离部分。这张图很重要,我们会在某节课上再讲。
在我们之前的体系结构中,每个表都有一个ETL映射作业,这使得平台非常严格。bob体育客户端下载我们实际上有4000个映射,如果您需要在许多映射中进行更改,那么工作量很大,而且平台有点抗拒更改,因为必须更改的事情很复杂。bob体育客户端下载在Lake House中,我们希望创建一个单一的spark ingestion作业,它能够流处理、批处理和从所有源中读取数据,并且是完全可配置的,我们使用Scala编写了这个作业,使用了设计模式和大量依赖注入,允许非常丰富的可配置性。我们选择结构化流来利用检查点,精确或至少一种语义,我们已经统一了我们的接口,我们的着陆区域,到S3和Kafka。因此,数据库、CDC管道、api、平面文件都上传到S3、SNS和SQS, Kinesis都上传到Kafka。然后,我们使用Databricks作为股作业和临时集群来调度摄取作业。因此,这使得代码具有高度的可测试性、易于维护和非常灵活。
数据湖中的所有表都是delta表。作为数据湖的一部分,我们需要保持一个缓慢变化的维度类型,只附加表来跟踪所有行、所有列和分数的变化,我们称之为L1表,但我们也需要存储每一行的相关版本,就像搜索引擎[听不清楚]类型,通过合并到目标表的变化,我们称之为L2表。所以我们有一个选择。你是否将数据从着陆区流到L1,然后从L1流到L2?但这实际上会使我们的工作岗位数量翻倍,从4000个增加到8000个。因此,在经过大量测试后,我们决定利用每个批处理API的强大功能同时编写两个表。如果你……Intellect box被大大简化了,但我们实际上没有使用直接的Spark。我们有读者和作者围绕Spark。我们还通过在forge批处理中添加强制元列来大量装饰我们的数据。
我们要做的下一个选择是关于我们的触发选择一种原生的解决这个问题的方法是取出摄取作业,那是一个流作业并将它作为一个短暂的作业发送出去。现在的挑战是,我们有4000个表要运行,而Databricks只允许在碎片中有1000个作业。最重要的是,每个短暂的集群都必须有一个驱动程序和至少两个节点,这将使我们处于一个12,000个节点的环境中,这有点让人望而却步,我们不认为这是对资源的最佳利用。所以下一步我们考虑在笔记本电脑中把这些流作业组合在一起,并把它们作为一个临时作业来运行,我们找到了一个驱动程序处理大约40个流的最佳位置。
当然,集群必须更大才能处理所有流,但我们也注意到相当高的计算机浪费,因为在这40个流中,有些流可能并不经常有数据,但我们让它们一直运行。这种方法的另一个问题是,如果我们想禁用单个流,我们实际上必须停止作业,这将停止所有40个流。因此,我们希望找到一种更好的方法,能够在不需要停止所有其他作业的情况下,切换一个作业,或将它从一个笔记本移动到另一个笔记本,或从一个集群移动到另一个集群。最后,我们选择了触发方式,对于那些可能不知道触发方式的人它是结构化流,它会收集所有数据,直到你的最大流或文件或字节数,处理这些数据,然后检查进度,然后终止作业。
因此,当您希望微批运行时,您实际上负责调度它们。这意味着没有持续执行,这允许我们将笔记本中的数百个作业放在一个临时集群中。现在,因为这些作业是按计划运行的,我们实际上可以在两轮之间在笔记本电脑之间迁移它们。我们还可以在每一轮中刷新每一轮的冲突,这样我们所做的唯一配置更改就会立即生效,而不需要重新启动。同时,我们也在使用机器学习来振兴不同集群的工作,以确保我们满足我们的数据sli。我们有五种类型的笔记本作为临时作业运行,这些笔记本有一个分配给它们的组。他们进入数据库,配置数据库,收集所有属于该组的作业,然后他们以[听不清]系列运行这些作业,或者通过使用电源收集,我们可以让作业执行瘫痪,当然,在驱动器上,通常是同时运行16到32个作业。
我们有五种口味的笔记本。我们有一个用于非常频繁更新的东西,实际上我们可以在一个笔记本电脑上放大约60个表。我们的目标是在60分钟内完成所有的更新和合并。我们有更新频率较低的表,可以放入300到500个表。然后是不经常更新的表,我们可以放1000个。这种方法的好处是,我们可以获得一个频率不断增长的表,并在这些组之间无缝地移动它,并无缝地重新平衡这些工作,而不会对该组中运行的任何其他工作造成资源或影响。
我们还可以通过在y循环中运行作业的笔记本来实现所谓的伪流,我们已经测试过了,性能非常接近于运行一个处理触发器,让Spark处理微批的执行。例外的是,我们可以在每个y循环的顶部检查配置,这意味着如果我们有五个作业正在运行,我们想要禁用其中一个,我们只禁用其中一个,下一次迭代将不会运行,而其他四个不会重新启动。
好的。所以让我们继续讨论我们在建造如此大规模的湖泊环境时所学到的经验教训。首先,让我们谈谈云文件。你们可能不知道,云文件是Databricks提供的Auto Loader的一部分,当你使用AWS并试图从文件夹中读取时,Databricks会自动为该文件夹创建一个S3通知,使其成为一个SNS,然后它会有一个订阅了订阅了SNS的SQS的SNS。现在,首先,AWS只允许每个桶100个通知。因此,您将不得不编写某种自动化程序,以便知道您在后端已经有了饱和的通知,并且您可能需要在下一个bucket中部署一种新的作业类型。
其次,SQS和SNS在默认情况下没有标记,至少现在是这样。因此,如果你像我们一样,或者要求你所有的资源都被标记,你将不得不使用某种Lambda或某种函数来注意和检测Databricks在你的帐户上创建的资源,并适当地标记它们。最后,AWS SNS对api, ListSubscriptions和ListSubscriptionsByTopic有硬性限制,Databricks使用这些api来检查是否已经…SNS已经有SQS订阅。如果你运行足够多的工作,就像我们同时运行成千上万个工作一样,我们已经看到我们达到这些限制,我们的工作失败了。所以今天处理这个问题的唯一选择就是看看是否有变化不大的非常慢的表,我们可以禁用通知,或者只是及时地分散它们来避免这种情况。但在某些时候,我们会遇到这样的问题。
好的。我们从云文件中学到的另一个教训。我要回到我之前讲过的数据计算种族隔离的幻灯片。因此,当你从另一个帐户预戳计算,从生产帐户请求数据时,Databricks会设置一个通知,它会设置一个SNS,然后它会设置一个SQS。但是如果您注意的话,就会发现SNS和SQS位于计算的帐户中,而不是存储的帐户中。
这样就可以了,您的测试也可以了,很好,您已经准备好部署到生产环境中了。将作业部署到生产环境中。您对数据提出了相同的请求,但这次Databricks只创建了一个SQS队列,因为已经存在一个SNS,对吗?所以现在,是的,你的生产数据也可以工作,但通知和整个云文件都是通过预刺激环境运行的,这当然是有问题的。因此,在部署过程中,我们必须清理通知,首先运行生产作业,让生产作业设置SNS和SQS,然后启动预生产跳转以订阅该SNS主题。这是最后一步,但这是我们在部署过程中必须执行和自动化的额外步骤。
好的,下一个。我们通过AWS DMS从成千上万的数据库表中获取数据,我们使用CDC更改数据捕获流。所以当你第一次尝试在桌子上启用CDC时,你必须做两件事。你必须启动负载和疾控中心。load的意思是,它是给定时间表的快照,然后CDC是数据库记录的过程,在对每个角色进行更改后跳过。现在,我们在这个设置中发现的挑战是加载文件可能需要几个小时,CDC文件在你开始工作时就开始被跟踪。所以有可能在管道的CDC部分中有一个对象[听不清]它会有一个时间戳,在加载文件完成之前,而加载文件分配了现在的时间戳。
所以你有一个加载文件版本的行,它的时间戳在CDC更新版本的行之前。因此,我们将加载文件的DMS时间戳重置为零,以避免这种竞争条件。我们从DMS和CDC中学到的另一个经验是数据类型转换,由于DMS可以连接到几乎任何数据库,我们发现有时它不能正确地转换数据类型。例如,我们是一个SQL服务器,一个小整数转换为UINT,我们已经看到了一些溢出,我们必须应用你在幻灯片底部看到的规则来强制它返回一个整数。例如,在Oracle中,数值被转换为DECIMAL(38,10),你可以选择将其一直设置为38,38,但例如,在我们的Oracle数据库中,数值列的精度是50。所以我们没有办法把数据带来。我们必须将名为numberDataTypeScale的设置设置为负2,这有效地将其转换为字符串。
这又是一个教训。加载文件可能很大,读取时可能会导致数据倾斜。所以你可能需要加些盐。DMS文件不是分区的,所以要么考虑压缩,要么注意Spark会花费很长时间来读取DMS存储桶。如果你在几个月后重新启动,会有成千上万的小文件。我们将DMS设置为在开始任务时删除所有文件,只是为了重新开始,只是为了缓解这个问题和这个问题,并尽量减少数据重复。数据库源上的某些源可能有大型事务。
让我们回顾一下。当我们在一个微批处理中对单行有多个更新时,我们必须确定哪一个被延迟了。这个说我们想要合并到目标表中,最初我们认为我们可以使用时间戳,因为它们通常以毫秒或微秒为单位,但我们发现,如果您手动在数据库中打开一个大型事务,并在该事务中对同一行进行大量更新,然后关闭事务,所有这些更新都带有完全相同的时间戳。所以你不可能确定哪个是最新的。因此,我们必须通过DMS为所有数据库引入LSN,以确保我们有某种确定性的方式来合并并获得最新的行。
现在,我们学到的另一个教训是,如果数据库没有主键,但是有一个唯一的约束,你可以用来合并,这个约束包含空值,空值是很多的源,Databrick的merge Delta在匹配时不会识别null和null,它每次都会插入新行。所以我们要做的就是将所有的空值替换为字符串空值或者空字符串来进行更确定的归并。
关于卡夫卡的教训。所以如果你使用Kafka进行某种CDC管道模式,这也是可能的,你可能有一些没有很多流量的表,你不想为这个小表配置大量的分区和浪费大量的资源,但初始数据负载实际上可能会带来数百万行的数据。现在你有了一个没有大量分区的主题,但它有很多数据,你不一定要为了一次读取而重新分区该主题。因此,我们建议将每个触发器的最小分区和最大偏移量设置为较大的数字。在我们的例子中,我认为我们分区了4000个,抵消了10,000个,以迫使Spark麻痹这个主题的权重,从而在不需要重新分区的情况下加快它的速度。
我们还使用了我们编写的第一个L1表。我们为每个批处理优化里面的表,然后我们把它作为L2归并的源。那就是避免一个动作,第二个动作,回到原点,因为原点可能很慢。DMS可能因为这个文件数量而停止,或者Kafka在这种特殊情况下也可能很慢。所以一般来说,我们采用了这样的模式,即向数据帧中添加一个批处理ID列,写入L1,如果需要的话进行优化,然后使用相同的批处理ID从L1中筛选数据进行合并。我们发现这种模式比返回源要快得多,如果你有大量的数据,缓存也会更慢。
我们从卡夫卡身上学到的其他经验。好吧,我们希望在所有东西上都使用触发器一次,这样我们就有了选择,而Kinesis和其他来源不支持这个开箱操作。所以我们必须使用Kafka Connect将我们所有的SNS, SQS, Kinesis移动到Kafka中,然后我们可以使用支持,我们可以使用Kafka触发器一次,使这个工作以与我们在DMS中完全相同的方式工作。
从达美航空吸取的教训。当你第一次带来数据时,手动优化你的表因为你会得到更快的[听不清]然后启用Delta Optimized Writes因为合并会重写很多数据。Optimized Writes是很好的合并和计算文件大小,所以你不需要经常做压缩优化。移动您的批处理ID和合并列到您的数据框架的前面。
因此,Databrick收集数据帧的第一行的统计信息。如果你有一个非常宽的数据帧,你不会得到最后的列的统计信息,特别是如果你刚刚添加了批处理ID,那将是最后一列。例如,您需要该批处理ID进行筛选。所以把你要合并和搜索的所有东西都移到数据框架的前面。如果你使用增量合并列,比如自动更新数字,等等,你也可以使用排序来将数据放在一起,进一步减少需要读取的文件数量,它有数据跳过。对于Delta,我们总是建议使用分区和带有IO缓存的i3实例类型。
还有其他的经验教训。如果你有其他需要读取Delta的工具,比如我们用Presto,当你在Hive中注册Delta表时,你必须写S3路径。因为Presto不会理解DBFS,它不会理解[听不清],所以我们必须手动创建表定义并将S3路径放入其中。如果你想让你的Delta文件被诸如Athena、Hive或者Spectrum这样的东西读写,你需要生成清单文件,并启用对这些清单文件的自动更新。这样,非delta或非delta事务,本地[听不清]技术仍然可以显示文件来读取parquet文件。
Presto和Spark视图目前不兼容。这是需要注意的,因为如果你在Spark中创建一个视图,Presto将不能利用它,反之亦然。我们还发现,将像行数、最后修改的行数这样的增量统计信息提取到热缓存中实际上是非常可取的,因为我们的工作负载需要更新许多先前的表,许多依赖的表需要首先更新。假设您有一个TL,它需要首先更新30,40个表。拥有一个正在运行的集群并发出40个已描述的命令,仅仅是为了确定是否可以运行作业,这是很慢的。通过在每个任务结束时将这些统计数据提交给热缓存,下一个任务只需读取现金并确定所有表都已更新,我就可以开始调度任务了。
好的。最后,Delta和Spark一起工作,将Delta从Delta表中输出,但目前,到今天为止,它只对append有效。这是有意义的,因为当您向Delta表追加内容时,您将向表和事务日志中插入新文件,并且侦听该表的任何人都知道有新文件,并且这些新文件被读取。这很有道理。至于归并,这有点复杂因为归并会重写所有数据。是的,创建了一个新文件这个文件可能有一百万行,但我们只更新了1000行。所以如果你在文件中沿着管道走,你必须从百万行中筛选到实际发生变化的一千行。我们要怎么做呢?
同样,我们使用了我们之前添加的批处理ID来知道新的批处理是27,我只需要提取27的数据,因此将这个庞大的文件过滤成一个小文件。它不是最有效的,但它运行得很好。最后是SQL分析,简单介绍一下我们如何使用SQL分析。因此,我们的数据集市是由超过1000个SQL查询和语句的集合构建的。我们需要一种方法将数据集市从以前的实验室平台提升并转移到这个平台,因此我们需要一个良好的可伸缩SQL执行引擎。bob体育客户端下载当然,Spark就是这样,我们希望利用现有的框架,通过JDBC连接器将SQL语句提交到Spark集群。第一个选择是使用交互式集群,但它们相当昂贵。
所以我们更倾向于使用开源Spark和Delta的EMR。当SQL Analytics产品进入我们的范围时,它仍然支持JDBC连接,所以它非常适合我们能够发送那些SQL查询并创建那些数据集市。到目前为止,我们学到的一些经验教训是,因为它是一个早期的产品,我们必须自己从api中收集所有的指标,并将它们放在Delta表中,以进行一些监控和性能。每个SQL工作区只允许使用一个元存储,这意味着如果你有多个不同的SQL端点,它们都将共享一个元存储,这与集群不同,我们可以在每个集群中配置不同的元存储。因此,我们在分离计算和存储以及跨帐户和使用元存储将它们结合在一起方面受到了一些限制。
它也不支持UDF。因此,如果你需要一个[听不清],你仍然必须依靠交互式Spark SQL集群,在那里你可以附加jar,这次它将没有jar来进行SQL分析。最后,您还必须学习如何排除Spark故障。因此,您仍然需要学习如何理解dag以及Spark UI上的Spark job和SQL视图,因为这仍然只是一个底层的Spark job。因此,为了有效地找到查询中的瓶颈,您仍然需要这样做。
好的,非常感谢。现在是问答时间。谢谢你,请提供你的反馈。我们想要你的回复。我们希望提高我们与大家分享的内容的质量,如果你有任何问题,如果你想离线联系我,请随时在领英上与我联系。谢谢你!

托马斯Magdanski

Tomasz是一位经验丰富的技术领导者,专注于大数据、实时应用和机器学习技术的现实世界实现。他部署和管理生产应用程序…
阅读更多