將 Pandas DataFrame 轉換為 Spark DataFrame
-
使用
createDataFrame()
函式將 Pandas DataFrame 轉換為 Spark DataFrame -
使用
createDataFrame()
和schema
函式將 Pandas DataFrame 轉換為 Spark DataFrame -
使用啟用
apache arrow
的createDataFrame()
函式將 Pandas DataFrame 轉換為 Spark DataFrame
DataFrame 是二維可變資料結構。DataFrame 中的資料儲存在稱為行和列的標記軸中。
Pandas 和 Spark 都有 DataFrame。本教程將討論將 Pandas DataFrame 轉換為 Spark DataFrame 的不同方法。
使用 createDataFrame()
函式將 Pandas DataFrame 轉換為 Spark DataFrame
createDataFrame()
函式用於從 RDD 或 pandas.DataFrame
建立 Spark DataFrame。createDataFrame()
將資料和方案作為引數。
我們將很快討論方案。
createDataFrame()
的語法:
createDataFrame(data, schema=None)
引數:
data
= 要傳遞的 DataFrameschema
=str
或列表,可選
返回:DataFrame
方法:
- 匯入
pandas
庫並使用DataFrame()
方法建立一個 Pandas DataFrame。 - 通過從
pyspark
庫中匯入SparkSession
建立spark
會話。 - 將 Pandas DataFrame 傳遞給
SparkSession
物件的createDataFrame()
方法。 - 列印 DataFrame。
以下程式碼使用 createDataFrame()
函式將 Pandas DataFrame 轉換為 Spark DataFrame。
# import the pandas
import pandas as pd
# from pyspark library import sql
from pyspark import sql
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame({'Course': ['Python', 'Spark', 'Java', 'JavaScript', 'C#'],
'Mentor': ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
'price$': [199, 299, 99, 250, 399]})
# Converting the pandas dataframe in to spark dataframe
spark_DataFrame = spark_session.createDataFrame(data)
#printing the dataframe
spark_DataFrame.show()
輸出:
+----------+---------+------+
| Course| Mentor|price$|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+
使用 createDataFrame()
和 schema
函式將 Pandas DataFrame 轉換為 Spark DataFrame
我們在前面的示例中討論了 createDataFrame()
方法。現在我們將看到如何在轉換 DataFrame 時更改 schema。
此示例將使用模式更改列名,將 Course
更改為 Technology
,將 Mentor
更改為 developer
,將 price
更改為 Salary
。
schema:
schema 定義欄位名稱及其資料型別。在 Spark 中,schema 是 DataFrame 的結構,DataFrame 的 schema 可以使用 StructType
類來定義,它是 StructField
的集合。
StructField
採用欄位或列的名稱、資料型別和可為空的。可空引數定義該欄位是否可以為空。
方法:
- 匯入
pandas
庫並使用DataFrame()
方法建立一個 Pandas DataFrame。 - 通過從
pyspark
庫中匯入SparkSession
建立Spark
會話。 - 通過將
StructField
的集合傳遞給StructType
類來建立模式;StructField
物件是通過傳遞欄位的名稱、資料型別和可為空來建立的。 - 將 Pandas DataFrame 和模式傳遞給
SparkSession
物件的createDataFrame()
方法。 - 列印 DataFrame。
以下程式碼使用 createDataFrame()
和 schema
將 Pandas DataFrame 轉換為 Spark DataFrame。
# import the pandas
import pandas as pd
# from pyspark library import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame({'Course': ['Python', 'Spark', 'Java', 'JavaScript', 'C#'],
'Mentor': ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
'price$': [199, 299, 99, 250, 399]})
#Creating/Changing schema
dfSchema = StructType([ StructField("Technology", StringType(), True), StructField("developer", StringType(), True), StructField("Salary", IntegerType(), True)])
# Converting the pandas dataframe in to spark dataframe
spark_DataFrame = spark_session.createDataFrame(data, schema=dfSchema)
#printing the dataframe
spark_DataFrame.show()
輸出:
+----------+---------+------+
|Technology|developer|Salary|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+
使用啟用 apache arrow
的 createDataFrame()
函式將 Pandas DataFrame 轉換為 Spark DataFrame
Apache Arrow 是一種獨立於語言的列式記憶體格式,用於平面和分層資料或任何結構化資料格式。Apache Arrow 通過建立標準的列式記憶體格式提高了資料分析的效率。
預設情況下禁用 apache 箭頭;我們可以使用以下程式碼顯式啟用它。
SparkSession.conf.set("spark.sql.execution.arrow.enabled", "true")
方法:
- 匯入
pandas
庫並使用DataFrame()
方法建立一個 Pandas DataFrame。 - 通過從
pyspark
庫中匯入SparkSession
建立spark
會話。 - 使用
conf
屬性啟用 apache 箭頭。 - 將 Pandas DataFrame 傳遞給
SparkSession
物件的createDataFrame()
方法。 - 列印 DataFrame。
以下程式碼通過啟用 apache 箭頭將 Pandas DataFrame 轉換為 Spark DataFrame 來使用 createDataFrame()
函式。
# import the pandas
import pandas as pd
# from pyspark library import sql
from pyspark import sql
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame({'Course': ['Python', 'Spark', 'Java', 'JavaScript', 'C#'],
'Mentor': ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
'price$': [199, 299, 99, 250, 399]})
spark_session.conf.set("spark.sql.execution.arrow.enabled", "true")
# Converting the pandas dataframe in to spark dataframe
sprak_arrow = spark_session.createDataFrame(data)
#printing the dataframe
sprak_arrow.show()
輸出:
+----------+---------+------+
| Course| Mentor|price$|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+