用Spark Streaming SQL和Delta Lake简化CDC管道

下载幻灯片

变更数据捕获CDC是实时数据仓库中的一个典型用例。它跟踪关系数据库[OLTP]的数据更改日志-binlog-,并及时将这些更改日志重放到外部存储进行实时OLAP,如delta/kudu。为了实现一个健壮的CDC流管道,需要考虑许多因素,如如何确保数据的准确性,如何处理OLTP源模式的变化,是否易于用较少的代码构建各种数据库。本次演讲将分享使用SparkStreaming SQL和Delta Lake简化CDC管道的实践。用户只需要编写一个简单的Merge Into Streaming SQL来构建一个CDC管道,从关系数据库到delta lake。在这个简单的流式SQL背后,我们涵盖了检测到的数据准确性/自动数据模式更改,还使用了大量的delta lake改进,数据跳过以提高合并性能,流式作业事务提交与压缩冲突。

点击这里观看更多Spark + AI课程

免费试用Databricks

视频记录

-大家下午好。我很高兴有机会做这个报告,谢谢你们的到来。今天的主题是用Spark流SQL和Delta Lake简化CDC管道。

使用Spark Streaming SQL和Delta Lake简化CDC管道

首先,请允许我自我介绍一下。我是阿里云E-MapReduce产品团队的一名工程师。

我是专注于SparkSQL的Spark贡献者,我也是HiveOnDelta的贡献者。我的演讲分为三个部分。首先,我将介绍什么是CDC,其次,我将使用Spark Streaming SQL和Delta Lake提出我们的CDC解决方案,最后,我将介绍一些未来的工作。

首先,我们来看看什么是疾控中心。

变更数据捕获

如果您是一名数据工程师,您可能已经遇到过CDC这个术语。CDC是Change Data Capture的缩写。它是一种数据集成方法,基于对数据源接口的更改的检查、捕获和交付。CDC可以帮助将源表加载到数据仓库或Delta Lake中。这是我们的CDC数据库管道。数据库或应用程序源中存储了大量数据,我们想分析这个表。不太可能直接对数据库运行查询,因为这会影响应用程序的性能。因此,我们可以使用CDC将表加载到外部数据仓库或Delta Lake,我们可以对存储在数据仓库中的目标表执行ETL或Ad hoc应用程序。

有很多CDC解决方案,包括增量导入作业或实时作业。Sqoop是一个开源工具bob下载地址,用于在Hadoop和关系数据库之间传输数据。您可以创建一个每日定时的Sqoop增量导入作业,将数据加载到我们的数据仓库中,但是Sqoop解决方案有一些缺点。例如,源数据库仍然有负载压力,这会影响应用程序的性能,每小时和每天安排的最佳作业不能满足实时分析的需要。此外,Sqoop增量解决方案也有一些限制源表,如它需要一个最后修改的时间戳列。

Sqoop可以处理删除的行,因为后续的增量作业只导入更新或插入比之前导入的更新的行,它可以捕获被删除的行。如果源表的模式发生变化,我们应该在HIVE表上手动执行一些DDL。

CDC的另一种解决方案是使用数据库的binlog。binlog是一组顺序日志文件,可以记录或插入、更新、删除操作。因此,流式CDC管道使用binlog,首先,我们可以使用一些外源给我们,像JSON, Maxwell同步binlog到Kafka,应用程序Spark流消费主题从Kafka排序。序列解析binlog记录,是对目标存储系统的权利。我们支持插入,更新,删除,像Kudu或数据或HBase。如果你想要重放binlog到HIVE,你应该自己做更复杂的归并逻辑。

这个解决方案也有一些缺点,比如HBase和Kudu都有很重的源代码,我们有很多的操作支持。如果数据太大,Kudu有一些请愿者可变性问题,HBase不能支持高吞吐量分析。

我们已经讨论了上述两种CDC解决方案,使用Sqoop的wise press模式和使用binlog的另一种流式模式,但它们都有缺点。在这里,我们提出了使用Spark Streaming SQL和Delta Lake的CDC解决方案。我们可以驱动流SQL来解析binlog并将它们合并到Delta Lake中。

Spark Streaming SQL

接下来,我将首先介绍Spark Streaming SQL。SQL是一种声明性语言。几乎所有的数据工程师都有SQL技能,特别是数据库和数据仓库,如MySQL, HIVESQL, SparkSQL等。SQL使数据分析更容易和有效。用户可以专注于您的业务逻辑,有很多流引擎提供SQL语言,如Create SQL, Stream SQL等。通过使用Stream SQL,即使用户不熟悉Spark Streaming,或者用户现在学习Java或Scala编程语言,也可以轻松地开发流处理。

此外,如果您想从最佳SQL作业迁移到流SQL作业,成本也很低。

在Spark社区中,ISO相关的JIRA讨论了在Spark中支持流式SQL,我们的团队也参与了这个SPIP。此外,我们已经实现了对Streaming SQL的监视,并将EMI集成到阿里云中。

左边的图显示了用于在Spark引擎堆栈上运行的流式SQL。它位于结构流的顶部。我们提供一些DDL和DML。对于DDL,我们支持创建表,创建表和选择,创建扫描和创建流。对于DML,我们支持insert into, Merge into,我们还支持一些其他操作,比如select, where, group by, join, union all。而且还支持UDF。最重要的是,结构流支持大量的同步和源。还有一种方法在源头上增加更多的同步,比如废船,阿里云的一些存储产品,如日志中心,磁带玩具等,其他的已经喜欢做了。

我们在DDL和DML中注入了一些新的关键词。接下来,我将选择其中的一些来解释为什么需要它们以及如何使用它们。

第一个建议,Create scan, Create scan语法定义如何读取表。为什么我们应该引入扫描来定义如何读取表,而不是直接读取创建的数据源表?因为Spark SQL创建的表只是一个数据源的定义。如果我们同时支持流式SQL和批处理SQL,喷口计数,确定,这个表相关的SQL是批量查询还是流式查询,所以我们引入了创建扫描语法,以Kafka为例,正如你所知道的,Kafka数据源表既可以用于流式处理,也可以用于批处理。

在批处理模式下,我们可以使用Spark SQL直接查询这些Kafka数据源表,但是对于流SQL,我们必须创建一个扫描来告诉Spark引擎这是一个流类型的查询。

下面是扫描详细信息索引,在我们的数据源表中创建名为alias的扫描表,并通过关键字using告诉查询类型,此外,我们还可以发送该扫描选项子句的长时间参数,例如每个触发器的max-offset。创建扫描是数据源表之上的临时运行时视图。它不保存在我们的主,它将在Spark会话退出后全部删除。创建扫描可以用来定义一个目标表。

因此,如果要使用批处理模式,可以直接在数据源表上查询,也可以在表上创建批处理扫描。如果我们想要使用流模式,我们必须在表的顶部创建一个流扫描。在Kafka数据源表上创建流扫描后,我们可以使用DML SQL来处理流数据源。例如,我们可以从数据源中进行选择,并将其像数据一样插入到目标表中。

Spark用于流,Spark用于流作业,还有长时间作业参数,如检查点,位置,输出模式等。如何通过SQL为这个作业设置这些参数?在这里,我们使用了创建流语法,一个流代表了我们命中的流作业,作业是,作业参数和DML SQL一起。下面是详细的语法,问题Kafka文本,带选项的流作业子句,用于设置运行时流作业参数,以及Kafka测试扫描上的DML操作。此外,它允许在一个应用程序中运行多个流作业,因此我们使用两者在一个SQL文件中创建多个流。每个创建流都有自己的作业参数在他自己的选项子句中,这是非常有效和清楚的。

Merge into语句是插入、更新和删除的组合,您可以指定一个条件来确定是更新数据还是向目标表插入数据。

这里是merge into语法,从using子句中定义的源数据合并到目标表中。源在using子句中可以创建流扫描或子查询,我们还应该提供合并条件,以确定是否更新,删除,插入到目标表。

我们可以使用merge into来重放binlog,目前,我们使用data和Kudu作为目标表。

该命令可以简化CDC在流模式下的实现,这是一个简单的示例。

UDF是数据分析的重要组成部分。它可以扩展更多的能力来处理数据使用SQL, Spark流SQL也支持Spark SQL UDF或HIVE UDF,它还支持一些窗口函数,窗口函数既流做聚合的事件时间窗口,我们使用这两种窗口类型,聪明的翻滚窗口,这意味着所有的窗口在流作业不重叠,否则,跳窗或滑动窗口,这意味着窗口在流作业可以重叠。

采用水印技术解决了数据延迟的问题。您可以通过指定事件时间列来定义查询的水印。股票代码显示数据预计会有多晚。

在帧API中,我们可以使用这个水印函数来设置它。在我们的流SQL中,我们可以在事件语句中放入一个延迟UDF。

下面是我们的例子,从Kafka中查询数据,对带有两分钟水印的滚动窗口进行平均运算。

关键现在,我已经介绍了Spark Streaming SQL,我们可以使用SQL来实现或打击流作业,而不是Scala,或者Java API,它更简单,更有效。

接下来,API将介绍Delta Lake以及我们在这方面的工作。Delta Lake是一个开源的bob下载地址存储层,fabric将其描述为一个湖屋,这是一个新的数据管理项目。

它结合了Delta Lake和数据仓库的优势。这张图显示了三角洲湖的主要特征。Delta Lake基于perquet格式构建,并提供了一些特性来支持更复杂的数据分析需求。例如,Delta Lake有自己的元数据管理,可以处理数十亿个分区的规模表,但不太好处理,而且容易出错。最重要的是它既是CDC交易,那么我们就可以把投注和流式统一起来,也就是我们可以用Spark流式把数据同步到数据表中,同时,我们可以查询,我们可以对同一个表进行批量查询。

因此,建立实时数据管道是很容易的。还可以对数据表进行比值模式的更新、删除。此外,您还可以支持模式强制和进化,从而提供更好的数据质量和数据管理。时间旅行提供了数据的快照,然后我们可以查询任何更早的数据恶化。转导,只有Spark可以通过数据写数据,包括批处理模式和流式模式,以及Presto Spark如何从数据中查询数据。

我们的团队也做三角洲湖泊的损失或改善。首先,为Delta Lake的更新、删除、优化、抽取等操作提供了SQL支持,并提供了相关的DDL和DML。

另外,我们支持HIVE和Presto对数据表进行查询,这与数据社区的实现不同。例如,一旦我们创建了一个表,一旦我们创建了我们自己的数据表,我们就会被社区中的Presto或HIVE查询。Presto和HIVE都有自己的表。

下面是我们的示例,展示如何使用Presto和HIVE来查询数据表。

使用Spark Streaming SQL和Delta Lak的CDC解决方案

首先,我介绍了我们在Spark Streaming SQL和Delta Lake上的工作。接下来,我将介绍我们使用Spark流SQL和Delta Lake的CDC解决方案。

我们可以从这个解决方案中得到一些好处。首先,binlog可以消除源数据库的负载压力。其次,三角洲湖只是一个静态的工作。其次,Spark Streaming SQL可以简化替换binlog的实现。

不需要编写Java或Scala代码,最后,CDC管道提供了低延迟的数据分析。要更改源表上的数据,可以以几分钟的延迟同步到Delta Lake,因为用户可以立即查询目标数据表。

接下来,我将详细介绍如何构建CDC管道。

首先,我们应该使用双倍缩放或其他类似的产品将表的binlog同步到Kafka。如果你在听我们讲,binlog格式如何不同,那么binlog解析器也不同,那么我们可以使用Spark Streaming SQL从Kafka中消费binlog,并解析binlog到Ganzel卷记录数据以及这条记录的操作类型,如插入、更新或删除,然后我们可以将这条过去的记录数据合并到Delta Lake。下面是一个示例,展示如何使用Spark Streaming SQL一步一步地构建管道。第一步,我们应该创建两个表,一个是源Kafka表,另一个是目标数据表。第二步,我们在Kafka表上创建一个流扫描,并在options子句中设置一些参数,比如研究偏移量,每个触发器的最大偏移量。第三步是CDC管道的主要逻辑。我们创建一个屏幕,将merge包装成语句和作业参数。

merge into包含一个using子句,它在Kafka源表上定义了一个子查询。例如,解析已经锁定,它还指定条件来确定是否向目标数据表插入、更新、删除数据。例如,当来自源表的记录在目标表中有合并的记录,并且操作类型在update中,那么我们应该用新的记录数据更新目标表。第四步,我们可以使用Streaming SQL命令启动SQL文件。该命令将启动一个年轻客户端模式流作业。在此之后,我们已经查看了CDC流管道,如果源数据库表中有一些数据变化,我们可以在外部链接中查询数据表。

这些图显示了merge into语句的在线逻辑。如您所见,对于每批流,我们调用数据的merge函数来将解析binlog记录合并到目标数据表中。

如何处理小文件?

在启动CDC流作业后,出现了一些影响作业长期运行稳定性的问题。第一个问题是如何处理小文件,因为我们的代码有很多功能,但是每个批处理,通常批处理间隔大约是一分钟或几分钟,然后我们会得到越来越多的小文件,在作业运行许多天之后,我们必须采取一些措施来处理这些小文件,例如,我们可以增加批处理间隔来减少数据合并功能的编码。压缩是将数据小文件合并为大文件的重要工具。压缩不仅改变了数据,还改变了数据布局。我们使用Spark SQL来运行优化的命令来进行压缩。另外,由于数据具有较多的功能,Spark中较大的作业要做join。因此,我们可以采用自适应的执行方式来减少减速器任务。这也可以显著减少小文件。

接下来,我将介绍如何对长时间运行的流作业进行压缩。有两种方法进行压缩,我们可以启动一个计划的压缩最佳作业,每小时或每天。这个作业只是一个简单的优化SQL命令,但是当调度的压缩作业运行时,可能会因为我们的数据事务完成而导致流作业失败。

左边的屏幕显示流批处理和压缩批处理作业之间的事务的时间轴。

首先,流批处理作业读取数据表,压缩作业通过事务提交。在此之后,流批处理作业将执行自己的事务提交。它将用COBOL压缩事务提交来调整所提交的完成。

如果完全检查失败,流作业将失败。该驱动器显示了数据如何通过逻辑事务提交完成检查。完整类型有三种,并发追加异常、并发删除读取异常、并发删除异常、删除异常。例如,根据左图,流式最佳作业和压缩作业,读取相同的源数据文件,而战斗作业将删除源数据文件并重新写入新文件,但不会更改数据。

使用我们的流式最佳作业合并逻辑,并删除相同的源数据文件。这样会造成并发删除、删除异常,因为被删除的数据被流最好的作业已经重写到新的文件中,这个数据并没有真正删除。

因此,为了防止流管道文件处理已提交的检查,我们还为此做了一些工作。为什么我们修正了一个错误,当流批处理只包含,插入binlog,它应该总是成功做事务提交。另一个是,如果完整的目标失败,我们添加了文件测试作业的重试。

另一种方法是自动压缩。我们在流批处理之间提取了压缩操作。因为顺序执行,所以没有完全。

目前,我们支持文件来策略,以确定我们是否应该在流运行期间进行压缩。例如,如果我们发现文件计数大于我们的共享代码设置,我们将触发压缩。

第三种减少小文件的方法是使用自适应执行,自适应执行可以自动合并小分区,以减少reducer的数量,减少输出文件的数量。

首先,我介绍了小文件问题,并提供了一些解决方案。接下来,我将介绍另一个问题,这是一个性能问题,因为实现数据模式如果启动join作业来做合并逻辑,如果目标数据表的大小变大,join操作将花费更多的时间。这将降低性能,影响长期运行的稳定性。长时间过滤器有助于提高连接的性能,特别是在流批量源数据较小的情况下。这是我们登顶的地方。

最后,我将介绍CDC解决方案的未来工作。

为什么自动检测模式更改。我们可以在作业运行期间自动检测模式更改并处理模式。因此,没有必要停止流工作来方便。另一个是性能的提升,我们要实现读模式特性,可以降低读侧的合并成本,防止批处理时间的增加。

我们还想通过sync语句简化用户体验。我们可以隐藏重放binlog逻辑,用户只需告诉我们源binlog格式类型和目标数据表。

点击这里观看更多Spark + AI课程

免费试用Databricks
«回来
关于宋俊

阿里巴巴

宋军,高级工程师,大数据专家@阿里巴巴,专注于Spark领域,尤其是Spark Core和Spark SQL。他也是Apache Spark的贡献者,也是2016年使用Spark作为计算引擎的CloudSort基准竞赛的获胜者。此外,他还向TPC-DS网站(http://www.tpc.org/5001)提交了基准测试报告,排名第一,这是通过对SparkSQL进行大量优化完成的。