公司博客上

潜入三角洲湖:模式实施与进化

2019年9月24日 公司博客上

分享这篇文章

试试Databricks的这个笔记本系列

三角洲湖标志。

数据,就像我们的经验一样,总是在不断发展和积累。为了跟上时代的步伐,我们对世界的思维模式必须适应新的数据,其中一些数据包含了新的维度——用新的方式看待我们以前没有概念的事物。这些心理模型就像一张表的模式,定义了我们如何分类和处理新信息。

这就涉及到模式管理。随着业务问题和需求的发展,数据结构也在不断变化。对于Delta Lake,随着数据的变化,合并新的维度是很容易的。用户可以访问简单的语义来控制表的模式。这些工具包括模式执行,这可以防止用户意外地用错误或垃圾数据污染他们的表,以及模式演化,这使它们能够自动添加富数据的新列。在这篇博客中,我们将深入研究这些工具的使用。

理解表模式

Apache Spark™中的每个DataFrame都包含一个模式,一个定义数据形状的蓝图,比如数据类型和列,以及元数据。使用Delta Lake,表的模式以JSON格式保存在事务日志中。

什么是模式强制?

模式强制,也称为模式验证是Delta Lake中的一种保护措施,它通过拒绝对不匹配表模式的表进行写操作来确保数据质量。就像繁忙的只接受预订的餐厅的前台经理一样,它检查插入到表中的数据中的每一列是否都在它的预期列列表中(换句话说,是否每一列都有“预订”),并拒绝包含不在列表中的列的任何写入。

模式强制是如何工作的?

Delta Lake使用模式验证在写,这意味着所有新写入表的操作都将在写入时检查与目标表的模式的兼容性。如果模式不兼容,Delta Lake将完全取消事务(不写入数据),并引发一个异常,让用户知道不匹配。

要确定对表的写操作是否兼容,Delta Lake使用以下规则。要写入的数据帧:

  • 不能包含目标表模式中不存在的任何附加列。相反,如果传入的数据不包含表中的每一列也没关系——这些列将被简单地分配为空值。
  • 不能具有与目标表中的列数据类型不同的列数据类型。如果目标表的列包含StringType数据,但DataFrame中的对应列包含IntegerType数据,模式强制将引发异常并阻止写入操作的发生。
  • 不能包含仅以大小写不同的列名。这意味着你不能在同一个表中定义'Foo'和'Foo'这样的列。Spark可以在区分大小写或不区分大小写(默认)模式下使用,而Delta Lake保留大小写,但在存储模式时不区分大小写。Parquet在存储和返回列信息时区分大小写。为了避免潜在的错误、数据损坏或丢失问题(我们在Databricks亲身经历过),我们决定添加这个限制。

为了说明这一点,看看下面的代码中,当试图将一些新计算的列附加到尚未设置为接受它们的Delta Lake表时,会发生什么情况。

#生成一个贷款数据框架,我们将附加到Delta Lake表< / span >贷款= sql(< / span >”“”< / span >SELECT addr_state, CAST(rand(10)*count as count,< / span >CAST(兰特(10)* 10000 *计数为AS的两倍)AS金额< / span >从loan_by_state_delta< / span >”“”< / span >)< / span >#显示原始DataFrame的模式< / span >original_loans.printSchema ()”“”< / span >根< / span >|——addr_state: string (nullable = true)< / span >|——count: integer (nullable = true)< / span >”“”< / span >#显示新的DataFrame的schema< / span >loans.printSchema ()”“”< / span >根< / span >|——addr_state: string (nullable = true)< / span >|——count: integer (nullable = true)< / span >|——amount: double (nullable = true) #新列< / span >”“”< / span >尝试添加新的数据帧到现有的表中< / span >loans.write。< / span >格式< / span >(< / span >“δ”< / span >) \< / span >.mode (< / span >“添加”< / span >) \< / span >.save (DELTALAKE_PATH)”“”返回:< / span >< / span >写入Delta表时检测到模式不匹配。< / span >要启用模式迁移,请设置:< / span >”。选项(“mergeSchema”,“真正的”)\ '< / span >表模式:< / span >根< / span >——addr_state: string (nullable = true)< / span >——count: long (nullable = true)< / span >数据模式:< / span >根< / span >——addr_state: string (nullable = true)< / span >——count: long (nullable = true)< / span >——数量:double (nullable = true)< / span >如果启用了表acl,这些选项将被忽略。请使用ALTER TABLE命令更改模式。< / span >< / span >< / span >”“”< / span >

Delta Lake不是自动添加新列,而是强制执行模式并停止写入操作。为了帮助识别导致不匹配的列,Spark在堆栈跟踪中打印出两个模式进行比较。

模式强制如何有用?

因为它是一种严格的检查,所以模式强制是一种很好的工具,可以用作清洁的、完全转换的数据集的看门人,这些数据集可以用于生产或消费。它通常被强制执行在直接提供:

  • 机器学习算法
  • BI仪表板
  • 数据分析和可视化工具
  • 任何需要高度结构化、强类型、语义模式的生产系统

为了让他们的数据为这最后一关做好准备,许多用户采用了一种简单的“多跳”体系结构,逐步为他们的表添加结构。要了解BOB低频彩更多信息,请查看标题为Delta Lake生产机器学习

当然,模式强制可以在管道中的任何地方使用,但是请注意,例如,由于忘记向传入数据添加了一列而导致流写入表失败,这可能会有点令人沮丧。

防止数据稀释

在这一点上,你可能会问自己,有什么好大惊小怪的?毕竟,有时意外的“模式不匹配”错误可能会使您在工作流中陷入困境,特别是如果您是Delta Lake的新手。为什么不让模式根据需要改变,这样我就可以写我的DataFrame了?

俗话说:“预防为主,治疗为辅。”在某些情况下,如果不强制执行模式,数据类型兼容性问题就会出现——看似相同的原始数据源可能包含边缘情况、损坏的列、格式错误的映射或其他可怕的东西。一种更好的方法是在门口就阻止这些敌人——使用模式强制——并在白天处理它们,而不是等到它们潜伏在生产代码的隐蔽位置时再处理。

模式强制提供了一种安心,即表的模式不会改变,除非您做出积极的选择来改变它。它可以防止数据“稀释”,当频繁地添加新列时,可能会发生这种情况,因为数据泛滥,以前丰富、简洁的表失去了它们的意义和有用性。通过鼓励您有意识地、设置高标准并期望高质量,模式实施正在实现其设计目的——保持诚实,保持表干净。

如果,经过进一步审查,你认为你真的做了要添加新列,这是一个简单的,一行修复,如下所述。解决方案是模式进化!

什么是图式进化?

模式演化是一种允许用户轻松更改表的当前模式以适应随着时间变化的数据的特性。最常见的情况是,在执行追加或覆盖操作时使用它,以自动调整模式以包含一个或多个新列。

模式进化是如何工作的?

继续上一节的示例,开发人员可以轻松地使用模式进化来添加以前由于模式不匹配而被拒绝的新列。模式演化是通过添加激活的.option(“mergeSchema”,“真正的”)到你的.write.writeStream火花命令。

#添加mergeSchema选项< / span >loans.write。< / span >格式< / span >(< / span >“δ”< / span >) \< / span >.option (< / span >“mergeSchema”< / span >,< / span >“真正的”< / span >) \< / span >.mode (< / span >“添加”< / span >) \< / span >.save (DELTALAKE_SILVER_PATH)

要查看绘图,请执行以下Spark SQL语句。

#< / span >创建< / span >一个阴谋< / span >与< / span >的< / span >新< / span >列< / span >来< / span >确认写入成功< / span >%< / span >sql< / span >选择< / span >addr_state,< / span >总和< / span >(“金额”)< / span >作为< / span >量< / span >从< / span >loan_by_state_delta< / span >集团< / span >通过< / span >addr_state< / span >订单< / span >通过< / span >总和< / span >(“金额”)< / span >DESC< / span >限制< / span >10< / span >

柱状图显示了成功使用模式强制和模式演进后每个州的贷款数量。

或者,您可以通过在Spark配置中添加Spark . databicks .delta.schema. automerge = True来为整个Spark会话设置该选项。请谨慎使用,因为模式强制将不再警告您意外的模式不匹配。

通过包括mergeSchema选项,任何出现在DataFrame中但不在目标表中的列都会自动添加到模式的末尾,作为写事务的一部分。还可以添加嵌套字段,这些字段也将被添加到各自结构列的末尾。

数据工程师和科学家可以使用这个选项向他们现有的机器学习生产表添加新列(可能是一个新跟踪的指标,或者本月的销售数据的列),而不会破坏依赖于旧列的现有模型。

在表追加或覆盖期间,以下类型的模式更改符合模式演化的条件:

  • 添加新列(这是最常见的场景)
  • 从NullType ->任何其他类型更改数据类型,或从ByteType -> ShortType -> IntegerType向上转换

不符合模式演化条件的其他更改要求模式和数据是否被添加覆盖.option(“overwriteSchema”,“真正的”).例如,在列“Foo”最初是一个的情况下整数数据类型和新模式将是一个字符串数据类型,那么所有的Parquet(数据)文件将需要重写。这些变化包括:

  • 删除一列
  • 更改现有列的数据类型(原地)
  • 重命名仅因大小写不同的列名(例如" Foo "和" Foo ")

最后,在即将发布的Spark 3.0中,显式DDL(使用ALTER TABLE)将得到全面支持,允许用户对表模式执行以下操作:

  • 添加列
  • 更改列注释
  • 设置定义表行为的表属性,例如设置事务日志的保留持续时间

模式进化如何有用?

模式演化可以在任何时候使用意愿来改变你的表的模式(而不是你不小心添加列到你的DataFrame,不应该在那里)。这是迁移模式的最简单方法,因为它自动添加正确的列名和数据类型,而无需显式地声明它们。

总结

模式强制拒绝与表不兼容的任何新列或其他模式更改。通过设置和坚持这些高标准,分析师和工程师可以相信他们的数据具有最高水平的完整性,并能够清晰地进行推理,从而使他们能够做出更好的业务决策。

另一方面,模式进化通过使执行更容易来补充执行目的模式更改将自动发生。毕竟,添加一列并不难。

模式强制是模式进化的阳与阴。当一起使用时,这些功能比以往任何时候都更容易屏蔽噪音,并调谐到信号。

我们也要感谢Mukul Murthy和Pranav Anand对这个博客的贡献。

对开源的Delta Lake感兴趣?bob下载地址
参观三角洲湖在线中心要了解BOB低频彩更多,请下载最新代码并加入Delta Lake社区。

相关的

本系列文章:
潜入三角洲湖#1:解压事务日志
潜入三角洲湖#2:模式实施与演进
潜入三角洲湖#3:DML内部(更新,删除,合并)

Delta Lake生产机器学习
什么是数据湖?

免费试用Databricks

相关的帖子

看到所有公司博客上的帖子