pyspark創建DataFrame的幾種方法
pyspark創建DataFrame
為瞭便於操作,使用pyspark時我們通常將數據轉為DataFrame的形式來完成清洗和分析動作。
RDD和DataFrame
在上一篇pyspark基本操作有提到RDD也是spark中的操作的分佈式數據對象。
這裡簡單看一下RDD和DataFrame的類型。
print(type(rdd)) # <class 'pyspark.rdd.RDD'> print(type(df)) # <class 'pyspark.sql.dataframe.DataFrame'>
翻閱瞭一下源碼的定義,可以看到他們之間並沒有繼承關系。
class RDD(object): """ A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. """
class DataFrame(object): """A distributed collection of data grouped into named columns. A :class:`DataFrame` is equivalent to a relational table in Spark SQL, and can be created using various functions in :class:`SparkSession`:: ... """
RDD是一種彈性分佈式數據集,Spark中的基本抽象。表示一種不可變的、分區儲存的集合,可以進行並行操作。
DataFrame是一種以列對數據進行分組表達的分佈式集合, DataFrame等同於Spark SQL中的關系表。相同點是,他們都是為瞭支持分佈式計算而設計。
但是RDD隻是元素的集合,但是DataFrame以列進行分組,類似於MySQL的表或pandas中的DataFrame。
實際工作中,我們用的更多的還是DataFrame。
使用二元組創建DataFrame
嘗試第一種情形發現,僅僅傳入二元組,結果是沒有列名稱的。
於是我們嘗試第二種,同時傳入二元組和列名稱。
a = [('Alice', 1)] output = spark.createDataFrame(a).collect() print(output) # [Row(_1='Alice', _2=1)] output = spark.createDataFrame(a, ['name', 'age']).collect() print(output) # [Row(name='Alice', age=1)]
這裡collect()是按行展示數據表,也可以使用show()對數據表進行展示。
spark.createDataFrame(a).show() # +-----+---+ # | _1| _2| # +-----+---+ # |Alice| 1| # +-----+---+ spark.createDataFrame(a, ['name', 'age']).show() # +-----+---+ # | name|age| # +-----+---+ # |Alice| 1| # +-----+---+
使用鍵值對創建DataFrame
d = [{'name': 'Alice', 'age': 1}] output = spark.createDataFrame(d).collect() print(output) # [Row(age=1, name='Alice')]
使用rdd創建DataFrame
a = [('Alice', 1)] rdd = sc.parallelize(a) output = spark.createDataFrame(rdd).collect() print(output) output = spark.createDataFrame(rdd, ["name", "age"]).collect() print(output) # [Row(_1='Alice', _2=1)] # [Row(name='Alice', age=1)]
基於rdd和ROW創建DataFrame
from pyspark.sql import Row a = [('Alice', 1)] rdd = sc.parallelize(a) Person = Row("name", "age") person = rdd.map(lambda r: Person(*r)) output = spark.createDataFrame(person).collect() print(output) # [Row(name='Alice', age=1)]
基於rdd和StructType創建DataFrame
from pyspark.sql.types import * a = [('Alice', 1)] rdd = sc.parallelize(a) schema = StructType( [ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ] ) output = spark.createDataFrame(rdd, schema).collect() print(output) # [Row(name='Alice', age=1)]
基於pandas DataFrame創建pyspark DataFrame
df.toPandas()可以把pyspark DataFrame轉換為pandas DataFrame。
df = spark.createDataFrame(rdd, ['name', 'age']) print(df) # DataFrame[name: string, age: bigint] print(type(df.toPandas())) # <class 'pandas.core.frame.DataFrame'> # 傳入pandas DataFrame output = spark.createDataFrame(df.toPandas()).collect() print(output) # [Row(name='Alice', age=1)]
創建有序的DataFrame
output = spark.range(1, 7, 2).collect() print(output) # [Row(id=1), Row(id=3), Row(id=5)] output = spark.range(3).collect() print(output) # [Row(id=0), Row(id=1), Row(id=2)]
通過臨時表得到DataFrame
spark.registerDataFrameAsTable(df, "table1") df2 = spark.table("table1") b = df.collect() == df2.collect() print(b) # True
配置DataFrame和臨時表
創建DataFrame時指定列類型
在createDataFrame中可以指定列類型,隻保留滿足數據類型的列,如果沒有滿足的列,會拋出錯誤。
a = [('Alice', 1)] rdd = sc.parallelize(a) # 指定類型於預期數據對應時,正常創建 output = spark.createDataFrame(rdd, "a: string, b: int").collect() print(output) # [Row(a='Alice', b=1)] rdd = rdd.map(lambda row: row[1]) print(rdd) # PythonRDD[7] at RDD at PythonRDD.scala:53 # 隻有int類型對應上,過濾掉其他列。 output = spark.createDataFrame(rdd, "int").collect() print(output) # [Row(value=1)] # 沒有列能對應上,會拋出錯誤。 output = spark.createDataFrame(rdd, "boolean").collect() # TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>
註冊DataFrame為臨時表
spark.registerDataFrameAsTable(df, "table1") spark.dropTempTable("table1")
獲取和修改配置
print(spark.getConf("spark.sql.shuffle.partitions")) # 200 print(spark.getConf("spark.sql.shuffle.partitions", u"10")) # 10 print(spark.setConf("spark.sql.shuffle.partitions", u"50")) # None print(spark.getConf("spark.sql.shuffle.partitions", u"10")) # 50
註冊自定義函數
spark.registerFunction("stringLengthString", lambda x: len(x)) output = spark.sql("SELECT stringLengthString('test')").collect() print(output) # [Row(stringLengthString(test)='4')] spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType()) output = spark.sql("SELECT stringLengthString('test')").collect() print(output) # [Row(stringLengthString(test)=4)] spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) output = spark.sql("SELECT stringLengthInt('test')").collect() print(output) # [Row(stringLengthInt(test)=4)]
查看臨時表列表
可以查看所有臨時表名稱和對象。
spark.registerDataFrameAsTable(df, "table1") print(spark.tableNames()) # ['table1'] print(spark.tables()) # DataFrame[database: string, tableName: string, isTemporary: boolean] print("table1" in spark.tableNames()) # True print("table1" in spark.tableNames("default")) # True spark.registerDataFrameAsTable(df, "table1") df2 = spark.tables() df2.filter("tableName = 'table1'").first() print(df2) # DataFrame[database: string, tableName: string, isTemporary: boolean]
從其他數據源創建DataFrame
MySQL
前提是需要下載jar包。
Mysql-connector-java.jar
from pyspark import SparkContext from pyspark.sql import SQLContext import pyspark.sql.functions as F sc = SparkContext("local", appName="mysqltest") sqlContext = SQLContext(sc) df = sqlContext.read.format("jdbc").options( url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&" "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&" "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load() df.show(n=5) sc.stop()
參考
RDD和DataFrame的區別
spark官方文檔 翻譯 之pyspark.sql.SQLContext
到此這篇關於pyspark創建DataFrame的幾種方法的文章就介紹到這瞭,更多相關pyspark創建DataFrame 內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- pyspark對Mysql數據庫進行讀寫的實現
- 創建SparkSession和sparkSQL的詳細過程
- spark dataframe全局排序id與分組後保留最大值行
- Python安裝spark的詳細過程
- SparkSQl簡介及運行原理