工作与三角洲湖表历史
修改Delta Lake表的每个操作都会创建一个新的表版本。您可以使用历史信息来审计特定时间点的操作或查询表。
检索Delta表历史记录
方法可以检索关于每次写入Delta表的操作、用户、时间戳等信息历史
命令。操作将按时间顺序反向返回。默认情况下,表历史记录将保留30天。
描述历史' /数据/事件/ '——获取这张表的全部历史描述历史δ.' /数据/事件/ '描述历史' /数据/事件/ '限制1——只获取最后一个操作描述历史eventsTable
有关Spark SQL语法的详细信息,请参见描述历史.
看到Delta Lake API文档查看Scala/Java/Python语法细节。
数据浏览提供Delta表的详细表信息和历史的可视化视图。除表模式和示例数据外,还可以单击历史选项卡查看所显示的表历史描述历史
.
历史模式
的输出。历史
操作包含以下列。
列 |
类型 |
描述 |
---|---|---|
版本 |
长 |
表版本生成的操作。 |
时间戳 |
时间戳 |
当这个版本被提交时。 |
用户标识 |
字符串 |
执行操作的用户ID。 |
用户名 |
字符串 |
执行操作的用户名。 |
操作 |
字符串 |
操作名称。 |
operationParameters |
地图 |
操作的参数(例如,谓词)。 |
工作 |
结构体 |
运行操作的作业的详细信息。 |
笔记本 |
结构体 |
运行操作的笔记本的详细信息。 |
clusterId |
字符串 |
执行操作的集群ID。 |
readVersion |
长 |
为执行写操作而读取的表的版本。 |
isolationLevel |
字符串 |
此操作使用的隔离级别。 |
isBlindAppend |
布尔 |
该操作是否追加数据。 |
operationMetrics |
地图 |
操作的度量(例如,修改的行数和文件数)。 |
userMetadata |
字符串 |
用户定义提交元数据(如果指定了) |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+ | 版| |时间戳userId |操作用户名| | operationParameters | |工作笔记本| clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+ | 5 | 2019-07-2914:07:47| ###| ###| DELETE|[谓词->["(…|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...| | 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...| | 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...| | 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...| | 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...| | 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...| +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
请注意
如果您使用以下方法写入Delta表,则其他一些列不可用:
将来添加的列将始终添加在最后一列之后。
操作度量键
的历史
操作中返回操作度量的集合operationMetrics
列映射。
下表按操作列出了映射键的定义。
操作 |
指标名称 |
描述 |
---|---|---|
写入,创建表作为选择,替换表作为选择,复制到 |
||
numFiles |
写入的文件数。 |
|
numOutputBytes |
写入内容的字节大小。 |
|
numOutputRows |
写入的行数。 |
|
流媒体更新 |
||
numAddedFiles |
添加的文件数量。 |
|
numRemovedFiles |
删除的文件数量。 |
|
numOutputRows |
写入的行数。 |
|
numOutputBytes |
写入大小(以字节为单位)。 |
|
删除 |
||
numAddedFiles |
添加的文件数量。删除表的分区时不提供。 |
|
numRemovedFiles |
删除的文件数量。 |
|
numDeletedRows |
删除的行数。删除表的分区时不提供。 |
|
numCopiedRows |
删除文件过程中复制的行数。 |
|
executionTimeMs |
执行整个操作所花费的时间。 |
|
scanTimeMs |
扫描匹配文件所花的时间。 |
|
rewriteTimeMs |
重写匹配文件所花费的时间。 |
|
截断 |
||
numRemovedFiles |
删除的文件数量。 |
|
executionTimeMs |
执行整个操作所花费的时间。 |
|
合并 |
||
numSourceRows |
源数据帧中的行数。 |
|
numTargetRowsInserted |
插入到目标表中的行数。 |
|
numTargetRowsUpdated |
目标表中更新的行数。 |
|
numTargetRowsDeleted |
目标表中删除的行数。 |
|
numTargetRowsCopied |
复制的目标行数。 |
|
numOutputRows |
写入的总行数。 |
|
numTargetFilesAdded |
添加到接收器(目标)的文件数。 |
|
numTargetFilesRemoved |
从接收器(目标)中删除的文件数。 |
|
executionTimeMs |
执行整个操作所花费的时间。 |
|
scanTimeMs |
扫描匹配文件所花的时间。 |
|
rewriteTimeMs |
重写匹配文件所花费的时间。 |
|
更新 |
||
numAddedFiles |
添加的文件数量。 |
|
numRemovedFiles |
删除的文件数量。 |
|
numUpdatedRows |
已更新的行数。 |
|
numCopiedRows |
在更新文件过程中刚刚复制的行数。 |
|
executionTimeMs |
执行整个操作所花费的时间。 |
|
scanTimeMs |
扫描匹配文件所花的时间。 |
|
rewriteTimeMs |
重写匹配文件所花费的时间。 |
|
FSCK |
numRemovedFiles |
删除的文件数量。 |
转换 |
numConvertedFiles |
已转换的Parquet文件数。 |
优化 |
||
numAddedFiles |
添加的文件数量。 |
|
numRemovedFiles |
优化的文件数。 |
|
numAddedBytes |
优化表后添加的字节数。 |
|
numRemovedBytes |
删除的字节数。 |
|
minFileSize |
表优化后最小文件的大小。 |
|
p25FileSize |
表优化后的第25百分位文件的大小。 |
|
p50FileSize |
表优化后的文件大小中位数。 |
|
p75FileSize |
表优化后的第75个百分位文件的大小。 |
|
maxFileSize |
优化表后最大文件的大小。 |
|
克隆 |
||
sourceTableSize |
克隆版本的源表大小(以字节为单位)。 |
|
sourceNumOfFiles |
克隆版本的源表中的文件数。 |
|
numRemovedFiles |
如果替换了以前的Delta表,则从目标表中删除的文件数。 |
|
removedFilesSize |
如果替换了先前的Delta表,则从目标表中删除的文件的总大小(以字节为单位)。 |
|
numCopiedFiles |
复制到新位置的文件数。0表示浅克隆。 |
|
copiedFilesSize |
复制到新位置的文件的总大小(以字节为单位)。0表示浅克隆。 |
|
恢复 |
||
tableSizeAfterRestore |
恢复后的表大小(以字节为单位)。 |
|
numOfFilesAfterRestore |
恢复后表中的文件数。 |
|
numRemovedFiles |
恢复操作删除的文件数。 |
|
numRestoredFiles |
由于恢复而添加的文件数。 |
|
removedFilesSize |
恢复删除的文件的字节大小。 |
|
restoredFilesSize |
恢复添加的文件大小(以字节为单位)。 |
|
真空 |
||
numDeletedFiles |
删除文件数量。 |
|
numVacuumedDirectories |
抽真空目录个数。 |
|
numFilesToDelete |
要删除的文件数量。 |
查询一个表的旧快照(时间旅行)
Delta Lake时间旅行允许您查询Delta表的旧快照。时间旅行有很多用例,包括:
重新创建分析、报告或输出(例如,机器学习模型的输出)。这对于调试或审计非常有用,特别是在受监管的行业中。
编写复杂的时态查询。
修正数据中的错误。
为一组快速更改表的查询提供快照隔离。
Delta Lake时间旅行语法
Delta Lake支持根据时间戳或表版本(记录在事务日志中)查询以前的表版本。
timestamp_expression
可以是以下任意一个:2018 - 10 - 18 t22:15:12.013z
也就是说,可以转换为时间戳的字符串铸造(' 2018-10-1813:36:32c '作为时间戳)
“2018-10-18”
,即日期字符串current_timestamp ()-时间间隔12小时
date_sub(当前日期(),1)
可以转换为时间戳的任何其他表达式
版本
的输出是可以得到的长值描述历史table_spec
.
既不timestamp_expression
也不版本
可以是子查询。
只接受日期或时间戳字符串。例如,“2019-01-01”
而且“2019 - 01 - 01 t00:00:00.000z”
.请看下面的语法示例代码:
选择*从people10m时间戳作为的2018 - 10 - 18 t22:15:12.013z选择*从δ.' /tmp/δ/people10m`版本作为的123
df1=火花.读.选项(“timestampAsOf”,“2019-01-01”).表格(“people10m”)df2=火花.读.选项(“versionAsOf”,123).负载(“/ tmp /δ/ people10m”)
你也可以使用@
语法指定时间戳或版本作为表名的一部分。时间戳必须是inyyyyMMddHHmmssSSS
格式。之后可以指定版本@
通过将v
到版本。请看下面的语法示例代码:
选择*从people10m@20190101000000000选择*从people10m@v123
火花.读.表格(“people10m@20190101000000000”)火花.读.表格(“people10m@v123”)火花.读.负载(“/ tmp /δ/ people10m@20190101000000000”)火花.读.负载(“/ tmp /δ/ people10m@v123”)
什么是事务日志检查点?
中以JSON文件的形式记录表版本_delta_log
目录,它与表数据一起存储。为了优化检查点查询,Delta Lake将表版本聚合到Parquet检查点文件,从而避免读取表历史的所有JSON版本。Databricks根据数据大小和工作负载优化检查点频率。用户不需要直接与检查点交互。检查频率如有更改,恕不另行通知。
为时间旅行配置数据保留
要穿越到以前的版本,你必须保留这两个该版本的日志和数据文件。
支持Delta表的数据文件为从来没有自动删除;数据文件只有运行时才会被删除真空.真空
不删除Delta日志文件;日志文件在写入检查点后自动清理。
默认情况下,您可以时间旅行到30天前的Delta表,除非您有:
运行
真空
在Delta表上。使用以下方法更改数据或日志文件保留期表属性:
delta.logRetentionDuration=“间隔< >间隔”
:控制一个表的历史记录保存的时间。默认为时间间隔30.天
.每次写入检查点时,Databricks都会自动清理超过保留间隔的日志项。如果将此配置设置为足够大的值,则会保留许多日志条目。这不会影响性能,因为对日志的操作是常数时间。对历史记录的操作是并行的,但随着日志大小的增加将变得更加昂贵。
delta.deletedFileRetentionDuration=“间隔< >间隔”
:控制删除文件的时间在成为候选人之前真空
.默认为时间间隔7天
.访问30天的历史数据,即使你运行
真空
在Delta表上,设置delta.deletedFileRetentionDuration=“间隔30.天”
.此设置可能会导致存储成本上升。
将Delta表恢复到以前的状态
请注意
在Databricks Runtime 7.4及以上版本中可用。
方法可以将Delta表恢复到其以前的状态恢复
命令。Delta表在内部维护表的历史版本,使其能够恢复到较早的状态。属性支持与较早状态对应的版本或创建较早状态时的时间戳作为选项恢复
命令。
重要的
您可以恢复已经恢复的表。
您可以恢复克隆表格
你一定有
修改
正在恢复的表的权限。将表恢复到手动或通过
真空
将会失败。部分恢复到此版本仍然是可能的,如果spark.sql.files.ignoreMissingFiles
设置为真正的
.恢复到较早状态的时间戳格式为
yyyy-MM-ddHH: mm: ss
.只提供日期(yyyy-MM-dd
)字符串也支持。
恢复表格db.target_table来版本作为的<版本>恢复表格δ.' /数据/目标/ '来时间戳作为的<时间戳>
有关语法的详细信息,请参见恢复.
重要的
恢复被认为是一个数据更改操作。添加的Delta Lake日志项恢复
命令包含dataChange设置为true。如果存在下游应用,例如结构化流作业处理Delta Lake表的更新,则恢复操作添加的数据更改日志条目被视为新的数据更新,处理它们可能会导致重复数据。
例如:
表版本 |
操作 |
Delta日志更新 |
在数据更改日志中记录更新 |
---|---|---|---|
0 |
插入 |
AddFile(/path/to/file-1, dataChange = true) |
(姓名=维克托,年龄= 29,(姓名=乔治,年龄= 55) |
1 |
插入 |
AddFile(/path/to/file-2, dataChange = true) |
(姓名乔治,年龄39岁) |
2 |
优化 |
AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) |
(没有记录,因为优化压缩不会改变表中的数据) |
3. |
恢复(version = 1) |
remove (/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) |
(姓名=维克托,年龄= 29),(姓名=乔治,年龄= 55),(姓名=乔治,年龄= 39) |
在上面的例子中,恢复
命令会导致读取Delta表版本0和1时已经看到的更新。如果一个流查询正在读取这个表,那么这些文件将被视为新添加的数据,并将再次处理。
恢复指标
请注意
在Databricks Runtime 8.2及以上版本中可用。
恢复
一旦操作完成,报告以下指标为单行数据帧:
table_size_after_restore
:恢复后的表大小。num_of_files_after_restore
:恢复后表中的文件数。num_removed_files
:从表中删除(逻辑删除)的文件数。num_restored_files
:回滚后恢复的文件数。removed_files_size
:从表中删除的文件的总大小(以字节为单位)。restored_files_size
:被恢复文件的总大小(以字节为单位)。
使用三角洲湖时间旅行的例子
为用户修复对表的意外删除
111
:插入成my_table选择*从my_table时间戳作为的date_sub(当前日期(),1)在哪里用户标识=111
修复表的意外错误更新:
合并成my_table目标使用my_table时间戳作为的date_sub(当前日期(),1)源在源.用户标识=目标.用户标识当匹配然后更新集*
使用实例查询过去一周的新客户数量。
选择数(截然不同的用户标识)-(选择数(截然不同的用户标识)从my_table时间戳作为的date_sub(当前日期(),7))
如何在Spark会话中找到上次提交的版本?
获取当前进程写入的最后一次提交的版本号SparkSession
跨所有线程和所有表,查询SQL配置spark.databricks.delta.lastCommitVersionInSession
.
集火花.砖.δ.lastCommitVersionInSession
火花.相依.得到(“spark.databricks.delta.lastCommitVersionInSession”)
火花.相依.得到(“spark.databricks.delta.lastCommitVersionInSession”)
方法未进行提交SparkSession
,查询该键返回空值。
请注意
如果你拥有同样的东西SparkSession
跨多个线程,这类似于跨多个线程共享一个变量;当配置值同时更新时,您可能会遇到竞争条件。