在Delta活动表中使用自动加载器
您可以在您的Delta活动表管道。Delta Live Tables扩展了Apache Spark结构化流的功能,允许你只写几行声明性的Python或SQL来部署生产质量的数据管道:
自动伸缩计算基础设施为了节省成本
数据质量检查预期
自动模式演化处理
中的度量进行监视事件日志
您不需要提供模式或检查点位置,因为Delta Live Tables会自动管理管道的这些设置。看到摄取数据到Delta活动表.
DLT的自动加载语法
Delta Live Tables为Auto Loader提供了稍微修改的Python语法,并为Auto Loader添加了SQL支持。
下面的例子使用Auto Loader从CSV和JSON文件创建数据集:
@dlt.表格def客户():返回(火花.readStream.格式(“cloudFiles”).选项(“cloudFiles.format”,“csv”).负载(“/ databricks-datasets / retail-org /客户/”))@dlt.表格defsales_orders_raw():返回(火花.readStream.格式(“cloudFiles”).选项(“cloudFiles.format”,“json”).负载(“/ databricks-datasets / retail-org sales_orders /”))
创建或刷新流媒体生活表格客户作为选择*从cloud_files(“/ databricks-datasets / retail-org /客户/”,“csv”)创建或刷新流媒体生活表格sales_orders_raw作为选择*从cloud_files(“/ databricks-datasets / retail-org sales_orders /”,“json”)
你可以使用supported格式选项自动装载机。使用map ()
函数时,可以将任意数量的选项传递给cloud_files ()
方法。选项是键-值对,其中键和值是字符串。下面描述了在SQL中使用Auto Loader的语法:
创建或刷新流媒体生活表格<table_name>作为选择*从cloud_files(“< file_path >”,“< file_format >”,地图(“< option_key >”,“< option_value”,“< option_key >”,“< option_value”,...))
下面的示例从带头的tab分隔的CSV文件中读取数据:
创建或刷新流媒体生活表格客户作为选择*从cloud_files(“/ databricks-datasets / retail-org /客户/”,“csv”,地图(“分隔符”,“t \”,“头”,“真正的”))
您可以使用模式
手动指定格式;您必须指定模式
对于不支持的格式模式推理:
@dlt.表格defwiki_raw():返回(火花.readStream.格式(“cloudFiles”).模式("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING").选项(“cloudFiles.format”,“铺”).负载(“/ databricks-datasets / wikipedia-datasets /数据- 001 / en_wikipedia / articles-only-parquet”))
创建或刷新流媒体生活表格wiki_raw作为选择*从cloud_files(“/ databricks-datasets / wikipedia-datasets /数据- 001 / en_wikipedia / articles-only-parquet”,“铺”,地图(“模式”,"title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"))
请注意
Delta Live Tables在使用Auto Loader读取文件时自动配置和管理模式和检查点目录。但是,如果手动配置这些目录中的任何一个,执行完全刷新不会影响所配置目录的内容。Databricks建议使用自动配置的目录,以避免处理过程中出现意想不到的副作用。