使用合并将插入到Delta Lake表

方法可以将源表、视图或DataFrame中的数据插入到目标Delta表中合并SQL操作。Delta Lake支持插入、更新和删除合并,并且它支持SQL标准之外的扩展语法,以方便高级用例。

假设您有一个名为people10mupdates或者源路径/ tmp /δ/ people-10m-updates的目标表的新数据people10m或者目标路径/ tmp /δ/ people-10m.其中一些新记录可能已经出现在目标数据中。要合并新数据,您需要更新人员所在的行id已经存在并插入没有匹配的新行吗id是礼物。可以执行如下查询:

合并people10m使用people10mupdatespeople10midpeople10mupdatesid匹配然后更新idpeople10mupdatesidfirstNamepeople10mupdatesfirstNamemiddleNamepeople10mupdatesmiddleNamepeople10mupdates性别people10mupdates性别生日people10mupdates生日ssnpeople10mupdatesssn工资people10mupdates工资匹配然后插入idfirstNamemiddleName性别生日ssn工资people10mupdatesidpeople10mupdatesfirstNamepeople10mupdatesmiddleNamepeople10mupdatespeople10mupdates性别people10mupdates生日people10mupdatesssnpeople10mupdates工资
delta.tables进口deltaTablePeopleDeltaTableforPath火花“/ tmp /δ/ people-10m”deltaTablePeopleUpdatesDeltaTableforPath火花“/ tmp /δ/ people-10m-updates”dfUpdatesdeltaTablePeopleUpdatestoDF()deltaTablePeople别名“人”合并dfUpdates别名“更新”),”的人。id=更新年代.id'whenMatchedUpdate“id”“updates.id”“firstName”“updates.firstName”“middleName”“updates.middleName”“姓”“updates.lastName”“性别”“updates.gender”“生日”“updates.birthDate”“ssn”“updates.ssn”“工资”“updates.salary”whenNotMatchedInsert“id”“updates.id”“firstName”“updates.firstName”“middleName”“updates.middleName”“姓”“updates.lastName”“性别”“updates.gender”“生日”“updates.birthDate”“ssn”“updates.ssn”“工资”“updates.salary”执行()
进口ioδ_进口orgapache火花sql功能_瓦尔deltaTablePeopleDeltaTableforPath火花“/ tmp /δ/ people-10m”瓦尔deltaTablePeopleUpdatesDeltaTableforPath火花“tmp /δ/ people-10m-updates”瓦尔dfUpdatesdeltaTablePeopleUpdatestoDF()deltaTablePeople作为“人”合并dfUpdates作为“更新”),”的人。id=更新年代.id"whenMatchedupdateExpr地图“id”->“updates.id”“firstName”->“updates.firstName”“middleName”->“updates.middleName”“姓”->“updates.lastName”“性别”->“updates.gender”“生日”->“updates.birthDate”“ssn”->“updates.ssn”“工资”->“updates.salary”))whenNotMatchedinsertExpr地图“id”->“updates.id”“firstName”->“updates.firstName”“middleName”->“updates.middleName”“姓”->“updates.lastName”“性别”->“updates.gender”“生日”->“updates.birthDate”“ssn”->“updates.ssn”“工资”->“updates.salary”))执行()

看到Delta Lake API文档Scala和Python语法的详细信息。有关SQL语法的详细信息,请参见合并成

使用merge修改所有不匹配的行

在Databricks Runtime 12.1及以上版本中,您可以使用匹配通过条款更新orgydF4y2Ba删除目标表中的记录在源表中没有对应的记录。Databricks建议添加一个可选的条件子句,以避免完全重写目标表。

下面的代码示例展示了用于删除的基本语法,用源表的内容覆盖目标表,并删除目标表中不匹配的记录。有关源更新和删除受时间限制的表的更可伸缩模式,请参见增量同步增量表与源

targetDF合并sourceDF”源。关键=目标.关键"whenMatchedUpdateAll()whenNotMatchedInsertAll()whenNotMatchedBySourceDelete()执行()
targetDF合并sourceDF”源。关键=目标.关键"whenMatched()updateAll()whenNotMatched()insertAll()whenNotMatchedBySource()删除()执行()
合并目标使用关键目标关键匹配更新匹配插入匹配通过删除

属性中添加条件匹配通过子句,并指定要在不匹配的目标行中更新的值。

targetDF合并sourceDF”源。关键=目标.关键"whenMatchedUpdate“target.lastSeen”“source.timestamp”whenNotMatchedInsert“target.key”“source.key”“target.lastSeen”“source.timestamp”“target.status”“活跃”whenNotMatchedBySourceUpdate条件”目标。lastSeen> =(当前日期()-时间间隔“5”一天)"“target.status”“不活跃”执行()
targetDF合并sourceDF”源。关键=目标.关键"whenMatched()updateExpr地图“target.lastSeen”->“source.timestamp”))whenNotMatched()insertExpr地图“target.key”->“source.key”“target.lastSeen”->“source.timestamp”“target.status”->“活跃”whenNotMatchedBySource”目标。lastSeen> =(当前日期()-时间间隔“5”一天)"updateExpr地图“target.status”->“不活跃”))执行()
合并目标使用关键目标关键匹配然后更新目标lastSeen时间戳匹配然后插入关键lastSeen状态关键时间戳“活跃”匹配通过目标lastSeen> =当前日期()-时间间隔“5”一天然后更新目标状态“不活跃”

合并操作语义

下面是一个详细的描述合并编程操作语义。

  • 可以有任意数量的whenMatched而且whenNotMatched条款。

  • whenMatched子句在源行与基于匹配条件的目标表行匹配时执行。这些子句具有以下语义。

    • whenMatched子句最多只能有一个更新和一个删除行动。的更新行动合并仅更新指定的列(类似于更新操作)匹配的目标行。的删除操作删除匹配的行。

    • 每一个whenMatched子句可以有可选条件。如果此子句条件存在,则更新orgydF4y2Ba删除仅当子句条件为true时,才对任何匹配的源-目标行对执行操作。

    • 如果有多个whenMatched子句,则它们将按照指定的顺序求值。所有whenMatched子句,除了最后一个,都必须有条件。

    • 如果没有whenMatched对于匹配合并条件的源和目标行对,条件计算为true,则目标行保持不变。

    • 若要使用源数据集的相应列更新目标Delta表的所有列,请使用whenMatched(…).updateAll ().这相当于:

      whenMatched(…)。updateExpr地图“col1”->“source.col1”“col2”->“source.col2”…))

      为目标Delta表的所有列。因此,此操作假定源表与目标表中的列具有相同的列,否则查询将抛出分析错误。

      请注意

      当启用自动模式迁移时,此行为将更改。看到自动模式演化获取详细信息。

  • whenNotMatched当源行与基于匹配条件的任何目标行不匹配时,将执行子句。这些子句具有以下语义。

    • whenNotMatched从句只能有the插入行动。根据指定的列和相应的表达式生成新行。您不需要指定目标表中的所有列。对于未指定的目标列,被插入。

    • 每一个whenNotMatched子句可以有可选条件。如果存在子句条件,则仅当源行条件为真时才插入源行。否则,源列将被忽略。

    • 如果有多个whenNotMatched子句,则它们将按照指定的顺序求值。所有whenNotMatched子句,除了最后一个,都必须有条件。

    • 若要将目标Delta表的所有列与源数据集的相应列插入,请使用whenNotMatched(…).insertAll ().这相当于:

      whenNotMatched(…)。insertExpr地图“col1”->“source.col1”“col2”->“source.col2”…))

      为目标Delta表的所有列。因此,此操作假定源表与目标表中的列具有相同的列,否则查询将抛出分析错误。

      请注意

      当启用自动模式迁移时,此行为将更改。看到自动模式演化获取详细信息。

  • whenNotMatchedBySource当目标行与基于合并条件的任何源行不匹配时,将执行子句。这些子句具有以下语义。

    • whenNotMatchedBySource子句可以指定删除而且更新行动。

    • 每一个whenNotMatchedBySource子句可以有可选条件。如果存在子句条件,则仅当该条件为真时才修改目标行。否则,目标行保持不变。

    • 如果有多个whenNotMatchedBySource子句,则它们将按照指定的顺序求值。所有whenNotMatchedBySource子句,除了最后一个,都必须有条件。

    • 根据定义,whenNotMatchedBySource子句没有可以从中提取列值的源行,因此源列不能被引用。对于要修改的每一列,您可以指定一个文字,也可以在目标列上执行操作,例如target.deleted_counttarget.deleted_count+1

重要的

  • 一个合并如果源数据集的多行匹配,并且merge尝试更新目标Delta表的同行,则操作可能失败。根据merge的SQL语义,这样的更新操作是不明确的,因为不清楚应该使用哪个源行来更新匹配的目标行。可以对源表进行预处理,以消除多个匹配的可能性。看到变更数据捕获示例它展示了如何预处理更改数据集(即源数据集),以便在将更改应用到目标Delta表之前仅保留每个键的最新更改。

  • 您可以应用SQL合并只有当视图被定义为时,才能对SQL VIEW进行操作创建视图viewName作为选择deltaTable

写入Delta表时重复数据删除

一个常见的ETL用例是通过将日志附加到一个表中来收集到Delta表中。但是,源通常会生成重复的日志记录,需要后续的重复数据删除步骤来处理它们。与合并时,可避免插入重复的记录。

合并日志使用newDedupedLogs日志uniqueIdnewDedupedLogsuniqueId匹配然后插入
deltaTable别名“日志”合并newDedupedLogs别名“newDedupedLogs”),“日志。uniqueId = newDedupedLogs.uniqueId"whenNotMatchedInsertAll()执行()
deltaTable作为“日志”合并newDedupedLogs作为“newDedupedLogs”),“日志。uniqueId = newDedupedLogs.uniqueId"whenNotMatched()insertAll()执行()
deltaTable作为“日志”合并newDedupedLogs作为“newDedupedLogs”),“日志。uniqueId = newDedupedLogs.uniqueId"whenNotMatched()insertAll()执行();

请注意

包含新日志的数据集本身需要重复数据删除。通过merge的SQL语义,它将新数据与表中的现有数据进行匹配并重复删除,但如果新数据集中有重复数据,则插入它。因此,在合并到表之前,要对新数据进行重复数据删除。

如果您知道可能只会在几天内得到重复的记录,那么您可以通过按日期对表进行分区,然后指定要匹配的目标表的日期范围,从而进一步优化查询。

合并日志使用newDedupedLogs日志uniqueIdnewDedupedLogsuniqueId日志日期>当前日期()-时间间隔7匹配newDedupedLogs日期>当前日期()-时间间隔7然后插入
deltaTable别名“日志”合并newDedupedLogs别名“newDedupedLogs”),“日志。uniqueId = newDedupedLogs。uniqueId和日志。日期>当前日期()-时间间隔7天"whenNotMatchedInsertAll“newDedupedLogs。日期>当前日期()-时间间隔7天"执行()
deltaTable作为“日志”).合并newDedupedLogs作为“newDedupedLogs”),“日志。uniqueId = newDedupedLogs。uniqueId和日志。日期>当前日期()-时间间隔7天"whenNotMatched“newDedupedLogs。日期>当前日期()-时间间隔7天"insertAll()执行()
deltaTable作为“日志”).合并newDedupedLogs作为“newDedupedLogs”),“日志。uniqueId = newDedupedLogs。uniqueId和日志。日期>当前日期()-时间间隔7天"whenNotMatched“newDedupedLogs。日期>当前日期()-时间间隔7天"insertAll()执行();

这比前一个命令更有效,因为它只查找最近7天的日志中的副本,而不是整个表。此外,您可以使用这种仅插入的合并和结构化流来执行连续的重复数据删除日志。

  • 在流查询中,可以使用合并操作foreachBatch使用重复数据删除将任何流数据连续写入Delta表。参见以下内容流的例子欲知更多有关foreachBatch

  • 在另一个流查询中,您可以连续地从这个Delta表中读取重复数据删除后的数据。这是可能的,因为仅插入合并仅向Delta表追加新数据。

缓慢改变数据(SCD)对Delta表的类型2操作

Delta Live Tables具有跟踪和应用SCD Type 2的本机支持。看到使用Delta Live表更改数据捕获

将更改数据写入Delta表

与SCD类似,另一个常见的用例(通常称为更改数据捕获(CDC))是将从外部数据库生成的所有数据更改应用到Delta表中。换句话说,应用于外部表的一组更新、删除和插入需要应用于Delta表。你可以用合并如下。

瓦尔deltaTableDeltaTable...// DeltaTable with schema (key, value)// DataFrame有以下列的变化// - key:修改的键// - time:更改之间排序的更改时间(可以用其他排序id代替)// - newValue:如果key未被删除,则更新或插入值// - deleted:如果密钥被删除,则为true;如果密钥被插入或更新,则为false瓦尔changesDFDataFrame...//根据时间戳查找每个键的最新更改//注意:对于嵌套结构,max on struct计算为//第一个结构字段的最大值,如果相等则返回到第二个字段,依此类推。瓦尔latestChangeForEachKeychangesDFselectExpr“关键”"struct(time, newValue, deleted) as otherCols"groupBy“关键”gg马克斯“otherCols”).作为“最新”))selectExpr“关键”“最新*”。deltaTable作为“t”合并latestChangeForEachKey作为“s”),"s.key = t.key"whenMatched"s.deleted = true"删除()whenMatched()updateExpr地图“关键”->“s.key”“价值”->“s.newValue”))whenNotMatched"s.deleted = false"insertExpr地图“关键”->“s.key”“价值”->“s.newValue”))执行()
deltaTable...# DeltaTable with schema (key, value)# DataFrame与以下列的变化# - key:修改的键# - time:更改之间排序的更改时间(可以替换为其他排序id)# - newValue:如果key没有被删除,则更新或插入值# - deleted:如果键被删除,则为true;如果键被插入或更新,则为falsechangesDF火花表格“改变”#根据时间戳查找每个键的最新更改#注意:对于嵌套结构,max on struct计算为# Max在第一个结构字段,如果相等则返回到第二个字段,依此类推。latestChangeForEachKeychangesDFselectExpr“关键”"struct(time, newValue, deleted) as otherCols"groupBy“关键”gg马克斯“otherCols”别名“最新”))选择“关键”“最新*”。deltaTable别名“t”合并latestChangeForEachKey别名“s”),"s.key = t.key"whenMatchedDelete条件"s.deleted = true"whenMatchedUpdate“关键”“s.key”“价值”“s.newValue”})whenNotMatchedInsert条件"s.deleted = false"“关键”“s.key”“价值”“s.newValue”执行()

使用MERGE笔记本记录变更数据

在新标签页打开笔记本

增量同步增量表与源

在Databricks Runtime 12.1及以上版本中,您可以使用匹配通过创建任意条件,以自动删除和替换表的一部分。当您有一个源表,其中的记录可能在初始数据输入后的几天内更改或删除,但最终保持最终状态时,这可能特别有用。

下面的查询显示使用此模式从源中选择5天的记录,在目标中更新匹配的记录,从源向目标插入新记录,并删除目标中过去5天的所有不匹配的记录。

合并目标作为t使用选择在哪里created_at> =当前日期()-时间间隔“5”一天))作为年代t关键年代关键匹配然后更新匹配然后插入匹配通过created_at> =当前日期()-时间间隔“5”一天然后删除

通过在源表和目标表上提供相同的布尔过滤器,您可以动态地将更改从源表传播到目标表,包括删除。

请注意

虽然可以在没有任何条件子句的情况下使用此模式,但这将导致完全重写目标表,代价可能会很高。