技术指南
开始使用Delta Live表
简介
本指南将演示Delta Live Tables如何使您能够开发可伸缩的、可靠的数据管道,这些管道符合Lakehouse架构的数据质量标准。
让我们从描述一个常见场景开始。我们在云对象存储(如S3、ADLS或GCS)中拥有来自各种OLTP系统的数据。有些数据集定期更新,有些是源系统的历史快照。我们对数据的消费者和转换有了大致的了解,我们将遵循Lakehouse架构将数据质量划分为原始、精炼和聚合层:
这些Gold表中的每一个都可能服务于不同的消费者,从BI报告到训练机器学习模型,因此这些数据从源到Gold层的旅程将有不同的需求,我们作为数据工程师关心:
- 延迟:当我们摄取新数据时,它必须在5秒内在银表中可见。
- 成本:“我们无法让一个固定容量的集群全天候运行来支持这些更新”
- 精度:“我应该在实时数据源中对延迟到达的数据承担多少责任?”
乍一看,这些需求中的许多似乎很容易在上面的参考管道中满足。然而,尽管湖屋管道故意设计得优雅而简单,但在现实中,我们通常不是在处理直接的线性流。在现实中,它通常是这样的:
随着我们开始扩大规模,用额外的数据源来丰富我们的分析环境,以支持新的见解,ETL复杂性呈指数级增长,以下挑战导致这些管道变得极其脆弱:
- 由于表之间没有明确的依赖关系,错误处理和恢复非常费力
- 数据质量很差,因为强制执行和监视约束是一个手动过程
- 无法跟踪数据沿袭,或者充其量需要大量的实现
- 颗粒级、单个批处理/流级别的可观察性是不可能的
- 很难在统一的管道中处理批处理和流处理
注意:批处理和流式?
Spark提供了使用单个API的批处理和流处理范式的能力,Delta Lake支持在单个数据集上并发批处理和流操作,从而消除了在两层或两层数据中所需的权衡或再处理λ架构,在实现和监控流方面仍有很多工作要做,特别是在ETL过程中,它将流和批作业作为数据集之间的单独跳点组合在一起。
声明式ETL
在编写ETL管道时,数据转换通常是“程序性”执行的。这意味着将对数据执行的操作表示为ETL引擎要执行的一系列计算步骤。在许多情况下,即使您使用的是编排工具(如风流或Azure Data Factory),启动的作业也包含过程逻辑。尽管编排器可能必须知道作业之间的依赖关系,但它们对ETL转换和业务逻辑是不透明的。
另一方面,声明式ETL要求用户描述管道的预期结果,而不显式列出得到结果必须执行的有序步骤。声明性意味着关注我们期望的目标是什么,并利用像DLT这样的智能引擎来确定计算框架应该“如何”执行这些过程。
你可能会想到程序性和声明性的ETL定义,就像给某人一步一步的驾驶方向,给他们提供一个包括城市地图和交通流信息的GPS。
驾驶指南会提供司机到达目的地的步骤,但不能提供他们的预计到达时间,他们也不知道他们会在路上经过哪些社区。此外,如果需要绕行路线,分步指示现在是无用的,但GPS地图将能够绕行重新规划路线。
在这个比喻中,地图就是DLT管道。DLT引擎是GPS,可以解释地图并确定最佳路线,并为您提供诸如ETA等指标。关于在路由中遍历的邻居的详细信息就像数据沿袭一样,在事故(或错误)周围找到弯路的能力是依赖项解析和模块化的结果,这是DLT的声明性性质所提供的。
你的第一条管道
在本指南中,我们将实现一个遭受这些挑战的管道,并将此作为一个机会,教你DLT的声明式开发范式如何简化ETL开发,并提高整个湖屋的质量、沿袭和可观察性。
为了快速开始,我们将管道的完成结果托管在Delta Live Tables笔记本回购.你可以复制SQL笔记本到您的Databricks部署中作为参考,或者您可以按照指南进行操作。
本指南将重点介绍SQL管道,但如果您更愿意在Python中运行相同的管道,请使用这个笔记本.
先决条件
为了充分利用本指南,你应该基本熟悉以下内容:
- SQL
- 开发ETL管道和/或与大数据系统合作
- 数据库交互式笔记本和集群
- 您必须能够访问Databricks工作区,并具有创建新集群、运行作业和将数据保存到外部云对象存储或数据存储上的位置的权限DBFS.
数据集
在第一个管道中,我们将使用retail-org数据集databricks-datasets每个工作区都有。Delta Live Tables提供了处理Lakehouse中Bronze表(即原始数据)的细微差别的技术。您将使用自动加载程序从云对象存储增量加载数据的特性。
青铜数据集:使用云文件摄取数据集
青铜数据集代表最原始的质量。我们通常会从源头进行最小限度的调整,利用云存储的成本效益来创建一个原始源,我们可以在此基础上验证改进的数据,访问我们通常不报告的字段,或者创建新的管道。此阶段的常见模式是不断从云存储中的某个位置摄取新数据。
虽然其中一些术语在通用说法中可以互换使用,但它们在DLT中具有不同的含义。有Spark结构化流经验的读者可能还会注意到一些重载的术语。在这里,我们试图消除这些术语的歧义:
- 流媒体数据集被视为无界的处理范式吗
- 增量更新模式是否只对目标数据进行最小的更改
- 连续指始终运行直到在任意时间停止的管道,而不是在管道启动时基于源数据状态的时间停止
你可能会注意到无界流处理框架(如Spark Structured Streaming)和DLT中的流数据集之间有一些重叠。事实上,DLT的流数据集利用了Spark结构化流和Delta事务日志的基础,但抽象了大部分复杂性,允许开发人员专注于满足处理需求,而不是系统级的繁重工作。
我们将在本指南的黄金部分讨论DLT的流数据集和DLT的连续模式是如何相互作用的。
作为一个例子,让我们看一下我们将要摄取的一个Bronze表。
CREATE STREAMING LIVE TABLE sales_orders_raw COMMENT“原始销售订单,从/ databicks -datasets中摄取。”TBLPROPERTIES ("quality" = "bronze") AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles. properties ")inferColumnTypes”、“真正的”);
SQL语句使用Auto Loader从json文件创建一个名为sales_orders_raw的流表。
cloud_files:调用Auto Loader,并以云存储路径和格式作为参数。(注意,该API与DLT之外的cloudFiles调用略有不同)。
现在,让我们创建一个Pipeline来从云对象存储中摄取数据。
打开你的工作区
- 创建你的第一个DLT管道笔记本
- 为你的DLT管道创建一个新的笔记本,如“dlt_retail_sales_pipeline”
- 将以下代码复制到第一个单元格中:
“客户购买成品,从/ databicks -dataset中摄取。”TBLPROPERTIES ("quality" = "mapping") AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv");CREATE STREAMING LIVE TABLE sales_orders_raw COMMENT“原始销售订单,从/ databicks -datasets中摄取。”TBLPROPERTIES ("quality" = "bronze") AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles. properties ")inferColumnTypes”、“真正的”);
注意:流水线笔记本
DLT流水线笔记本是特殊的,即使他们使用标准的Databricks笔记本。虽然我们目前不阻止您将集群附加到Pipeline Notebook,但DLT绝不会使用附加的集群来运行管道。作为最佳实践,我们建议您将管道笔记本保持在分离状态,并在开发过程中使用二级草稿笔记本运行任意命令。如果您在附加的集群上运行管道笔记本,您将看到类似这样的内容……
- 在新选项卡或窗口中打开作业,并选择“Delta Live Tables”
- 选择“Create Pipeline”创建一个新的管道
- 指定一个名称,例如“销售订单管道”
- 将Notebook Path指定为步骤2中创建的Notebook。这是必需的步骤,但将来可能会修改为引用非笔记本库。
- Target是可选的,但推荐使用,因为目标是目标数据库,其他授权成员可以从管道访问结果数据。
- 存储位置是可选的,但推荐使用。如果已配置外部blob存储位置,则可以指定外部blob存储位置。DLT将在这里为管道生成数据集和元数据日志。提示:如果没有指定storage,那么DLT Pipeline生成的所有数据和日志都将存储在DLT创建的DBFS根存储中的路径中。稍后您可以在Edit Setting JSON文件中找到该路径。要将数据和日志存储在外部(即非dbfs根目录)位置,必须为DLT管道指定存储位置。
- “管道模式”为触发
- 设置用于的最小和最大工人数集群规模
- 选择“开始”
- 您已经创建了您的第一个管道!
管道日志
现在,您将在图的下方看到包含管道运行日志的部分。下面是这个部分的样子。
第一个摄取代码解释
图标表示DLT数据集,在本例中是表。这两张桌子我们认为是青铜桌。具体来说,它们是增量活动表,我们使用Auto Loader特性使用cloud_files函数
在DLT中,视图类似于SQL中的临时视图,是某些计算的别名。视图允许您将复杂的查询分解为更小或更容易理解的查询。视图还允许您将给定的转换重用为多个表的源。视图只能从管道中获得,不能以交互方式查询。
在DLT中,表类似于传统的物化视图。Delta Live Tables运行时自动创建Delta格式的表,并确保使用创建表的查询的最新结果更新这些表。
消费者可以像使用标准Delta表一样从Data Lakehouse读取这些表和视图(例如用于SQL报告或Python数据科学),但它们是由DLT引擎更新和管理的。有关更多详细信息,请参见目标在下面。
银色数据集:期望和高质量数据
在本节中,我们将交给您开发端到端管道的控制权,如下面的DAG所示。我们已经创建了青铜数据集,现在为银,然后是金,概述在Lakehouse架构在2020年CIDR数据库会议上发表的论文,并使用每一层教你一个新的DLT概念。
银色层是关于高质量、多样化和可访问的数据集。这些表可能不会服务于特定的用例,例如以低延迟为生产报告提供服务,但它们已经被清理、转换和管理,因此数据科学家和分析师可以轻松、自信地使用这些表来快速执行预处理、探索性分析和特征工程,以便他们可以将剩余的时间用于机器学习和洞察收集。
对这些消费者来说,最大的生产力杀手不仅仅是数据访问和预处理,而是对他们所使用的数据质量的信心。因此,我们将使用DLT来确保这些数据集符合特定的质量水平,并清楚地注释数据集。数据消费者和决策者都可以使用通过正确使用约束和注释得到的结果编目和质量监控。
- 打开管道笔记本并创建一个新单元格。
- 复制以下代码到一个新单元格:
CREATE STREAMING LIVE TABLE sales_orders_cleaned(CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW) PARTITIONED BY (order_date) COMMENT“使用有效的order_number(s)清洗的销售订单,并使用order_datetime进行分区。”TBLPROPERTIES ("quality" = "silver") AS SELECT f.customer_id, f.customer_name, f.number_of_line_items, TIMESTAMP(from_unixtime((cast(f。order_datetime as long)))) as order_datetime, DATE(from_unixtime((cast(f。order_datetime as long)))) as order_date, f.order_number, f.ordered_products, c.state, c.c city, c.lon, c.lat, c.units_purchased, c.loyalty_segment FROM STREAM(LIVE.sales_orders_raw) f LEFT JOIN LIVE。c ON c.customer_id = f.customer_id AND c.customer_name = f.customer_name
- 通过导航到左侧导航栏中的Jobs,选择“Delta Live Tables”并选择在上一步中创建的管道,返回到管道“Sales Order Pipeline”
- 选择开始/停止切换按钮旁边的下拉菜单,然后选择“完整的刷新”
约束:约束允许您定义数据质量期望。它们采用一个解析为任何Spark筛选器谓词的语句,以及在失败时要采取的操作。操作可以是保留、删除、失败或隔离。欲知详情在这里看到的.所有约束都被记录下来,以支持简化的质量监控。
Tblproperties:键值对列表,可以是Delta Lake属性、DLT管道属性或任意属性。任意tblproperties类似于可用于数据编目的标记。在本例中," quality ": " silver "是一个作为标记的任意属性。
评论:简要描述表用途的字符串,将来用于数据编目
黄金数据集:完整vs流/连续vs触发
许多聚合不能增量地执行,必须作为完整的再处理执行,即使新数据可以在聚合的上游青铜层和银层增量地处理。然而,访问尚未聚合的实时或“快速”数据具有重要价值。与传统Lambda架构需要复杂的两层基础设施来处理快数据和慢数据不同,Lakehouse架构支持单个管道,包括实时增量“快速”青铜层和银层,以及批量更新的黄金层(由Delta Lake存储的强一致性保证实现)。
在实践中,这种模式在过程式ETL中可能具有挑战性,因为它需要部署单独的流作业和批处理作业,并分别维护每个作业。为了解决这个问题,DLT允许您选择管道中的每个数据集是完整的还是增量的,对管道的其余部分进行最小的更改。这使得它很容易扩展管道,涉及青铜和白银实时数据与黄金聚集层的组合。
事实气泡:部分Spark聚合可以增量执行,如count、min、max、sum等。在一些简单的情况下,将gold数据集声明为增量可能是有意义的。然而,即使是简单的计数和和,这可能会变得效率低下,如果您正在使用多个分组(例如GROUP BY col1, col2, col3),则不建议这样做。
在本例中,我们通过按城市聚合银表中的数据来创建完整的金表:
- 打开管道笔记本并创建一个新单元格。
- 复制以下代码到一个新单元格:
创建活动表sales_order_in_la COMMENT“销售订单在LA。”TBLPROPERTIES ("quality" = "gold") AS SELECT city, order_date, customer_id, customer_name, ordered_products_explosion。curr, SUM(ordered_products_explosion .price)作为sales, SUM(ordered_products_explosion .qty)作为quantity, COUNT(ordered_products_explosion .id)作为product_count FROM (SELECT city, DATE(order_datetime)作为order_date, customer_id, customer_name, explosion (ordered_products)作为ordered_products_explosion FROM LIVE。sales_orders_cleaned WHERE city = 'Los Angeles') GROUP BY order_date, city, customer_id, customer_name, ordered_products_explosion .curr;创建活动表sales_order_in_chicago COMMENT“芝加哥的销售订单”。TBLPROPERTIES ("quality" = "gold") AS SELECT city, order_date, customer_id, customer_name, ordered_products_explosion。curr, SUM(ordered_products_explosion .price)作为sales, SUM(ordered_products_explosion .qty)作为quantity, COUNT(ordered_products_explosion .id)作为product_count FROM (SELECT city, DATE(order_datetime)作为order_date, customer_id, customer_name, explosion (ordered_products)作为ordered_products_explosion FROM LIVE。sales_orders_cleaned WHERE city = 'Chicago') GROUP BY order_date, city, customer_id, customer_name, ordered_products_explosion .curr;
- 通过导航到左侧导航栏中的Jobs,选择“Delta Live Tables”并选择在上一步中创建的管道,返回到管道“Sales Order Pipeline”
- 选择启动/停止开关旁边的下拉菜单,然后选择“完全刷新”
连续vs触发管道模式
在DLT中,虽然单个数据集可能是增量的或完整的,但整个管道可能是触发的或连续的。当连续管道启动时,它将启动基础设施并继续吸收新数据,直到手动或通过API停止管道。被触发的管道将一次性消耗源中的所有新数据,并自动关闭基础设施。被触发的管道通常会在生产中使用协调器或Databricks多任务作业的调度上运行。
要在触发模式和连续模式之间切换,打开管道并选择“编辑设置”。Continuous将是JSON中的布尔值。设置" continuous ": false "相当于将管道设置为触发模式。
这使您可以灵活地慢慢成熟到连续处理范式,而无需对代码进行重大重构。对于那些开始意识到实时洞察的价值而不需要持续运行云基础设施的更高成本的组织来说,这是一种常见的模式。有经验的Spark工程师可以使用下面的矩阵来理解DLT的功能:
读: | 写: | 连续模式 | 触发模式 |
---|---|---|---|
完整的 | 完整的 | 在预定义的时间间隔内重新处理 | 单次再处理(删除和替换) |
完整的 | 增量 | 不可能的 | 不可能的 |
增量 | 完整的 | 在预定义的时间间隔内重新处理 | 重新处理物化流结果 |
增量 | 增量 | 使用默认触发器的流 | Trigger.once()流 |
Productionization
现在我们已经定义了管道。我们可以通过以下步骤来总结:
管道可观测性与数据质量监测
事件日志
DLT将所有管道日志发送到管道存储位置中的预定义Delta Lake表,该表可用于监控、沿袭和数据质量报告。您可以导入这个通用日志分析笔记本来检查事件日志,或者使用dbutils访问Delta表{{您的存储位置}}/system/events。
最有用的信息在日志表的“details”列中。下面是导致DLT发出日志的不同类型的操作,以及您可以在“详细信息”中找到该事件的一些相关字段:
- user_action:在执行诸如创建管道之类的操作时发生事件
- flow_definition:当管道部署或更新并具有沿袭、模式和执行计划信息时发生事件
- output_dataset而且input_datasets-输出表/视图及其上游表/视图
- flow_type—这是一个完整流还是附加流
- explain_text- Spark解释计划
- flow_progress:当数据流开始运行或完成对一批数据的处理时发生事件
- 指标-目前包含num_output_rows
- Data_quality——包含这个特定数据集的数据质量规则的结果数组
- dropped_records
- 预期
- 名称,数据集,passed_records, failed_records
数据质量监控(需要Databricks SQL)
因为DLT日志是作为Delta表公开的,并且日志包含数据期望指标,所以很容易生成报告,用您选择的BI工具监视数据质量。我们建议使用砖的SQL因为它与Delta和Databricks平台紧密集成,并通过易于管理的计算端点提供极快的查询速度。bob体育客户端下载
使用Databricks SQL创建数据质量报告,请遵循以下步骤:
- 注意管道的“存储位置”,方法是导航到管道,选择Edit Settings,并复制的值“storage_location”
- 使用下面的例子和步骤1中的存储位置在metastore中注册日志表:
CREATE TABLE {{my_pipeline_logs}} AS SELECT * FROM delta。“{{管道存储位置}}”
- 在左上角的下拉菜单中,切换到“SQL”工作区(在开发DLT管道时,您应该在“数据科学与工程”工作区)
- 在左侧导航栏中,选择“查询”
- 选择“创建查询”
- 复制以下SQL查询,替换{{my_pipeline_logs}}用你在步骤2中创建的表的名字:
WITH all_expectations AS (SELECT爆炸(from_json(details:flow_progress:data_quality:expectations, schema_of_json("[{'name':'str', 'dataset':'str', 'passed_records':'int', 'failed_records':'int'}]"))) AS expectation FROM {{my_pipeline_logs}} WHERE details:flow_progress. log . FROM {{my_pipeline_logs}}SELECT expectation_name, X_Axis, SUM(Y_Axis) AS Y_Axis FROM (SELECT expect .name AS expectation_name, 'Passed' AS X_Axis, expectations .name)passsed_records AS Y_Axis FROM all_expectations UNION ALL SELECT expect .name AS expectation_name, 'Failed' AS X_Axis, expectation.namefailed_records AS FROM all_expectations) GROUP BY expectation_name, X_Axis
- 运行查询,你应该看到一个类似于下面的响应:
- 选择“添加可视化”
- 选择可视化类型为“图表”,图表类型为“饼”。设置X列和Y列,并设置grouping为expectation_name:
你现在可以尝试在Redash中使用不同的图表和/或可视化类型。通常,对于图表,您可以使用X_axis和Y_axis,并根据expectation_name进行分组,以创建用于不同质量监控目的的仪表板
结论
现在您已经完成了第一个Delta Live Tables管道,并在此过程中学习了一些关键概念,我们迫不及待地想要看到您创建的管道!有关Delta Live Tables的更多信息,请参阅我们的DLT文档,观看演示,或下载笔记本!