砖连接
请注意
Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。
Databricks Connect允许您连接您最喜欢的IDE (Eclipse, IntelliJ, PyCharm, RStudio, Visual Studio Code),笔记本服务器(Jupyter notebook, Zeppelin),和其他自定义应用程序到Databricks集群。
本文解释了Databricks Connect的工作原理,引导您完成开始使用Databricks Connect的步骤,解释了如何排除使用Databricks Connect时可能出现的问题,以及使用Databricks Connect运行与在Databricks笔记本上运行之间的区别。
概述
Databricks Connect是Databricks运行时的客户端库。它允许您使用Spark api编写作业,并在Databricks集群上远程运行它们,而不是在本地Spark会话中运行。
例如,执行DataFrame命令时spark.read.format(“铺”).load(…).groupBy(…).agg(…),告诉()
使用Databricks Connect,作业的解析和规划在本地机器上运行。然后,任务的逻辑表示被发送到运行在Databricks中的Spark服务器,以便在集群中执行。
使用Databricks Connect,您可以:
从任何Python、Java、Scala或R应用程序运行大规模Spark作业。任何地方
进口pyspark
,进口org.apache.spark
,或要求(SparkR)
,您现在可以直接从应用程序运行Spark作业,而不需要安装任何IDE插件或使用Spark提交脚本。即使在使用远程集群时,也可以在IDE中逐步检查和调试代码。
在开发库时快速迭代。在Databricks Connect中更改Python或Java库依赖关系后,不需要重新启动集群,因为每个客户端会话在集群中是相互隔离的。
在不丢失工作的情况下关闭空闲集群。由于客户机应用程序与集群分离,因此它不受集群重新启动或升级的影响,而集群重新启动或升级通常会导致丢失笔记本中定义的所有变量、rdd和DataFrame对象。
请注意
对于使用SQL查询的Python开发,Databricks建议您使用Databricks SQL连接器for Python而不是Databricks Connect。Databricks SQL Connector for Python比Databricks Connect更容易设置。此外,Databricks Connect解析和计划在本地机器上运行的作业,而作业在远程计算资源上运行。这使得调试运行时错误变得特别困难。Databricks SQL Connector for Python直接向远程计算资源提交SQL查询并获取结果。
需求
仅支持以下Databricks Runtime版本:
Databricks Runtime 11.3 LTS ML
Databricks Runtime 10.4 LTS ML
Databricks Runtime 9.1 LTS ML
Databricks Runtime 7.3 LTS ML
客户端Python安装的次要版本必须与Databricks集群的次要Python版本相同。下表显示了每个Databricks Runtime安装的Python版本。
Databricks运行时版本号
Python版本
11.3 LTS ml, 11.3 LTS
3.9
10.4 LTS ml, 10.4 LTS
3.8
9.1 LTS ml, 9.1 LTS
3.8
7.3 LTS ml
3.7
例如,如果你在本地开发环境中使用Conda,而你的集群运行的是Python 3.7,你必须创建一个该版本的环境,例如:
Conda create——name dbconnectpython=3.7 conda
Databricks Connect主要和次要包版本必须始终与您的Databricks运行时版本匹配。Databricks建议您始终使用与您的Databricks运行时版本相匹配的Databricks Connect最新包。例如,当使用Databricks Runtime 7.3 LTS集群时,使用
databricks-connect = = 7.3 . *
包中。请注意
看到Databricks Connect发布说明参阅可用的Databricks Connect版本和维护更新的列表。
Java Runtime Environment (JRE)客户端已使用OpenJDK 8 JRE进行测试。客户端不支持Java 11。
请注意
在Windows上,如果您看到Databricks Connect无法找到的错误winutils.exe
,请参阅在Windows上找不到winutils.exe.
设置客户端
请注意
在开始设置Databricks Connect客户端之前,您必须满足要求用于Databricks Connect。
步骤1:安装客户端
卸载PySpark。这是必需的,因为
databricks-connect
包与PySpark冲突。详细信息请参见冲突的PySpark安装.PIP卸载pyspark
安装Databricks Connect客户端。
pip install -U“databricks-connect = = 7.3 *”。#或x.y *来匹配您的集群版本。
请注意
总是指定
databricks-connect = = X.Y. *
而不是databricks-connect = X。Y
,以确保安装了最新的软件包。
步骤2:配置连接属性
收集以下配置属性:
配置连接。可以使用CLI、SQL或环境变量进行配置。配置方法的优先级从高到低依次为:SQL配置键、CLI和环境变量。
CLI
运行
databricks-connect
.databricks-connect配置
license显示如下:
版权(2018)砖,公司.这图书馆(的“软件”)五月不是使用除了在连接与的被许可方根据协议使用Databricks平台服务bob体育客户端下载...
接受许可证并提供配置值。为砖的主机而且砖的令牌,输入您在步骤1中注意到的工作区URL和个人访问令牌。
你是否接受上述协议?[y/N] y设置新的配置值(保留输入为空接受默认值):databrick Host[无当前值,必须以https://开头]:
databrick Token[无当前值]: 集群ID(例如,0921-001415-jelly628)[无当前值]:< Cluster - ID >组织ID (Azure-only,见?o=orgId in URL) [0]: < Org - ID >端口[15001]:<端口>
SQL配置或环境变量。下表显示了与步骤1中注意到的配置属性对应的SQL配置键和环境变量。要设置SQL配置键,请使用
sql(“集配置=值”)
.例如:sql(“集spark.databricks.service.clusterId = 0304 - 201045 - abcdefgh”)
.参数
SQL配置键
环境变量名称
砖的主机
spark.databricks.service.address
DATABRICKS_ADDRESS
砖的令牌
spark.databricks.service.token
DATABRICKS_API_TOKEN
集群ID
spark.databricks.service.clusterId
DATABRICKS_CLUSTER_ID
Org ID
spark.databricks.service.orgId
DATABRICKS_ORG_ID
港口
spark.databricks.service.port
DATABRICKS_PORT
测试到Databricks的连通性。
databricks-connect测验
如果您配置的集群没有运行,测试将启动集群,该集群将一直运行到其配置的自动终止时间。输出应该是这样的:
* PySpark安装在/…/3.5.6/lib/python3.5/site-packages/ PySpark *检查java版本java版本"1.8.0_152" java (TM) SE运行时环境(build 1.8.0_152-b16) java HotSpot(TM) 64位服务器虚拟机(build 25.152-b16,混合模式)*测试scala命令18/12/10 16:38:44警告NativeCodeLoader:无法为您的平台加载本机hadoop库…bob体育客户端下载使用Spark的默认log4j配置文件:org/apache/spark/log4j-defaults。properties设置默认日志级别为WARN。使用sc.setLogLevel(newLevel)调整日志级别。对于SparkR,使用setLogLevel(newLevel)。18/12/10 16:38:50 WARN MetricsSystem:使用默认名称SparkStatusTracker作为源,因为没有设置spark.metrics.namespace和spark.app.id。18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidprev state 18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms欢迎访问____ __/__ /__ ___ _____/ /___ \ \/ _\ /_/ /___/ .__/\_,_/_/ /_/\_版本2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit server VM, Java 1.8.0_152)输入表达式来计算它们。类型:帮助获取更多信息。scala > spark.range(100)。reduce(_ + _) Spark context Web UI可用在https://10.8.5.214:4040 Spark context可用为'sc' (master = local[*], app id = local-1544488730553)。Spark会话可用为“Spark”。 View job details at
/?o=0#/setting/clusters/ /sparkUi View job details at ?o=0#/setting/clusters/ /sparkUi res0: Long = 4950 scala> :quit * Testing python command 18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state View job details at /?o=0#/setting/clusters/ /sparkUi
设置IDE或笔记本服务器
本节描述如何配置您的首选IDE或笔记本服务器以使用Databricks Connect客户端。
本节:
Jupyter笔记本
请注意
Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。
在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。
Databricks Connect配置脚本自动将包添加到项目配置中。要在Python内核中开始,运行:
从pyspark.sql进口SparkSession火花=SparkSession.构建器.getOrCreate()
要启用%的sql
运行和可视化SQL查询的简写,使用下面的代码片段:
从IPython.core.magic进口line_magic,line_cell_magic,魔法,magics_class@magics_class类DatabricksConnectMagics(魔法):@line_cell_magicdefsql(自我,行,细胞=没有一个):如果细胞而且行:提高ValueError(“单元格魔法行必须为空”,行)试一试:从autovizwidget.widget.utils进口display_dataframe除了ImportError:打印(“请运行' pip install autovizwidget '以启用可视化小部件。”)display_dataframe=λx:x返回display_dataframe(自我.get_spark().sql(细胞orgydF4y2Ba行).toPandas())defget_spark(自我):user_ns=get_ipython().user_ns如果“火花”在user_ns:返回user_ns[“火花”]其他的:从pyspark.sql进口SparkSessionuser_ns[“火花”]=SparkSession.构建器.getOrCreate()返回user_ns[“火花”]知识产权=get_ipython()知识产权.register_magics(DatabricksConnectMagics)
PyCharm
请注意
Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。
在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。
Databricks Connect配置脚本自动将包添加到项目配置中。
Python 3集群
创建PyCharm项目时,选择现有的翻译.从下拉菜单中,选择您创建的Conda环境(参见需求).
去执行>编辑配置.
添加
PYSPARK_PYTHON = python3
作为一个环境变量。
SparkR和RStudio Desktop
请注意
Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。
在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。
下载并解包bob下载地址开源Spark到您的本地机器上。选择与Databricks集群中相同的版本(Hadoop 2.7)。
运行
databricks-connectget-jar-dir
.该命令返回类似于/usr/local/lib/python3.5/dist-packages / pyspark / jar
.复制的文件路径上面一个目录JAR目录文件路径,例如:/usr/local/lib/python3.5/dist-packages / pyspark
,即SPARK_HOME
目录中。配置Spark lib路径和Spark home路径,将它们添加到R脚本的顶部。集
< spark-lib-path >
到步骤1中解压缩开源Spark包的目录。bob下载地址集< spark-home-path >
从步骤2到Databricks Connect目录。#指向OSS包的路径,例如/path/to/…/spark-2.4.0-bin-hadoop2.7图书馆(SparkR,lib.loc=.libPaths(c(file.path(' < spark-lib-path >”,“R”,“自由”),.libPaths())))#指向Databricks Connect PySpark安装,例如:/path/to/…/ PySparkSys.setenv(SPARK_HOME=“< spark-home-path >”)
启动Spark会话并开始执行SparkR命令。
sparkR.session()df<-作为。DataFrame(忠实的)头(df)df1<-有斑纹的(df,函数(x){x},模式(df))收集(df1)
sparklyr和RStudio Desktop
预览
此功能已在公共预览.
请注意
Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。
在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。
您可以复制使用Databricks Connect在本地开发的依赖于sparklyr的代码,并在Databricks笔记本电脑或Databricks工作区中的托管RStudio服务器中运行,只需进行很少或不进行代码更改。
需求
Sparklyr 1.2或以上版本。
Databricks Runtime 7.3或以上版本,并搭配Databricks Connect。
安装、配置和使用sparklyr
在RStudio Desktop中,从CRAN安装sparklyr 1.2或以上版本,或从GitHub安装最新的主版本。
#从CRAN安装install.packages(“sparklyr”)#或从GitHub安装最新的主版本install.packages(“devtools”)devtools::install_github(“sparklyr / sparklyr”)
激活安装了Databricks Connect的Python环境,并在终端中运行以下命令以获取
< spark-home-path >
:databricks-connect get-spark-home
启动Spark会话,执行sparklyr命令。
图书馆(sparklyr)sc<-spark_connect(方法=“砖”,spark_home=“< spark-home-path >”)iris_tbl<-copy_to(sc,虹膜,覆盖=真正的)图书馆(dplyr)src_tbls(sc)iris_tbl% > %数
关闭连接。
spark_disconnect(sc)
IntelliJ (Scala或Java)
请注意
Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。
在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。
运行
databricks-connectget-jar-dir
.将依赖项指向命令返回的目录。去>项目结构>模块>依赖>“+”符号> jar或目录.
为了避免冲突,我们强烈建议从您的类路径中删除任何其他Spark安装。如果这是不可能的,请确保您添加的jar位于类路径的前面。特别是,它们必须在任何其他已安装的Spark版本之前(否则您将使用其中一个其他Spark版本并在本地运行或抛出一个错误
ClassDefNotFoundError
).检查IntelliJ中中断选项的设置。默认为所有并且如果为调试设置断点,将导致网络超时。设置为线程避免停止后台网络线程。
Eclipse
请注意
Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。
在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。
运行
databricks-connectget-jar-dir
.将外部jar配置指向命令返回的目录。去项目菜单>属性> Java构建路径>库>添加外部jar.
为了避免冲突,我们强烈建议从您的类路径中删除任何其他Spark安装。如果这是不可能的,请确保您添加的jar位于类路径的前面。特别是,它们必须在任何其他已安装的Spark版本之前(否则您将使用其中一个其他Spark版本并在本地运行或抛出一个错误
ClassDefNotFoundError
).
Visual Studio代码
请注意
Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。
在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。
验证Python扩展安装。
打开命令面板(命令+ Shift + P在macOS和Ctrl + Shift + P在Windows / Linux)。
选择一个Python解释器。去Code > Preferences > Settings,并选择python的设置.
运行
databricks-connectget-jar-dir
.将命令返回的目录添加到User Settings JSON下
python.venvPath
.这应该被添加到Python配置中。关闭门栓。单击...在右边和编辑json设置.修改后的设置如下:
如果在虚拟环境中运行,这是在VS Code中为Python开发的推荐方式,在命令面板类型中运行
选择python翻译
并指出你的环境匹配你的集群Python版本。例如,如果您的集群是Python 3.5,那么您的本地环境应该是Python 3.5。
从IDE运行示例
进口java.util.ArrayList;进口并不知道;进口java.sql.Date;进口org.apache.spark.sql.SparkSession;进口org.apache.spark.sql.types。*;进口org.apache.spark.sql.Row;进口org.apache.spark.sql.RowFactory;进口org.apache.spark.sql.Dataset;公共类应用程序{公共静态无效主要(字符串[]arg游戏)抛出异常{SparkSession火花=SparkSession.构建器().浏览器名称(“临时工演示”).配置(“spark.master”,“本地”).getOrCreate();//创建一个由高温和低温组成的Spark DataFrame//通过机场代码和日期。StructType模式=新StructType(新StructField[]{新StructField(“AirportCode”,数据类型.StringType,假,元数据.空()),新StructField(“日期”,数据类型.DateType,假,元数据.空()),新StructField(“TempHighF”,数据类型.IntegerType,假,元数据.空()),新StructField(“TempLowF”,数据类型.IntegerType,假,元数据.空()),});列表<行>dataList=新ArrayList<行>();dataList.添加(RowFactory.创建(“BLI”,日期.返回对象的值(“2021-04-03”),52,43));dataList.添加(RowFactory.创建(“BLI”,日期.返回对象的值(“2021-04-02”),50,38));dataList.添加(RowFactory.创建(“BLI”,日期.返回对象的值(“2021-04-01”),52,41));dataList.添加(RowFactory.创建(“PDX”,日期.返回对象的值(“2021-04-03”),64,45));dataList.添加(RowFactory.创建(“PDX”,日期.返回对象的值(“2021-04-02”),61,41));dataList.添加(RowFactory.创建(“PDX”,日期.返回对象的值(“2021-04-01”),66,39));dataList.添加(RowFactory.创建(“海”,日期.返回对象的值(“2021-04-03”),57,43));dataList.添加(RowFactory.创建(“海”,日期.返回对象的值(“2021-04-02”),54,39));dataList.添加(RowFactory.创建(“海”,日期.返回对象的值(“2021-04-01”),56,41));数据集<行>临时工=火花.createDataFrame(dataList,模式);//在Databricks集群上创建一个表,然后填充//包含DataFrame内容的表//如果表在之前运行时已经存在,//先删除它。火花.sql(“使用默认”);火花.sql(DROP TABLE IF EXISTS demo_temps_table);临时工.写().saveAsTable(“demo_temps_table”);//查询Databricks集群上的表,返回行//如果机场代码不是BLI,日期是晚些//比2021-04-01。将结果分组,按高排序//温度由高到低。数据集<行>df_temps=火花.sql(SELECT * FROM demo_temps_table+“WHERE AirportCode != 'BLI' AND Date > '2021-04-01'”+GROUP BY AirportCode, Date, TempHighF, TempLowF+“TempHighF DESC订单”);df_temps.显示();/ /结果://// +-----------+----------+---------+--------+// |AirportCode| Date|TempHighF|TempLowF| .使用实例// +-----------+----------+---------+--------+// | pdx |2021-04-03| 64| 45| .使用实例// | pdx |2021-04-02| 61| 41| .使用实例// | sea |2021-04-03| 57| 43|// | sea |2021-04-02| 54| 39|// +-----------+----------+---------+--------+//删除Databricks集群中的表火花.sql(DROP TABLE demo_temps_table);}}
从pyspark.sql进口SparkSession从pyspark.sql.types进口*从datetime进口日期火花=SparkSession.构建器.浏览器名称(“temps-demo”).getOrCreate()#创建一个由高温和低温组成的Spark数据帧#按机场代码和日期。模式=StructType([StructField(“AirportCode”,StringType(),假),StructField(“日期”,DateType(),假),StructField(“TempHighF”,IntegerType(),假),StructField(“TempLowF”,IntegerType(),假)])数据=[[“BLI”,日期(2021,4,3.),52,43),[“BLI”,日期(2021,4,2),50,38),[“BLI”,日期(2021,4,1),52,41),[“PDX”,日期(2021,4,3.),64,45),[“PDX”,日期(2021,4,2),61,41),[“PDX”,日期(2021,4,1),66,39),[“海”,日期(2021,4,3.),57,43),[“海”,日期(2021,4,2),54,39),[“海”,日期(2021,4,1),56,41]]临时工=火花.createDataFrame(数据,模式)在Databricks集群上创建一个表,然后填充# DataFrame的内容。#如果表在之前的运行中已经存在,#先删除它。火花.sql(使用默认的)火花.sql(删除表如果存在demo_temps_table)临时工.写.saveAsTable(“demo_temps_table”)#查询Databricks集群的表,返回行#,其中机场代码不是BLI,日期是稍后#比2021-04-01。将结果分组,按高排序#温度由高到低。df_temps=火花.sql(SELECT * FROM demo_temps_table\“WHERE AirportCode != 'BLI' AND Date > '2021-04-01'”\GROUP BY AirportCode, Date, TempHighF, TempLowF\“TempHighF DESC订单”)df_temps.显示()#结果:## +-----------+----------+---------+--------+# |AirportCode| Date|TempHighF|TempLowF|# +-----------+----------+---------+--------+# | pdx |2021-04-03| 64| 45|# | pdx |2021-04-02| 61| 41|# |海|2021-04-03| 57| 43|# |海|2021-04-02| 54| 39|# +-----------+----------+---------+--------+删除Databricks集群中的表火花.sql(DROP TABLE demo_temps_table)
进口org.apache.火花.sql.SparkSession进口org.apache.火花.sql.类型._进口org.apache.火花.sql.行进口java.sql.日期对象演示{def主要(arg游戏:数组[字符串]){瓦尔火花=SparkSession.构建器.主(“本地”).getOrCreate()//创建一个由高温和低温组成的Spark DataFrame//通过机场代码和日期。瓦尔模式=StructType(数组(StructField(“AirportCode”,StringType,假),StructField(“日期”,DateType,假),StructField(“TempHighF”,IntegerType,假),StructField(“TempLowF”,IntegerType,假)))瓦尔数据=列表(行(“BLI”,日期.返回对象的值(“2021-04-03”),52,43),行(“BLI”,日期.返回对象的值(“2021-04-02”),50,38),行(“BLI”,日期.返回对象的值(“2021-04-01”),52,41),行(“PDX”,日期.返回对象的值(“2021-04-03”),64,45),行(“PDX”,日期.返回对象的值(“2021-04-02”),61,41),行(“PDX”,日期.返回对象的值(“2021-04-01”),66,39),行(“海”,日期.返回对象的值(“2021-04-03”),57,43),行(“海”,日期.返回对象的值(“2021-04-02”),54,39),行(“海”,日期.返回对象的值(“2021-04-01”),56,41))瓦尔抽样=火花.sparkContext.makeRDD(数据)瓦尔临时工=火花.createDataFrame(抽样,模式)//在Databricks集群上创建一个表,然后填充//包含DataFrame内容的表//如果表在之前运行时已经存在,//先删除它。火花.sql(“使用默认”)火花.sql(DROP TABLE IF EXISTS demo_temps_table)临时工.写.saveAsTable(“demo_temps_table”)//查询Databricks集群上的表,返回行//如果机场代码不是BLI,日期是晚些//比2021-04-01。将结果分组,按高排序//温度由高到低。瓦尔df_temps=火花.sql(SELECT * FROM demo_temps_table+“WHERE AirportCode != 'BLI' AND Date > '2021-04-01'”+GROUP BY AirportCode, Date, TempHighF, TempLowF+“TempHighF DESC订单”)df_temps.显示()/ /结果://// +-----------+----------+---------+--------+// |AirportCode| Date|TempHighF|TempLowF| .使用实例// +-----------+----------+---------+--------+// | pdx |2021-04-03| 64| 45| .使用实例// | pdx |2021-04-02| 61| 41| .使用实例// | sea |2021-04-03| 57| 43|// | sea |2021-04-02| 54| 39|// +-----------+----------+---------+--------+//删除Databricks集群中的表火花.sql(DROP TABLE demo_temps_table)}}
使用依赖项
通常,主类或Python文件会有其他依赖jar和文件。您可以通过调用来添加此类依赖jar和文件sparkContext.addJar(“path-to-the-jar”)
orgydF4y2BasparkContext.addPyFile(文件路径)
.,还可以添加Egg文件和zip文件addPyFile ()
接口。每次在IDE中运行代码时,依赖jar和文件都会安装在集群上。
从自由进口喷火从pyspark.sql进口SparkSession火花=SparkSession.构建器.getOrCreate()sc=火花.sparkContext# sc.setLogLevel(“信息”)打印(“测试简单计数”)打印(火花.范围(One hundred.).数())打印("测试addPyFile隔离")sc.addPyFile(“lib.py”)打印(sc.并行化(范围(10)).地图(λ我:喷火(2)).收集())类喷火(对象):def__init__(自我,x):自我.x=x
Python + Java udf
从pyspark.sql进口SparkSession从pyspark.sql.column进口_to_java_column,_to_seq,列在这个例子中,udf.jar包含了编译好的Java / Scala udf:#包com.example##进口org.apache.spark.sql._#进口org.apache.spark.sql.expressions._#进口org.apache.spark.sql.functions.udf##对象测试{# val: UserDefinedFunction = udf((i: Long) => i + 1)#}火花=SparkSession.构建器\.配置(“spark.jars”,“/道路/ / udf.jar”)\.getOrCreate()sc=火花.sparkContextdefplus_one_udf(上校):f=sc._jvm.com.例子.测试.plusOne()返回列(f.应用(_to_seq(sc,[上校),_to_java_column)))sc._jsc.addJar(“/道路/ / udf.jar”)火花.范围(One hundred.).withColumn(“plusOne”,plus_one_udf(“id”)).显示()
包com.例子进口org.apache.火花.sql.SparkSession情况下类喷火(x:字符串)对象测试{def主要(arg游戏:数组[字符串):单位={瓦尔火花=SparkSession.构建器()....getOrCreate();火花.sparkContext.setLogLevel(“信息”)println(“正在运行简单的show查询…”)火花.读.格式(“铺”).负载(“/ tmp / x”).显示()println(“运行简单的UDF查询…”)火花.sparkContext.addJar(”。/目标/ scala - 2.11 / hello-world_2.11-1.0.jar”)火花.udf.注册(“f”,(x:Int)= >x+1)火花.范围(10).selectExpr(“f (id)”).显示()println("运行自定义对象查询…")瓦尔obj=火花.sparkContext.并行化(Seq(喷火(“再见”),喷火(“嗨”)))。收集()println(obj.toSeq)}}
访问DBUtils
你可以使用dbutils.fs
而且dbutils.secrets
公用事业砖公用事业模块。支持的命令包括dbutils.fs.cp
,dbutils.fs.head
,dbutils.fs.ls
,dbutils.fs.mkdirs
,dbutils.fs.mv
,dbutils.fs.put
,dbutils.fs.rm
,dbutils.secrets.get
,dbutils.secrets.getBytes
,dbutils.secrets.list
,dbutils.secrets.listScopes
.看到文件系统实用程序(dbutls .fs)或运行dbutils.fs.help ()
而且秘密实用程序(dbutils.secrets)或运行dbutils.secrets.help ()
.
从pyspark.sql进口SparkSession从pyspark.dbutils进口DBUtils火花=SparkSession.构建器.getOrCreate()dbutils=DBUtils(火花)打印(dbutils.fs.ls(“dbfs: /))打印(dbutils.秘密.listScopes())
当使用Databricks Runtime 7.3 LTS或以上版本时,要以本地和Databricks集群中都可以工作的方式访问DBUtils模块,请使用以下方法get_dbutils ()
:
defget_dbutils(火花):从pyspark.dbutils进口DBUtils返回DBUtils(火花)
否则,请使用以下方法get_dbutils ()
:
defget_dbutils(火花):如果火花.相依.得到(“spark.databricks.service.client.enabled”)= =“真正的”:从pyspark.dbutils进口DBUtils返回DBUtils(火花)其他的:进口IPython返回IPython.get_ipython().user_ns[“dbutils”]
瓦尔dbutils=com.砖.服务.DBUtilsprintln(dbutils.fs.ls(“dbfs: /))println(dbutils.秘密.listScopes())
访问Hadoop文件系统
您也可以直接使用标准Hadoop文件系统接口访问DBFS:
>进口org.apache.hadoop.fs._//获取新的DBFS连接>瓦尔dbfs=文件系统.得到(火花.sparkContext.hadoopConfiguration)dbfs:org.apache.hadoop.fs.文件系统=com.砖.后端.守护进程.数据.客户端.DBFS@二维036335//列出文件>dbfs.listStatus(新路径(“dbfs: /))res1:数组[org.apache.hadoop.fs.FileStatus]=数组(FileStatus{路径=dbfs:/$;isDirectory=真正的;…})//打开文件>瓦尔流=dbfs.开放(新路径(“dbfs: /道路/ / your_file”))流:org.apache.hadoop.fs.FSDataInputStream=org.apache.hadoop.fs.FSDataInputStream@7aa4ef24//获取文件内容为字符串>进口org.apache.下议院.io._>println(新字符串(IOUtils.toByteArray(流)))
设置Hadoop配置
方法在客户端上设置Hadoop配置spark.conf.set
API,应用于SQL和DataFrame操作。的Hadoop配置sparkContext
必须在集群配置或使用笔记本设置。这是因为设置了配置sparkContext
不绑定到用户会话,而是应用于整个集群。
故障排除
运行databricks-connect测验
检查连接问题。本节介绍可能遇到的一些常见问题以及解决方法。
Python版本不匹配
检查您在本地使用的Python版本至少与集群上的版本具有相同的次要版本(例如,3.5.1
与3.5.2
是好的,3.5
与3.6
不是)。
如果本地安装了多个Python版本,请通过设置Databricks Connect的PYSPARK_PYTHON
环境变量(例如,PYSPARK_PYTHON = python3
).
服务器未启用
确保集群中已启用Spark服务器spark.databricks.service.server.enabled真正的
.如果是,你应该在驱动日志中看到以下几行:
18/10/25 21:39:18 INFO SparkConfUtils$: Set spark config: spark. databicks .service.server.enabled -> true…18/10/25 21:39:21 INFO SparkContext: Loading SparkServiceRPCServer 18/10/25 21:39:21 INFO SparkServiceRPCServer: Starting SparkServiceRPCServer 18/10/25 21:39:21 INFO Server: jetty-9.3.20。v20170531 18/10/25 21:39:21 INFO AbstractConnector: Started ServerConnector@6a6c7f42 {HTTP/1.1,[HTTP/1.1]}{0.0.0.0:15001} 18/10/25 21:39:21 INFO Server: Started @5879ms
冲突的PySpark安装
的databricks-connect
包与PySpark冲突。在Python中初始化Spark上下文时会导致错误。这可以通过几种方式表现出来,包括“流损坏”或“类未找到”错误。如果在Python环境中安装了PySpark,请确保在安装databicks -connect之前已将其卸载。卸载PySpark后,请确保完全重新安装Databricks Connect包:
pip uninstall pyspark pip uninstall databicks -connect pip install -U“databricks-connect = = 9.1 *”。#或x.y *来匹配您的集群版本。
相互冲突的SPARK_HOME
如果您以前在计算机上使用过Spark,则您的IDE可能已配置为使用其他版本的Spark之一,而不是Databricks Connect Spark。这可以通过几种方式表现出来,包括“流损坏”或“类未找到”错误。属性的值可以查看正在使用哪个版本的SparkSPARK_HOME
环境变量:
系统.出.println(系统.采用(“SPARK_HOME”));
进口操作系统打印(操作系统.环境[“SPARK_HOME”])
println(sys.env.得到(“SPARK_HOME”))
冲突或缺失路径
二进制条目
有可能您的PATH已配置,以便命令像spark-shell
将运行之前安装的其他二进制文件,而不是Databricks Connect提供的二进制文件。这会导致databricks-connect测验
失败。您应该确保Databricks Connect二进制文件优先,或者删除之前安装的二进制文件。
如果你不能运行命令spark-shell
,也有可能您的PATH没有被自动设置皮普安装
您需要添加安装箱子
手动到您的PATH。即使没有设置,也可以使用Databricks Connect与ide连接。然而,databricks-connect测验
命令是行不通的。
集群上的序列化设置冲突
如果您在运行时看到“流损坏”错误databricks-connect测验
,这可能是由于不兼容的集群序列化配置。例如,设置spark.io.compression.codec
配置可能会导致此问题。要解决此问题,请考虑从集群设置中删除这些配置,或在Databricks Connect客户端中设置配置。
找不到winutils.exe
在Windows上
如果您在Windows上使用Databricks Connect,请参阅:
错误壳牌:失败的来定位的winutils二进制在的hadoop二进制路径java.io.IOException:可以不定位可执行的零\箱子\winutils.exe在的Hadoop二进制文件.
按照说明操作Windows下配置Hadoop路径.
限制
Databricks Connect不支持以下Databricks特性和第三方平台:bob体育客户端下载
统一目录.
结构化的流。
在远程集群上运行不属于Spark作业的任意代码。
原生Scala, Python和R api用于Delta表操作(例如,
DeltaTable.forPath
)不受支持。然而,SQL API (spark.sql(…)
Delta Lake操作和Spark API(例如,spark.read.load
)都支持。进入副本。
Apache飞艇0.7.x而且是low.
连接到集群表访问控制.
连接到启用了进程隔离的集群(换句话说,其中
spark.databricks.pyspark.enableProcessIsolation
设置为真正的
).δ
克隆
SQL命令。全局临时视图。
考拉.
创建表格表格作为选择...
SQL命令并不总是有效。相反,使用spark.sql(“选择……”).write.saveAsTable(“表”)
.