在Azure Synapse Analytics中查询数据

您可以使用Azure Synapse连接器从Databricks访问Azure Synapse,该连接器使用复制语句,使用Azure data Lake Storage Gen2存储帐户临时暂存,在Databricks集群和Azure Synapse实例之间有效传输大量数据。

Azure Synapse Analytics是一个基于云计算的企业数据仓库,它利用大规模并行处理(MPP)跨pb级数据快速运行复杂查询。

重要的

此连接器仅用于Synapse专用池实例,与其他Synapse组件不兼容。

请注意

复制仅在Azure数据湖存储Gen2实例上可用。如果您正在寻找使用Polybase的详细信息,请参见使用PolyBase连接Databricks和Azure Synapse(遗留)

Synapse的语法示例

您可以在Scala、Python、SQL和r中查询Synapse。下面的代码示例使用存储帐户密钥并将存储凭据从Databricks转发到Synapse。

请注意

使用Azure门户提供的连接字符串,该连接为Spark驱动程序和Azure Synapse实例之间通过JDBC连接发送的所有数据启用安全套接字层(SSL)加密。如果需要验证SSL加密是否已启用,可以搜索加密= true在连接字符串中。

//在notebook会话conf中设置存储帐户访问密钥。火花相依“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”“< your-storage-account-access-key >”//从Azure Synapse表中获取数据瓦尔dfDataFrame火花格式“com.databricks.spark.sqldw”选项“url”" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”选项“tempDir”“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”选项“forwardSparkAzureStorageCredentials”“真正的”选项“数据表”“< your-table-name >”负载()//从Azure Synapse查询中加载数据。瓦尔dfDataFrame火花格式“com.databricks.spark.sqldw”选项“url”" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”选项“tempDir”“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”选项“forwardSparkAzureStorageCredentials”“真正的”选项“查询”select x, count(*) as CNT from table group by x负载()//对数据应用一些转换,然后使用//将数据写回Azure Synapse中的另一个表的数据源API。df格式“com.databricks.spark.sqldw”选项“url”" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”选项“forwardSparkAzureStorageCredentials”“真正的”选项“数据表”“< your-table-name >”选项“tempDir”“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”保存()
在notebook会话conf中设置存储帐户访问密钥。火花相依“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”“< your-storage-account-access-key >”#从Azure Synapse表中获取一些数据。df火花格式“com.databricks.spark.sqldw”选项“url”" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”选项“tempDir”“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”选项“forwardSparkAzureStorageCredentials”“真正的”选项“数据表”“< your-table-name >”负载()#从Azure Synapse查询中加载数据df火花格式“com.databricks.spark.sqldw”选项“url”" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”选项“tempDir”“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”选项“forwardSparkAzureStorageCredentials”“真正的”选项“查询”select x, count(*) as CNT from table group by x负载()对数据应用一些转换,然后使用#数据源API将数据写回Azure Synapse中的另一个表。df格式“com.databricks.spark.sqldw”选项“url”" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”选项“forwardSparkAzureStorageCredentials”“真正的”选项“数据表”“< your-table-name >”选项“tempDir”“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”保存()
——在笔记本会话配置文件中设置存储帐户访问密钥。fsazure账户关键<你的-存储-账户-的名字>dfs核心窗户= <你的-存储-账户-访问-关键>——使用SQL读取数据。创建表格example_table_in_spark_read使用com火花sqldw选项url" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”forwardSparkAzureStorageCredentials“真正的”数据表' < your-table-name >”tempDir“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”);——使用SQL写数据。——创建一个新表,如果已经存在同名的表,则抛出一个错误:创建表格example_table_in_spark_write使用com火花sqldw选项url" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”forwardSparkAzureStorageCredentials“真正的”数据表' < your-table-name >”tempDir“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”作为选择table_to_save_in_spark
#加载SparkR图书馆SparkR在notebook会话conf中设置存储帐户访问密钥。相依<-sparkR.callJMethodsparkR.session(),“配置”sparkR.callJMethod相依“设置”“fs.azure.account.key。< your-storage-account-name > .dfs.core.windows.net”“< your-storage-account-access-key >”#从Azure Synapse表中获取一些数据。df<-read.df“com.databricks.spark.sqldw”url" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”forward_spark_azure_storage_credentials“真正的”数据表“< your-table-name >”tempDir“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”#从Azure Synapse查询中加载数据df<-read.df“com.databricks.spark.sqldw”url" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”forward_spark_azure_storage_credentials“真正的”查询select x, count(*) as CNT from table group by xtempDir“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”对数据应用一些转换,然后使用#数据源API将数据写回Azure Synapse中的另一个表。write.dfdf“com.databricks.spark.sqldw”url" jdbc::状态"置疑" / / < the-rest-of-the-connection-string >”forward_spark_azure_storage_credentials“真正的”数据表“< your-table-name >”tempDir“abfss: / / < your-container-name > @ < your-storage-account-name > .dfs.core.windows.net/ < your-directory-name >”

如何在Databricks和Synapse之间进行身份验证?

Azure Synapse连接器使用三种类型的网络连接:

  • Spark驱动程序到Azure Synapse

  • Spark集群到Azure存储帐户

  • Azure Synapse到Azure存储帐户

配置对Azure存储的访问

Databricks和Synapse都需要对Azure存储帐户的特权访问,以便用于临时数据存储。

Azure Synapse不支持使用SAS进行存储帐户访问。您可以通过执行以下操作之一来配置两个服务的访问:

需要Azure Synapse权限

因为它使用了复制在后台,Azure Synapse连接器要求JDBC连接用户有权限在连接的Azure Synapse实例中运行以下命令:

如果目标表在Azure Synapse中不存在,除了上面的命令外,还需要运行以下命令的权限:

下表总结了写入所需的权限复制

权限(插入到现有表中)

权限(插入到新表中)

管理数据库批量操作

插入

管理数据库批量操作

插入

创建表

ALTER ON SCHEMA:: dbo

网络配置

如果在Azure Synapse上配置防火墙,则必须配置网络设置以允许Databricks访问Azure Synapse。首先,确保Databricks工作空间部署在您自己的虚拟网络中_.然后,您可以在Azure Synpase上配置IP防火墙规则,以允许从子网连接到Synpase帐户。看到Azure Synapse Analytics IP防火墙规则

使用服务主体配置从Databricks到Synapse的OAuth 2.0连接

您可以使用具有访问底层存储帐户权限的服务主体对Azure Synapse Analytics进行身份验证。有关使用服务主体凭据访问Azure存储帐户的详细信息,请参见访问Azure数据湖存储Gen2和Blob存储.您必须设置enableServicePrincipalAuth选项真正的在连接配置中Databricks突触连接器选项参考使连接器能够使用服务主体进行身份验证。

您可以选择为Azure Synapse Analytics连接使用不同的服务主体。以下示例为存储帐户配置服务主体凭据,为Synapse配置可选服务主体凭据:

;为Azure存储帐户定义服务主体凭据fs.azure.account.auth.type OAuthfs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProviderfs.azure.account.oauth2.client。id <应用程序id >fs.azure.account.oauth2.client。秘密< service-credential >fs.azure.account.oauth2.client。端点https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌;为Azure Synapse Analytics定义一组单独的服务主体凭证(如果没有定义,连接器将使用Azure存储帐户凭证)spark.databricks.sqldw.jdbc.service.principal.client.id <应用程序id >spark.databricks.sqldw.jdbc.service.principal.client.secret < service-credential >
//为Azure存储帐户定义Service Principal凭据火花相依“fs.azure.account.auth.type”“OAuth”火花相依“fs.azure.account.oauth.provider.type”“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”火花相依“fs.azure.account.oauth2.client.id”“<应用程序id >”火花相依“fs.azure.account.oauth2.client.secret”“< service-credential >”火花相依“fs.azure.account.oauth2.client.endpoint”“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”//为Azure Synapse Analytics定义一个单独的服务主体凭证集(如果没有定义,连接器将使用Azure存储帐户凭证)火花相依“spark.databricks.sqldw.jdbc.service.principal.client.id”“<应用程序id >”火花相依“spark.databricks.sqldw.jdbc.service.principal.client.secret”“< service-credential >”
#为Azure存储帐户定义服务主体凭证火花相依“fs.azure.account.auth.type”“OAuth”火花相依“fs.azure.account.oauth.provider.type”“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”火花相依“fs.azure.account.oauth2.client.id”“<应用程序id >”火花相依“fs.azure.account.oauth2.client.secret”“< service-credential >”火花相依“fs.azure.account.oauth2.client.endpoint”“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”#为Azure Synapse Analytics定义一组单独的服务主体凭证(如果没有定义,连接器将使用Azure存储帐户凭证)火花相依“spark.databricks.sqldw.jdbc.service.principal.client.id”“<应用程序id >”火花相依“spark.databricks.sqldw.jdbc.service.principal.client.secret”“< service-credential >”
#加载SparkR图书馆SparkR相依<-sparkR.callJMethodsparkR.session(),“配置”#为Azure存储帐户定义服务主体凭证sparkR.callJMethod相依“设置”“fs.azure.account.auth.type”“OAuth”sparkR.callJMethod相依“设置”“fs.azure.account.oauth.provider.type”“org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider”sparkR.callJMethod相依“设置”“fs.azure.account.oauth2.client.id”“<应用程序id >”sparkR.callJMethod相依“设置”“fs.azure.account.oauth2.client.secret”“< service-credential >”sparkR.callJMethod相依“设置”“fs.azure.account.oauth2.client.endpoint”“https://login.microsoftonline.com/ < directory-id > / oauth2 /令牌”#为Azure Synapse Analytics定义一组单独的服务主体凭证(如果没有定义,连接器将使用Azure存储帐户凭证)sparkR.callJMethod相依“设置”“spark.databricks.sqldw.jdbc.service.principal.client.id”“<应用程序id >”sparkR.callJMethod相依“设置”“spark.databricks.sqldw.jdbc.service.principal.client.secret”“< service-credential >”

支持批量写入的保存模式

Azure Synapse连接器支持ErrorIfExists忽略附加,覆盖保存模式,默认模式为ErrorIfExists.有关Apache Spark中支持的保存模式的更多信息,请参见Spark SQL文档中的保存模式

Databricks突触连接器选项参考

选项Spark SQL中提供的支持以下设置:

参数

要求

默认的

笔记

数据表

是的,除非查询指定

没有默认的

在Azure Synapse中要创建或读取的表。当将数据保存回Azure Synapse时,此参数是必需的。

你也可以使用{模式名称},{表名称}访问给定模式中的表。如果没有提供模式名,则使用与JDBC用户关联的默认模式。

以前支持的数据表Variant已弃用,在将来的版本中将被忽略。使用“骆驼案例”的名称。

查询

是的,除非数据表指定

没有默认的

在Azure Synapse中读取的查询。

对于查询中引用的表,也可以使用{模式名称},{表名称}访问给定模式中的表。如果没有提供模式名,则使用与JDBC用户关联的默认模式。

用户

没有

没有默认的

Azure Synapse用户名。必须配合使用吗密码选择。只有在URL中没有传递用户和密码时才能使用。同时传递两者将导致错误。

密码

没有

没有默认的

Azure Synapse密码。必须配合使用吗用户选择。只有在URL中没有传递用户和密码时才能使用。同时传递两者将导致错误。

url

是的

没有默认的

JDBC URLsqlserver设置为子协议。建议使用Azure门户提供的连接字符串。设置加密= true强烈推荐,因为它支持JDBC连接的SSL加密。如果用户而且密码是分开设置的,你不需要在URL中包含它们。

jdbcDriver

没有

由JDBC URL的子协议决定

要使用的JDBC驱动程序的类名。这个类必须在类路径上。在大多数情况下,应该没有必要指定这个选项,因为适当的驱动程序类名应该由JDBC URL的子协议自动确定。

以前支持的jdbc_driverVariant已弃用,在将来的版本中将被忽略。使用“骆驼案例”的名称。

tempDir

是的

没有默认的

一个abfssURI。我们建议您为Azure Synapse使用专用的Blob存储容器。

以前支持的tempdirVariant已弃用,在将来的版本中将被忽略。使用“骆驼案例”的名称。

tempCompression

没有

时髦的

Spark和Azure Synapse用于临时编码/解码的压缩算法。目前支持的值为:未压缩的时髦的而且GZIP

forwardSparkAzureStorageCredentials

没有

如果真正的,库会自动发现Spark用于连接到Blob存储容器的凭据,并通过JDBC将这些凭据转发给Azure Synapse。这些凭证作为JDBC查询的一部分发送。因此,强烈建议在使用此选项时启用JDBC连接的SSL加密。

配置存储鉴权时,必须设置其中之一useAzureMSI而且forwardSparkAzureStorageCredentials真正的.或者,您可以设置enableServicePrincipalAuth真正的并将服务原则用于JDBC和存储身份验证。

以前支持的forward_spark_azure_storage_credentialsVariant已弃用,在将来的版本中将被忽略。使用“骆驼案例”的名称。

useAzureMSI

没有

如果真正的,库将指定身份的管理服务身份的也没有秘密对于它创建的数据库范围的凭据。

配置存储鉴权时,必须设置其中之一useAzureMSI而且forwardSparkAzureStorageCredentials真正的.或者,您可以设置enableServicePrincipalAuth真正的并将服务原则用于JDBC和存储身份验证。

enableServicePrincipalAuth

没有

如果真正的,库将使用提供的服务主体凭据通过JDBC连接到Azure存储帐户和Azure Synapse Analytics。

如果任何一forward_spark_azure_storage_credentialsuseAzureMSI设置为真正的在存储鉴权时,该选项优先于业务原则。

tableOptions

没有

集群COLUMNSTORE指数分布ROUND_ROBIN

用于指定的字符串表选项在创建Azure Synapse表时设置数据表.字符串按字面意义传递给的条款创建表格针对Azure Synapse发出的SQL语句。

以前支持的table_optionsVariant已弃用,在将来的版本中将被忽略。使用“骆驼案例”的名称。

预作用

没有

无默认值(空字符串)

一个在将数据写入Azure Synapse实例之前,要在Azure Synapse中执行的SQL命令的分离列表。这些SQL命令必须是Azure Synapse接受的有效命令。

如果这些命令中的任何一个失败,都将被视为错误,并且不会执行写操作。

postActions

没有

无默认值(空字符串)

一个在连接器成功地将数据写入Azure Synapse实例后,将在Azure Synapse中执行的SQL命令的分离列表。这些SQL命令必须是Azure Synapse接受的有效命令。

如果这些命令中的任何一个失败,它将被视为一个错误,并且在成功地将数据写入Azure Synapse实例后,您将得到一个异常。

maxStrLength

没有

256

StringType中映射到NVARCHAR (maxStrLength)输入Azure Synapse。你可以使用maxStrLength为所有对象设置字符串长度NVARCHAR (maxStrLength)键入表中具有name的列数据表在Azure Synapse中。

以前支持的maxstrlengthVariant已弃用,在将来的版本中将被忽略。使用“骆驼案例”的名称。

applicationName

没有

Databricks-User-Query

每个查询的连接标记。如果未指定或该值为空字符串,则将标记的默认值添加到JDBC URL中。默认值防止Azure DB监控工具对查询发出虚假的SQL注入警报。

maxbinlength

没有

没有默认的

的列长度BinaryType列。此参数转换为VARBINARY (maxbinlength)

identityInsert

没有

设置为真正的使IDENTITY_INSERT模式,它在Azure Synapse表的标识列中插入一个DataFrame提供的值。

看到显式地将值插入IDENTITY列

maxErrors

没有

0

在取消加载操作之前,在读写期间可以拒绝的最大行数。被拒绝的行将被忽略。例如,如果10个记录中有2个有错误,那么将只处理8个记录。

看到创建外部表中的REJECT_VALUE文档而且COPY中的MAXERRORS文档

inferTimestampNTZType

没有

如果真正的, Azure Synapse类型的值时间戳被解释为TimestampNTZType(时间戳,不带时区)。否则,所有时间戳都被解释为TimestampType不管底层Azure Synapse表中的类型是什么。

请注意

  • tableOptions预作用postActions,maxStrLength只有在将数据从Databricks写入Azure Synapse中的新表时才相关。

  • 尽管所有数据源选项名称都不区分大小写,但为了清晰起见,我们建议您使用“驼峰大小写”指定它们。

查询下推到Azure Synapse

Azure Synapse连接器实现了一组优化规则,将以下操作符推入Azure Synapse:

  • 过滤器

  • 项目

  • 限制

项目而且过滤器操作符支持以下表达式:

  • 大多数布尔逻辑运算符

  • 比较

  • 基本算术运算

  • 数字和字符串类型转换

限制操作符,只有在没有指定顺序时才支持下推。例如:

选择(10)表格,但不是选择(10)表格订单通过上校

请注意

Azure Synapse连接器不会下推操作字符串、日期或时间戳的表达式。

默认情况下,使用Azure Synapse连接器构建的查询下推是启用的。您可以通过设置禁用它spark.databricks.sqldw.pushdown

临时数据管理

Azure Synapse连接器删除它在Azure存储容器中创建的临时文件。Databricks建议定期删除用户提供的临时文件tempDir的位置。

为了方便数据清理,Azure Synapse连接器不直接在下面存储数据文件tempDir,而是创建表单的子目录:< tempDir > / < yyyy-MM-dd > / < HH-mm-ss-SSS > / < randomUUID > /.您可以设置周期作业(使用Databricks . properties)工作功能或其他)递归删除任何子目录的历史超过给定的阈值(例如,2天),假设没有Spark作业运行的时间超过该阈值。

一个更简单的替代方法是定期删除整个容器,并创建一个具有相同名称的新容器。这要求您为Azure Synapse连接器生成的临时数据使用专用容器,并且您可以找到一个时间窗口,在该时间窗口中可以保证没有涉及该连接器的查询正在运行。

临时对象管理

Azure Synapse连接器自动在Databricks集群和Azure Synapse实例之间传输数据。要从Azure Synapse表读取数据或查询数据,或将数据写入Azure Synapse表,Azure Synapse连接器将创建临时对象,包括数据库作用域凭证外部数据外部文件格式,外部表格幕后。这些对象只在相应的Spark作业期间存在,并且会自动删除。

当集群使用Azure Synapse连接器运行查询时,如果Spark驱动程序进程崩溃或强制重新启动,或者集群强制终止或重新启动,则可能不会删除临时对象。为了便于识别和手动删除这些对象,Azure Synapse连接器在Azure Synapse实例中创建的所有中间临时对象的名称前加上一个这样的标记:tmp_databricks_ < yyyy_MM_dd_HH_mm_ss_SSS > _ < randomUUID > _ < internalObject >

我们建议您定期使用以下查询查找泄漏对象:

  • 选择sys.database_scoped_credentials在哪里的名字就像“tmp_databricks_ %”

  • 选择sys.external_data_sources在哪里的名字就像“tmp_databricks_ %”

  • 选择sys.external_file_formats在哪里的名字就像“tmp_databricks_ %”

  • 选择sys.external_tables在哪里的名字就像“tmp_databricks_ %”