在Azure Synapse Analytics中查询数据
您可以使用Azure Synapse连接器从Databricks访问Azure Synapse,该连接器使用复制
语句,使用Azure data Lake Storage Gen2存储帐户临时暂存,在Databricks集群和Azure Synapse实例之间有效传输大量数据。
Azure Synapse Analytics是一个基于云计算的企业数据仓库,它利用大规模并行处理(MPP)跨pb级数据快速运行复杂查询。
重要的
此连接器仅用于Synapse专用池实例,与其他Synapse组件不兼容。
请注意
复制
仅在Azure数据湖存储Gen2实例上可用。如果您正在寻找使用Polybase的详细信息,请参见使用PolyBase连接Databricks和Azure Synapse(遗留).
Synapse的语法示例
您可以在Scala、Python、SQL和r中查询Synapse。下面的代码示例使用存储帐户密钥并将存储凭据从Databricks转发到Synapse。
请注意
使用Azure门户提供的连接字符串,该连接为Spark驱动程序和Azure Synapse实例之间通过JDBC连接发送的所有数据启用安全套接字层(SSL)加密。如果需要验证SSL加密是否已启用,可以搜索加密= true
在连接字符串中。
//在notebook会话conf中设置存储帐户访问密钥。火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”,“< your-storage-account-access-key >”)//从Azure Synapse表中获取数据瓦尔df:DataFrame=火花.读.格式(“com.databricks.spark.sqldw”).选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”).选项(“tempDir”,“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”).选项(“forwardSparkAzureStorageCredentials”,“真正的”).选项(“数据表”,“< your-table-name >”).负载()//从Azure Synapse查询中加载数据。瓦尔df:DataFrame=火花.读.格式(“com.databricks.spark.sqldw”).选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”).选项(“tempDir”,“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”).选项(“forwardSparkAzureStorageCredentials”,“真正的”).选项(“查询”,select x, count(*) as CNT from table group by x).负载()//对数据应用一些转换,然后使用//将数据写回Azure Synapse中的另一个表的数据源API。df.写.格式(“com.databricks.spark.sqldw”).选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”).选项(“forwardSparkAzureStorageCredentials”,“真正的”).选项(“数据表”,“< your-table-name >”).选项(“tempDir”,“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”).保存()
在notebook会话conf中设置存储帐户访问密钥。火花.相依.集(“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”,“< your-storage-account-access-key >”)#从Azure Synapse表中获取一些数据。df=火花.读\.格式(“com.databricks.spark.sqldw”)\.选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”)\.选项(“tempDir”,“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”)\.选项(“forwardSparkAzureStorageCredentials”,“真正的”)\.选项(“数据表”,“< your-table-name >”)\.负载()#从Azure Synapse查询中加载数据df=火花.读\.格式(“com.databricks.spark.sqldw”)\.选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”)\.选项(“tempDir”,“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”)\.选项(“forwardSparkAzureStorageCredentials”,“真正的”)\.选项(“查询”,select x, count(*) as CNT from table group by x)\.负载()对数据应用一些转换,然后使用#数据源API将数据写回Azure Synapse中的另一个表。df.写\.格式(“com.databricks.spark.sqldw”)\.选项(“url”," jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”)\.选项(“forwardSparkAzureStorageCredentials”,“真正的”)\.选项(“数据表”,“< your-table-name >”)\.选项(“tempDir”,“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”)\.保存()
——在笔记本会话配置文件中设置存储帐户访问密钥。集fs.azure.账户.关键.<你的-存储-账户-的名字>.dfs.核心.窗户.网= <你的-存储-账户-访问-关键>;——使用SQL读取数据。创建表格example_table_in_spark_read使用com.砖.火花.sqldw选项(url" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”,forwardSparkAzureStorageCredentials“真正的”,数据表' < your-table-name >”,tempDir“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”);——使用SQL写数据。——创建一个新表,如果已经存在同名的表,则抛出一个错误:创建表格example_table_in_spark_write使用com.砖.火花.sqldw选项(url" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”,forwardSparkAzureStorageCredentials“真正的”,数据表' < your-table-name >”,tempDir“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”)作为选择*从table_to_save_in_spark;
#加载SparkR图书馆(SparkR)在notebook会话conf中设置存储帐户访问密钥。相依<-sparkR.callJMethod(sparkR.session(),“配置”)sparkR.callJMethod(相依,“设置”,“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”,“< your-storage-account-access-key >”)#从Azure Synapse表中获取一些数据。df<-read.df(源=“com.databricks.spark.sqldw”,url=" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”,forward_spark_azure_storage_credentials=“真正的”,数据表=“< your-table-name >”,tempDir=“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”)#从Azure Synapse查询中加载数据df<-read.df(源=“com.databricks.spark.sqldw”,url=" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”,forward_spark_azure_storage_credentials=“真正的”,查询=select x, count(*) as CNT from table group by x,tempDir=“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”)对数据应用一些转换,然后使用#数据源API将数据写回Azure Synapse中的另一个表。write.df(df,源=“com.databricks.spark.sqldw”,url=" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”,forward_spark_azure_storage_credentials=“真正的”,数据表=“< your-table-name >”,tempDir=“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”)
如何在Databricks和Synapse之间进行身份验证?
Azure Synapse连接器使用三种类型的网络连接:
Spark驱动程序到Azure Synapse
Spark集群到Azure存储帐户
Azure Synapse到Azure存储帐户
配置对Azure存储的访问
Databricks和Synapse都需要对Azure存储帐户的特权访问,以便用于临时数据存储。
Azure Synapse不支持使用SAS进行存储帐户访问。您可以通过执行以下操作之一来配置两个服务的访问:
使用存储帐户和设置的帐户密钥和秘密
forwardSparkAzureStorageCredentials
来真正的
.看到使用帐户密钥访问Azure数据湖存储Gen2或Blob存储.使用Azure数据湖存储Gen2OAuth 2.0认证.
配置Azure Synapse实例,使其具有托管服务标识。
需要Azure Synapse权限
因为它使用了复制
在后台,Azure Synapse连接器要求JDBC连接用户有权限在连接的Azure Synapse实例中运行以下命令:
如果目标表在Azure Synapse中不存在,除了上面的命令外,还需要运行以下命令的权限:
下表总结了写入所需的权限复制
:
权限(插入到现有表中) |
权限(插入到新表中) |
---|---|
管理数据库批量操作 插入 |
管理数据库批量操作 插入 创建表 ALTER ON SCHEMA:: dbo |
网络配置
如果在Azure Synapse上配置防火墙,则必须配置网络设置以允许Databricks访问Azure Synapse。首先,确保Databricks工作空间部署在您自己的虚拟网络中_.然后,您可以在Azure Synpase上配置IP防火墙规则,以允许从子网连接到Synpase帐户。看到Azure Synapse Analytics IP防火墙规则.
使用服务主体配置从Databricks到Synapse的OAuth 2.0连接
您可以使用具有访问底层存储帐户权限的服务主体对Azure Synapse Analytics进行身份验证。有关使用服务主体凭据访问Azure存储帐户的详细信息,请参见访问Azure数据湖存储Gen2和Blob存储.您必须设置enableServicePrincipalAuth
选项真正的
在连接配置中Databricks突触连接器选项参考使连接器能够使用服务主体进行身份验证。
您可以选择为Azure Synapse Analytics连接使用不同的服务主体。以下示例为存储帐户配置服务主体凭据,为Synapse配置可选服务主体凭据:
;为Azure存储帐户定义服务主体凭据fs.azure.account.auth.type OAuthfs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProviderfs.azure.account.oauth2.client。id <应用程序id >fs.azure.account.oauth2.client。秘密< service-credential >fs.azure.account.oauth2.client。端点https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌;为Azure Synapse Analytics定义一组单独的服务主体凭证(如果没有定义,连接器将使用Azure存储帐户凭证)spark.databricks.sqldw.jdbc.service.principal.client.id <应用程序id >spark.databricks.sqldw.jdbc.service.principal.client.secret < service-credential >
//为Azure存储帐户定义Service Principal凭据火花.相依.集(“fs.azure.account.auth.type”,“OAuth”)火花.相依.集(“fs.azure.account.oauth.provider.type”,“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”)火花.相依.集(“fs.azure.account.oauth2.client.id”,“<应用程序id >”)火花.相依.集(“fs.azure.account.oauth2.client.secret”,“< service-credential >”)火花.相依.集(“fs.azure.account.oauth2.client.endpoint”,“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”)//为Azure Synapse Analytics定义一个单独的服务主体凭证集(如果没有定义,连接器将使用Azure存储帐户凭证)火花.相依.集(“spark.databricks.sqldw.jdbc.service.principal.client.id”,“<应用程序id >”)火花.相依.集(“spark.databricks.sqldw.jdbc.service.principal.client.secret”,“< service-credential >”)
#为Azure存储帐户定义服务主体凭证火花.相依.集(“fs.azure.account.auth.type”,“OAuth”)火花.相依.集(“fs.azure.account.oauth.provider.type”,“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”)火花.相依.集(“fs.azure.account.oauth2.client.id”,“<应用程序id >”)火花.相依.集(“fs.azure.account.oauth2.client.secret”,“< service-credential >”)火花.相依.集(“fs.azure.account.oauth2.client.endpoint”,“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”)#为Azure Synapse Analytics定义一组单独的服务主体凭证(如果没有定义,连接器将使用Azure存储帐户凭证)火花.相依.集(“spark.databricks.sqldw.jdbc.service.principal.client.id”,“<应用程序id >”)火花.相依.集(“spark.databricks.sqldw.jdbc.service.principal.client.secret”,“< service-credential >”)
#加载SparkR图书馆(SparkR)相依<-sparkR.callJMethod(sparkR.session(),“配置”)#为Azure存储帐户定义服务主体凭证sparkR.callJMethod(相依,“设置”,“fs.azure.account.auth.type”,“OAuth”)sparkR.callJMethod(相依,“设置”,“fs.azure.account.oauth.provider.type”,“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”)sparkR.callJMethod(相依,“设置”,“fs.azure.account.oauth2.client.id”,“<应用程序id >”)sparkR.callJMethod(相依,“设置”,“fs.azure.account.oauth2.client.secret”,“< service-credential >”)sparkR.callJMethod(相依,“设置”,“fs.azure.account.oauth2.client.endpoint”,“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”)#为Azure Synapse Analytics定义一组单独的服务主体凭证(如果没有定义,连接器将使用Azure存储帐户凭证)sparkR.callJMethod(相依,“设置”,“spark.databricks.sqldw.jdbc.service.principal.client.id”,“<应用程序id >”)sparkR.callJMethod(相依,“设置”,“spark.databricks.sqldw.jdbc.service.principal.client.secret”,“< service-credential >”)
支持批量写入的保存模式
Azure Synapse连接器支持ErrorIfExists
,忽略
,附加
,覆盖
保存模式,默认模式为ErrorIfExists
.有关Apache Spark中支持的保存模式的更多信息,请参见Spark SQL文档中的保存模式.
Databricks突触连接器选项参考
的选项
Spark SQL中提供的支持以下设置:
参数 |
要求 |
默认的 |
笔记 |
---|---|---|---|
|
是的,除非 |
没有默认的 |
在Azure Synapse中要创建或读取的表。当将数据保存回Azure Synapse时,此参数是必需的。 你也可以使用 以前支持的 |
|
是的,除非 |
没有默认的 |
在Azure Synapse中读取的查询。 对于查询中引用的表,也可以使用 |
|
没有 |
没有默认的 |
Azure Synapse用户名。必须配合使用吗 |
|
没有 |
没有默认的 |
Azure Synapse密码。必须配合使用吗 |
|
是的 |
没有默认的 |
JDBC URL |
|
没有 |
由JDBC URL的子协议决定 |
要使用的JDBC驱动程序的类名。这个类必须在类路径上。在大多数情况下,应该没有必要指定这个选项,因为适当的驱动程序类名应该由JDBC URL的子协议自动确定。 以前支持的 |
|
是的 |
没有默认的 |
一个 以前支持的 |
|
没有 |
|
Spark和Azure Synapse用于临时编码/解码的压缩算法。目前支持的值为: |
|
没有 |
假 |
如果 配置存储鉴权时,必须设置其中之一 以前支持的 |
|
没有 |
假 |
如果 配置存储鉴权时,必须设置其中之一 |
|
没有 |
假 |
如果 如果任何一 |
|
没有 |
|
用于指定的字符串表选项在创建Azure Synapse表时设置 以前支持的 |
|
没有 |
无默认值(空字符串) |
一个 如果这些命令中的任何一个失败,都将被视为错误,并且不会执行写操作。 |
|
没有 |
无默认值(空字符串) |
一个 如果这些命令中的任何一个失败,它将被视为一个错误,并且在成功地将数据写入Azure Synapse实例后,您将得到一个异常。 |
|
没有 |
256 |
以前支持的 |
|
没有 |
|
每个查询的连接标记。如果未指定或该值为空字符串,则将标记的默认值添加到JDBC URL中。默认值防止Azure DB监控工具对查询发出虚假的SQL注入警报。 |
|
没有 |
没有默认的 |
的列长度 |
|
没有 |
假 |
设置为 |
|
没有 |
0 |
在取消加载操作之前,在读写期间可以拒绝的最大行数。被拒绝的行将被忽略。例如,如果10个记录中有2个有错误,那么将只处理8个记录。 |
|
没有 |
假 |
如果 |
请注意
tableOptions
,预作用
,postActions
,maxStrLength
只有在将数据从Databricks写入Azure Synapse中的新表时才相关。尽管所有数据源选项名称都不区分大小写,但为了清晰起见,我们建议您使用“驼峰大小写”指定它们。
查询下推到Azure Synapse
Azure Synapse连接器实现了一组优化规则,将以下操作符推入Azure Synapse:
过滤器
项目
限制
的项目
而且过滤器
操作符支持以下表达式:
大多数布尔逻辑运算符
比较
基本算术运算
数字和字符串类型转换
为限制
操作符,只有在没有指定顺序时才支持下推。例如:
选择(10)*从表格
,但不是选择(10)*从表格订单通过上校
.
请注意
Azure Synapse连接器不会下推操作字符串、日期或时间戳的表达式。
默认情况下,使用Azure Synapse连接器构建的查询下推是启用的。您可以通过设置禁用它spark.databricks.sqldw.pushdown
来假
.
临时数据管理
Azure Synapse连接器不删除它在Azure存储容器中创建的临时文件。Databricks建议定期删除用户提供的临时文件tempDir
的位置。
为了方便数据清理,Azure Synapse连接器不直接在下面存储数据文件tempDir
,而是创建表单的子目录:< tempDir > / < yyyy-MM-dd > / < HH-mm-ss-SSS > / < randomUUID > /
.您可以设置周期作业(使用Databricks . properties)工作功能或其他)递归删除任何子目录的历史超过给定的阈值(例如,2天),假设没有Spark作业运行的时间超过该阈值。
一个更简单的替代方法是定期删除整个容器,并创建一个具有相同名称的新容器。这要求您为Azure Synapse连接器生成的临时数据使用专用容器,并且您可以找到一个时间窗口,在该时间窗口中可以保证没有涉及该连接器的查询正在运行。
临时对象管理
Azure Synapse连接器自动在Databricks集群和Azure Synapse实例之间传输数据。要从Azure Synapse表读取数据或查询数据,或将数据写入Azure Synapse表,Azure Synapse连接器将创建临时对象,包括数据库作用域凭证
,外部数据源
,外部文件格式
,外部表格
幕后。这些对象只在相应的Spark作业期间存在,并且会自动删除。
当集群使用Azure Synapse连接器运行查询时,如果Spark驱动程序进程崩溃或强制重新启动,或者集群强制终止或重新启动,则可能不会删除临时对象。为了便于识别和手动删除这些对象,Azure Synapse连接器在Azure Synapse实例中创建的所有中间临时对象的名称前加上一个这样的标记:tmp_databricks_ < yyyy_MM_dd_HH_mm_ss_SSS > _ < randomUUID > _ < internalObject >
.
我们建议您定期使用以下查询查找泄漏对象:
选择*从sys.database_scoped_credentials在哪里的名字就像“tmp_databricks_ %”
选择*从sys.external_data_sources在哪里的名字就像“tmp_databricks_ %”
选择*从sys.external_file_formats在哪里的名字就像“tmp_databricks_ %”
选择*从sys.external_tables在哪里的名字就像“tmp_databricks_ %”