砖连接

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

Databricks Connect允许您连接您最喜欢的IDE (Eclipse, IntelliJ, PyCharm, RStudio, Visual Studio Code),笔记本服务器(Jupyter notebook, Zeppelin),和其他自定义应用程序到Databricks集群。

本文解释了Databricks Connect的工作原理,引导您完成开始使用Databricks Connect的步骤,解释了如何排除使用Databricks Connect时可能出现的问题,以及使用Databricks Connect运行与在Databricks笔记本上运行之间的区别。

概述

Databricks Connect是Databricks运行时的客户端库。它允许您使用Spark api编写作业,并在Databricks集群上远程运行它们,而不是在本地Spark会话中运行。

例如,执行DataFrame命令时spark.read.format(“铺”).load(…).groupBy(…).agg(…),告诉()使用Databricks Connect,作业的解析和规划在本地机器上运行。然后,任务的逻辑表示被发送到运行在Databricks中的Spark服务器,以便在集群中执行。

使用Databricks Connect,您可以:

  • 从任何Python、Java、Scala或R应用程序运行大规模Spark作业。任何地方进口pyspark进口org.apache.spark,或要求(SparkR),您现在可以直接从应用程序运行Spark作业,而不需要安装任何IDE插件或使用Spark提交脚本。

  • 即使在使用远程集群时,也可以在IDE中逐步检查和调试代码。

  • 在开发库时快速迭代。在Databricks Connect中更改Python或Java库依赖关系后,不需要重新启动集群,因为每个客户端会话在集群中是相互隔离的。

  • 在不丢失工作的情况下关闭空闲集群。由于客户机应用程序与集群分离,因此它不受集群重新启动或升级的影响,而集群重新启动或升级通常会导致丢失笔记本中定义的所有变量、rdd和DataFrame对象。

请注意

对于使用SQL查询的Python开发,Databricks建议您使用Databricks SQL连接器for Python而不是Databricks Connect。Databricks SQL Connector for Python比Databricks Connect更容易设置。此外,Databricks Connect解析和计划在本地机器上运行的作业,而作业在远程计算资源上运行。这使得调试运行时错误变得特别困难。Databricks SQL Connector for Python直接向远程计算资源提交SQL查询并获取结果。

需求

  • 仅支持以下Databricks Runtime版本:

    • Databricks Runtime 11.3 LTS ML

    • Databricks Runtime 10.4 LTS ML

    • Databricks Runtime 9.1 LTS ML

    • Databricks Runtime 7.3 LTS ML

  • 客户端Python安装的次要版本必须与Databricks集群的次要Python版本相同。下表显示了每个Databricks Runtime安装的Python版本。

    Databricks运行时版本号

    Python版本

    11.3 LTS ml, 11.3 LTS

    3.9

    10.4 LTS ml, 10.4 LTS

    3.8

    9.1 LTS ml, 9.1 LTS

    3.8

    7.3 LTS ml

    3.7

    例如,如果你在本地开发环境中使用Conda,而你的集群运行的是Python 3.7,你必须创建一个该版本的环境,例如:

    Conda create——name dbconnectpython3.7 conda
  • Databricks Connect主要和次要包版本必须始终与您的Databricks运行时版本匹配。Databricks建议您始终使用与您的Databricks运行时版本相匹配的Databricks Connect最新包。例如,当使用Databricks Runtime 7.3 LTS集群时,使用databricks-connect = = 7.3 . *包中。

    请注意

    看到Databricks Connect发布说明参阅可用的Databricks Connect版本和维护更新的列表。

  • Java Runtime Environment (JRE)客户端已使用OpenJDK 8 JRE进行测试。客户端不支持Java 11。

请注意

在Windows上,如果您看到Databricks Connect无法找到的错误winutils.exe,请参阅在Windows上找不到winutils.exe

设置客户端

请注意

在开始设置Databricks Connect客户端之前,您必须满足要求用于Databricks Connect。

步骤1:安装客户端

  1. 卸载PySpark。这是必需的,因为databricks-connect包与PySpark冲突。详细信息请参见冲突的PySpark安装

    PIP卸载pyspark
  2. 安装Databricks Connect客户端。

    pip install -U“databricks-connect = = 7.3 *”。#或x.y *来匹配您的集群版本。

    请注意

    总是指定databricks-connect = = X.Y. *而不是databricks-connect = X。Y,以确保安装了最新的软件包。

步骤2:配置连接属性

  1. 收集以下配置属性:

    • 工作空间的URL

    • 个人访问令牌

    • 创建的集群ID。集群ID可从URL中获取。这里是集群ID0304 - 201045 xxxxxxxx

      集群ID
    • Databricks Connect所连接的端口。设置为15001

  2. 配置连接。可以使用CLI、SQL或环境变量进行配置。配置方法的优先级从高到低依次为:SQL配置键、CLI和环境变量。

    • CLI

      1. 运行databricks-connect

        databricks-connect配置

        license显示如下:

        版权2018公司图书馆“软件”五月使用除了连接被许可方根据协议使用Databricks平台服务bob体育客户端下载...
      2. 接受许可证并提供配置值。为砖的主机而且砖的令牌,输入您在步骤1中注意到的工作区URL和个人访问令牌。

        你是否接受上述协议?[y/N] y设置新的配置值(保留输入为空接受默认值):databrick Host[无当前值,必须以https://开头]: databrick Token[无当前值]:集群ID(例如,0921-001415-jelly628)[无当前值]:< Cluster - ID >组织ID (Azure-only,见?o=orgId in URL) [0]: < Org - ID >端口[15001]:<端口>
    • SQL配置或环境变量。下表显示了与步骤1中注意到的配置属性对应的SQL配置键和环境变量。要设置SQL配置键,请使用sql(“集配置=值”).例如:sql(“集spark.databricks.service.clusterId = 0304 - 201045 - abcdefgh”)

      参数

      SQL配置键

      环境变量名称

      砖的主机

      spark.databricks.service.address

      DATABRICKS_ADDRESS

      砖的令牌

      spark.databricks.service.token

      DATABRICKS_API_TOKEN

      集群ID

      spark.databricks.service.clusterId

      DATABRICKS_CLUSTER_ID

      Org ID

      spark.databricks.service.orgId

      DATABRICKS_ORG_ID

      港口

      spark.databricks.service.port

      DATABRICKS_PORT

  3. 测试到Databricks的连通性。

    databricks-connect测验

    如果您配置的集群没有运行,测试将启动集群,该集群将一直运行到其配置的自动终止时间。输出应该是这样的:

    * PySpark安装在/…/3.5.6/lib/python3.5/site-packages/ PySpark *检查java版本java版本"1.8.0_152" java (TM) SE运行时环境(build 1.8.0_152-b16) java HotSpot(TM) 64位服务器虚拟机(build 25.152-b16,混合模式)*测试scala命令18/12/10 16:38:44警告NativeCodeLoader:无法为您的平台加载本机hadoop库…bob体育客户端下载使用Spark的默认log4j配置文件:org/apache/spark/log4j-defaults。properties设置默认日志级别为WARN。使用sc.setLogLevel(newLevel)调整日志级别。对于SparkR,使用setLogLevel(newLevel)。18/12/10 16:38:50 WARN MetricsSystem:使用默认名称SparkStatusTracker作为源,因为没有设置spark.metrics.namespace和spark.app.id。18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidprev state 18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms欢迎访问____ __/__ /__ ___ _____/ /___ \ \/ _\ /_/ /___/ .__/\_,_/_/ /_/\_版本2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit server VM, Java 1.8.0_152)输入表达式来计算它们。类型:帮助获取更多信息。scala > spark.range(100)。reduce(_ + _) Spark context Web UI可用在https://10.8.5.214:4040 Spark context可用为'sc' (master = local[*], app id = local-1544488730553)。Spark会话可用为“Spark”。 View job details at /?o=0#/setting/clusters//sparkUi View job details at ?o=0#/setting/clusters//sparkUi res0: Long = 4950 scala> :quit * Testing python command 18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state View job details at /?o=0#/setting/clusters//sparkUi

设置IDE或笔记本服务器

本节描述如何配置您的首选IDE或笔记本服务器以使用Databricks Connect客户端。

Jupyter笔记本

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。

Databricks Connect配置脚本自动将包添加到项目配置中。要在Python内核中开始,运行:

pyspark.sql进口SparkSession火花SparkSession构建器getOrCreate()

要启用%的sql运行和可视化SQL查询的简写,使用下面的代码片段:

IPython.core.magic进口line_magicline_cell_magic魔法magics_class@magics_classDatabricksConnectMagics魔法):@line_cell_magicdefsql自我细胞没有一个):如果细胞而且提高ValueError“单元格魔法行必须为空”试一试autovizwidget.widget.utils进口display_dataframe除了ImportError打印“请运行' pip install autovizwidget '以启用可视化小部件。”display_dataframeλxx返回display_dataframe自我get_spark()sql细胞orgydF4y2BatoPandas())defget_spark自我):user_nsget_ipython()user_ns如果“火花”user_ns返回user_ns“火花”其他的pyspark.sql进口SparkSessionuser_ns“火花”SparkSession构建器getOrCreate()返回user_ns“火花”知识产权get_ipython()知识产权register_magicsDatabricksConnectMagics

PyCharm

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。

Databricks Connect配置脚本自动将包添加到项目配置中。

Python 3集群

  1. 创建PyCharm项目时,选择现有的翻译.从下拉菜单中,选择您创建的Conda环境(参见需求).

    选择翻译
  2. 执行>编辑配置

  3. 添加PYSPARK_PYTHON = python3作为一个环境变量。

    Python 3集群配置

SparkR和RStudio Desktop

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。

  1. 下载并解包bob下载地址开源Spark到您的本地机器上。选择与Databricks集群中相同的版本(Hadoop 2.7)。

  2. 运行databricks-connectget-jar-dir.该命令返回类似于/usr/local/lib/python3.5/dist-packages / pyspark / jar.复制的文件路径上面一个目录JAR目录文件路径,例如:/usr/local/lib/python3.5/dist-packages / pyspark,即SPARK_HOME目录中。

  3. 配置Spark lib路径和Spark home路径,将它们添加到R脚本的顶部。集< spark-lib-path >到步骤1中解压缩开源Spark包的目录。bob下载地址集< spark-home-path >从步骤2到Databricks Connect目录。

    #指向OSS包的路径,例如/path/to/…/spark-2.4.0-bin-hadoop2.7图书馆SparkRlib.loc.libPathscfile.path' < spark-lib-path >”“R”“自由”),.libPaths())))#指向Databricks Connect PySpark安装,例如:/path/to/…/ PySparkSys.setenvSPARK_HOME“< spark-home-path >”
  4. 启动Spark会话并开始执行SparkR命令。

    sparkR.session()df<-作为。DataFrame忠实的dfdf1<-有斑纹的df函数xx},模式df))收集df1

sparklyr和RStudio Desktop

预览

此功能已在公共预览

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。

您可以复制使用Databricks Connect在本地开发的依赖于sparklyr的代码,并在Databricks笔记本电脑或Databricks工作区中的托管RStudio服务器中运行,只需进行很少或不进行代码更改。

需求

  • Sparklyr 1.2或以上版本。

  • Databricks Runtime 7.3或以上版本,并搭配Databricks Connect。

安装、配置和使用sparklyr

  1. 在RStudio Desktop中,从CRAN安装sparklyr 1.2或以上版本,或从GitHub安装最新的主版本。

    #从CRAN安装install.packages“sparklyr”#或从GitHub安装最新的主版本install.packages“devtools”devtools::install_github“sparklyr / sparklyr”
  2. 激活安装了Databricks Connect的Python环境,并在终端中运行以下命令以获取< spark-home-path >

    databricks-connect get-spark-home
  3. 启动Spark会话,执行sparklyr命令。

    图书馆sparklyrsc<-spark_connect方法“砖”spark_home“< spark-home-path >”iris_tbl<-copy_tosc虹膜覆盖真正的图书馆dplyrsrc_tblssciris_tbl% > %
  4. 关闭连接。

    spark_disconnectsc

资源

欲了解更多信息,请参阅sparklyr GitHub自述

有关代码示例,请参见sparklyr

sparklyr和RStudio桌面限制

以下特性不支持:

  • sparklyr流api

  • sparklyr ML api

  • 扫帚api

  • Csv_file序列化模式

  • 火花提交

IntelliJ (Scala或Java)

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。

  1. 运行databricks-connectget-jar-dir

  2. 将依赖项指向命令返回的目录。去>项目结构>模块>依赖>“+”符号> jar或目录

    IntelliJ罐子

    为了避免冲突,我们强烈建议从您的类路径中删除任何其他Spark安装。如果这是不可能的,请确保您添加的jar位于类路径的前面。特别是,它们必须在任何其他已安装的Spark版本之前(否则您将使用其中一个其他Spark版本并在本地运行或抛出一个错误ClassDefNotFoundError).

  3. 检查IntelliJ中中断选项的设置。默认为所有并且如果为调试设置断点,将导致网络超时。设置为线程避免停止后台网络线程。

    IntelliJ线程

Eclipse

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。

  1. 运行databricks-connectget-jar-dir

  2. 将外部jar配置指向命令返回的目录。去项目菜单>属性> Java构建路径>库>添加外部jar

    Eclipse外部JAR配置

    为了避免冲突,我们强烈建议从您的类路径中删除任何其他Spark安装。如果这是不可能的,请确保您添加的jar位于类路径的前面。特别是,它们必须在任何其他已安装的Spark版本之前(否则您将使用其中一个其他Spark版本并在本地运行或抛出一个错误ClassDefNotFoundError).

    Eclipse Spark配置

Visual Studio代码

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。

  1. 验证Python扩展安装。

  2. 打开命令面板(命令+ Shift + P在macOS和Ctrl + Shift + P在Windows / Linux)。

  3. 选择一个Python解释器。去Code > Preferences > Settings,并选择python的设置

  4. 运行databricks-connectget-jar-dir

  5. 将命令返回的目录添加到User Settings JSON下python.venvPath.这应该被添加到Python配置中。

  6. 关闭门栓。单击...在右边和编辑json设置.修改后的设置如下:

    VS代码配置
  7. 如果在虚拟环境中运行,这是在VS Code中为Python开发的推荐方式,在命令面板类型中运行选择python翻译并指出你的环境匹配你的集群Python版本。

    选择Python解释器

    例如,如果您的集群是Python 3.5,那么您的本地环境应该是Python 3.5。

    Python版本

SBT

请注意

Databricks建议您使用dbx由Databricks Labs进行本地开发而不是Databricks Connect。

在开始使用Databricks Connect之前,您必须满足要求而且设置客户端用于Databricks Connect。

要使用SBT,必须配置您的build.sbt文件链接到Databricks Connect jar,而不是通常的Spark库依赖。你用unmanagedBase指令在下面的示例构建文件中,它假设Scala应用程序有一个com.example.Test主要对象:

build.sbt

name:= "hello-world" version:= "1.0" scalaVersion:= "2.11.6" //这应该设置为' databricks-connect get-jar-dir ' ' unmanagedBase:= new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")返回的路径mainClass:= Some("com.example.Test")

从IDE运行示例

进口java.util.ArrayList进口并不知道进口java.sql.Date进口org.apache.spark.sql.SparkSession进口org.apache.spark.sql.types。*进口org.apache.spark.sql.Row进口org.apache.spark.sql.RowFactory进口org.apache.spark.sql.Dataset公共应用程序公共静态无效主要字符串[]arg游戏抛出异常SparkSession火花SparkSession构建器()浏览器名称“临时工演示”配置“spark.master”“本地”getOrCreate();//创建一个由高温和低温组成的Spark DataFrame//通过机场代码和日期。StructType模式StructTypeStructField[]StructField“AirportCode”数据类型StringType元数据()),StructField“日期”数据类型DateType元数据()),StructField“TempHighF”数据类型IntegerType元数据()),StructField“TempLowF”数据类型IntegerType元数据()),});列表<>dataListArrayList<>();dataList添加RowFactory创建“BLI”日期返回对象的值“2021-04-03”),5243));dataList添加RowFactory创建“BLI”日期返回对象的值“2021-04-02”),5038));dataList添加RowFactory创建“BLI”日期返回对象的值“2021-04-01”),5241));dataList添加RowFactory创建“PDX”日期返回对象的值“2021-04-03”),6445));dataList添加RowFactory创建“PDX”日期返回对象的值“2021-04-02”),6141));dataList添加RowFactory创建“PDX”日期返回对象的值“2021-04-01”),6639));dataList添加RowFactory创建“海”日期返回对象的值“2021-04-03”),5743));dataList添加RowFactory创建“海”日期返回对象的值“2021-04-02”),5439));dataList添加RowFactory创建“海”日期返回对象的值“2021-04-01”),5641));数据集<>临时工火花createDataFramedataList模式);//在Databricks集群上创建一个表,然后填充//包含DataFrame内容的表//如果表在之前运行时已经存在,//先删除它。火花sql“使用默认”);火花sqlDROP TABLE IF EXISTS demo_temps_table);临时工().saveAsTable“demo_temps_table”);//查询Databricks集群上的表,返回行//如果机场代码不是BLI,日期是晚些//比2021-04-01。将结果分组,按高排序//温度由高到低。数据集<>df_temps火花sqlSELECT * FROM demo_temps_table+“WHERE AirportCode != 'BLI' AND Date > '2021-04-01'”+GROUP BY AirportCode, Date, TempHighF, TempLowF+“TempHighF DESC订单”);df_temps显示();/ /结果://// +-----------+----------+---------+--------+// |AirportCode| Date|TempHighF|TempLowF| .使用实例// +-----------+----------+---------+--------+// | pdx |2021-04-03| 64| 45| .使用实例// | pdx |2021-04-02| 61| 41| .使用实例// | sea |2021-04-03| 57| 43|// | sea |2021-04-02| 54| 39|// +-----------+----------+---------+--------+//删除Databricks集群中的表火花sqlDROP TABLE demo_temps_table);
pyspark.sql进口SparkSessionpyspark.sql.types进口datetime进口日期火花SparkSession构建器浏览器名称“temps-demo”getOrCreate()#创建一个由高温和低温组成的Spark数据帧#按机场代码和日期。模式StructType([StructField“AirportCode”StringType(),),StructField“日期”DateType(),),StructField“TempHighF”IntegerType(),),StructField“TempLowF”IntegerType(),])数据“BLI”日期202143.),5243),“BLI”日期202142),5038),“BLI”日期202141),5241),“PDX”日期202143.),6445),“PDX”日期202142),6141),“PDX”日期202141),6639),“海”日期202143.),5743),“海”日期202142),5439),“海”日期202141),5641临时工火花createDataFrame数据模式在Databricks集群上创建一个表,然后填充# DataFrame的内容。#如果表在之前的运行中已经存在,#先删除它。火花sql使用默认的火花sql删除表如果存在demo_temps_table临时工saveAsTable“demo_temps_table”#查询Databricks集群的表,返回行#,其中机场代码不是BLI,日期是稍后#比2021-04-01。将结果分组,按高排序#温度由高到低。df_temps火花sqlSELECT * FROM demo_temps_table“WHERE AirportCode != 'BLI' AND Date > '2021-04-01'”GROUP BY AirportCode, Date, TempHighF, TempLowF“TempHighF DESC订单”df_temps显示()#结果:# +-----------+----------+---------+--------+# |AirportCode| Date|TempHighF|TempLowF|# +-----------+----------+---------+--------+# | pdx |2021-04-03| 64| 45|# | pdx |2021-04-02| 61| 41|# |海|2021-04-03| 57| 43|# |海|2021-04-02| 54| 39|# +-----------+----------+---------+--------+删除Databricks集群中的表火花sqlDROP TABLE demo_temps_table
进口orgapache火花sqlSparkSession进口orgapache火花sql类型_进口orgapache火花sql进口javasql日期对象演示def主要arg游戏数组字符串])瓦尔火花SparkSession构建器“本地”).getOrCreate()//创建一个由高温和低温组成的Spark DataFrame//通过机场代码和日期。瓦尔模式StructType数组StructField“AirportCode”StringType),StructField“日期”DateType),StructField“TempHighF”IntegerType),StructField“TempLowF”IntegerType))瓦尔数据列表“BLI”日期返回对象的值“2021-04-03”),5243),“BLI”日期返回对象的值“2021-04-02”),5038),“BLI”日期返回对象的值“2021-04-01”),5241),“PDX”日期返回对象的值“2021-04-03”),6445),“PDX”日期返回对象的值“2021-04-02”),6141),“PDX”日期返回对象的值“2021-04-01”),6639),“海”日期返回对象的值“2021-04-03”),5743),“海”日期返回对象的值“2021-04-02”),5439),“海”日期返回对象的值“2021-04-01”),5641瓦尔抽样火花sparkContextmakeRDD数据瓦尔临时工火花createDataFrame抽样模式//在Databricks集群上创建一个表,然后填充//包含DataFrame内容的表//如果表在之前运行时已经存在,//先删除它。火花sql“使用默认”火花sqlDROP TABLE IF EXISTS demo_temps_table临时工saveAsTable“demo_temps_table”//查询Databricks集群上的表,返回行//如果机场代码不是BLI,日期是晚些//比2021-04-01。将结果分组,按高排序//温度由高到低。瓦尔df_temps火花sqlSELECT * FROM demo_temps_table+“WHERE AirportCode != 'BLI' AND Date > '2021-04-01'”+GROUP BY AirportCode, Date, TempHighF, TempLowF+“TempHighF DESC订单”df_temps显示()/ /结果://// +-----------+----------+---------+--------+// |AirportCode| Date|TempHighF|TempLowF| .使用实例// +-----------+----------+---------+--------+// | pdx |2021-04-03| 64| 45| .使用实例// | pdx |2021-04-02| 61| 41| .使用实例// | sea |2021-04-03| 57| 43|// | sea |2021-04-02| 54| 39|// +-----------+----------+---------+--------+//删除Databricks集群中的表火花sqlDROP TABLE demo_temps_table

使用依赖项

通常,主类或Python文件会有其他依赖jar和文件。您可以通过调用来添加此类依赖jar和文件sparkContext.addJar(“path-to-the-jar”)orgydF4y2BasparkContext.addPyFile(文件路径).,还可以添加Egg文件和zip文件addPyFile ()接口。每次在IDE中运行代码时,依赖jar和文件都会安装在集群上。

自由进口喷火pyspark.sql进口SparkSession火花SparkSession构建器getOrCreate()sc火花sparkContext# sc.setLogLevel(“信息”)打印“测试简单计数”打印火花范围One hundred.())打印"测试addPyFile隔离"scaddPyFile“lib.py”打印sc并行化范围10))地图λ喷火2))收集())喷火对象):def__init__自我x):自我xx

Python + Java udf

pyspark.sql进口SparkSessionpyspark.sql.column进口_to_java_column_to_seq在这个例子中,udf.jar包含了编译好的Java / Scala udf:#包com.example#进口org.apache.spark.sql._#进口org.apache.spark.sql.expressions._#进口org.apache.spark.sql.functions.udf#对象测试{# val: UserDefinedFunction = udf((i: Long) => i + 1)#}火花SparkSession构建器配置“spark.jars”“/道路/ / udf.jar”getOrCreate()sc火花sparkContextdefplus_one_udf上校):fsc_jvmcom例子测试plusOne()返回f应用_to_seqsc上校),_to_java_column)))sc_jscaddJar“/道路/ / udf.jar”火花范围One hundred.withColumn“plusOne”plus_one_udf“id”))显示()
com例子进口orgapache火花sqlSparkSession情况下喷火x字符串对象测试def主要arg游戏数组字符串):单位瓦尔火花SparkSession构建器()...getOrCreate();火花sparkContextsetLogLevel“信息”println“正在运行简单的show查询…”火花格式“铺”).负载“/ tmp / x”).显示()println“运行简单的UDF查询…”火花sparkContextaddJar”。/目标/ scala - 2.11 / hello-world_2.11-1.0.jar”火花udf注册“f”xInt= >x+1火花范围10).selectExpr“f (id)”).显示()println"运行自定义对象查询…"瓦尔obj火花sparkContext并行化Seq喷火“再见”),喷火“嗨”)))。收集()printlnobjtoSeq

访问DBUtils

你可以使用dbutils.fs而且dbutils.secrets公用事业砖公用事业模块。支持的命令包括dbutils.fs.cpdbutils.fs.headdbutils.fs.lsdbutils.fs.mkdirsdbutils.fs.mvdbutils.fs.putdbutils.fs.rmdbutils.secrets.getdbutils.secrets.getBytesdbutils.secrets.listdbutils.secrets.listScopes.看到文件系统实用程序(dbutls .fs)或运行dbutils.fs.help ()而且秘密实用程序(dbutils.secrets)或运行dbutils.secrets.help ()

pyspark.sql进口SparkSessionpyspark.dbutils进口DBUtils火花SparkSession构建器getOrCreate()dbutilsDBUtils火花打印dbutilsfsls“dbfs: /))打印dbutils秘密listScopes())

当使用Databricks Runtime 7.3 LTS或以上版本时,要以本地和Databricks集群中都可以工作的方式访问DBUtils模块,请使用以下方法get_dbutils ()

defget_dbutils火花):pyspark.dbutils进口DBUtils返回DBUtils火花

否则,请使用以下方法get_dbutils ()

defget_dbutils火花):如果火花相依得到“spark.databricks.service.client.enabled”= =“真正的”pyspark.dbutils进口DBUtils返回DBUtils火花其他的进口IPython返回IPythonget_ipython()user_ns“dbutils”
瓦尔dbutilscom服务DBUtilsprintlndbutilsfsls“dbfs: /))printlndbutils秘密listScopes())

在本地和远程文件系统之间复制文件

你可以使用dbutils.fs在客户端和远程文件系统之间复制文件。计划文件:/指客户机上的本地文件系统。

pyspark.dbutils进口DBUtilsdbutilsDBUtils火花dbutilsfscp“文件:/ home / user / data.csv”“dbfs: /上传”dbutilsfscp“dbfs: / / results.csv输出”下载的文件:/ home / user / / '

可以通过这种方式传输的最大文件大小为250mb。

启用dbutils.secrets.get

由于安全限制,无法打电话dbutils.secrets.get默认禁用。联系Databricks支持人员为您的工作空间启用此功能。

访问Hadoop文件系统

您也可以直接使用标准Hadoop文件系统接口访问DBFS:

>进口orgapachehadoopfs_//获取新的DBFS连接>瓦尔dbfs文件系统得到火花sparkContexthadoopConfigurationdbfsorgapachehadoopfs文件系统com后端守护进程数据客户端DBFS@二维036335//列出文件>dbfslistStatus路径“dbfs: /))res1数组orgapachehadoopfsFileStatus数组FileStatus路径dbfs/isDirectory真正的…})//打开文件>瓦尔dbfs开放路径“dbfs: /道路/ / your_file”))orgapachehadoopfsFSDataInputStreamorgapachehadoopfsFSDataInputStream@7aa4ef24//获取文件内容为字符串>进口orgapache下议院io_>println字符串IOUtilstoByteArray)))

设置Hadoop配置

方法在客户端上设置Hadoop配置spark.conf.setAPI,应用于SQL和DataFrame操作。的Hadoop配置sparkContext必须在集群配置或使用笔记本设置。这是因为设置了配置sparkContext不绑定到用户会话,而是应用于整个集群。

故障排除

运行databricks-connect测验检查连接问题。本节介绍可能遇到的一些常见问题以及解决方法。

Python版本不匹配

检查您在本地使用的Python版本至少与集群上的版本具有相同的次要版本(例如,3.5.13.5.2是好的,3.53.6不是)。

如果本地安装了多个Python版本,请通过设置Databricks Connect的PYSPARK_PYTHON环境变量(例如,PYSPARK_PYTHON = python3).

服务器未启用

确保集群中已启用Spark服务器spark.databricks.service.server.enabled真正的.如果是,你应该在驱动日志中看到以下几行:

18/10/25 21:39:18 INFO SparkConfUtils$: Set spark config: spark. databicks .service.server.enabled -> true…18/10/25 21:39:21 INFO SparkContext: Loading SparkServiceRPCServer 18/10/25 21:39:21 INFO SparkServiceRPCServer: Starting SparkServiceRPCServer 18/10/25 21:39:21 INFO Server: jetty-9.3.20。v20170531 18/10/25 21:39:21 INFO AbstractConnector: Started ServerConnector@6a6c7f42 {HTTP/1.1,[HTTP/1.1]}{0.0.0.0:15001} 18/10/25 21:39:21 INFO Server: Started @5879ms

冲突的PySpark安装

databricks-connect包与PySpark冲突。在Python中初始化Spark上下文时会导致错误。这可以通过几种方式表现出来,包括“流损坏”或“类未找到”错误。如果在Python环境中安装了PySpark,请确保在安装databicks -connect之前已将其卸载。卸载PySpark后,请确保完全重新安装Databricks Connect包:

pip uninstall pyspark pip uninstall databicks -connect pip install -U“databricks-connect = = 9.1 *”。#或x.y *来匹配您的集群版本。

相互冲突的SPARK_HOME

如果您以前在计算机上使用过Spark,则您的IDE可能已配置为使用其他版本的Spark之一,而不是Databricks Connect Spark。这可以通过几种方式表现出来,包括“流损坏”或“类未找到”错误。属性的值可以查看正在使用哪个版本的SparkSPARK_HOME环境变量:

系统println系统采用“SPARK_HOME”));
进口操作系统打印操作系统环境“SPARK_HOME”])
printlnsysenv得到“SPARK_HOME”))

决议

如果SPARK_HOME被设置为与客户端版本不同的Spark版本时,您应该清除SPARK_HOME变量,再试一次。

检查您的IDE环境变量设置. bashrc. zshrc,或. bash_profile文件,以及其他任何可能设置环境变量的地方。您很可能不得不退出并重新启动IDE以清除旧状态,如果问题仍然存在,您甚至可能需要创建一个新项目。

你不需要设置SPARK_HOME到一个新的值;取消设置就足够了。

冲突或缺失路径二进制条目

有可能您的PATH已配置,以便命令像spark-shell将运行之前安装的其他二进制文件,而不是Databricks Connect提供的二进制文件。这会导致databricks-connect测验失败。您应该确保Databricks Connect二进制文件优先,或者删除之前安装的二进制文件。

如果你不能运行命令spark-shell,也有可能您的PATH没有被自动设置皮普安装您需要添加安装箱子手动到您的PATH。即使没有设置,也可以使用Databricks Connect与ide连接。然而,databricks-connect测验命令是行不通的。

集群上的序列化设置冲突

如果您在运行时看到“流损坏”错误databricks-connect测验,这可能是由于不兼容的集群序列化配置。例如,设置spark.io.compression.codec配置可能会导致此问题。要解决此问题,请考虑从集群设置中删除这些配置,或在Databricks Connect客户端中设置配置。

找不到winutils.exe在Windows上

如果您在Windows上使用Databricks Connect,请参阅:

错误壳牌失败的定位winutils二进制hadoop二进制路径javaioIOException可以定位可执行的箱子winutilsexeHadoop二进制文件

按照说明操作Windows下配置Hadoop路径

Windows上的文件名、目录名或卷标签语法错误

如果您在Windows上使用Databricks Connect,请参阅:

文件名目录的名字orgydF4y2Ba体积标签语法不正确的

Java或Databricks Connect安装到具有你道路上的空间.方法安装到不带空格的目录路径中,或使用简写形式

限制

Databricks Connect不支持以下Databricks特性和第三方平台:bob体育客户端下载

  • 结构化的流。

  • 在远程集群上运行不属于Spark作业的任意代码。

  • 原生Scala, Python和R api用于Delta表操作(例如,DeltaTable.forPath)不受支持。然而,SQL API (spark.sql(…)Delta Lake操作和Spark API(例如,spark.read.load)都支持。

  • 进入副本。

  • Apache飞艇0.7.x而且是low.

  • 连接到集群表访问控制

  • 连接到启用了进程隔离的集群(换句话说,其中spark.databricks.pyspark.enableProcessIsolation设置为真正的).

  • δ克隆SQL命令。

  • 全局临时视图。

  • 考拉

  • 创建表格表格作为选择...SQL命令并不总是有效。相反,使用spark.sql(“选择……”).write.saveAsTable(“表”)