使用合并将插入到Delta Lake表
方法可以将源表、视图或DataFrame中的数据插入到目标Delta表中合并
SQL操作。Delta Lake支持插入、更新和删除合并
,并且它支持SQL标准之外的扩展语法,以方便高级用例。
假设您有一个名为people10mupdates
或者源路径/ tmp /δ/ people-10m-updates
的目标表的新数据people10m
或者目标路径/ tmp /δ/ people-10m
.其中一些新记录可能已经出现在目标数据中。要合并新数据,您需要更新人员所在的行id
已经存在并插入没有匹配的新行吗id
是礼物。可以执行如下查询:
合并成people10m使用people10mupdates在people10m.id=people10mupdates.id当匹配然后更新集id=people10mupdates.id,firstName=people10mupdates.firstName,middleName=people10mupdates.middleName,姓=people10mupdates.姓,性别=people10mupdates.性别,生日=people10mupdates.生日,ssn=people10mupdates.ssn,工资=people10mupdates.工资当不匹配然后插入(id,firstName,middleName,姓,性别,生日,ssn,工资)值(people10mupdates.id,people10mupdates.firstName,people10mupdates.middleName,people10mupdates.姓,people10mupdates.性别,people10mupdates.生日,people10mupdates.ssn,people10mupdates.工资)
从delta.tables进口*deltaTablePeople=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)deltaTablePeopleUpdates=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m-updates”)dfUpdates=deltaTablePeopleUpdates.toDF()deltaTablePeople.别名(“人”)\.合并(dfUpdates.别名(“更新”),”的人。id=更新年代.id')\.whenMatchedUpdate(集={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性别”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工资”:“updates.salary”})\.whenNotMatchedInsert(值={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性别”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工资”:“updates.salary”})\.执行()
进口io.δ.表._进口org.apache.火花.sql.功能._瓦尔deltaTablePeople=DeltaTable.forPath(火花,“/ tmp /δ/ people-10m”)瓦尔deltaTablePeopleUpdates=DeltaTable.forPath(火花,“tmp /δ/ people-10m-updates”)瓦尔dfUpdates=deltaTablePeopleUpdates.toDF()deltaTablePeople.作为(“人”).合并(dfUpdates.作为(“更新”),”的人。id=更新年代.id").whenMatched.updateExpr(地图(“id”->“updates.id”,“firstName”->“updates.firstName”,“middleName”->“updates.middleName”,“姓”->“updates.lastName”,“性别”->“updates.gender”,“生日”->“updates.birthDate”,“ssn”->“updates.ssn”,“工资”->“updates.salary”)).whenNotMatched.insertExpr(地图(“id”->“updates.id”,“firstName”->“updates.firstName”,“middleName”->“updates.middleName”,“姓”->“updates.lastName”,“性别”->“updates.gender”,“生日”->“updates.birthDate”,“ssn”->“updates.ssn”,“工资”->“updates.salary”)).执行()
看到Delta Lake API文档Scala和Python语法的详细信息。有关SQL语法的详细信息,请参见合并成
使用merge修改所有不匹配的行
在Databricks Runtime 12.1及以上版本中,您可以使用当不匹配通过源
条款更新
orgydF4y2Ba删除
目标表中的记录在源表中没有对应的记录。Databricks建议添加一个可选的条件子句,以避免完全重写目标表。
下面的代码示例展示了用于删除的基本语法,用源表的内容覆盖目标表,并删除目标表中不匹配的记录。有关源更新和删除受时间限制的表的更可伸缩模式,请参见增量同步增量表与源.
(targetDF.合并(sourceDF,”源。关键=目标.关键").whenMatchedUpdateAll().whenNotMatchedInsertAll().whenNotMatchedBySourceDelete().执行())
targetDF.合并(sourceDF,”源。关键=目标.关键").whenMatched().updateAll().whenNotMatched().insertAll().whenNotMatchedBySource().删除().执行()
合并成目标使用源在源.关键=目标.关键当匹配更新集*当不匹配插入*当不匹配通过源删除
属性中添加条件当不匹配通过源
子句,并指定要在不匹配的目标行中更新的值。
(targetDF.合并(sourceDF,”源。关键=目标.关键").whenMatchedUpdate(集={“target.lastSeen”:“source.timestamp”}).whenNotMatchedInsert(集={“target.key”:“source.key”,“target.lastSeen”:“source.timestamp”,“target.status”:“活跃”}).whenNotMatchedBySourceUpdate(条件=”目标。lastSeen> =(当前日期()-时间间隔“5”一天)",集={“target.status”:“不活跃”}).执行())
targetDF.合并(sourceDF,”源。关键=目标.关键").whenMatched().updateExpr(地图(“target.lastSeen”->“source.timestamp”)).whenNotMatched().insertExpr(地图(“target.key”->“source.key”,“target.lastSeen”->“source.timestamp”,“target.status”->“活跃”,)).whenNotMatchedBySource(”目标。lastSeen> =(当前日期()-时间间隔“5”一天)").updateExpr(地图(“target.status”->“不活跃”)).执行()
合并成目标使用源在源.关键=目标.关键当匹配然后更新集目标.lastSeen=源.时间戳当不匹配然后插入(关键,lastSeen,状态)值(源.关键,源.时间戳,“活跃”)当不匹配通过源和目标.lastSeen> =(当前日期()-时间间隔“5”一天)然后更新集目标.状态=“不活跃”
合并操作语义
下面是一个详细的描述合并
编程操作语义。
可以有任意数量的
whenMatched
而且whenNotMatched
条款。whenMatched
子句在源行与基于匹配条件的目标表行匹配时执行。这些子句具有以下语义。whenMatched
子句最多只能有一个更新
和一个删除
行动。的更新
行动合并
仅更新指定的列(类似于更新
操作)匹配的目标行。的删除
操作删除匹配的行。每一个
whenMatched
子句可以有可选条件。如果此子句条件存在,则更新
orgydF4y2Ba删除
仅当子句条件为true时,才对任何匹配的源-目标行对执行操作。如果有多个
whenMatched
子句,则它们将按照指定的顺序求值。所有whenMatched
子句,除了最后一个,都必须有条件。如果没有
whenMatched
对于匹配合并条件的源和目标行对,条件计算为true,则目标行保持不变。若要使用源数据集的相应列更新目标Delta表的所有列,请使用
whenMatched(…).updateAll ()
.这相当于:whenMatched(…)。updateExpr(地图(“col1”->“source.col1”,“col2”->“source.col2”,…))
为目标Delta表的所有列。因此,此操作假定源表与目标表中的列具有相同的列,否则查询将抛出分析错误。
请注意
当启用自动模式迁移时,此行为将更改。看到自动模式演化获取详细信息。
whenNotMatched
当源行与基于匹配条件的任何目标行不匹配时,将执行子句。这些子句具有以下语义。whenNotMatched
从句只能有the插入
行动。根据指定的列和相应的表达式生成新行。您不需要指定目标表中的所有列。对于未指定的目标列,零
被插入。每一个
whenNotMatched
子句可以有可选条件。如果存在子句条件,则仅当源行条件为真时才插入源行。否则,源列将被忽略。如果有多个
whenNotMatched
子句,则它们将按照指定的顺序求值。所有whenNotMatched
子句,除了最后一个,都必须有条件。若要将目标Delta表的所有列与源数据集的相应列插入,请使用
whenNotMatched(…).insertAll ()
.这相当于:whenNotMatched(…)。insertExpr(地图(“col1”->“source.col1”,“col2”->“source.col2”,…))
为目标Delta表的所有列。因此,此操作假定源表与目标表中的列具有相同的列,否则查询将抛出分析错误。
请注意
当启用自动模式迁移时,此行为将更改。看到自动模式演化获取详细信息。
whenNotMatchedBySource
当目标行与基于合并条件的任何源行不匹配时,将执行子句。这些子句具有以下语义。whenNotMatchedBySource
子句可以指定删除
而且更新
行动。每一个
whenNotMatchedBySource
子句可以有可选条件。如果存在子句条件,则仅当该条件为真时才修改目标行。否则,目标行保持不变。如果有多个
whenNotMatchedBySource
子句,则它们将按照指定的顺序求值。所有whenNotMatchedBySource
子句,除了最后一个,都必须有条件。根据定义,
whenNotMatchedBySource
子句没有可以从中提取列值的源行,因此源列不能被引用。对于要修改的每一列,您可以指定一个文字,也可以在目标列上执行操作,例如集target.deleted_count=target.deleted_count+1
.
重要的
一个
合并
如果源数据集的多行匹配,并且merge尝试更新目标Delta表的同行,则操作可能失败。根据merge的SQL语义,这样的更新操作是不明确的,因为不清楚应该使用哪个源行来更新匹配的目标行。可以对源表进行预处理,以消除多个匹配的可能性。看到变更数据捕获示例它展示了如何预处理更改数据集(即源数据集),以便在将更改应用到目标Delta表之前仅保留每个键的最新更改。您可以应用SQL
合并
只有当视图被定义为时,才能对SQL VIEW进行操作创建视图viewName作为选择*从deltaTable
.
写入Delta表时重复数据删除
一个常见的ETL用例是通过将日志附加到一个表中来收集到Delta表中。但是,源通常会生成重复的日志记录,需要后续的重复数据删除步骤来处理它们。与合并
时,可避免插入重复的记录。
合并成日志使用newDedupedLogs在日志.uniqueId=newDedupedLogs.uniqueId当不匹配然后插入*
deltaTable.别名(“日志”).合并(newDedupedLogs.别名(“newDedupedLogs”),“日志。uniqueId = newDedupedLogs.uniqueId")\.whenNotMatchedInsertAll()\.执行()
deltaTable.作为(“日志”).合并(newDedupedLogs.作为(“newDedupedLogs”),“日志。uniqueId = newDedupedLogs.uniqueId").whenNotMatched().insertAll().执行()
deltaTable.作为(“日志”).合并(newDedupedLogs.作为(“newDedupedLogs”),“日志。uniqueId = newDedupedLogs.uniqueId").whenNotMatched().insertAll().执行();
请注意
包含新日志的数据集本身需要重复数据删除。通过merge的SQL语义,它将新数据与表中的现有数据进行匹配并重复删除,但如果新数据集中有重复数据,则插入它。因此,在合并到表之前,要对新数据进行重复数据删除。
如果您知道可能只会在几天内得到重复的记录,那么您可以通过按日期对表进行分区,然后指定要匹配的目标表的日期范围,从而进一步优化查询。
合并成日志使用newDedupedLogs在日志.uniqueId=newDedupedLogs.uniqueId和日志.日期>当前日期()-时间间隔7天当不匹配和newDedupedLogs.日期>当前日期()-时间间隔7天然后插入*
deltaTable.别名(“日志”).合并(newDedupedLogs.别名(“newDedupedLogs”),“日志。uniqueId = newDedupedLogs。uniqueId和日志。日期>当前日期()-时间间隔7天")\.whenNotMatchedInsertAll(“newDedupedLogs。日期>当前日期()-时间间隔7天")\.执行()
deltaTable.作为(“日志”).合并(newDedupedLogs.作为(“newDedupedLogs”),“日志。uniqueId = newDedupedLogs。uniqueId和日志。日期>当前日期()-时间间隔7天").whenNotMatched(“newDedupedLogs。日期>当前日期()-时间间隔7天").insertAll().执行()
deltaTable.作为(“日志”).合并(newDedupedLogs.作为(“newDedupedLogs”),“日志。uniqueId = newDedupedLogs。uniqueId和日志。日期>当前日期()-时间间隔7天").whenNotMatched(“newDedupedLogs。日期>当前日期()-时间间隔7天").insertAll().执行();
这比前一个命令更有效,因为它只查找最近7天的日志中的副本,而不是整个表。此外,您可以使用这种仅插入的合并和结构化流来执行连续的重复数据删除日志。
在流查询中,可以使用合并操作
foreachBatch
使用重复数据删除将任何流数据连续写入Delta表。参见以下内容流的例子欲知更多有关foreachBatch
.在另一个流查询中,您可以连续地从这个Delta表中读取重复数据删除后的数据。这是可能的,因为仅插入合并仅向Delta表追加新数据。
缓慢改变数据(SCD)对Delta表的类型2操作
Delta Live Tables具有跟踪和应用SCD Type 2的本机支持。看到使用Delta Live表更改数据捕获.
将更改数据写入Delta表
与SCD类似,另一个常见的用例(通常称为更改数据捕获(CDC))是将从外部数据库生成的所有数据更改应用到Delta表中。换句话说,应用于外部表的一组更新、删除和插入需要应用于Delta表。你可以用合并
如下。
瓦尔deltaTable:DeltaTable=...// DeltaTable with schema (key, value)// DataFrame有以下列的变化// - key:修改的键// - time:更改之间排序的更改时间(可以用其他排序id代替)// - newValue:如果key未被删除,则更新或插入值// - deleted:如果密钥被删除,则为true;如果密钥被插入或更新,则为false瓦尔changesDF:DataFrame=...//根据时间戳查找每个键的最新更改//注意:对于嵌套结构,max on struct计算为//第一个结构字段的最大值,如果相等则返回到第二个字段,依此类推。瓦尔latestChangeForEachKey=changesDF.selectExpr(“关键”,"struct(time, newValue, deleted) as otherCols").groupBy(“关键”).gg(马克斯(“otherCols”).作为(“最新”)).selectExpr(“关键”,“最新*”。)deltaTable.作为(“t”).合并(latestChangeForEachKey.作为(“s”),"s.key = t.key").whenMatched("s.deleted = true").删除().whenMatched().updateExpr(地图(“关键”->“s.key”,“价值”->“s.newValue”)).whenNotMatched("s.deleted = false").insertExpr(地图(“关键”->“s.key”,“价值”->“s.newValue”)).执行()
deltaTable=...# DeltaTable with schema (key, value)# DataFrame与以下列的变化# - key:修改的键# - time:更改之间排序的更改时间(可以替换为其他排序id)# - newValue:如果key没有被删除,则更新或插入值# - deleted:如果键被删除,则为true;如果键被插入或更新,则为falsechangesDF=火花.表格(“改变”)#根据时间戳查找每个键的最新更改#注意:对于嵌套结构,max on struct计算为# Max在第一个结构字段,如果相等则返回到第二个字段,依此类推。latestChangeForEachKey=changesDF\.selectExpr(“关键”,"struct(time, newValue, deleted) as otherCols")\.groupBy(“关键”)\.gg(马克斯(“otherCols”).别名(“最新”))\.选择(“关键”,“最新*”。)\deltaTable.别名(“t”).合并(latestChangeForEachKey.别名(“s”),"s.key = t.key")\.whenMatchedDelete(条件="s.deleted = true")\.whenMatchedUpdate(集={“关键”:“s.key”,“价值”:“s.newValue”})\.whenNotMatchedInsert(条件="s.deleted = false",值={“关键”:“s.key”,“价值”:“s.newValue”}).执行()
增量同步增量表与源
在Databricks Runtime 12.1及以上版本中,您可以使用当不匹配通过源
创建任意条件,以自动删除和替换表的一部分。当您有一个源表,其中的记录可能在初始数据输入后的几天内更改或删除,但最终保持最终状态时,这可能特别有用。
下面的查询显示使用此模式从源中选择5天的记录,在目标中更新匹配的记录,从源向目标插入新记录,并删除目标中过去5天的所有不匹配的记录。
合并成目标作为t使用(选择*从源在哪里created_at> =(当前日期()-时间间隔“5”一天))作为年代在t.关键=年代.关键当匹配然后更新集*当不匹配然后插入*当不匹配通过源和created_at> =(当前日期()-时间间隔“5”一天)然后删除
通过在源表和目标表上提供相同的布尔过滤器,您可以动态地将更改从源表传播到目标表,包括删除。
请注意
虽然可以在没有任何条件子句的情况下使用此模式,但这将导致完全重写目标表,代价可能会很高。