从另一个笔记本运行一个Databricks笔记本
本文描述如何使用Databricks笔记本对使用模块化代码、链接或嵌入式笔记本以及if-then-else逻辑的复杂工作流进行编码。
的比较运行%
笔记本工作流程
的运行%命令允许您在一个笔记本中包含另一个笔记本。你可以使用运行%
模块化你的代码,例如把支持函数放在一个单独的笔记本里。您还可以使用它来连接实现分析步骤的笔记本。当你使用运行%
时,被调用的notebook立即执行,其中定义的函数和变量在调用notebook中可用。
笔记本的工作流程是一个补充运行%
因为它们允许您向笔记本传递参数并从笔记本返回值。这允许您构建具有依赖关系的复杂工作流和管道。例如,您可以获得目录中的文件列表,并将名称传递给另一个笔记本,这是不可能的运行%
.您还可以基于返回值创建if-then-else工作流,或者使用相对路径调用其他笔记本。
要实现笔记本工作流,请使用dbutils.notebook。*
方法。不像运行%
,dbutils.notebook.run ()
方法启动一个新作业来运行笔记本。
这些方法,就像所有dbutils
api,仅在Python和Scala中可用。但是,您可以使用dbutils.notebook.run ()
调用R笔记本。
请注意
基于笔记本工作流的作业必须在30天或更短时间内完成。不支持基于模块化或链接笔记本任务的长期运行作业。
API
方法中提供的方法dbutils.notebook
构建笔记本工作流程的API是:运行
而且退出
.参数和返回值都必须是字符串。
运行(路径:字符串,timeout_seconds:int,参数:地图):字符串
运行一个笔记本并返回它的退出值。该方法启动一个立即运行的临时作业。
的timeout_seconds
参数控制运行的超时(0表示没有超时):调用运行
如果未在指定时间内完成,则抛出异常。如果Databricks宕机超过10分钟,则无论如何笔记本运行都将失败timeout_seconds
.
的参数
参数设置目标笔记本的小部件值。特别是,如果您正在运行的笔记本有一个名为一个
,然后传递一个键值对(“A”:“B”)
参数的参数的一部分run ()
调用,然后检索widget的值一个
将返回“B”
.控件中可以找到创建和使用小部件的说明砖小部件篇文章。
请注意
的参数
参数只接受拉丁字符(ASCII字符集)。使用非ascii字符将返回错误。
运行
使用
dbutils.笔记本.运行(“notebook-name”,60,{“参数”:“数据”,“argument2”:“data2”,...})
dbutils.笔记本.运行(“notebook-name”,60,地图(“参数”->“数据”,“argument2”->“data2”,…))
运行
例子
假设您有一个名为工作流
使用一个名为喷火
打印小部件的值:
dbutils.小部件.文本(“foo”,“fooDefault”,“fooEmptyLabel”)打印dbutils.小部件.得到(“foo”)
运行dbutils.notebook.run(“工作流”,60岁,{“foo”:“酒吧”})
产生以下结果:
小部件具有您通过工作流传递的值,“酒吧”
,而不是默认。
退出(价值:字符串):无效
退出一个有值的笔记本。如果你调用笔记本使用运行
方法,这是返回的值。
dbutils.笔记本.退出(“returnValue”)
调用dbutils.notebook.exit
在一个作业中使笔记本顺利完成。如果希望导致作业失败,请抛出异常。
例子
在下面的示例中,将参数传递给DataImportNotebook
并运行不同的笔记本电脑(DataCleaningNotebook
或ErrorHandlingNotebook
)的结果DataImportNotebook
.
当笔记本工作流运行时,您会看到一个到正在运行的笔记本的链接:
点击笔记本链接笔记本工作#xxxx查看运行的详细信息:
传递结构化数据
本节说明如何在笔记本之间传递结构化数据。
#示例1 -通过临时视图返回数据。使用dbutils.notebook.exit()只能返回一个字符串,但由于调用的notebook驻留在同一个JVM中,所以可以返回一个名称,引用存储在临时视图中的数据。##在调用的笔记本火花.范围(5).toDF(“价值”).createOrReplaceGlobalTempView(“my_data”)dbutils.笔记本.退出(“my_data”)##在来电者笔记本上returned_table=dbutils.笔记本.运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)global_temp_db=火花.相依.得到(“spark.sql.globalTempDatabase”)显示(表格(global_temp_db+“。”+returned_table))#例2 -通过DBFS返回数据。#对于较大的数据集,您可以将结果写入DBFS,然后返回存储数据的DBFS路径。##在调用的笔记本dbutils.fs.rm(“/ tmp /结果/ my_data”,递归=真正的)火花.范围(5).toDF(“价值”).写.格式(“铺”).负载(“dbfs: / tmp /结果/ my_data”)dbutils.笔记本.退出(“dbfs: / tmp /结果/ my_data”)##在来电者笔记本上returned_table=dbutils.笔记本.运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)显示(火花.读.格式(“铺”).负载(returned_table))#示例3 -返回JSON数据。要返回多个值,可以使用标准JSON库对结果进行序列化和反序列化。##在调用的笔记本进口jsondbutils.笔记本.退出(json.转储({“状态”:“OK”,“表”:“my_data”}))##在来电者笔记本上结果=dbutils.笔记本.运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)打印(json.加载(结果))
//示例1 -通过临时视图返回数据。//使用dbutils.notebook.exit()只能返回一个字符串,但由于调用的notebook驻留在同一个JVM中,所以可以//返回一个名称,引用存储在临时视图中的数据。/**在被调用的笔记本*/sc.并行化(1来5).toDF().createOrReplaceGlobalTempView(“my_data”)dbutils.笔记本.退出(“my_data”)/**在调用者笔记本*/瓦尔returned_table=dbutils.笔记本.运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)瓦尔global_temp_db=火花.相依.得到(“spark.sql.globalTempDatabase”)显示(表格(global_temp_db+“。”+returned_table))//示例2 -通过DBFS返回数据。//对于较大的数据集,可以将结果写入DBFS,然后返回存储数据的DBFS路径。/**在被调用的笔记本*/dbutils.fs.rm(“/ tmp /结果/ my_data”,递归=真正的)sc.并行化(1来5).toDF().写.格式(“铺”).保存(“dbfs: / tmp /结果/ my_data”)dbutils.笔记本.退出(“dbfs: / tmp /结果/ my_data”)/**在调用者笔记本*/瓦尔returned_table=dbutils.笔记本.运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)显示(sqlContext.读.格式(“铺”).负载(returned_table))//示例3 -返回JSON数据。//要返回多个值,可以使用标准JSON库对结果进行序列化和反序列化。/**在被调用的笔记本*///导入jackson json库进口com.fasterxml.杰克逊.模块.scala.DefaultScalaModule进口com.fasterxml.杰克逊.模块.scala.实验.ScalaObjectMapper进口com.fasterxml.杰克逊.databind.objectmap//创建一个json序列化器瓦尔jsonMapper=新objectmap与ScalaObjectMapperjsonMapper.registerModule(DefaultScalaModule)//使用json退出dbutils.笔记本.退出(jsonMapper.writeValueAsString(地图(“状态”->“OK”,“表”->“my_data”)))/**在调用者笔记本*/瓦尔结果=dbutils.笔记本.运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)println(jsonMapper.readValue[地图[字符串,字符串]] (结果))
处理错误
本节说明如何处理笔记本工作流程中的错误。
#工作流中的错误抛出WorkflowException。defrun_with_retry(笔记本,超时,arg游戏={},max_retries=3.):num_retries=0而真正的:试一试:返回dbutils.笔记本.运行(笔记本,超时,arg游戏)除了异常作为e:如果num_retries>max_retries:提高e其他的:打印(“失败的错误”,e)num_retries+ =1run_with_retry(“LOCATION_OF_CALLEE_NOTEBOOK”,60,max_retries=5)
//工作流中的错误抛出WorkflowException。进口com.砖.WorkflowException//由于dbutils.notebook.run()只是一个函数调用,您可以使用标准的Scala try-catch重试失败//控制流。这里我们展示了一个多次尝试使用笔记本的例子。defrunRetry(笔记本:字符串,超时:Int,arg游戏:地图[字符串,字符串]=地图.空,maxTries:Int=3.):字符串={varnumTries=0而(真正的){试一试{返回dbutils.笔记本.运行(笔记本,超时,arg游戏)}抓{情况下e:WorkflowException如果numTries<maxTries= >println("错误,正在重试:"+e)}numTries+ =1}""//未到达}runRetry(“LOCATION_OF_CALLEE_NOTEBOOK”,超时=60,maxTries=5)