将 Pandas DataFrame 转换为 Spark DataFrame
-
使用
createDataFrame()
函数将 Pandas DataFrame 转换为 Spark DataFrame -
使用
createDataFrame()
和schema
函数将 Pandas DataFrame 转换为 Spark DataFrame -
使用启用
apache arrow
的createDataFrame()
函数将 Pandas DataFrame 转换为 Spark DataFrame
DataFrame 是二维可变数据结构。DataFrame 中的数据存储在称为行和列的标记轴中。
Pandas 和 Spark 都有 DataFrame。本教程将讨论将 Pandas DataFrame 转换为 Spark DataFrame 的不同方法。
使用 createDataFrame()
函数将 Pandas DataFrame 转换为 Spark DataFrame
createDataFrame()
函数用于从 RDD 或 pandas.DataFrame
创建 Spark DataFrame。createDataFrame()
将数据和方案作为参数。
我们将很快讨论方案。
createDataFrame()
的语法:
createDataFrame(data, schema=None)
参数:
data
= 要传递的 DataFrameschema
=str
或列表,可选
返回:DataFrame
方法:
- 导入
pandas
库并使用DataFrame()
方法创建一个 Pandas DataFrame。 - 通过从
pyspark
库中导入SparkSession
创建spark
会话。 - 将 Pandas DataFrame 传递给
SparkSession
对象的createDataFrame()
方法。 - 打印 DataFrame。
以下代码使用 createDataFrame()
函数将 Pandas DataFrame 转换为 Spark DataFrame。
# import the pandas
import pandas as pd
# from pyspark library import sql
from pyspark import sql
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame({'Course': ['Python', 'Spark', 'Java', 'JavaScript', 'C#'],
'Mentor': ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
'price$': [199, 299, 99, 250, 399]})
# Converting the pandas dataframe in to spark dataframe
spark_DataFrame = spark_session.createDataFrame(data)
#printing the dataframe
spark_DataFrame.show()
输出:
+----------+---------+------+
| Course| Mentor|price$|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+
使用 createDataFrame()
和 schema
函数将 Pandas DataFrame 转换为 Spark DataFrame
我们在前面的示例中讨论了 createDataFrame()
方法。现在我们将看到如何在转换 DataFrame 时更改 schema。
此示例将使用模式更改列名,将 Course
更改为 Technology
,将 Mentor
更改为 developer
,将 price
更改为 Salary
。
schema:
schema 定义字段名称及其数据类型。在 Spark 中,schema 是 DataFrame 的结构,DataFrame 的 schema 可以使用 StructType
类来定义,它是 StructField
的集合。
StructField
采用字段或列的名称、数据类型和可为空的。可空参数定义该字段是否可以为空。
方法:
- 导入
pandas
库并使用DataFrame()
方法创建一个 Pandas DataFrame。 - 通过从
pyspark
库中导入SparkSession
创建Spark
会话。 - 通过将
StructField
的集合传递给StructType
类来创建模式;StructField
对象是通过传递字段的名称、数据类型和可为空来创建的。 - 将 Pandas DataFrame 和模式传递给
SparkSession
对象的createDataFrame()
方法。 - 打印 DataFrame。
以下代码使用 createDataFrame()
和 schema
将 Pandas DataFrame 转换为 Spark DataFrame。
# import the pandas
import pandas as pd
# from pyspark library import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame({'Course': ['Python', 'Spark', 'Java', 'JavaScript', 'C#'],
'Mentor': ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
'price$': [199, 299, 99, 250, 399]})
#Creating/Changing schema
dfSchema = StructType([ StructField("Technology", StringType(), True), StructField("developer", StringType(), True), StructField("Salary", IntegerType(), True)])
# Converting the pandas dataframe in to spark dataframe
spark_DataFrame = spark_session.createDataFrame(data, schema=dfSchema)
#printing the dataframe
spark_DataFrame.show()
输出:
+----------+---------+------+
|Technology|developer|Salary|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+
使用启用 apache arrow
的 createDataFrame()
函数将 Pandas DataFrame 转换为 Spark DataFrame
Apache Arrow 是一种独立于语言的列式内存格式,用于平面和分层数据或任何结构化数据格式。Apache Arrow 通过创建标准的列式内存格式提高了数据分析的效率。
默认情况下禁用 apache 箭头;我们可以使用以下代码显式启用它。
SparkSession.conf.set("spark.sql.execution.arrow.enabled", "true")
方法:
- 导入
pandas
库并使用DataFrame()
方法创建一个 Pandas DataFrame。 - 通过从
pyspark
库中导入SparkSession
创建spark
会话。 - 使用
conf
属性启用 apache 箭头。 - 将 Pandas DataFrame 传递给
SparkSession
对象的createDataFrame()
方法。 - 打印 DataFrame。
以下代码通过启用 apache 箭头将 Pandas DataFrame 转换为 Spark DataFrame 来使用 createDataFrame()
函数。
# import the pandas
import pandas as pd
# from pyspark library import sql
from pyspark import sql
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame({'Course': ['Python', 'Spark', 'Java', 'JavaScript', 'C#'],
'Mentor': ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
'price$': [199, 299, 99, 250, 399]})
spark_session.conf.set("spark.sql.execution.arrow.enabled", "true")
# Converting the pandas dataframe in to spark dataframe
sprak_arrow = spark_session.createDataFrame(data)
#printing the dataframe
sprak_arrow.show()
输出:
+----------+---------+------+
| Course| Mentor|price$|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+