教程:使用SparkR SparkDataFrames砖

本文向您展示了如何使用SparkDataFrame加载和转换数据API SparkR砖。

你可以练习跑步的本文的代码示例从一个细胞在一个R笔记本这是附加对正在运行的集群。砖集群提供的在火花SparkR (R)包预装,以便您可以开始使用SparkDataFrameAPI。

什么是SparkDataFrame ?

SparkDataFrame是一个分布式数据组织成命名列的集合。概念上相当于一个表在数据库或数据帧在R . SparkDataFrames可以由一系列广泛的来源如结构化数据文件,表在数据库中,或现有的当地R数据帧。SparkDataFrames提供一组丰富的功能(选择列、过滤、连接、聚集),允许您来有效地解决常见的数据分析问题。

创建一个SparkDataFrame

大多数Apache在R上下文返回SparkDataFrame引发查询。这包括阅读从表,从文件加载数据,操作转换数据。

创建一个SparkDataFrame的一个方法是通过构造一组数据并指定数据的模式,然后通过数据和模式createDataFrame功能,如以下示例。火花使用术语模式引用列的名称和数据类型SparkDataFrame。您可以通过调用打印模式printSchema通过调用函数和打印数据showDF函数。

请注意

砖也使用这个词模式描述表登记目录的集合。

#加载SparkR包已经预装在集群上。图书馆(SparkR)#获取现有SparkSession已经开始在集群上。sparkR.session()#构造样本数据的列表。你必须使用一种后缀#这里的整数或你可能会得到错误# " . lang。双不是一个有效的外部模式的int类型。”数据< -列表(列表(1 l,“雷蒙德”),列表(2 l,“洛雷塔”),列表(3 l,“韦恩。”))#指定数据的模式(列的名称和数据类型)。模式< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”))#创建SparkDataFrame基于指定的数据和模式。df< -createDataFrame(数据=数据,模式=模式)#打印SparkDataFrame的模式。printSchema(df)#打印SparkDataFrame的内容。showDF(df)#输出:##根# | - id:整数(nullable = true)# |——名称:字符串(nullable = true)# + - - - + - - - - - - - +# | | | id名称# + - - - + - - - - - - - +# | 1 |雷蒙德|# | 2 | Loretta |# | 3 | |# + - - - + - - - - - - - +

提示

以一种更健壮的格式显示数据在一个砖笔记本,你可以叫砖显示命令而不是SparkRshowDF功能,例如:

显示(df)

读表SparkDataFrame

砖使用三角洲湖默认所有表。你可以通过调用三角洲表加载到SparkDataFramestableToDF功能,如以下示例。这个示例假设您已经访问表数据砖命名钻石在指定的位置。如果没有,根据需要改变表的名称和位置。

# df < - tableToDF (“< catalog_name >, < schema_name >。< table_name >”)df< -tableToDF(“main.default.diamonds”)显示(df)#输出:## + - - - - - - - - - - - - - - - - - - +……# |克拉削减| |…# + - - - - - - - - - - - - - - - - - - +……理想0.23 # | | |……溢价0.21 # | | |……#……

参见:

数据加载到SparkDataFrame从一个文件中

你可以从许多支持加载数据文件格式通过调用loadDF函数。下面的示例加载一个CSV文件的内容和假设文件存在于指定的路径。这个例子中推断列名和模式基于文件的内容。的loadDF函数支持不同参数的文件格式。有关更多信息,请参见:

所有Apache火花从SparkR数据源可以使用。如果你可以加载数据从数据源使用PySpark或Scala中,您还可以通过使用SparkR加载它。下面的例子使用了参数的值csv加载数据从一个CSV文件。从JSON文件加载数据相反,你可以指定json,等等。

df< -loadDF(路径=“/ FileStore /表/ diamonds.csv”,=“csv”,=“真正的”,inferSchema=“真正的”)显示(df)#输出:## + - - - - - - - - - - - - - - - - - - +……# |克拉削减| |…# + - - - - - - - - - - - - - - - - - - +……理想0.23 # | | |……溢价0.21 # | | |……#……

分配SparkDataFrame转换步骤

大多数的结果返回SparkDataFrame火花转换。你可以分配这些结果返回给一个SparkDataFrame变量,类似于如何使用公共表表达式(cte),临时视图,或者DataFrames其他系统。

结合SparkDataFrames和加入工会

SparkDataFrames使用标准的SQL连接操作语义。加入返回合并后的结果两个SparkDataFrames基于提供的匹配条件和连接类型。下面的示例调用加入功能和使用内连接,这是缺省设置。调用选择函数删除重复的id列加入的结果:

dataCustomers< -列表(列表(1 l,“雷蒙德”),列表(2 l,“洛雷塔”),列表(3 l,“韦恩。”))schemaCustomers< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”))dataPurchases< -列表(列表(1 l,“绿色”,“苹果”),列表(2 l,“紫色”,“葡萄”),列表(3 l,“黄色”,“香蕉”))schemaPurchases< -structType(structField(“id”,“整数”),structField(“颜色”,“字符串”),structField(“水果”,“字符串”))dfCustomers< -createDataFrame(数据=dataCustomers,模式=schemaCustomers)dfPurchases< -createDataFrame(数据=dataPurchases,模式=schemaPurchases)dfPurchaseHistory< -选择(x=加入(x=dfCustomers,y=dfPurchases,joinExpr=dfCustomers美元id= =dfPurchases美元id),上校=列表(dfCustomers美元id,“名称”,“颜色”,“水果”))显示(dfPurchaseHistory)#输出:## + - - - + - - - - - - - - - - - - - - - - - - - + +# | | id名称| | |水果颜色# + - - - + - - - - - - - - - - - - - - - - - - - + +# | 1 |雷蒙德绿色苹果| | |# | 2 | Loretta |紫色葡萄| |韦恩# | 3 | | | |黄色香蕉# + - - - + - - - - - - - - - - - - - - - - - - - + +

您可以添加的行SparkDataFrame到另一个通过调用联盟功能,如以下示例:

模式< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”))df1< -createDataFrame(数据=列表(列表(1 l,“雷蒙德”),列表(2 l,“洛雷塔”),列表(3 l,“韦恩。”)),模式=模式)df2< -createDataFrame(数据=列表(列表(4 l,“特里”),列表(5 l,“杰森”),列表(6 l,“雅”)),模式=模式)dfUnion< -联盟(df1,df2)显示(dfUnion)#输出:## + - - - + - - - - - - - +# | | | id名称# + - - - + - - - - - - - +# | 1 |雷蒙德|# | 2 | Loretta |# | 3 | |# | 4 |特里|# | 5 | |# | 6 |汤娅|# + - - - + - - - - - - - +

过滤器SparkDataFrame行

您可以使用过滤选择一个子集的行返回或修改在SparkDataFrame通过调用过滤器在哪里功能。在性能或语法没有区别,下面的例子:

数据< -列表(列表(1 l,“雷蒙德”),列表(2 l,“洛雷塔”),列表(3 l,“韦恩。”))模式< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”))df< -createDataFrame(数据=数据,模式=模式)dfFilter< -过滤器(x=df,条件=df美元id>1)dfWhere< -在哪里(x=df,条件=df美元id>1)showDF(dfFilter)showDF(dfWhere)#输出:## + - - - + - - - - - - - +# | | | id名称# + - - - + - - - - - - - +# | 2 | Loretta |# | 3 | |# + - - - + - - - - - - - +# + - - - + - - - - - - - +# | | | id名称# + - - - + - - - - - - - +# | 2 | Loretta |# | 3 | |# + - - - + - - - - - - - +

从一个SparkDataFrame选择列

您可以选择列通过一个或多个列的名字选择功能,如以下示例:

数据< -列表(列表(1 l,“雷蒙德”,“绿色”,“苹果”),列表(2 l,“洛雷塔”,“紫色”,“葡萄”),列表(3 l,“韦恩。”,“黄色”,“香蕉”))模式< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”),structField(“颜色”,“字符串”),structField(“水果”,“字符串”))df< -createDataFrame(数据=数据,模式=模式)dfSelect< -选择(x=df,上校=列表(“id”,“颜色”))显示(dfSelect)#输出:## + - - - + - - - +# | | | id颜色# + - - - + - - - +# | 1 | |# | 2 | |紫色#黄色| | 3 |# + - - - + - - - +

您可以组合选择和过滤查询来限制返回的行和列,就像下面的例子:

数据< -列表(列表(1 l,“雷蒙德”,“绿色”,“苹果”),列表(2 l,“洛雷塔”,“紫色”,“葡萄”),列表(3 l,“韦恩。”,“黄色”,“香蕉”))模式< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”),structField(“颜色”,“字符串”),structField(“水果”,“字符串”))df< -createDataFrame(数据=数据,模式=模式)dfFilterSelect< -过滤器(x=选择(x=df,上校=列表(“id”,“水果”)),条件=df美元id>1)显示(dfFilterSelect)#输出:## + - - - + - - - +# | | |水果id# + - - - + - - - +# | 2 | |葡萄# | | 3 |香蕉# + - - - + - - - +

节省SparkDataFrame表

砖使用三角洲湖默认所有表。您可以保存的内容SparkDataFrame砖通过调用一个表saveAsTable功能,如以下示例:

#表名< - < catalog_name >。< schema_name >。< table_name > "的表< -“main.default.purchasehistory”数据< -列表(列表(1 l,“雷蒙德”,“绿色”,“苹果”),列表(2 l,“洛雷塔”,“紫色”,“葡萄”),列表(3 l,“韦恩。”,“黄色”,“香蕉”))模式< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”),structField(“颜色”,“字符串”),structField(“水果”,“字符串”))df< -createDataFrame(数据=数据,模式=模式)saveAsTable(df=df,的表=的表)#确认表已成功保存#显示表的内容。显示(sql(paste0(“SELECT * FROM”,的表)))#输出:## + - - - + - - - - - - - - - - - - - - - - - - - + +# | | id名称| | |水果颜色# + - - - + - - - - - - - - - - - - - - - - - - - + +# | 1 |雷蒙德绿色苹果| | |# | 2 | Loretta |紫色葡萄| |韦恩# | 3 | | | |黄色香蕉# + - - - + - - - - - - - - - - - - - - - - - - - + +

写一个SparkDataFrame文件的集合

大多数火花应用程序被设计成工作在大型数据集和工作在一个分布式的方式,并引发写出一个目录的文件,而不是单个文件。许多数据系统配置为读取这些文件的目录。砖推荐使用的表,而不是文件路径对于大多数应用程序。

下面的示例调用write.json函数将一个表的内容保存到一个目录中。其他文件格式,请参阅:

参见:

#表名< - < catalog_name >。< schema_name >。< table_name > "的表< -“main.default.diamonds”writeToPath< -paste0(“/ tmp /”,的表)df< -tableToDF(的表)write.json(x=df,路径=writeToPath)#验证JSON成功写的#显示SparkDataFrame JSON文件的内容。dfJSON< -read.json(writeToPath)显示(dfJSON)#输出:## + - - - - - - - - - - - - - - - - - - +……# |克拉削减| |…# + - - - - - - - - - - - - - - - - - - +……理想0.23 # | | |……溢价0.21 # | | |……#……

运行SQL查询

SparkDataFrames提供许多选项将SQL与R。

selectExpr功能使您可以指定每一列作为一个SQL查询,比如下面的例子:

数据< -列表(列表(1 l,“雷蒙德”),列表(2 l,“洛雷塔”),列表(3 l,“韦恩。”))模式< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”))df< -createDataFrame(数据=数据,模式=模式)dfSelectExpr< -selectExpr(x=df,expr=”(id + 100)为id”,“(上层(名字))名称”)显示(dfSelectExpr)#输出:## + - - - + - - - - - - - +# | | | id名称# + - - - + - - - - - - - +# | 101 |雷蒙德|# | 102 | LORETTA |# | 103 | |# + - - - + - - - - - - - +

expr功能使您能够使用SQL语法一列将被指定的任何地方,如以下示例:

数据< -列表(列表(1 l,“雷蒙德”),列表(2 l,“洛雷塔”),列表(3 l,“韦恩。”))模式< -structType(structField(“id”,“整数”),structField(“名称”,“字符串”))df< -createDataFrame(数据=数据,模式=模式)dfSelect< -选择(x=df,上校=列表(expr(“像id (id * 2)”),expr(”(低(名称))名称”)))显示(dfSelect)#输出:## + - - - + - - - - - - - +# | | | id名称# + - - - + - - - - - - - +# | 2 |雷蒙德|# | 4 | loretta |# | 6 | |# + - - - + - - - - - - - +

你也可以调用sql函数运行任意SQL查询,如以下示例:

df=sql(“选择克拉,切割、颜色清晰main.default.diamonds清晰= VVS2限制2”)显示(df)#输出:## + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - +# |克拉| |清晰| |颜色# + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - +0.24 # | |很好| J | VVS2 |0.23 # | |很好| G | VVS2 |# + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - +

您可以使用字符串操作参数化SQL查询,如以下示例:

columns_list< -“克拉、切割、颜色清晰”table_name< -“main.default.diamonds”column_name< -“清晰”column_value< -“VVS2”row_limit< -2df< -sql(paste0(“选择”,columns_list,“从”,table_name,”,“,column_name,“=”,column_value,“限制”,row_limit))显示(df)#输出:## + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - +# |克拉| |清晰| |颜色# + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - +0.24 # | |很好| J | VVS2 |0.23 # | |很好| G | VVS2 |# + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - +

下一个步骤