常见的数据加载模式
自动加载程序简化了大量的公共数据摄入的任务。这个快速参考提供了几个例子流行的模式。
过滤使用一滴目录或文件模式
水珠模式可用于过滤目录和文件时提供的路径。
模式 |
描述 |
---|---|
|
匹配任何单个的字符 |
|
匹配零个或多个字符 |
|
匹配一个字符的字符集{a, b, c}。 |
|
匹配一个字符的字符范围{…z}。 |
|
匹配一个字符不是字符集或距离{}。请注意, |
|
匹配字符串的字符串集{ab、cd}。 |
|
匹配字符串的字符串集{ab, cde, cfh}。 |
使用路径
提供前缀模式,例如:
df=火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,<格式>)\。模式(模式)\。负载(“<基本路径> / * /文件”)
瓦尔df=火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,<格式>)。模式(模式)。负载(“<基本路径> / * /文件”)
重要的
您需要使用选择pathGlobFilter
显式地提供后缀模式。的路径
只提供一个前缀过滤器。
例如,如果您想只解析png
一个目录中的文件,其中包含文件与不同的后缀,你能做什么:
df=火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,“binaryFile”)\。选项(“pathGlobfilter”,“* . png”)\。负载(<基地- - - - - -路径>)
瓦尔df=火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“binaryFile”)。选项(“pathGlobfilter”,“* . png”)。负载(<基地- - - - - -路径>)
请注意
默认的globbing自动加载程序的行为是不同的比其他的默认行为引发文件来源。添加.option (“useStrictGlobber”,“真正的”)
你阅读使用通配符匹配违约引发行为对文件来源。更多关于globbing见下表:
模式 |
文件路径 |
默认的水珠 |
严格的水珠 |
---|---|---|---|
/ / b |
/ a / b / c / file.txt |
是的 |
是的 |
/ / b |
/ / b_dir / c / file.txt |
没有 |
没有 |
/ / b |
/ / b.txt |
没有 |
没有 |
/ a / b / |
/ / b.txt |
没有 |
没有 |
/ / * / c / |
/ a / b / c / file.txt |
是的 |
是的 |
/ / * / c / |
/ a / b / c / d / file.txt |
是的 |
是的 |
/ / * / c / |
/ a / b / x / y / c / file.txt |
是的 |
没有 |
/ / * / c |
/ a / b / c_file.txt |
是的 |
没有 |
/ / * / c / |
/ a / b / c_file.txt |
是的 |
没有 |
/ / * / c / |
/ / * /饼干/ file.txt |
是的 |
没有 |
/ / b * |
/ / b.txt |
是的 |
是的 |
/ / b * |
/ a / b / file.txt |
是的 |
是的 |
/ / {0. txt, 1. txt} |
/ / 0.三种 |
是的 |
是的 |
/ / * / {0. txt, 1. txt} |
/ / 0.三种 |
没有 |
没有 |
/ a / b / [cde-h] /我/ |
/ a / b / c / i / file.txt |
是的 |
是的 |
使简单的ETL
一个简单的方法来让你的数据到三角洲湖而不会丢失任何数据是使用以下模式和自动加载程序启用模式推理。砖建议运行下面的代码在一个砖工作时自动重启流源数据的模式变化。默认情况下,模式推断为字符串类型,任何解析错误(应该没有如果一切仍然作为一个字符串)将去_rescued_data
会失败,任何新列流和发展模式。
火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,“json”)\。选项(“cloudFiles.schemaLocation”,“< path-to-schema-location >”)\。负载(“< path-to-source-data >”)\。writeStream\。选项(“mergeSchema”,“真正的”)\。选项(“checkpointLocation”,“< path-to-checkpoint >”)\。开始(“< path_to_target”)
火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“json”)。选项(“cloudFiles.schemaLocation”,“< path-to-schema-location >”)。负载(“< path-to-source-data >”)。writeStream。选项(“mergeSchema”,“真正的”)。选项(“checkpointLocation”,“< path-to-checkpoint >”)。开始(“< path_to_target”)
防止数据丢失结构良好的数据
当你知道你的模式,但想知道每当你收到意想不到的数据,数据砖推荐使用rescuedDataColumn
。
火花。readStream。格式(“cloudFiles”)\。模式(expected_schema)\。选项(“cloudFiles.format”,“json”)\#将收集所有新领域以及在_rescued_data数据类型不匹配。选项(“cloudFiles.schemaEvolutionMode”,“营救”)\。负载(“< path-to-source-data >”)\。writeStream\。选项(“checkpointLocation”,“< path-to-checkpoint >”)\。开始(“< path_to_target”)
火花。readStream。格式(“cloudFiles”)。模式(expected_schema)。选项(“cloudFiles.format”,“json”)/ /将收集所有新领域以及在_rescued_data数据类型不匹配。选项(“cloudFiles.schemaEvolutionMode”,“营救”)。负载(“< path-to-source-data >”)。writeStream。选项(“checkpointLocation”,“< path-to-checkpoint >”)。开始(“< path_to_target”)
如果你想让你的流停止处理的新领域介绍不匹配你的模式,您可以添加:
。选项(“cloudFiles.schemaEvolutionMode”,“failOnNewColumns”)
支持灵活的半结构化数据管道
当你接收数据从一个供应商介绍新列他们提供的信息,你可能不知道什么时候,或者你可能没有足够的带宽来更新你的数据管道。您现在可以利用模式演化重启流,让汽车自动加载程序更新的模式。您还可以利用schemaHints
一些“无模式”的字段,供应商可以提供。
火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,“json”)\#将确保标题列得到加工作为一个地图。选项(“cloudFiles.schemaHints”,“头map < string, string >, statusCode短”)\。负载(“. . / api /请求”)\。writeStream\。选项(“mergeSchema”,“真正的”)\。选项(“checkpointLocation”,“< path-to-checkpoint >”)\。开始(“< path_to_target”)
火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“json”)/ /将确保标题列得到加工作为一个地图。选项(“cloudFiles.schemaHints”,“头map < string, string >, statusCode短”)。负载(“. . / api /请求”)。writeStream。选项(“mergeSchema”,“真正的”)。选项(“checkpointLocation”,“< path-to-checkpoint >”)。开始(“< path_to_target”)
嵌套的JSON数据转换
因为汽车装载机推断顶级JSON作为字符串列,你可以留下嵌套的JSON对象,需要进一步转换。您可以使用半结构化数据访问api进一步变换复杂JSON内容。
火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,“json”)\#模式位置目录跟踪你的数据模式。选项(“cloudFiles.schemaLocation”,“< path-to-checkpoint >”)\。负载(“< source-data-with-nested-json >”)\。selectExpr(“*”,“标签:page.name”,#提取{“标签”:{“页面”:{“名称”:…}}}“标签:page.id:: int”,#提取{“标签”:{"页面":{" id ":…}}},int转换“标签:eventType”#提取{“标签”:{“eventType”:…}})
火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“json”)/ /数据的模式位置目录跟踪模式。选项(“cloudFiles.schemaLocation”,“< path-to-checkpoint >”)。负载(“< source-data-with-nested-json >”)。selectExpr(“*”,“标签:page.name”,/ /提取{“标签”:{“页面”:{“名称”:…}}}“标签:page.id:: int”,/ /提取{“标签”:{"页面":{" id ":…}}},int转换“标签:eventType”/ /提取{“标签”:{“eventType”:…}})
推断出嵌套的JSON数据
当你有嵌套的数据,您可以使用cloudFiles.inferColumnTypes
可以推断出你的数据和其他列的嵌套结构类型。
火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,“json”)\#模式位置目录跟踪你的数据模式。选项(“cloudFiles.schemaLocation”,“< path-to-checkpoint >”)\。选项(“cloudFiles.inferColumnTypes”,“真正的”)\。负载(“< source-data-with-nested-json >”)
火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“json”)/ /数据的模式位置目录跟踪模式。选项(“cloudFiles.schemaLocation”,“< path-to-checkpoint >”)。选项(“cloudFiles.inferColumnTypes”,“真正的”)。负载(“< source-data-with-nested-json >”)
没有头的负荷CSV文件
df=火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,“csv”)\。选项(“rescuedDataColumn”,“_rescued_data”)\#确保你不会丢失数据。模式(<模式>)\#在这里提供一个模式文件。负载(<路径>)
瓦尔df=火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“csv”)。选项(“rescuedDataColumn”,“_rescued_data”)/ /确保你不会丢失数据。模式(<模式>)/ /在这里提供一个模式文件。负载(<路径>)
执行一个模式在CSV文件头
df=火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,“csv”)\。选项(“头”,“真正的”)\。选项(“rescuedDataColumn”,“_rescued_data”)\#确保你不会丢失数据。模式(<模式>)\#在这里提供一个模式文件。负载(<路径>)
瓦尔df=火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“csv”)。选项(“头”,“真正的”)。选项(“rescuedDataColumn”,“_rescued_data”)/ /确保你不会丢失数据。模式(<模式>)/ /在这里提供一个模式文件。负载(<路径>)
摄取图像或二进制数据为毫升三角洲湖
一旦数据存储在三角洲湖,您可以运行分布式推理的数据。看到使用熊猫UDF执行分布式推理。
火花。readStream。格式(“cloudFiles”)\。选项(“cloudFiles.format”,“binaryFile”)\。负载(“< path-to-source-data >”)\。writeStream\。选项(“checkpointLocation”,“< path-to-checkpoint >”)\。开始(“< path_to_target”)
火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“binaryFile”)。负载(“< path-to-source-data >”)。writeStream。选项(“checkpointLocation”,“< path-to-checkpoint >”)。开始(“< path_to_target”)