砖连接

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

砖允许您连接到连接你最喜欢的IDE (IntelliJ, Eclipse, PyCharm Visual Studio代码),笔记本电脑服务器如飞艇,和其他自定义应用程序数据砖集群。

本文解释了砖连接是如何工作的,走你通过与砖连接的步骤开始,解释如何解决可能出现的问题在使用砖连接时,使用砖和不同运行连接和运行在一个砖笔记本。

概述

砖是砖的客户端库运行时连接。它允许您使用火花api编写工作和远程数据砖集群上运行它们,而不是在当地引发会话。

例如,当您运行DataFrame命令spark.read.format(“铺”).load (…) .groupBy (…) .agg(…),告诉()使用砖连接,解析和规划工作的运行在本地机器上。然后,工作的逻辑表示发送到火花集群中的服务器运行在砖执行。

砖连接,您可以:

  • 从任何Python运行大规模刺激就业,Java, Scala,或R应用程序。任何你可以进口pyspark,进口org.apache.spark,或要求(SparkR)现在,您可以运行火花工作直接从您的应用程序,而不需要安装任何IDE插件或使用火花提交脚本。

  • 单步调试和调试代码在IDE甚至在处理远程集群。

  • 快速迭代开发库。您不需要重新启动集群在砖连接改变Python和Java库依赖关系后,因为每个客户机会话集群中的相互隔离。

  • 关闭闲置集群没有失去工作。因为客户端应用程序是与集群脱钩,这是影响集群重启或升级,这通常会导致你失去所有的变量,抽样和DataFrame对象定义在一个笔记本上。

请注意

Python开发的SQL查询,砖建议您使用Python的砖SQL的连接器而不是砖连接。Python的砖SQL的连接器是比砖更容易建立连接。同时,砖连接解析和计划工作在本地机器上运行,运行在远程计算资源而工作。这可以让它尤其难以调试运行时错误。Python的砖SQL连接器直接提交SQL查询远程计算资源和获取结果。

需求

  • 只有以下砖运行时版本的支持:

    • 砖运行时10.4 LTS ML,砖LTS 10.4运行时

    • 砖运行时9.1 LTS ML,砖LTS 9.1运行时

    • 砖运行时7.3 LTS ML,砖LTS 7.3运行时

  • 小版本的Python安装客户端必须一样的小砖集群的Python版本。表显示了Python版本与每个砖安装运行时。

    砖的运行时版本的

    Python版本

    LTS 11.3 LTS ML, 11.3

    3.9

    LTS 10.4 LTS ML, 10.4

    3.8

    LTS 9.1 LTS ML, 9.1

    3.8

    LTS 7.3 LTS ML, 7.3

    3.7

    例如,如果您正在使用Conda本地开发环境和集群上运行Python 3.7中,您必须创建一个环境版本,例如:

    dbconnect conda创建——名称python=37 conda激活dbconnect
  • 砖连接主要和次要的包的版本必须匹配你的砖运行时版本。砖建议你总是使用最新的包砖连接相匹配你的砖的运行时版本。例如,当使用砖运行时7.3 LTS集群,使用databricks-connect = = 7.3 . *包中。

    请注意

    看到砖连接的发布说明可用数据砖连接的列表发布和维护更新。

  • Java运行时环境(JRE) 8。客户测试了OpenJDK 8 JRE。客户端不支持Java 11。

请注意

在Windows上,如果你看到一个错误,砖连接找不到winutils.exe,请参阅找不到winutils。exe在Windows上

设置客户端

请注意

在你开始之前建立砖连接的客户端,您必须符合要求的砖的连接。

步骤1:安装客户端

  1. 卸载PySpark。这是必需的,因为databricks-connect包与PySpark冲突。有关详细信息,请参见冲突PySpark安装

    皮普卸载pyspark
  2. 安装砖连接的客户端。

    pip安装- u“databricks-connect = = 7.3 *”。#或X.Y.*来match your cluster version.

    请注意

    总是指定databricks-connect = = X.Y. *而不是databricks-connect = X.Y,确保最新的安装包。

步骤2:配置连接属性

  1. 收集以下配置属性:

    • 工作空间的URL

    • 个人访问令牌

    • 集群创建的ID。你可以从URL获取集群ID。这里的集群ID0304 - 201045 xxxxxxxx

      集群ID
    • 砖连接的连接的端口。设置为15001年

  2. 配置连接。您可以使用CLI、SQL配置或环境变量。从最高到最低配置方法的优先级是:SQL配置钥匙,CLI和环境变量。

    • CLI

      1. 运行databricks-connect

        databricks-connect配置

        许可证将显示:

        版权(2018年),公司图书馆(“软件”)可能使用除了连接被许可方的砖的使用平台服务依照达成协议bob体育客户端下载
      2. 接受许可和供应配置值。为砖的主机砖的令牌,输入工作区URL和个人访问令牌您在步骤1中指出。

        你接受以上协议吗?[y / N] y设置新的配置值(离开输入空接受默认):砖主机(目前没有价值,必须从https://]: < databricks-url >砖令牌(没有当前值):< databricks-token >集群ID(例如,0921 - 001415 jelly628)(没有当前值):< cluster-id > Org ID (Azure-only,看到了吗? o = orgId URL) [0]: < org-id >端口[15001]:<口>
    • SQL配置或环境变量。下面的表显示了SQL配置键和对应的环境变量配置属性您在步骤1中指出。设置SQL配置键,使用sql(“集配置=值”)。例如:sql(“集spark.databricks.service.clusterId = 0304 - 201045 - abcdefgh”)

      参数

      SQL配置关键

      环境变量名称

      砖的主机

      spark.databricks.service.address

      DATABRICKS_ADDRESS

      砖的令牌

      spark.databricks.service.token

      DATABRICKS_API_TOKEN

      集群ID

      spark.databricks.service.clusterId

      DATABRICKS_CLUSTER_ID

      Org ID

      spark.databricks.service.orgId

      DATABRICKS_ORG_ID

      港口

      spark.databricks.service.port

      DATABRICKS_PORT

  3. 测试连接数据砖。

    databricks-connect测试

    如果您配置的集群没有运行,测试配置的集群将继续运行,直到它开始autotermination时间。输出应该类似:

    * PySpark是安装在/…/ 3.5.6 / lib / python3.5 /网站/ PySpark *检查java版本的java版本“1.8.0_152”java (TM)(构建1.8.0_152-b16) java SE运行时环境热点(TM) 64位服务器虚拟机(构建25.152 b16转椅,混合模式)*测试scala命令18/12/10 16:38:44 NativeCodeLoader警告:无法加载native-hadoop库为您的平台……bob体育客户端下载使用builtin-java类,适用的使用引发的违约log4j配置文件:org/apache/spark/log4j-defaults。属性默认日志级别设置为“警告”。调整日志级别使用sc.setLogLevel(中的)。对于SparkR,使用setLogLevel(中的)。18/12/10 16:38:50警告MetricsSystem:使用默认名称SparkStatusTracker因为无论是spark.metrics.namespace还是spark.app来源。我d is set. 18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state 18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://10.8.5.214:4040 Spark context available as 'sc' (master = local[*], app id = local-1544488730553). Spark session available as 'spark'. View job details at /?o=0#/setting/clusters//sparkUi View job details at ?o=0#/setting/clusters//sparkUi res0: Long = 4950 scala> :quit * Testing python command 18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state View job details at /?o=0#/setting/clusters//sparkUi

设置您的IDE或笔记本电脑服务器

部分描述了如何配置您的首选IDE或笔记本电脑服务器使用砖连接的客户端。

Jupyter笔记本

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

在你开始使用砖连接之前,必须符合要求的设置客户端砖的连接。

砖连接配置脚本自动添加包到您的项目配置。开始一个Python内核中运行:

pyspark.sql进口SparkSession火花=SparkSession构建器getOrCreate()

要启用%的sql简称跑步和可视化的SQL查询,使用以下代码片段:

IPython.core.magic进口line_magic,line_cell_magic,魔法,magics_class@magics_classDatabricksConnectMagics(魔法):@line_cell_magicdefsql(自我,,细胞=没有一个):如果细胞:提高ValueError(细胞魔法”“线必须是空的,)试一试:autovizwidget.widget.utils进口display_dataframe除了ImportError:打印(“请运行“pip安装autovizwidget”启用可视化部件。”)display_dataframe=λx:x返回display_dataframe(自我get_spark()sql(细胞orgydF4y2Ba)toPandas())defget_spark(自我):user_ns=get_ipython()user_ns如果“火花”user_ns:返回user_ns(“火花”]其他的:pyspark.sql进口SparkSessionuser_ns(“火花”]=SparkSession构建器getOrCreate()返回user_ns(“火花”]知识产权=get_ipython()知识产权register_magics(DatabricksConnectMagics)

PyCharm

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

在你开始使用砖连接之前,必须符合要求的设置客户端砖的连接。

砖连接配置脚本自动添加包到您的项目配置。

Python 3集群

  1. 当你创建一个PyCharm项目,选择现有的翻译。从下拉菜单中,选择您创建(参见Conda环境需求)。

    选择翻译
  2. Run >编辑配置

  3. 添加PYSPARK_PYTHON = python3作为一个环境变量。

    Python 3集群配置

SparkR和RStudio桌面

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

在你开始使用砖连接之前,必须符合要求的设置客户端砖的连接。

  1. 下载并解压bob下载地址开源的火花到您的本地机器上。选择相同的版本在你的砖集群(Hadoop 2.7)。

  2. 运行databricks-connectget-jar-dir。这个命令返回一个路径/usr/local/lib/python3.5/dist-packages / pyspark / jar。复制的文件路径上面一个目录例如,JAR目录文件路径/usr/local/lib/python3.5/dist-packages / pyspark,这是SPARK_HOME目录中。

  3. 配置火花自由路径和火花家里通过将它们添加到你的R脚本。集< spark-lib-path >你打开的目录开源火花包在步骤1中。bob下载地址集< spark-home-path >从步骤2砖连接目录。

    #指向OSS包路径,例如,/ / /…/ spark-2.4.0-bin-hadoop2.7道路图书馆(SparkR,lib.loc=.libPaths(c(file.path(“< spark-lib-path >”,“R”,“自由”),.libPaths())))#点砖PySpark连接安装,例如,/ / /…/ PySpark路径Sys.setenv(SPARK_HOME=“< spark-home-path >”)
  4. 发起一个火花会话并开始运行SparkR命令。

    sparkR.session()df< -as.DataFrame(忠实的)(df)df1< -有斑纹的(df,函数(x){x},模式(df))收集(df1)

sparklyr和RStudio桌面

预览

这个特性是在公共预览

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

在你开始使用砖连接之前,必须符合要求的设置客户端砖的连接。

你可以复制sparklyr-dependent代码开发本地使用砖连接并运行它在你的砖砖笔记本或RStudio托管服务器的工作区以最小的不需要修改代码。

需求

  • sparklyr 1.2或以上。

  • 与匹配砖砖运行时的7.3或以上连接。

安装、配置和使用sparklyr

  1. 在RStudio桌面,从凹口安装sparklyr 1.2或以上或从GitHub安装最新的主版本。

    #安装从凹口install.packages(“sparklyr”)从GitHub或安装最新的主版本号install.packages(“devtools”)devtools::install_github(“sparklyr / sparklyr”)
  2. 激活Python环境砖连接安装和运行以下命令在终端得到< spark-home-path >:

    databricks-connect get-spark-home
  3. 发起一个火花会话并开始运行sparklyr命令。

    图书馆(sparklyr)sc< -spark_connect(方法=“砖”,spark_home=“< spark-home-path >”)iris_tbl< -copy_to(sc,虹膜,覆盖=真正的)图书馆(dplyr)src_tbls(sc)iris_tbl% > %
  4. 关闭连接。

    spark_disconnect(sc)

资源

有关更多信息,请参见sparklyr GitHub自述

代码示例,请参阅sparklyr

sparklyr和RStudio桌面的局限性

不支持以下特性:

  • sparklyr流api

  • sparklyr毫升api

  • 扫帚api

  • csv_file序列化方式

  • 火花提交

IntelliJ (Scala或Java)

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

在你开始使用砖连接之前,必须符合要求的设置客户端砖的连接。

  1. 运行databricks-connectget-jar-dir

  2. 的依赖关系指向的目录返回的命令。去文件> >项目结构>模块依赖项>“+”符号> jar或目录

    IntelliJ罐子

    为了避免冲突,我们强烈建议删除任何其他火花从类路径中安装。如果这是不可能的,确保jar添加在前面的类路径中。特别是,他们必须提前安装其他版本的火花(否则你会使用其中一个其他火花版本和本地运行或扔一个ClassDefNotFoundError)。

  3. 检查在IntelliJ突破的设置选项。默认值是所有并将导致网络超时如果你设置断点调试。将其设置为线程为了避免停止网络后台线程。

    IntelliJ线程

Eclipse

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

在你开始使用砖连接之前,必须符合要求的设置客户端砖的连接。

  1. 运行databricks-connectget-jar-dir

  2. 点的外部jar配置目录返回的命令。去图书馆项目菜单>属性> Java构建路径> >添加外部jar

    Eclipse外部JAR配置

    为了避免冲突,我们强烈建议删除任何其他火花从类路径中安装。如果这是不可能的,确保jar添加在前面的类路径中。特别是,他们必须提前安装其他版本的火花(否则你会使用其中一个其他火花版本和本地运行或扔一个ClassDefNotFoundError)。

    Eclipse火花配置

Visual Studio代码

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

在你开始使用砖连接之前,必须符合要求的设置客户端砖的连接。

  1. 验证Python扩展安装。

  2. 打开命令面板(命令+ Shift + P在macOS和Ctrl + Shift + P在Windows / Linux)。

  3. 选择一个Python解释器。去代码> Preferences >设置,并选择python的设置

  4. 运行databricks-connectget-jar-dir

  5. 从命令返回的目录添加到用户设置JSONpython.venvPath。这应该被添加到Python配置。

  6. 禁用短绒。单击在右边编辑json设置。修改后的设置如下:

    VS代码配置
  7. 如果与一个虚拟环境中运行,这是推荐的方式为Python开发在VS代码中,在命令面板类型选择python翻译并指向您的环境匹配集群的Python版本。

    选择Python解释器

    例如,如果您的集群是Python 3.5,你的当地环境应该是Python 3.5。

    Python版本

SBT

请注意

砖建议你使用dbx或者是砖扩展Visual Studio代码为当地的发展而不是砖连接。

在你开始使用砖连接之前,必须符合要求的设置客户端砖的连接。

使用SBT,您必须配置您的build.sbt文件链接对砖的连接罐而不是通常的火花库依赖关系。你这样做的unmanagedBase下面的示例构建文件指令,假设一个Scala应用的com.example.Test主要对象:

build.sbt

名称:= " hello world "版本:= " 1.0 " scalaVersion: = " 2.11.6 " / /这应该被设置为“返回的路径databricks-connect get-jar-dir“unmanagedBase: = new java.io.File (“/ usr /地方/ lib / python2.7 / dist-packages pyspark / jars”) mainClass: =一些(“com.example.Test”)

从IDE运行示例

进口java.util.ArrayList;进口并不知道;进口java.sql.Date;进口org.apache.spark.sql.SparkSession;进口org.apache.spark.sql.types。*;进口org.apache.spark.sql.Row;进口org.apache.spark.sql.RowFactory;进口org.apache.spark.sql.Dataset;公共应用程序{公共静态无效主要(字符串[]arg游戏)抛出异常{SparkSession火花=SparkSession构建器()浏览器名称(“临时工演示”)配置(“spark.master”,“本地”)getOrCreate();/ /创建一个火花DataFrame组成的高和低的温度/ /通过机场代码和日期。StructType模式=StructType(StructField[]{StructField(“AirportCode”,数据类型StringType,,元数据()),StructField(“日期”,数据类型DateType,,元数据()),StructField(“TempHighF”,数据类型IntegerType,,元数据()),StructField(“TempLowF”,数据类型IntegerType,,元数据()),});列表<>dataList=ArrayList<>();dataList添加(RowFactory创建(“BLI”,日期返回对象的值(“2021-04-03”),52,43));dataList添加(RowFactory创建(“BLI”,日期返回对象的值(“2021-04-02”),50,38));dataList添加(RowFactory创建(“BLI”,日期返回对象的值(“2021-04-01”),52,41));dataList添加(RowFactory创建(“PDX”,日期返回对象的值(“2021-04-03”),64年,45));dataList添加(RowFactory创建(“PDX”,日期返回对象的值(“2021-04-02”),61年,41));dataList添加(RowFactory创建(“PDX”,日期返回对象的值(“2021-04-01”),66年,39));dataList添加(RowFactory创建(“海”,日期返回对象的值(“2021-04-03”),57,43));dataList添加(RowFactory创建(“海”,日期返回对象的值(“2021-04-02”),54,39));dataList添加(RowFactory创建(“海”,日期返回对象的值(“2021-04-01”),56,41));数据集<>临时工=火花createDataFrame(dataList,模式);/ /创建一个表的数据砖集群,然后填满/ /表DataFrame的内容。/ /从先前的运行,如果表已经存在/ /先删除它。火花sql(“使用默认”);火花sql(“如果存在demo_temps_table”删除表);临时工()。saveAsTable(“demo_temps_table”);/ /查询砖集群上的表,返回的行/ /在机场代码不是BLI和日期晚/ /比2021-04-01。组织和秩序的结果高/ /温度按照降序排列。数据集<>df_temps=火花sql(“从demo_temps_table SELECT *”+“AirportCode ! = BLI和日期>‘2021-04-01’”+“GROUP BY AirportCode,日期、TempHighF TempLowF”+“TempHighF DESC秩序”);df_temps显示();/ /结果:/ // / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | AirportCode | |日期TempHighF | TempLowF |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | PDX | 64 | 2021-04-03 | |/ / | PDX | 61 | 2021-04-02 | 41 |/ /海洋57 43 | | | 2021-04-03 | |54海/ / | | 2021-04-02 | | |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ /清理被删除的表数据砖集群。火花sql(“DROP TABLE demo_temps_table”);}}
pyspark.sql进口SparkSessionpyspark.sql.types进口*datetime进口日期火花=SparkSession构建器浏览器名称(“temps-demo”)getOrCreate()#创建一个火花DataFrame组成的高和低的温度#机场代码和日期。模式=StructType([StructField(“AirportCode”,StringType(),),StructField(“日期”,DateType(),),StructField(“TempHighF”,IntegerType(),),StructField(“TempLowF”,IntegerType(),)])数据=((“BLI”,日期(2021年,4,3),52,43),(“BLI”,日期(2021年,4,2),50,38),(“BLI”,日期(2021年,4,1),52,41),(“PDX”,日期(2021年,4,3),64年,45),(“PDX”,日期(2021年,4,2),61年,41),(“PDX”,日期(2021年,4,1),66年,39),(“海”,日期(2021年,4,3),57,43),(“海”,日期(2021年,4,2),54,39),(“海”,日期(2021年,4,1),56,41]]临时工=火花createDataFrame(数据,模式)#砖集群的创建一个表,然后填满# DataFrame的表的内容。#从先前的运行,如果表已经存在#删除它。火花sql(使用默认的)火花sql(“删除表如果存在demo_temps_table”)临时工saveAsTable(“demo_temps_table”)#查询砖集群上的表,返回的行#在机场代码不是BLI和日期晚比2021-04-01 #。组织和秩序的结果高#温度按照降序排列。df_temps=火花sql(“从demo_temps_table SELECT *”\“AirportCode ! = BLI和日期>‘2021-04-01’”\“GROUP BY AirportCode,日期、TempHighF TempLowF”\“TempHighF DESC秩序”)df_temps显示()#结果:## + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +# | AirportCode | |日期TempHighF | TempLowF |# + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +45 # | PDX | 64 | 2021-04-03 | |# | PDX | 61 | 2021-04-02 | 41 |43 57 #海| | 2021-04-03 | | |54 #海| | 2021-04-02 | | |# + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +#清理被删除的表数据砖集群。火花sql(“DROP TABLE demo_temps_table”)
进口orgapache火花sqlSparkSession进口orgapache火花sql类型_进口orgapache火花sql进口javasql日期对象演示{def主要(arg游戏:数组(字符串]){瓦尔火花=SparkSession构建器(“本地”)。getOrCreate()/ /创建一个火花DataFrame组成的高和低的温度/ /通过机场代码和日期。瓦尔模式=StructType(数组(StructField(“AirportCode”,StringType,),StructField(“日期”,DateType,),StructField(“TempHighF”,IntegerType,),StructField(“TempLowF”,IntegerType,)))瓦尔数据=列表((“BLI”,日期返回对象的值(“2021-04-03”),52,43),(“BLI”,日期返回对象的值(“2021-04-02”),50,38),(“BLI”,日期返回对象的值(“2021-04-01”),52,41),(“PDX”,日期返回对象的值(“2021-04-03”),64年,45),(“PDX”,日期返回对象的值(“2021-04-02”),61年,41),(“PDX”,日期返回对象的值(“2021-04-01”),66年,39),(“海”,日期返回对象的值(“2021-04-03”),57,43),(“海”,日期返回对象的值(“2021-04-02”),54,39),(“海”,日期返回对象的值(“2021-04-01”),56,41))瓦尔抽样=火花sparkContextmakeRDD(数据)瓦尔临时工=火花createDataFrame(抽样,模式)/ /创建一个表的数据砖集群,然后填满/ /表DataFrame的内容。/ /从先前的运行,如果表已经存在/ /先删除它。火花sql(“使用默认”)火花sql(“如果存在demo_temps_table”删除表)临时工saveAsTable(“demo_temps_table”)/ /查询砖集群上的表,返回的行/ /在机场代码不是BLI和日期晚/ /比2021-04-01。组织和秩序的结果高/ /温度按照降序排列。瓦尔df_temps=火花sql(“从demo_temps_table SELECT *”+“AirportCode ! = BLI和日期>‘2021-04-01’”+“GROUP BY AirportCode,日期、TempHighF TempLowF”+“TempHighF DESC秩序”)df_temps显示()/ /结果:/ // / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | AirportCode | |日期TempHighF | TempLowF |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ / | PDX | 64 | 2021-04-03 | |/ / | PDX | 61 | 2021-04-02 | 41 |/ /海洋57 43 | | | 2021-04-03 | |54海/ / | | 2021-04-02 | | |/ / + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +/ /清理被删除的表数据砖集群。火花sql(“DROP TABLE demo_temps_table”)}}

使用依赖关系

通常你的主类或Python文件将有其他依赖jar文件和文件。您可以添加这样的依赖jar文件和文件通过调用sparkContext.addJar (“path-to-the-jar”)orgydF4y2BasparkContext.addPyFile(文件路径)。你也可以加入鸡蛋和zip文件的文件addPyFile ()接口。每次你在IDE运行代码,依赖jar文件和文件在集群上安装。

自由进口喷火pyspark.sql进口SparkSession火花=SparkSession构建器getOrCreate()sc=火花sparkContext# sc.setLogLevel(“信息”)打印(“测试简单的计数”)打印(火花范围(One hundred.)())打印(“测试addPyFile隔离”)scaddPyFile(“lib.py”)打印(sc并行化(范围(10))地图(λ:喷火(2))收集())喷火(对象):def__init__(自我,x):自我x=x

Python + Java udf

pyspark.sql进口SparkSessionpyspark.sql.column进口_to_java_column,_to_seq,# #在这个例子中,udf。jar包含编译后的Java / Scala udf的:#包com.example##进口org.apache.spark.sql._#进口org.apache.spark.sql.expressions._#进口org.apache.spark.sql.functions.udf##{对象测试# val plusOne: UserDefinedFunction = udf((我:长)= > i + 1)#}火花=SparkSession构建器\配置(“spark.jars”,“/道路/ / udf.jar”)\getOrCreate()sc=火花sparkContextdefplus_one_udf(上校):f=sc_jvmcom例子测试plusOne()返回(f应用(_to_seq(sc,(上校),_to_java_column)))sc_jscaddJar(“/道路/ / udf.jar”)火花范围(One hundred.)withColumn(“plusOne”,plus_one_udf(“id”))显示()
com例子进口orgapache火花sqlSparkSession情况下喷火(x:字符串)对象测试{def主要(arg游戏:数组(字符串):单位={瓦尔火花=SparkSession构建器()getOrCreate();火花sparkContextsetLogLevel(“信息”)println(“运行简单的显示查询……”)火花格式(“铺”)。负载(“/ tmp / x”)。显示()println(“运行简单的UDF查询……”)火花sparkContextaddJar(”。/目标/ scala - 2.11 / hello-world_2.11-1.0.jar”)火花udf注册(“f”,(x:Int)= >x+1)火花范围(10)。selectExpr(“f (id)”)。显示()println(“运行定制对象查询……”)瓦尔obj=火花sparkContext并行化(Seq(喷火(“再见”),喷火(“嗨”)))。收集()println(objtoSeq)}}

访问DBUtils

您可以使用dbutils.fsdbutils.secrets公用事业的砖公用事业模块。支持命令dbutils.fs.cp,dbutils.fs.head,dbutils.fs.ls,dbutils.fs.mkdirs,dbutils.fs.mv,dbutils.fs.put,dbutils.fs.rm,dbutils.secrets.get,dbutils.secrets.getBytes,dbutils.secrets.list,dbutils.secrets.listScopes。看到文件系统实用程序(dbutils.fs)或运行dbutils.fs.help ()秘密效用(dbutils.secrets)或运行dbutils.secrets.help ()

pyspark.sql进口SparkSessionpyspark.dbutils进口DBUtils火花=SparkSession构建器getOrCreate()dbutils=DBUtils(火花)打印(dbutilsfsls(“dbfs: /))打印(dbutils秘密listScopes())

使用砖运行时的7.3 LTS以上时,访问DBUtils模块的方式在当地或砖集群的工作,使用以下get_dbutils ():

defget_dbutils(火花):pyspark.dbutils进口DBUtils返回DBUtils(火花)

否则,使用以下get_dbutils ():

defget_dbutils(火花):如果火花相依得到(“spark.databricks.service.client.enabled”)= =“真正的”:pyspark.dbutils进口DBUtils返回DBUtils(火花)其他的:进口IPython返回IPythonget_ipython()user_ns(“dbutils”]
瓦尔dbutils=com服务DBUtilsprintln(dbutilsfsls(“dbfs: /))println(dbutils秘密listScopes())

本地和远程文件系统之间复制文件

您可以使用dbutils.fs您的客户端和远程文件系统之间复制文件。计划文件:/指的是在客户端本地文件系统。

pyspark.dbutils进口DBUtilsdbutils=DBUtils(火花)dbutilsfscp(“文件:/ home / user / data.csv”,“dbfs: /上传”)dbutilsfscp(“dbfs: / / results.csv输出”,下载的文件:/ home / user / / ')

最大文件大小,可以转移方式是250 MB。

启用dbutils.secrets.get

由于安全限制,调用的能力dbutils.secrets.get默认情况下是禁用的。接触砖支持您的工作区来启用这个特性。

访问Hadoop文件系统

您也可以直接访问DBFS使用标准的Hadoop文件系统接口:

>进口orgapachehadoopfs_/ /得到新的DBFS连接>瓦尔dbfs=文件系统得到(火花sparkContexthadoopConfiguration)dbfs:orgapachehadoopfs文件系统=com后端守护进程数据客户端DBFS@二维036335年/ /列表文件>dbfslistStatus(路径(“dbfs: /))res1:数组(orgapachehadoopfsFileStatus]=数组(FileStatus{路径=dbfs:/美元;isDirectory=真正的;…})/ /打开文件>瓦尔=dbfs开放(路径(“dbfs: /道路/ / your_file”)):orgapachehadoopfsFSDataInputStream=orgapachehadoopfsFSDataInputStream@7aa4ef24/ /获取文件内容为字符串>进口orgapache下议院io_>println(字符串(IOUtilstoByteArray()))

Hadoop设置配置

在客户端可以设置使用Hadoop的配置spark.conf.setAPI,它适用于SQL和DataFrame操作。Hadoop配置设置sparkContext必须在集群配置中设置或使用一个笔记本。这是因为配置设置sparkContext不与用户会话,但适用于整个集群。

故障排除

运行databricks-connect测试检查连接问题。本节介绍您可能遇到的一些常见问题,以及如何解决它们。

Python版本不匹配

检查您使用的Python版本当地至少有相同的小版本版本在集群上(例如,3.5.13.5.2是好的,3.53.6不是)。

如果你有多个Python版本安装在本地,确保砖使用正确的连接是通过设置PYSPARK_PYTHON环境变量(例如,PYSPARK_PYTHON = python3)。

服务器未启用

确保集群火花服务器启用了spark.databricks.service.server.enabled真正的。您应该看到下面的线在司机日志如果它是:

18/10/25 21:39:18信息SparkConfUtils美元:设置火花配置:spark.databricks.service.server。启用- >真实……18/10/25 21:39:21信息SparkContext:加载火花服务RPC服务器18/10/25 21:39:21信息SparkServiceRPCServer:火花服务RPC服务器开始18/10/25 21:39:21信息服务器:jetty-9.3.20。v20170531 18/10/25 21:39:21信息AbstractConnector:开始ServerConnector@6a6c7f42 {HTTP / 1.1, (HTTP / 1.1)}{0.0.0.0:15001} 18/10/25 21:39:21信息服务器:@5879ms开始

冲突PySpark安装

databricks-connect包与PySpark冲突。安装两个初始化时将导致错误引发上下文在Python中。这可以体现在几个方面,包括“流破坏”或“找不到”的错误。如果你有PySpark安装到您的Python环境,确保安装databricks-connect之前卸载。卸载PySpark之后,一定要完全重新安装砖连接的包:

pip卸载pyspark pip卸载databricks-connect pip安装- u“databricks-connect = = 9.1 *”。#或X.Y.*来match your cluster version.

相互冲突的SPARK_HOME

如果您以前使用过火花机,IDE可以配置为使用一个其他版本的火花而不是砖连接的火花。这可以体现在几个方面,包括“流破坏”或“找不到”的错误。你可以看到哪个版本的火花被检查的价值SPARK_HOME环境变量:

系统println(系统采用(“SPARK_HOME”));
进口操作系统打印(操作系统环境(“SPARK_HOME”])
println(sysenv得到(“SPARK_HOME”))

决议

如果SPARK_HOME将另一个版本的火花在客户端,您应该设置SPARK_HOME变量和再试一次。

检查您的IDE环境变量设置,你的. bashrc,. zshrc,或. bash_profile文件,和其他环境变量可能。你很可能会退出并重新启动IDE来清除旧的状态,甚至你可能需要创建一个新项目,如果问题依然存在。

你不应该需要设置SPARK_HOME一个新值;复位应该足够了。

冲突或失踪路径二进制文件的条目

可以配置路径,这样的命令spark-shell将运行其他之前安装的二进制代替砖提供的一个连接。这可能会导致databricks-connect测试失败。你应该确保优先考虑砖连接的二进制文件,或删除之前安装的。

如果你不能运行命令spark-shell,也有可能你的路径并不是自动建立的皮普安装,你将需要添加安装手动dir到您的路径。可以使用砖与ide,即使这不是设置。然而,databricks-connect测试命令将不能正常工作。

冲突的序列化设置在集群上

如果你看到“流损坏”运行时错误databricks-connect测试,这可能是由于不兼容的集群序列化配置。例如,设置spark.io.compression.codec配置会导致这个问题。为了解决这个问题,考虑从集群移除这些配置设置,或设置配置在砖连接的客户端。

找不到winutils.exe在Windows上

如果您使用的是砖连接在Windows上看:

错误壳牌:失败的定位winutils二进制hadoop二进制路径javaioIOException:可以定位可执行的\\winutilsexeHadoop二进制文件

按照指示在Windows上配置Hadoop路径

文件名、目录名或卷标在Windows上语法是不正确的

如果您使用的是砖连接在Windows上看:

文件名,目录的名字,orgydF4y2Ba体积标签语法不正确的

Java或砖连接被安装到一个目录空间在你的路径。您可以解决通过安装到一个目录路径没有空间,使用或配置路径短名称形式

限制

砖连接不支持下面的砖的特点和第三方平台:bob体育客户端下载

  • 结构化的流。

  • 运行任意代码不是一个远程集群上火花工作的一部分。

  • 本机Scala、Python和Rδ表操作的api(例如,DeltaTable.forPath不支持)。然而,SQL API (spark.sql (…))和三角洲湖操作和火花的API(例如,spark.read.load三角洲表上)都支持。

  • 进入副本。

  • 使用SQL函数、Python或Scala udf的一部分服务器的目录。然而,当地引入Scala和Python udf的工作。

  • Apache飞艇0.7。x和是low.

  • 连接到集群访问控制表

  • 连接到集群处理隔离(换句话说,启用spark.databricks.pyspark.enableProcessIsolation被设置为真正的)。

  • δ克隆SQL命令。

  • 全局临时视图。

  • 考拉

  • 创建作为选择SQL命令并不总是工作。相反,使用spark.sql(“选择……”).write.saveAsTable(“表”)