管理数据质量与达美住表
你使用的期望来定义数据质量约束的数据集的内容。期望让你保证数据到达表满足数据质量要求,并提供洞察为每个管道更新数据质量。你期望适用于使用Python修饰符或SQL查询约束条款。
δ住表期望是什么?
期望是可选条款添加到三角洲生活表数据集声明应用数据质量检查每个记录通过一个查询。
一个期望包括三个方面:
一个描述,它充当一个惟一的标识符,并允许您跟踪指标的约束。
一个布尔值声明,总是返回真或假基于一些条件。
一个动作记录失败时期望,即布尔返回false。
下面的矩阵显示了三个动作你可以适用于无效的记录:
行动 |
结果 |
---|---|
警告(默认) |
无效记录写入到目标;失败是报道作为数据集的度量。 |
无效的记录数据写入到目标前下降;失败是报告的指标数据集。 |
|
无效记录防止更新成功。加工之前需要手动干预。 |
您可以查看数据质量标准的记录数等违反一个期望通过查询三角洲生活事件日志表。看到管道监控三角洲生活表。
三角洲的完整参考生活表数据集声明语法,看三角洲生活表Python语言参考或三角洲生活表SQL语言参考。
请注意
虽然您可以包括多个条款在任何期望,只有基于多个Python支持定义操作的期望。看到多个预期。
保留无效记录
使用预计
运营商当你想记录违反的期望。记录违反期望被添加到目标数据集以及有效的记录:
@dlt。预计(“有效时间戳”,“上校(“时间戳”)> 2012-01-01”)
约束valid_timestamp预计(时间戳>“2012-01-01”)
减少无效的记录
使用预计或下降
操作符,以防止进一步的处理无效记录。记录违反预期下降的目标数据集:
@dlt。expect_or_drop(“valid_current_page”,“current_page_id不是零和current_page_title不是零”)
约束valid_current_page预计(current_page_id是不零和current_page_title是不零)在违反下降行
无效记录失败
当无效的记录是不可接受的,使用预计或失败
操作员记录验证失败时立即停止执行。如果操作是一个表的更新,系统自动回滚事务:
@dlt。expect_or_fail(“valid_count”,“数> 0 ")
约束valid_count预计(数>0)在违反失败更新
当管道失败因为违反一个期望,你必须解决管道代码来处理无效数据正确地重新运行前管道。
失败的期望修改的火花查询计划所需转换跟踪信息检测和报告违规行为。对于许多查询,您可以使用这些信息来确定哪些输入记录导致侵犯。下面是一个例子例外:
预期违反:{“flowName”:“a - b”," verboseInfo ": {“expectationsViolated”:(“x1是负的)," inputData ": {“一”:{“x1”: 1、“日元”:"}," b ": {“x2”: 1、“日元”:“aa”}}," outputRecord ": {“x1”: 1、“日元”:“一”,“x2”: 1、“日元”:“aa”},“missingInputData”:假的}}
多个预期
您可以定义期望与一个或多个数据质量约束在Python中管道。这些修饰符接受Python字典作为参数,其中的关键是期望名称和值是期望约束。
使用expect_all
指定多个数据质量约束当记录失败验证应包含在目标数据集:
@dlt。expect_all({“valid_count”:“数> 0 ",“valid_current_page”:“current_page_id不是零和current_page_title不是零”})
使用expect_all_or_drop
指定多个数据质量约束当记录失败验证应该从目标数据集:
@dlt。expect_all_or_drop({“valid_count”:“数> 0 ",“valid_current_page”:“current_page_id不是零和current_page_title不是零”})
使用expect_all_or_fail
指定多个数据质量约束当记录失败验证应该停止管道执行:
@dlt。expect_all_or_fail({“valid_count”:“数> 0 ",“valid_current_page”:“current_page_id不是零和current_page_title不是零”})
您还可以定义期望的集合作为一个变量并将其传递到一个或多个查询你的管道:
valid_pages={“valid_count”:“数> 0 ",“valid_current_page”:“current_page_id不是零和current_page_title不是零”}@dlt。表@dlt。expect_all(valid_pages)defraw_data():#创建原始数据集@dlt。表@dlt。expect_all_or_drop(valid_pages)defprepared_data():#创建清洁和准备数据集
检疫无效数据
下面的示例使用期望结合临时表和视图。这种模式为您提供指标通过期望调查的记录,并在管道更新,并提供了一种方法来处理有效和无效记录通过不同的下游路径。
请注意
这个例子中读取包含在示例数据砖的数据集。因为砖与管道发布数据集不支持统一目录,这个例子只能与一个蜂巢metastore管道配置发布。然而,这种模式也适用于统一目录启用管道,但你必须读取数据外部位置。了解更BOB低频彩多关于使用统一目录与达美住表,看看使用统一的目录与三角洲住表管道。
进口dlt从pyspark.sql.functions进口expr规则={}规则(“valid_website”]=“(网站不是NULL)”规则(“valid_location”]=”(位置不是零)”quarantine_rules=“不是({0})”。格式(”和“。加入(规则。值()))@dlt。表(的名字=“raw_farmers_market”)defget_farmers_market_data():返回(火花。读。格式(“csv”)。选项(“头”,“真正的”)。负载(' / databricks-datasets /网站/ farmers_markets_geographic_data /数据- 001 / '))@dlt。表(的名字=“farmers_market_quarantine”,临时=真正的,partition_cols=(“is_quarantined”])@dlt。expect_all(规则)deffarmers_market_quarantine():返回(dlt。读(“raw_farmers_market”)。选择(“MarketName”,“网站”,“位置”,“状态”,“Facebook”,“推特”,“Youtube”,“有机”,“updateTime”)。withColumn(“is_quarantined”,expr(quarantine_rules)))@dlt。视图(的名字=“valid_farmers_market”)defget_valid_farmers_market():返回(dlt。读(“farmers_market_quarantine”)。过滤器(“is_quarantined = false”))@dlt。视图(的名字=“invalid_farmers_market”)defget_invalid_farmers_market():返回(dlt。读(“farmers_market_quarantine”)。过滤器(“is_quarantined = true”))
验证跨表行数
您可以添加一个额外的表定义一个期望你的管道来比较两个生活表之间的行数。期望的结果出现在UI事件日志和三角洲住表。下面这个例子验证之间的平等的行数tbla
和tblb
表:
创建或刷新生活表count_verification(约束no_rows_dropped预计(a_count= =b_count))作为选择*从(选择数(*)作为a_count从生活。tbla),(选择数(*)作为b_count从生活。tblb)
执行先进的验证与达美住表期望
您可以使用聚合和定义生活表连接查询和使用这些查询的结果作为期望检查的一部分。这是有用的,如果你想执行复杂的数据质量检查,例如,确保一个派生表包含源表的所有记录或保证平等的数字列在表中。您可以使用临时
关键字来防止这些表发布到目标模式。
下面的例子验证所有预期的记录中报告
表:
创建临时生活表report_compare_tests(约束no_missing_records预计(r。关键是不零))作为选择*从生活。validation_copyv左外加入生活。报告r在v。关键=r。关键
下面的示例使用一个聚合,以确保一个主键的唯一性:
创建临时生活表report_pk_tests(约束unique_pk预计(num_entries=1))作为选择pk,数(*)作为num_entries从生活。报告集团通过pk
使预期便携和可重用
您可以维护数据质量规则分别从管道实现。
砖建议将规则存储在三角洲表每个规则分类的标签。你使用这个标签数据集定义来确定哪些规则适用。
下面的示例创建一个表命名规则
维护规则:
创建或取代表规则作为选择col1作为的名字,col2作为约束,col3作为标签从(值(“website_not_null”,”网站不是零”,“有效性”),(“location_not_null”,“位置是不空”,“有效性”),(“state_not_null”,“不空”,“有效性”),(“fresh_data”,“to_date (updateTime, M / d / yyyy h:男:“)> 2010-01-01”,“维护”),(“social_media_access”,“不是(Facebook是NULL和Twitter是零和Youtube是NULL)”,“维护”))
以下Python示例定义数据质量的期望基于存储在规则规则
表。的get_rules ()
从函数读取规则规则
表并返回一个Python字典包含匹配的规则标签
参数传递给函数。字典上的应用@dlt.expect_all_ * ()
decorator执行数据质量约束。例如,任何记录未标记的规则有效性
将从吗raw_farmers_market
表:
请注意
这个例子中读取包含在示例数据砖的数据集。因为砖与管道发布数据集不支持统一目录,这个例子只能与一个蜂巢metastore管道配置发布。然而,这种模式也适用于统一目录启用管道,但你必须读取数据外部位置。了解更BOB低频彩多关于使用统一目录与达美住表,看看使用统一的目录与三角洲住表管道。
进口dlt从pyspark.sql.functions进口expr,上校defget_rules(标签):”“”加载数据质量规则从一个表:param标签:标签返回:字典的规则匹配的标记”“”规则={}df=火花。读。表(“规则”)为行在df。过滤器(上校(“标签”)= =标签)。收集():规则(行(“名字”]]=行(“约束”]返回规则@dlt。表(的名字=“raw_farmers_market”)@dlt。expect_all_or_drop(get_rules(“有效性”))defget_farmers_market_data():返回(火花。读。格式(“csv”)。选项(“头”,“真正的”)。负载(' / databricks-datasets /网站/ farmers_markets_geographic_data /数据- 001 / '))@dlt。表(的名字=“organic_farmers_market”)@dlt。expect_all_or_drop(get_rules(“维护”))defget_organic_farmers_market():返回(dlt。读(“raw_farmers_market”)。过滤器(expr(“有机= Y”))。选择(“MarketName”,“网站”,“状态”,“Facebook”,“推特”,“Youtube”,“有机”,“updateTime”))