安装和编译Cython

学习如何安装和编译Cython砖。

写的亚当Pavlacka

去年发表在:2022年5月19日

本文解释了如何运行火花代码编译Cython代码。的步骤如下:

  1. 创建一个示例Cython DBFS模块(AWS|Azure)。
  2. 将文件添加到火花会话。
  3. 创建一个包装器方法来加载模块的执行人。
  4. 样本数据集的映射器运行。
  5. 产生更大的数据集,比较性能与本机Python示例。
删除

信息

默认情况下,使用路径dbfs: /如果没有协议引用。

% python #写一个例子cython / cython fib模块/例子。pyx DBFS。dbutils.fs.put (“/ / cython /无伤大雅的例子。pyx”、“”“def fib_mapper_cython (n):“返回第一个fibonnaci数量> n。“cdef int = 0 cdef int b = 1 cdef int j = int (n),而b < j: a、b = b, a + b返回b, 1”“”,真的)#写一个示例输入文件/ / cython /输入示例。在DBFS txt。#此文件的每一行是一个整数。dbutils.fs.put (“/ / cython_input /输入示例。txt”、“100”“10”,真的)#看看示例输入。dbutils.fs.head(“/例子/ cython_input / input.txt”)

Cython源文件添加到火花

使Cython跨集群可用的源文件,我们将使用sc.addPyFile将这些文件添加到火花。例如,

% python sc.addPyFile (“dbfs: / / cython / fib.pyx例子”)

测试Cython编译驱动节点上

这段代码首先将测试编译驱动节点。

% python导入pyximport导入操作系统pyximport.install进口fib ()

定义了wap功能编译和导入模块

打印语句将执行器节点上执行。您可以查看stdout日志消息的进步跟踪模块。

% python导入系统,操作系统、shutil cython def spark_cython(模块、方法):def (* args, * * kwargs):包装印刷的输入函数:% s % args全球cython_function_尝试:返回cython_function_ (* args, * * kwargs)除了:进口pyximport pyximport.install()打印“cython编译完成”cython_function_ = getattr (__import__(模块),方法)印刷的定义功能:% s % cython_function_返回cython_function_ (* args, * * kwargs)返回包裹

运行Cython例子

下面的代码片段在几个数据点运行fibonacci示例。

% python #使用CSV读者产生火花DataFrame。回滚到抽样从GenericRowObject DataFrames和抓住单一元素行= spark.read.csv .rdd(“/例子/ cython_input /”)。地图(λy: y.__getitem__ (0)) mapper = spark_cython (fib, fib_mapper_cython) fib_frequency = lines.map(映射)。reduceByKey(λa、b: a + b) .collect()打印fib_frequency

性能比较

下面我们将测试2之间的速度差实现。我们将使用spark.range ()api生成数据点从10000年到100000000年,50个火花分区。我们将写这个输出DBFS CSV。

对于这个测试,禁用自动定量(AWS|Azure)为了确保集群有固定数量的火花执行人。

python dbutils.fs %。rm (“/ tmp / cython_input /”,真的)火花。范围(10000、100000000、1、50).write.csv (“/ tmp / cython_input /”)

正常PySpark代码

% python def fib_mapper_python (n): b = 0 = 1打印“尝试:% s % n,而b < int (n): a、b = b, a + b返回(b, 1)打印fib_mapper_python(2000)行= spark.read.csv .rdd (“/ tmp / cython_input /”)。地图(λy: y.__getitem__ (0)) fib_frequency =线。地图(λx: fib_mapper_python (x))。reduceByKey(λa、b: a + b) .collect()打印fib_frequency

测试Cython代码

现在测试Cython编译代码。

% = spark.read.csv python行(“/ tmp / cython_input /”) .rdd。地图(λy: y.__getitem__ (0)) mapper = spark_cython (fib, fib_mapper_cython) fib_frequency = lines.map(映射)。reduceByKey(λa、b: a + b) .collect()打印fib_frequency

我们生成的测试数据集有50个火花分区,创建50 csv文件所示。您可以查看的数据集dbutils.fs.ls (“/ tmp / cython_input /”)

这篇文章有用吗?