GeoSpark未定义的函数与DBConnect错误

使用GeoSpark代码DBConnect会话。

写的arjun.kaimaparambilrajan

去年发表在:2022年6月1日

问题

你想使用GeoSpark函数st_geofromwktDBConnect (AWS|Azure|GCPApache火花),你得到一个错误消息。

错误:org.apache.spark.sql。AnalysisException:未定义的功能:“st_geomfromwkt”。这个函数既不是注册临时函数也不是一个永久的函数注册数据库中的“默认”。

这个示例代码使用DBConnect时失败的错误。

% scala val sc =火花。sparkContext sc.setLogLevel(“调试”)val sqlContext =火花。sqlContext spark.sparkContext.addJar(~ /罐/ geospark-sql_2.3-1.2.0.jar) spark.sparkContext.addJar(~ /罐/ geospark-1.2.0.jar) GeoSparkSQLRegistrator.registerAll (sqlContext) println (spark.sessionState.functionRegistry.listFunction)火花。sql(“选择ST_GeomFromWKT(区域)的几何多边形”),告诉()

导致

DBConnect不支持自动同步服务器的客户端udf。

解决方案

您可以使用一个定制的utility jar的代码注册UDF在集群上使用SparkSessionExtensions类。

  1. 创建一个utility jar,寄存器GeoSpark函数使用SparkSessionExtensions。这个实用程序类定义可以内置一个utility jar。
    % com.databricks.spark scala包。进口org.apache.spark.sql跑龙套。SparkSessionExtensionsimport org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator class GeoSparkUdfExtension extends (SparkSessionExtensions => Unit) { def apply(e: SparkSessionExtensions): Unit = { e.injectCheckRule(spark => { println("INJECTING UDF") GeoSparkSQLRegistrator.registerAll(spark) _ => Unit }) } }
  2. 复制GeoSpark jar DBFS和utility jardbfs: /砖/ geospark-extension-jars /
  3. 创建一个init脚本(set_geospark_extension_jar.sh),复制jar从DBFS位置引发类路径并设置spark.sql.extensions实用程序类。
    % scala dbutils.fs。put (" dbfs: /砖/ < init-script-folder > / set_geospark_extension_jar。sh”、“”“# !/bin/sh |睡眠10 | #将扩展和GeoSpark依赖jar文件复制到/砖/ jar。| cp - v / dbfs /砖/ geospark-extension-jars / {spark_geospark_extension_2_11_0_1.jar、geospark_sql_2_3_1_2_0.jar geospark_1_2_0。jar} /砖/罐/ | #设置扩展。猫| < < EOF的> / conf / 00-custom-spark /砖/驱动程序。参看|(司机){spark.sql |”。扩展com.databricks.spark.utils“=”。GeoSparkUdfExtension”“|} | EOF |”。stripMargin覆盖= true)
  4. 安装初始化脚本作为一个集群级init脚本(AWS|Azure|GCP)。你需要的完整路径的位置脚本(dbfs: /砖/ < init-script-folder > / set_geospark_extension_jar.sh)。
  5. 重新启动集群。
  6. 您现在可以使用与DBConnect GeoSpark代码。
这篇文章有用吗?