在Auto Loader中配置模式推断和进化
您可以配置Auto Loader来自动检测加载数据的模式,允许您在不显式声明数据模式的情况下初始化表,并在引入新列时改进表模式。这消除了随着时间的推移手动跟踪和应用模式更改的需要。
Auto Loader还可以“挽救”JSON blob列中意外的数据(例如,不同的数据类型),您可以选择稍后使用半结构化数据访问api.
模式推断和进化支持以下格式:
文件格式 |
支持版本 |
---|---|
|
Databricks Runtime 8.2及以上版本 |
|
Databricks Runtime 8.3及以上版本 |
|
Databricks运行时10.2及以上 |
|
Databricks运行时11.1及以上版本 |
|
不支持的 |
|
不适用(固定模式) |
|
不适用(固定模式) |
模式推断和进化的语法
指定选项的目标目录cloudFiles.schemaLocation
启用模式推断和进化。属性指定的相同目录checkpointLocation
.如果你使用Delta活动表, Databricks自动管理模式位置和其他检查点信息。
请注意
如果将多个源数据位置加载到目标表中,则每个Auto Loader摄取工作负载都需要一个单独的流检查点。
下面的示例使用拼花
为cloudFiles.format
.使用csv
,avro
,或json
对于其他文件源。对于每种格式的默认行为,读写的所有其他设置保持不变。
(火花.readStream.格式(“cloudFiles”).选项(“cloudFiles.format”,“铺”)#模式位置目录跟踪数据模式随时间的变化.选项(“cloudFiles.schemaLocation”,“< path_to_checkpoint >”).负载(“< path_to_source_data >”).writeStream.选项(“checkpointLocation”,“< path_to_checkpoint >”).开始(“< path_to_target”))
火花.readStream.格式(“cloudFiles”).选项(“cloudFiles.format”,“铺”)//模式位置目录跟踪你的数据模式.选项(“cloudFiles.schemaLocation”,“< path_to_checkpoint >”).负载(“< path_to_source_data >”).writeStream.选项(“checkpointLocation”,“< path_to_checkpoint >”).开始(“< path_to_target”)
Auto Loader模式推断如何工作?
为了在第一次读取数据时推断模式,Auto Loader对它发现的前50 GB或1000个文件进行采样,以先超过限制为准。Auto Loader将模式信息存储在一个目录中_schemas
在配置cloudfFiles.schemaLocation
随着时间的推移跟踪输入数据的模式更改。
请注意
要更改使用的示例的大小,您可以设置SQL配置:
火花.砖.cloudFiles.schemaInference.sampleSize.numBytes
例如,字节字符串10 gb
)
而且
火花.砖.cloudFiles.schemaInference.sampleSize.numFiles
(整数)
默认情况下,Auto Loader模式推断试图避免由于类型不匹配而导致的模式演变问题。对于不编码数据类型的格式(JSON和CSV),自动加载器将所有列推断为字符串(包括JSON文件中的嵌套字段)。对于具有类型化模式的格式(Parquet和Avro), Auto Loader对文件的子集进行采样,并合并单个文件的模式。这种行为总结在下表中:
文件格式 |
默认推断数据类型 |
---|---|
|
字符串 |
|
字符串 |
|
Avro模式中编码的类型 |
|
Parquet模式中编码的类型 |
Apache Spark dataframerader使用不同的行为进行模式推断,根据示例数据为JSON和CSV源中的列选择数据类型。要使用Auto Loader启用此行为,请设置该选项cloudFiles.inferColumnTypes
来真正的
.
请注意
当推断CSV数据的模式时,自动加载器假设文件包含头文件。如果您的CSV文件不包含标题,请提供该选项.option(“头”,“假”)
.此外,Auto Loader合并示例中所有文件的模式,以生成一个全局模式。Auto Loader可以根据文件头读取每个文件,并正确解析CSV。
请注意
当一个列在两个Parquet文件中具有不同的数据类型时,Auto Loader将尝试向上的
从一种到另一种。如果无法向上转换,则数据推断失败。示例见下表:
1型 |
2型 |
向上的类型 |
---|---|---|
|
|
|
|
|
|
|
|
推理失败 |
在推理时合并数据类型之后,包含未选择类型记录的文件将加载到获救数据列,因为数据类型不同于推断的模式。
Auto Loader模式演化如何工作?
Auto Loader在处理数据时检测添加的新列。当自动加载器检测到一个新列时,流以UnknownFieldException
.在流抛出此错误之前,Auto Loader会对最新的微批数据执行模式推断,并通过将新列合并到模式的末尾,使用最新的模式更新模式位置。现有列的数据类型保持不变。
Databricks建议配置自动加载流工作流在这种模式更改后自动重新启动。
Auto Loader支持在选项中设置的模式演变的以下模式cloudFiles.schemaEvolutionMode
:
模式 |
读取新列时的行为 |
---|---|
|
流失败。新列被添加到模式中。现有列不演进数据类型。 |
|
模式永远不会演进,流也不会因为模式更改而失败。中记录所有新列获救数据列. |
|
流失败。除非更新了提供的模式,或者删除了有问题的数据文件,否则流不会重新启动。 |
|
不进化模式,忽略新列,并且不保存数据,除非 |
分区如何与自动加载器工作?
如果数据以Hive风格分区布局,Auto Loader尝试从数据的底层目录结构推断分区列。例如,文件路径base_path /事件= = 2021-04-01 / f0.json点击/日期
的推论结果日期
而且事件
作为分区列。如果底层目录结构包含冲突的Hive分区或不包含Hive风格的分区,则忽略分区列。
二进制文件(binaryFile
),文本
文件格式具有固定的数据模式,但支持分区列推断。Databricks推荐设置cloudFiles.schemaLocation
对于这些文件格式。这避免了任何潜在的错误或信息丢失,并防止每次自动加载程序开始时分区列的推断。
分区列不考虑模式演变。如果你有一个初始目录结构base_path /事件= = 2021-04-01 / f0.json点击/日期
,然后开始接收新文件base_path /事件=点击/日期= 1 / f1.json = 2021-04-01 /小时
, Auto Loader忽略小时列。若要捕获新分区列的信息,请设置cloudFiles.partitionColumns
来事件、日期、小时
.
请注意
的选项cloudFiles.partitionColumns
获取以逗号分隔的列名列表。仅为存在的列键=值
目录结构中的对将被解析。
获救的数据列是什么?
当Auto Loader推断出模式时,一个已保存的数据列将自动添加到您的模式为_rescued_data
.您可以重命名列,或者在提供模式的情况下通过设置该选项将其包括在内rescuedDataColumn
.
获救的数据列确保与模式不匹配的列被获救,而不是被删除。获救的数据列包含由于以下原因未被解析的任何数据:
模式中缺少该列。
类型不匹配。
不匹配。
获救的数据列包含一个JSON,其中包含获救的列和记录的源文件路径。
请注意
JSON和CSV解析器在解析记录时支持三种模式:宽容的
,DROPMALFORMED
,FAILFAST
.当与rescuedDataColumn
,数据类型不匹配不会导致记录被删除DROPMALFORMED
模式或抛出错误FAILFAST
模式。只有损坏的记录才会被丢弃或抛出错误,例如不完整或格式错误的JSON或CSV。如果你使用badRecordsPath
,在解析JSON或CSV时,数据类型不匹配不被视为坏记录rescuedDataColumn
.仅存储不完整和格式不正确的JSON或CSV记录badRecordsPath
.
改变区分大小写的行为
除非启用区分大小写,否则列美国广播公司
,美国广播公司
,美国广播公司
出于模式推断的目的,将它们视为同一列。所选择的情况是任意的,取决于采样数据。你可以使用模式提示来执行应该使用哪种情况。一旦做出选择并推断出模式,Auto Loader就不会考虑与模式不一致的套管变体。
当获救数据列启用时,以模式大小写以外的大小写命名的字段将加载到_rescued_data
列。通过设置该选项来改变这种行为readerCaseSensitive
为false,在这种情况下,Auto Loader以不区分大小写的方式读取数据。
用模式提示覆盖模式推断
您可以使用模式提示在推断的模式上强制使用您所知道和期望的模式信息。当您知道一个列是特定的数据类型,或者如果您想选择更通用的数据类型(例如,a双
而不是整数
),您可以使用SQL模式规范语法为列数据类型提供任意数量的提示作为字符串,例如:
.选项(“cloudFiles.schemaHints”,"标签映射<字符串,字符串>,版本int")
请参阅有关数据类型获取所支持的数据类型列表。
如果某个列不在流的开始处,还可以使用模式提示将该列添加到推断的模式中。
下面是一个推断模式的示例,用于查看带有模式提示的行为。
推断模式:
|——date: string |——quantity: int |——user_info: struct | |——id: string | |——name: string | |——dob: string |——purchase_options: struct | |——delivery_address: string
通过指定以下模式提示:
.选项(“cloudFiles.schemaHints”,date日期,user_info。MAP, time TIMESTAMP" )
你会得到:
|——date: string -> date |——quantity: int |——user_info: struct | |——id: string | |——name: string | b|——dob: string -> date |——purchase_options: struct -> map |——time: timestamp
请注意
数组和映射模式提示支持Databricks Runtime 9.1 LTS及以上。
下面是一个具有复杂数据类型的推断模式示例,以查看使用模式提示的行为。
推断模式:
|——products: array |——locations: array |——users: array | |——users。元素:struct | | |——id: string | | |——name: string | | b|——dob: string |——ids: map |——names: map |——prices: map |——折扣:map | |——折扣。键:struct | | |——id: string | |——折扣。值:字符串|——描述:map<字符串,struct> | |——描述。关键字:字符串| |—描述。值:struct | | |——内容:int
通过指定以下模式提示:
.选项(“cloudFiles.schemaHints”,"products ARRAY, location . "元素字符串,users.element.id INT, id MAP,名称。关键INT,价格。key.id INT, description .value.content STRING" )
你会得到:
|——products: array -> array |——locations: array -> array |——users: array | |——users。元素:struct | b| |——id: string -> int | | |——name: string | | |——dob: string |——ids: map -> map |——names: map -> map |——prices: map -> map |——折扣:map | |——折扣。关键字:struct | | |——id: string -> int | |——折扣。值:字符串|——描述:map<字符串,struct> | |——描述。关键字:字符串| |—描述。取值:struct | | |——content: int ->字符串
请注意
模式提示仅在以下情况下使用不为自动加载器提供一个模式。您可以使用模式提示是否cloudFiles.inferColumnTypes
启用或禁用。