使用Delta Live表更改数据捕获

请注意

本文描述如何根据源数据的更改更新Delta Live tables管道中的表。要了解如何记录和查询Delta表的行级更改信息,请参见在Databricks上使用Delta Lake更改数据提要

预览

Delta Live Tables支持SCD类型2公共预览

可以在Delta Live Tables中使用更改数据捕获(CDC)根据源数据的更改更新表。Delta Live Tables SQL和Python接口支持CDC。Delta Live Tables支持更新慢变维(SCD)类型1和类型2的表:

  • 使用SCD类型1直接更新记录。更新的记录不保留历史记录。

  • 使用SCD类型2来保留对记录的所有更新的历史。在使用SCD类型2时,还可以保留对指定列的更新历史记录。看到仅跟踪具有SCD类型2的指定列的历史记录

为了表示变更的有效期间,SCD Type 2将每个变更与生成的变更一起存储__START_AT而且__END_AT列。的列指定序列通过使用SQL或sequence_by来生成__START_AT而且__END_AT列。

请注意

  • 的数据类型__START_AT而且__END_AT列的值与指定的数据类型相同序列通过字段。

  • 要查询APPLY CHANGES目标表,必须发布你的表。

  • 当您发布APPLY CHANGES表时,以__apply_changes_storage_也是在包含APPLY CHANGES命令的底层内部状态的metastore中创建的。

SQL

使用应用变化语句使用Delta Live Tables CDC功能:

将更改应用到活动中。表格_name FROM source KEYS (keys) [WHERE condition] [IGNORE NULL UPDATES] [APPLY AS DELETE WHEN condition] [APPLY AS TRUNCATE WHEN condition] SEQUENCE BY orderByColumn [COLUMNS {columnList | * EXCEPT (exceptColumnList)}] [STORED AS {SCD TYPE 1 | SCD TYPE 2}] [TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

条款

在源数据中唯一标识一行的列或列的组合。这用于确定哪些CDC事件应用于目标表中的特定记录。

这一条款是必需的。

在哪里

应用于源和目标的条件,以触发优化,如分区修剪。此条件不能用于删除源行;源中的所有CDC行必须满足此条件,否则将抛出错误。使用WHERE子句是可选的,应该在处理需要特定优化时使用。

这个条款是可选的。

忽略空更新

允许摄取包含目标列子集的更新。当CDC事件匹配现有行并且指定了IGNORE NULL UPDATES时,带有将其现有值保留在目标中。这也适用于值为的嵌套列

这个条款是可选的。

的默认值是覆盖现有列值。

应用为delete时

指定CDC事件何时应作为事件处理删除而不是上插。为了处理乱序数据,被删除的行被临时保留为底层Delta表中的一个墓碑,并在metastore中创建一个视图以过滤掉这些墓碑。保留间隔可以使用pipelines.cdc.tombstoneGCThresholdInSeconds表属性

这个条款是可选的。

应用为截断时

指定CDC事件何时应作为满表处理截断.因为这个子句触发了目标表的完整截断,所以它应该只用于需要此功能的特定用例。

应用作为截断子句仅支持SCD类型1。SCD类型2不支持截断。

这个条款是可选的。

序列由

指定源数据中CDC事件逻辑顺序的列名。Delta Live Tables使用此排序来处理无序到达的更改事件。

这一条款是必需的。

指定要包含在目标表中的列的子集。你可以:

  • 指定要包含的列的完整列表:(用户标识,的名字,市)

  • 指定要排除的列列表:除了(操作,sequenceNum)

这个条款是可选的。

属性时,默认值是在目标表中包含所有列子句未指定。

存储为

是否将记录存储为SCD类型1或SCD类型2。

这个条款是可选的。

默认为SCD类型1。

航迹历史记录

pipelines.enableTrackHistory设置后,指定输出列的一个子集,以便在对这些指定列进行任何更改时生成历史记录。你可以指定:

  • 指定要跟踪的列的完整列表:(用户标识,的名字,市)

  • 指定要排除跟踪的列的列表:除了(操作,sequenceNum)

这个条款是可选的。当有任何更改时,默认为所有输出列的跟踪历史记录,相当于跟踪历史

要使用此子句,必须设置pipelines.enableTrackHistory在管道设置中。否则,抛出异常。当pipelines.enableTrackHistory未设置时,将为每个输入行生成历史记录。

的默认行为插入而且更新事件是插入来自源的CDC事件:更新目标表中与指定键匹配的任何行,或者在目标表中不存在匹配记录时插入新行。处理的删除属性可以指定事件应用作为删除条件。

Python

使用apply_changes ()函数中使用Delta Live Tables CDC功能。Delta Live Tables Python CDC接口还提供create_streaming_live_table ()函数。类所需的目标表可以使用此函数创建apply_changes ()函数。看到示例查询

应用更改函数

apply_changes目标“<目标表>”“<数据源>”“key1”“key2”“keyN”],sequence_by“< sequence-column >”ignore_null_updatesapply_as_deletes没有一个apply_as_truncates没有一个column_list没有一个except_column_list没有一个stored_as_scd_type<类型>track_history_column_list没有一个track_history_except_column_list没有一个

参数

目标

类型:str

要更新的表的名称。您可以使用create_streaming_live_table ()函数在执行apply_changes ()函数。

必选参数。

类型:str

包含CDC记录的数据源。

必选参数。

类型:列表

在源数据中唯一标识一行的列或列的组合。这用于确定哪些CDC事件应用于目标表中的特定记录。

你可以指定:

  • 字符串列表:["标识",“orderId”)

  • Spark SQL的列表坳()功能:[坳(“标识”),坳(“orderId”)

参数坳()函数不能包含限定符。例如,你可以使用坳(标识),但你不能使用坳(source.userId)

必选参数。

sequence_by

类型:str坳()

指定源数据中CDC事件逻辑顺序的列名。Delta Live Tables使用此排序来处理无序到达的更改事件。

你可以指定:

  • 一个字符串:“sequenceNum”

  • Spark SQL坳()功能:坳(“sequenceNum”)

参数坳()函数不能包含限定符。例如,你可以使用坳(标识),但你不能使用坳(source.userId)

必选参数。

ignore_null_updates

类型:保龄球

允许摄取包含目标列子集的更新。当CDC事件与现有行和ignore_null_updates真正的,列中将其现有值保留在目标中。这也适用于值为的嵌套列.当ignore_null_updates,现有值将被覆盖值。

可选参数。

默认为

apply_as_deletes

类型:strexpr ()

指定CDC事件何时应作为事件处理删除而不是上插。为了处理乱序数据,被删除的行被临时保留为底层Delta表中的一个墓碑,并在metastore中创建一个视图以过滤掉这些墓碑。保留间隔可以使用pipelines.cdc.tombstoneGCThresholdInSeconds表属性

你可以指定:

  • 一个字符串:”操作“删除”

  • Spark SQLexpr ()功能:expr(“操作“删除”)

可选参数。

apply_as_truncates

类型:strexpr ()

指定CDC事件何时应作为满表处理截断.因为这个子句触发了目标表的完整截断,所以它应该只用于需要此功能的特定用例。

apply_as_truncates参数只支持SCD类型1。SCD类型2不支持截断。

你可以指定:

  • 一个字符串:”操作“截断”

  • Spark SQLexpr ()功能:expr(“操作“截断”)

可选参数。

column_listexcept_column_list

类型:列表

要包含在目标表中的列的子集。使用column_list指定要包含的列的完整列表。使用except_column_list指定要排除的列。您可以将值声明为字符串列表或Spark SQL坳()功能:

  • column_list["标识",“名称”,“城市”)

  • column_list[坳(“标识”),坳(“名字”),坳(“城市”)]

  • except_column_list["操作",“sequenceNum”)

  • except_column_list[坳(“操作”),坳(“sequenceNum”)

参数坳()函数不能包含限定符。例如,你可以使用坳(标识),但你不能使用坳(source.userId)

可选参数。

默认情况下是在目标表中包含所有列column_listexcept_column_list参数被传递给函数。

stored_as_scd_type

类型:strint

是否将记录存储为SCD类型1或SCD类型2。

设置为1适用于SCD类型1或2适用于SCD类型2。

这个条款是可选的。

默认为SCD类型1。

track_history_column_listtrack_history_except_column_list

类型:列表

要在目标表中跟踪历史记录的输出列的子集。当pipelines.enableTrackHistory已设置,使用track_history_column_list指定要跟踪的列的完整列表。使用track_history_except_column_list指定要排除在跟踪之外的列。您可以将值声明为字符串列表或Spark SQL坳()功能:-track_history_column_list["标识",“名称”,“城市”).-track_history_column_list[坳(“标识”),坳(“名字”),坳(“城市”)]-track_history_except_column_list["操作",“sequenceNum”)-track_history_except_column_list[坳(“操作”),坳(“sequenceNum”)

参数坳()函数不能包含限定符。例如,你可以使用坳(标识),但你不能使用坳(source.userId)

可选参数。

默认情况下是在目标表中包含所有列track_history_column_listtrack_history_except_column_list参数被传递给函数。

要使用这些参数,必须设置pipelines.enableTrackHistory在管道设置中。否则,抛出异常。当pipelines.enableTrackHistory未设置时,将为每个输入行生成历史记录。

的默认行为插入而且更新事件是插入来自源的CDC事件:更新目标表中与指定键匹配的任何行,或者在目标表中不存在匹配记录时插入新行。处理的删除属性可以指定事件apply_as_deletes论点。

为输出记录创建一个目标表

使用create_streaming_live_table ()方法创建目标表apply_changes ()输出记录。

请注意

create_target_table ()函数已弃用。Databricks建议更新现有代码以使用create_streaming_live_table ()函数。

create_streaming_live_table的名字“<表名称>”评论“< >评论”spark_conf“< >键”“<价值”“<键”“< >价值”},table_properties“< >键”“< >价值”“< >键”“< >价值”},partition_cols“<划分字段>”“<划分字段>”],路径“< storage-location-path >”模式“模式定义”

参数

的名字

类型:str

表名。

必选参数。

评论

类型:str

表的可选描述。

spark_conf

类型:dict

用于执行此查询的可选Spark配置列表。

table_properties

类型:dict

可选的表属性在桌子上。

partition_cols

类型:数组

用于对表进行分区的一个或多个列的可选列表。

路径

类型:str

表数据的可选存储位置。如果不设置,系统将默认为管道存储位置。

模式

类型:strStructType

表的可选模式定义。模式可以定义为SQL DDL字符串,也可以定义为PythonStructType

类的模式时apply_changes目标表中,还必须包含__START_AT而且__END_AT类具有相同数据类型的列sequence_by字段。例如,如果目标表有列键,字符串值,字符串,测序,

create_streaming_live_table的名字“目标”评论"目标为疾病控制中心摄入。"partition_cols“价值”],路径“tablePath美元”模式StructTypeStructField“关键”StringType()),StructField“价值”StringType()),StructField“排序”LongType()),StructField“__START_AT”LongType()),StructField“__END_AT”LongType())

请注意

  • 方法之前,必须确保已创建目标表应用变化查询或apply_changes函数。看到示例查询

  • 目标表的指标(如输出行数)不可用。

  • SCD类型2更新将为每个输入行添加历史记录行,即使没有列发生更改。

  • 的目标应用变化查询或apply_changes函数不能用作流式直播表的源。对象的目标中读取的表应用变化查询或apply_changes函数必须是活动表。

  • 类中不支持期望应用变化查询或apply_changes ()函数。要对源数据集或目标数据集使用期望:

    • 通过定义具有所需期望的中间表,在源数据上添加期望,并使用此数据集作为目标表的源。

    • 使用从目标表读取输入数据的下游表在目标数据上添加期望。

表属性

添加以下表属性以控制墓碑管理的行为删除事件:

表属性

pipelines.cdc.tombstoneGCThresholdInSeconds

将此值设置为匹配无序数据之间的最高预期间隔。

缺省值:5分钟

pipelines.cdc.tombstoneGCFrequencyInSeconds

控制墓碑清理检查的频率。

缺省值:60秒

仅跟踪具有SCD类型2的指定列的历史记录

SCD类型2支持指定输出列的子集,仅在这些列上生成历史记录;对其他列的更改将就地更新,而不是生成新的历史记录。

要在Delta Live Tables SCD type 2中使用跟踪历史,必须通过将以下配置添加到Delta Live Tables管道设置中来显式启用管道中的功能:

“配置”“pipelines.enableTrackHistory”“真正的”

如果pipelines.enableTrackHistory是未设置还是设置为, SCD类型2查询使用为每个输入行生成历史记录的默认行为。

数据库上的SCD类型1和SCD类型2

这些示例演示了Delta Live Tables SCD类型1和类型2查询,这些查询基于以下源事件更新目标表:

  1. 创建新的用户记录。

  2. 删除用户记录。

  3. 更新用户记录。在SCD类型1的示例中,最后一个更新操作延迟到达并从目标表中删除,演示了无序事件的处理。

以下是这些例子的输入记录:

用户标识

的名字

城市

操作

sequenceNum

124

劳尔

瓦哈卡

插入

1

123

伊莎贝尔

蒙特雷

插入

1

125

梅塞德斯

提华纳

插入

2

126

莉莉

坎昆

插入

2

123

删除

6

125

梅塞德斯

瓜达拉哈拉

更新

6

125

梅塞德斯

墨西卡利

更新

5

123

伊莎贝尔

吉娃娃

更新

5

运行SCD type 1示例后,目标表包含以下记录:

用户标识

的名字

城市

124

劳尔

瓦哈卡

125

梅塞德斯

瓜达拉哈拉

126

莉莉

坎昆

属性的附加记录包含以下输入记录截断操作,可与SCD类型1示例代码一起使用:

用户标识

的名字

城市

操作

sequenceNum

124

劳尔

瓦哈卡

插入

1

123

伊莎贝尔

蒙特雷

插入

1

125

梅塞德斯

提华纳

插入

2

126

莉莉

坎昆

插入

2

123

删除

6

125

梅塞德斯

瓜达拉哈拉

更新

6

125

梅塞德斯

墨西卡利

更新

5

123

伊莎贝尔

吉娃娃

更新

5

截断

3.

在运行带有附加的SCD类型1示例之后截断记录,记录124而且126会被截断,因为截断操作在sequenceNum = 3,目标表包含以下记录:

用户标识

的名字

城市

125

梅塞德斯

瓜达拉哈拉

在没有附加的情况下运行SCD类型2的示例之后截断记录,目标表包含以下记录:

用户标识

的名字

城市

__START_AT

__END_AT

123

伊莎贝尔

蒙特雷

1

5

123

伊莎贝尔

吉娃娃

5

6

124

劳尔

瓦哈卡

1

125

梅塞德斯

提华纳

2

5

125

梅塞德斯

墨西卡利

5

6

125

梅塞德斯

瓜达拉哈拉

6

126

莉莉

坎昆

2

运行后的SCD类型2跟踪历史的例子没有额外的截断记录,目标表包含以下记录:

用户标识

的名字

城市

__START_AT

__END_AT

123

伊莎贝尔

吉娃娃

1

6

124

劳尔

瓦哈卡

1

125

梅塞德斯

瓜达拉哈拉

2

126

莉莉

坎昆

2

生成测试数据

为这个例子创建测试记录:

  1. 转到Databricks登录页并选择创建一个笔记本,或按新图标在侧栏中选择笔记本.的创建笔记本对话框出现了。

  2. 创建笔记本对话,给你的笔记本起个名字;例如,生成测试CDC记录.选择SQL默认的语言下拉菜单。

  3. 如果有正在运行的集群,则集群下拉显示。选择要将笔记本附加到的集群。你也可以创建创建笔记本后要附加到的新集群。

  4. 点击创建

  5. 复制以下查询并将其粘贴到新笔记本的第一个单元格中:

    创建模式如果存在cdc_data创建表格cdc_data用户作为选择col1作为用户标识col2作为的名字col3作为城市col4作为操作col5作为sequenceNum——初始负载。124“劳尔”“瓦哈卡”“插入”1),123“伊莎贝尔”“蒙特雷”“插入”1),——新用户。125“梅赛德斯”“提华纳”“插入”2),126“莉莉”“坎昆”“插入”2),伊莎贝尔被从系统中移除,梅赛德斯被转移到瓜达拉哈拉。123“删除”6),125“梅赛德斯”“瓜达拉哈拉”“更新”6),—这批更新是不按顺序来的。上述位于sequenceNum 5的批处理将是最终状态。125“梅赛德斯”“墨西卡利”“更新”5),123“伊莎贝尔”“吉娃娃”“更新”5—取消注释以测试TRUNCATE。——,(null, null, null, "TRUNCATE", 3));
  6. 要运行笔记本并填充测试记录,请在单元格操作菜单中细胞的行为在最右侧,单击运行图标并选择运行单元,或按shift + enter

创建并运行SCD类型1示例管道

  1. 转到Databricks登录页并选择创建一个笔记本,或按新图标在侧栏中选择笔记本.的创建笔记本对话框出现了。

  2. 创建笔记本对话,给你的笔记本起个名字;例如,DLT CDC示例.选择PythonSQL默认的语言下拉菜单,根据您的首选语言。你可以离开了集群设置为默认值。Delta Live Tables运行时在运行管道之前创建一个集群。

  3. 点击创建

  4. 复制Python或SQL查询然后粘贴到笔记本的第一个单元格中。

  5. 创建一个新的管道并将笔记本添加到笔记本库字段。要发布管道处理的输出,可以在目标字段。

  6. 启动管道.如果您配置了目标价值,你可以查看并验证结果查询的。

示例查询

进口dltpyspark.sql.functions进口上校expr@dlt视图def用户():返回火花readStream格式“δ”表格“cdc_data.users”dltcreate_streaming_live_table“目标”dltapply_changes目标“目标”“用户”“标识”],sequence_by上校“sequenceNum”),apply_as_deletesexpr"operation = 'DELETE'"),apply_as_truncatesexpr"操作= 'TRUNCATE'"),except_column_list“操作”“sequenceNum”],stored_as_scd_type1
——创建并填充目标表。创建刷新流媒体生活表格目标应用变化生活目标cdc_data用户用户标识应用作为删除操作“删除”应用作为截断操作“截断”序列通过sequenceNum除了操作sequenceNum存储作为镜头分割类型1

创建并运行SCD类型2示例管道

  1. 转到Databricks登录页并选择创建一个笔记本,或按新图标在侧栏中选择笔记本.的创建笔记本对话框出现了。

  2. 创建笔记本对话,给你的笔记本起个名字;例如,DLT CDC示例.选择PythonSQL默认的语言下拉菜单,根据您的首选语言。你可以离开了集群设置为默认值。Delta Live Tables运行时在运行管道之前创建一个集群。

  3. 点击创建

  4. 复制Python或SQL查询然后粘贴到笔记本的第一个单元格中。

  5. 创建一个新的管道并将笔记本添加到笔记本库字段。要发布管道处理的输出,可以在目标字段。

  6. 启动管道.如果您配置了目标价值,你可以查看并验证结果查询的。

示例查询

进口dltpyspark.sql.functions进口上校expr@dlt视图def用户():返回火花readStream格式“δ”表格“cdc_data.users”dltcreate_streaming_live_table“目标”dltapply_changes目标“目标”“用户”“标识”],sequence_by上校“sequenceNum”),apply_as_deletesexpr"operation = 'DELETE'"),except_column_list“操作”“sequenceNum”],stored_as_scd_type“2”
——创建并填充目标表。创建刷新流媒体生活表格目标应用变化生活目标cdc_data用户用户标识应用作为删除操作“删除”序列通过sequenceNum除了操作sequenceNum存储作为镜头分割类型2

带有跟踪历史的示例查询

进口dltpyspark.sql.functions进口上校expr@dlt视图def用户():返回火花readStream格式“δ”表格“cdc_data.users”dltcreate_streaming_live_table“目标”dltapply_changes目标“目标”“用户”“标识”],sequence_by上校“sequenceNum”),apply_as_deletesexpr"operation = 'DELETE'"),except_column_list“操作”“sequenceNum”],stored_as_scd_type“2”track_history_except_column_list“城市”
——创建并填充目标表。创建刷新流媒体生活表格目标应用变化生活目标cdc_data用户用户标识应用作为删除操作“删除”序列通过sequenceNum除了操作sequenceNum存储作为镜头分割类型2跟踪历史除了城市