使用射线砖

预览

这个特性是在公共预览

雷tripwire以上支持创建雷集群和射线应用程序运行在Apache引发集群数据砖。对射线开始使用机器学习的信息,包括教程和示例,请参见射线的文档。关于雷和Apache火花集成的更多信息,见射线引发API文档

需求

  • 砖运行时12.0毫升以上。

  • 砖集群运行时访问模式必须“分配”模式或“没有隔离共享”模式。

安装光

使用以下命令安装线。的(默认)扩展所需的射线仪表板组件。

%皮普安装(默认的]> =tripwire

创建一个射线集群

创建一个射线集群,使用ray.util.spark.setup_ray_clusterAPI。

ray.util.spark进口setup_ray_cluster,shutdown_ray_clustersetup_ray_cluster(num_worker_nodes=2,num_cpus_per_node=4,collect_log_to_path=“/ dbfs /道路/ / ray_collected_logs”)

ray.util.spark.setup_ray_clusterAPI创建一个射线簇火花。在内部,它创建一个背景火花工作。工作中的每个火花任务创建一个射线工作者节点,和雷头节点上创建驱动程序。这个论点num_worker_nodes创建代表射线工作者节点的数量。指定数量的CPU或GPU核心分配给每个射线工作者节点,设置参数num_cpus_per_nodenum_gpus_per_node

雷集群创建之后,您就可以直接在你的笔记本运行任何射线应用程序代码。一个HTML链接在新选项卡中打开雷集群仪表板也显示,允许您查看集群的射线仪表板。

提示

如果您正在使用一个砖分配模式集群,您可以设置num_worker_nodesray.util.spark.MAX_NUM_WORKER_NODES为了使用所有可用的资源为你雷集群。

setup_ray_cluster (#……num_worker_nodes = ray.util.spark.MAX_NUM_WORKER_NODES,)

你可以设置参数collect_log_to_path指定目的地的路径,你想收集射线集群日志。日志收集运行射线集群后关闭。砖建议你设置一个路径开始/ dbfs /所以日志保存即使你终止集群的火花。

请注意

调用ray.util.spark.setup_ray_cluster将设置RAY_ADDRESS环境变量的地址创建集群射线,射线应用程序将自动使用这个雷集群。您可以指定一个替代集群地址使用地址论点的ray.initAPI。

运行一线的应用程序

雷集群创建之后,您可以运行任何射线砖笔记本的应用程序代码。例如,您可以运行一个简单的射线应用程序在一个数据砖笔记本如下:

进口进口随机进口时间分数进口分数初始化()@ray远程defpi4_sample(sample_count):”““pi4_sample sample_count实验运行,并返回部分时间内循环。”“”in_count=0范围(sample_count):x=随机随机()y=随机随机()如果x*x+y*y< =1:in_count+ =1返回分数(in_count,sample_count)SAMPLE_COUNT=1000年*1000年开始=时间时间()未来=pi4_sample远程(sample_count=SAMPLE_COUNT)pi4=得到(未来)结束=时间时间()大调的=结束- - - - - -开始打印(f“运行{SAMPLE_COUNT}测试了{大调的}秒的)π=pi4*4打印(浮动(π))

从火花DataFrame加载数据

加载一个火花DataFrame射线数据集,首先你需要保存火花DataFrame DBFS使用镶花或δ格式。为了控制DBFS安全地访问、砖建议你山云DBFS对象存储。然后,您可以创建一个ray.data.Dataset实例从保存的火花DataFrame路径使用以下辅助方法:

进口进口操作系统urllib.parse进口urlparsedefcreate_ray_dataset_from_spark_dataframe(spark_dataframe,dbfs_tmp_path):spark_df模式(“覆盖”)拼花(dbfs_tmp_path)fuse_path=“/ dbfs”+urlparse(dbfs_tmp_path)路径返回数据read_parquet(fuse_path)#例如,读表作为火花DataFrame三角洲spark_df=火花(“diviner_demo.diviner_pedestrians_data_500”)#提供dbfs位置写表data_location_2=(“dbfs: / home / example.user@www.neidfyre.com/data/ray_test/test_data_2”)#火花DataFrame转换为一线的数据集ray_dataset=create_ray_dataset_from_spark_dataframe(spark_dataframe=spark_df,dbfs_tmp_path=data_location_2)

关闭集群一线

关闭射线集群上运行数据砖,你可以调用ray.utils.spark.shutdown_ray_clusterAPI。

请注意

雷集群也将关闭时间:

  • 你从砖分离互动笔记本集群。

  • 你的砖工作完成。

  • 你的砖集群重新启动或终止。

调优射线集群配置

每个射线工作者节点的推荐配置是:

  • 每个雷工人最低4个CPU核心节点。

  • 最低10 gb堆内存为每个射线工作者节点。

所以,当调用ray.util.spark.setup_ray_cluster,砖建议设置num_cpus_per_node一个值> = 4。

看到射线工作者节点的内存分配细节调优堆内存为每个射线工作者节点。

射线工作者节点的内存分配

每个射线工作者节点使用两种类型的内存:堆内存和对象存储内存。每种类型的分配的内存大小是决定如下所述。

总内存分配给每个射线工作者节点是:

RAY_WORKER_NODE_TOTAL_MEMORY=(SPARK_WORKER_NODE_PHYSICAL_MEMORY/MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES*0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES是射线工作者节点的最大数量,可以引发工人节点上启动。这是由参数决定num_cpus_per_nodenum_gpus_per_node

如果你不设置参数object_store_memory_per_node,然后堆内存大小和对象存储内存大小分配给每个射线工作者节点是:

RAY_WORKER_NODE_HEAP_MEMORY=RAY_WORKER_NODE_TOTAL_MEMORY*0.7OBJECT_STORE_MEMORY_PER_NODE=RAY_WORKER_NODE_TOTAL_MEMORY*0.3

如果你设置参数object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY=RAY_WORKER_NODE_TOTAL_MEMORY- - - - - -argument_object_store_memory_per_node

此外,对象存储内存大小/射线工作者节点有限的共享内存操作系统。最大的价值是:

OBJECT_STORE_MEMORY_PER_NODE_CAP=(SPARK_WORKER_NODE_OS_SHARED_MEMORY/MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES*0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY/dev/shm磁盘容量配置的火花工作节点。

雷仪表板上启用堆栈跟踪和火焰图演员页面

在雷仪表板的演员页面,您可以查看堆栈跟踪和火焰图对于活跃的射线的演员。查看这些信息,使用以下命令安装“py-spy”在你开始之前雷集群:

%皮普安装py- - - - - -间谍

例如笔记本电脑

以下笔记本演示如何创建一个雷集群和砖上运行一线的应用程序。

射线在火花启动笔记本

在新标签页打开笔记本

限制

  • 雷不支持集群自动定量。APIray.util.spark.setup_ray_cluster只能开始射线与固定数量的射线工作者节点集群。

  • 多用户共享数据砖集群不支持(隔离模式启用)。

  • 当使用% pip安装包时,射线集群将关闭。确保你完成后开始射线与% pip安装你的所有库。

  • 使用集成覆盖的配置ray.util.spark.setup_ray_cluster可以导致射线集群变得不稳定和崩溃的射线背景。例如,使用xgboost_ray包和设置RayParams一个演员或cpus_per_actor配置超过雷集群配置集群可以默默地崩溃射线。