运行一个砖笔记本从另一个笔记本
请注意
对于大多数编排用例,砖推荐使用砖的工作或模块化代码文件。你应该只使用dbutils.notebook
本文中描述的API时不能使用多任务工作实现你的用例。例子是条件执行和循环笔记本在一组动态的参数。
本文描述了如何使用砖笔记本代码复杂的工作流,使用模块化的代码,链接或嵌入笔记本和if - then - else逻辑。
的比较运行%
和dbutils.notebook.run ()
的运行%命令允许您将另一个笔记本在一个笔记本。您可以使用运行%
模块化代码,例如通过将支持功能在一个单独的笔记本。你也可以用它来连接笔记本电脑,实现在一个分析步骤。当你使用运行%
,立即执行被调用的笔记本中定义的函数和变量,可用在调用笔记本。
的dbutils.notebook
API是一个补充运行%
因为它可以让您从一个笔记本传递参数和返回值。这允许您构建复杂的工作流和管道与依赖。例如,您可以获得一个目录中的文件的列表并将名称传递给另一个笔记本电脑,这是不可能的运行%
。您还可以创建if - then - else工作流基于返回值或调用其他笔记本使用相对路径。
不像运行%
,dbutils.notebook.run ()
方法开始一份新工作运行的笔记本。
这些方法,就像所有的dbutils
只在Python和Scala api,可用。不过,您可以使用dbutils.notebook.run ()
调用一个R笔记本。
使用运行%
导入一个笔记本
在这个例子中,第一个笔记本定义一个函数,反向
后,可在第二个笔记本使用运行%
神奇的执行shared-code-notebook
。
因为这两个笔记本在同一个目录在工作区中,使用前缀。/
在。/ shared-code-notebook
表明路径应相对于解决当前运行的笔记本。比如,你可以笔记本组织成目录运行%/ dir /笔记本
,或者使用绝对路径运行%/用户/ username@organization.com/directory/notebook
。
dbutils.notebook
API
中可用的方法dbutils.notebook
API是运行
和退出
。这两个参数和返回值必须是字符串。
运行(路径:字符串,timeout_seconds:int,参数:地图):字符串
运行一个笔记本和返回退出值。该方法立即开始运行的临时工作。
的timeout_seconds
运行参数控制超时(0意味着没有超时):调用运行
抛出一个异常,如果没有在指定的时间内完成。如果砖了超过10分钟,笔记本运行失败,不管timeout_seconds
。
的参数
参数设置部件价值目标的笔记本。具体来说,如果您运行的笔记本有一个小部件一个
,你通过一个键-值对(“A”:“B”)
参数的参数run ()
电话,然后检索工具的价值一个
将返回“B”
。你可以找到小部件的创建和使用的指令砖小部件篇文章。
请注意
的
参数
参数只接受拉丁字符(ASCII字符集)。使用非ascii字符返回一个错误。就业岗位使用
dbutils.notebook
API在30天内必须完成或更少。
运行
使用
dbutils。笔记本。运行(“notebook-name”,60,{“参数”:“数据”,“argument2”:“data2”,…})
dbutils。笔记本。运行(“notebook-name”,60,地图(“参数”- >“数据”,“argument2”- >“data2”,…))
运行
例子
假设你有一个笔记本工作流
用一个小名叫喷火
打印部件的价值:
dbutils。小部件。文本(“foo”,“fooDefault”,“fooEmptyLabel”)打印dbutils。小部件。得到(“foo”)
运行dbutils.notebook.run(“工作流”,60岁,{“foo”:“酒吧”})
产生以下结果:
您通过使用小部件有价值dbutils.notebook.run ()
,“酒吧”
,而不是默认的。
退出(价值:字符串):无效
退出一个笔记本和一个值。如果你叫一个笔记本使用运行
方法,这是返回的值。
dbutils。笔记本。退出(“returnValue”)
调用dbutils.notebook.exit
使笔记本成功完成的工作。如果你想使工作失败,抛出异常。
例子
在以下示例中,您传递参数DataImportNotebook
和运行不同的笔记本电脑(DataCleaningNotebook
或ErrorHandlingNotebook
基于结果DataImportNotebook
。
当代码运行时,你看到的链接笔记本:
查看运行的详细信息,单击笔记本链接# xxxx笔记本工作。
通过结构化数据
本节说明如何通过笔记本之间的结构化数据。
#示例1 -通过临时视图返回数据。#你只能返回一个字符串使用dbutils.notebook.exit(),但由于叫笔记本驻留在相同的JVM中,你可以#返回一个引用数据存储在一个临时视图名称。# #被笔记本火花。范围(5)。toDF(“价值”)。createOrReplaceGlobalTempView(“my_data”)dbutils。笔记本。退出(“my_data”)# #在调用者的笔记本returned_table=dbutils。笔记本。运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)global_temp_db=火花。相依。得到(“spark.sql.globalTempDatabase”)显示(表(global_temp_db+“。”+returned_table))通过DBFS #示例2 -返回数据。#对于较大的数据集,您可以编写DBFS和返回结果的DBFS路径存储数据。# #被笔记本dbutils。fs。rm(“/ tmp /结果/ my_data”,递归=真正的)火花。范围(5)。toDF(“价值”)。写。格式(“铺”)。负载(“dbfs: / tmp /结果/ my_data”)dbutils。笔记本。退出(“dbfs: / tmp /结果/ my_data”)# #在调用者的笔记本returned_table=dbutils。笔记本。运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)显示(火花。读。格式(“铺”)。负载(returned_table))#示例3 -返回JSON数据。#返回多个值,您可以使用标准JSON库进行序列化和反序列化的结果。# #被笔记本进口jsondbutils。笔记本。退出(json。转储({“状态”:“OK”,“表”:“my_data”}))# #在调用者的笔记本结果=dbutils。笔记本。运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)打印(json。加载(结果))
/ /通过临时视图示例1 -返回数据。/ /可以使用dbutils.notebook.exit只返回一个字符串(),但由于叫笔记本驻留在相同的JVM中,你可以/ /返回一个引用数据存储在一个临时视图名称。/ * *被笔记本* /sc。并行化(1来5)。toDF()。createOrReplaceGlobalTempView(“my_data”)dbutils。笔记本。退出(“my_data”)/ * *在调用者的笔记本* /瓦尔returned_table=dbutils。笔记本。运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)瓦尔global_temp_db=火花。相依。得到(“spark.sql.globalTempDatabase”)显示(表(global_temp_db+“。”+returned_table))/ /通过DBFS例子2 -返回数据。/ /对于较大的数据集,您可以编写DBFS和返回结果的DBFS路径存储数据。/ * *被笔记本* /dbutils。fs。rm(“/ tmp /结果/ my_data”,递归=真正的)sc。并行化(1来5)。toDF()。写。格式(“铺”)。保存(“dbfs: / tmp /结果/ my_data”)dbutils。笔记本。退出(“dbfs: / tmp /结果/ my_data”)/ * *在调用者的笔记本* /瓦尔returned_table=dbutils。笔记本。运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)显示(sqlContext。读。格式(“铺”)。负载(returned_table))/ /例3 -返回JSON数据。/ /返回多个值,您可以使用标准JSON库进行序列化和反序列化的结果。/ * *被笔记本* // /导入杰克逊json库进口com。fasterxml。杰克逊。模块。scala。DefaultScalaModule进口com。fasterxml。杰克逊。模块。scala。实验。ScalaObjectMapper进口com。fasterxml。杰克逊。databind。objectmap/ /创建一个json序列化器瓦尔jsonMapper=新objectmap与ScalaObjectMapperjsonMapper。registerModule(DefaultScalaModule)/ /退出与jsondbutils。笔记本。退出(jsonMapper。writeValueAsString(地图(“状态”- >“OK”,“表”- >“my_data”)))/ * *在调用者的笔记本* /瓦尔结果=dbutils。笔记本。运行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)println(jsonMapper。readValue(地图(字符串,字符串]](结果))
处理错误
本节说明如何处理错误。
抛出一个WorkflowException #错误。defrun_with_retry(笔记本,超时,arg游戏={},max_retries=3):num_retries=0而真正的:试一试:返回dbutils。笔记本。运行(笔记本,超时,arg游戏)除了异常作为e:如果num_retries>max_retries:提高e其他的:打印(“失败的错误”,e)num_retries+ =1run_with_retry(“LOCATION_OF_CALLEE_NOTEBOOK”,60,max_retries=5)
/ /错误WorkflowException。进口com。砖。WorkflowException/ /自dbutils.notebook.run()只是一个函数调用,您可以使用标准Scala try - catch重试失败/ /控制流。这里我们展示一个例子再审笔记本的次数。defrunRetry(笔记本:字符串,超时:Int,arg游戏:地图(字符串,字符串]=地图。空,maxTries:Int=3):字符串={varnumTries=0而(真正的){试一试{返回dbutils。笔记本。运行(笔记本,超时,arg游戏)}抓{情况下e:WorkflowException如果numTries<maxTries= >println(”错误,重试:“+e)}numTries+ =1}”“/ /没有达到}runRetry(“LOCATION_OF_CALLEE_NOTEBOOK”,超时=60,maxTries=5)