开始
加载和管理数据
处理数据
政府
参考和资源
2023年2月17日更新
给我们反馈
本文向您展示如何使用Databricks中的Apache Spark Python (PySpark) DataFrame API加载和转换数据。
另请参阅Apache Spark PySpark API参考.
DataFrame是一种二维标记数据结构,其列类型可能不同。您可以将DataFrame视为电子表格、SQL表或系列对象的字典。Apache Spark DataFrames提供了一组丰富的函数(选择列、过滤、连接、聚合),允许您高效地解决常见的数据分析问题。
Apache Spark dataframe是建立在弹性分布式数据集(rdd)之上的抽象。Spark DataFrames和Spark SQL使用统一的规划和优化引擎,允许您在Databricks上的所有支持语言(Python, SQL, Scala和R)上获得几乎相同的性能。
大多数Apache Spark查询返回一个DataFrame。这包括从表中读取数据、从文件中加载数据以及转换数据的操作。
你也可以从一个列表或者pandas DataFrame中创建一个Spark DataFrame,如下例所示:
进口熊猫作为pd数据=[[1,“伊利亚”],[2,“Teo”],[3.,“方”]]pdf=pd.DataFrame(数据,列=[“id”,“名称”])df1=火花.createDataFrame(pdf)df2=火花.createDataFrame(数据,模式=“id LONG, name STRING”)
Databricks默认对所有表使用Delta Lake。你可以很容易地将表加载到dataframe中,如下例所示:
火花.读.表格(“< catalog_name >, < schema_name >。< table_name >”)
您可以从许多支持的加载数据文件格式.中可用的数据集/ databricks-datasets目录,可从大多数工作空间访问。看到样本数据集.
/ databricks-datasets
df=(火花.读.格式(“csv”).选项(“头”,“真正的”).选项(“inferSchema”,“真正的”).负载(“/ databricks-datasets /样本/ population-vs-price / data_geo.csv”))
大多数Spark转换的结果返回一个DataFrame。您可以将这些结果分配回DataFrame变量,类似于在其他系统中使用cte、临时视图或DataFrame的方式。
dataframe使用标准SQL语义进行连接操作。连接根据提供的匹配条件和连接类型返回两个dataframe的组合结果。下面的例子是一个内部连接,这是默认的:
joined_df=df1.加入(df2,如何=“内心”,在=“id”)
您可以使用union操作将一个DataFrame的行添加到另一个DataFrame中,示例如下:
unioned_df=df1.联盟(df2)
您可以使用.filter ()或其中().在性能或语法上没有区别,如下例所示:
.filter ()
其中()
filtered_df=df.过滤器("id > 1")filtered_df=df.在哪里("id > 1")
使用筛选来选择数据帧中要返回或修改的行子集。
您可以通过传递一个或多个列名来选择列.select (),如下例所示:
.select ()
select_df=df.选择(“id”,“名称”)
您可以结合选择和筛选查询来限制返回的行和列。
subset_df=df.过滤器("id > 1").选择(“名称”)
要以表格格式查看这些数据,可以使用Databricks显示()命令,示例如下:
显示()
显示(df)
Spark使用了这个术语模式来引用DataFrame中列的名称和数据类型。
请注意
Databricks还使用术语模式来描述注册到目录的表的集合。
方法打印模式.printSchema ()方法,示例如下:
.printSchema ()
df.printSchema()
Databricks默认对所有表使用Delta Lake。你可以使用以下语法将一个DataFrame的内容保存到一个表中:
df.写.saveAsTable(“< table_name >”)
大多数Spark应用程序都被设计为处理大型数据集,并以分布式方式工作,Spark会写出一个文件目录,而不是一个文件。许多数据系统被配置为读取这些文件目录。Databricks建议大多数应用程序使用表而不是文件路径。
下面的例子保存了一个JSON文件目录:
df.写.格式(“json”).保存(“/ tmp / json_data”)
Spark DataFrames提供了许多选项来将SQL与Python结合起来。
的selectExpr ()方法允许你将每一列指定为一个SQL查询,如下例所示:
selectExpr ()
显示(df.selectExpr(“id”,"upper(name) as big_name"))
您可以导入expr ()函数pyspark.sql.functions在指定列的任何地方使用SQL语法,如下例所示:
expr ()
pyspark.sql.functions
从pyspark.sql.functions进口expr显示(df.选择(“id”,expr("lower(name) as little_name")))
你也可以使用spark.sql ()在Python内核中运行任意SQL查询,如下例所示:
spark.sql ()
query_df=火花.sql(SELECT * FROM ")
因为逻辑是在Python内核中执行的,所有SQL查询都是作为字符串传递的,所以你可以使用Python格式化来参数化SQL查询,如下面的例子所示:
table_name=“my_table”query_df=火花.sql(fselect * from{table_name}")