SparkSQL使用快速入門
一、SparkSQL的進化之路
1.0以前: Shark
1.1.x開始:SparkSQL(隻是測試性的) SQL
1.3.x: SparkSQL(正式版本)+Dataframe
1.5.x: SparkSQL 鎢絲計劃
1.6.x: SparkSQL+DataFrame+DataSet(測試版本)
2.x:
- SparkSQL+DataFrame+DataSet(正式版本)
- SparkSQL:還有其他的優化
- StructuredStreaming(DataSet)
Spark on Hive和Hive on Spark
- Spark on Hive:Hive隻作為儲存角色,Spark負責sql解析優化,執行。
- Hive on Spark:Hive即作為存儲又負責sql的解析優化,Spark負責執行。
二、認識SparkSQL
2.1 什麼是SparkSQL?
spark SQL是spark的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象就是DataFrame。
2.2 SparkSQL的作用
提供一個編程抽象(DataFrame) 並且作為分佈式 SQL查詢引擎
DataFrame:它可以根據很多源進行構建,包括:結構化的數據文件,hive中的表,外部的關系型數據庫,以及RDD
2.3 運行原理
將Spark SQL轉化為RDD,然後提交到集群執行
2.4 特點
(1)容易整合
(2)統一的數據訪問方式
(3)兼容 Hive
(4)標準的數據連接
2.5 SparkSession
SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供瞭統一的切入點,來讓用戶學習spark的各項功能。
在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們通過sparkcontext來創建和操作RDD。對於每個其他的API,我們需要使用不同的context。例如,對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點,SparkSession封裝瞭SparkConf、SparkContext和SQLContext。為瞭向後兼容,SQLContext和HiveContext也被保存下來。
SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝瞭sparkContext,所以計算實際上是由sparkContext完成的。
特點:
—- 為用戶提供一個統一的切入點使用Spark 各項功能
—- 允許用戶通過它調用 DataFrame 和 Dataset 相關 API 來編寫程序
—- 減少瞭用戶需要瞭解的一些概念,可以很容易的與 Spark 進行交互
—- 與 Spark 交互之時不需要顯示的創建 SparkConf, SparkContext 以及 SQlContext,這些對象已經封閉在 SparkSession 中
2.6 DataFrames
在Spark中,DataFrame是一種以RDD為基礎的分佈式數據集,類似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背後的數據源以及作用於DataFrame之上的變換進行瞭針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由於無從得知所存數據元素的具體內部結構,Spark Core隻能在stage層面進行簡單、通用的流水線優化。
三、RDD轉換成為DataFrame
使用spark1.x版本的方式
測試數據目錄:spark/examples/src/main/resources(spark的安裝目錄裡面)
people.txt
3.1通過case class創建DataFrames(反射)
//定義case class,相當於表結構 case class People(var name:String,var age:Int) object TestDataFrame1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local") val sc = new SparkContext(conf) val context = new SQLContext(sc) // 將本地的數據讀入 RDD, 並將 RDD 與 case class 關聯 val peopleRDD = sc.textFile("E:\\666\\people.txt") .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt)) import context.implicits._ // 將RDD 轉換成 DataFrames val df = peopleRDD.toDF //將DataFrames創建成一個臨時的視圖 df.createOrReplaceTempView("people") //使用SQL語句進行查詢 context.sql("select * from people").show() } }
運行結果
3.2通過structType創建DataFrames(編程接口)
object TestDataFrame2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val fileRDD = sc.textFile("E:\\666\\people.txt") // 將 RDD 數據映射成 Row,需要 import org.apache.spark.sql.Row val rowRDD: RDD[Row] = fileRDD.map(line => { val fields = line.split(",") Row(fields(0), fields(1).trim.toInt) }) // 創建 StructType 來定義結構 val structType: StructType = StructType( //字段名,字段類型,是否可以為空 StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil ) /** * rows: java.util.List[Row], * schema: StructType * */ val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType) df.createOrReplaceTempView("people") sqlContext.sql("select * from people").show() } }
運行結果
3.3通過 json 文件創建DataFrames
object TestDataFrame3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df: DataFrame = sqlContext.read.json("E:\\666\\people.json") df.createOrReplaceTempView("people") sqlContext.sql("select * from people").show() } }
四、DataFrame的read和save和savemode
4.1 數據的讀取
object TestRead { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //方式一 val df1 = sqlContext.read.json("E:\\666\\people.json") val df2 = sqlContext.read.parquet("E:\\666\\users.parquet") //方式二 val df3 = sqlContext.read.format("json").load("E:\\666\\people.json") val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet") //方式三,默認是parquet格式 val df5 = sqlContext.load("E:\\666\\users.parquet") } }
4.2 數據的保存
object TestSave { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df1 = sqlContext.read.json("E:\\666\\people.json") //方式一 df1.write.json("E:\\111") df1.write.parquet("E:\\222") //方式二 df1.write.format("json").save("E:\\333") df1.write.format("parquet").save("E:\\444") //方式三 df1.write.save("E:\\555") } }
4.3 數據的保存模式
使用mode
df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")
五、數據源
5.1 數據源隻json
參考4.1
5.2 數據源之parquet
參考4.1
5.3 數據源之Mysql
object TestMysql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestMysql").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val url = "jdbc:mysql://192.168.123.102:3306/hivedb" val table = "dbs" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","root") //需要傳入Mysql的URL、表明、properties(連接數據庫的用戶名密碼) val df = sqlContext.read.jdbc(url,table,properties) df.createOrReplaceTempView("dbs") sqlContext.sql("select * from dbs").show() } }
運行結果
5.3 數據源之Hive
(1)準備工作
在pom.xml文件中添加依賴
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.3.0</version> </dependency>
開發環境則把resource文件夾下添加hive-site.xml文件,集群環境把hive的配置文件要發到$SPARK_HOME/conf目錄下
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> <!-- 如果 mysql 和 hive 在同一個服務器節點,那麼請更改 hadoop02 為 localhost --> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> <description>hive default warehouse, if nessecory, change it</description> </property> </configuration>
(2)測試代碼
object TestHive { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) sqlContext.sql("select * from myhive.student").show() } }
運行結果
六、SparkSQL 的元數據
1.1元數據的狀態
SparkSQL 的元數據的狀態有兩種:
1、in_memory,用完瞭元數據也就丟瞭
2、hive , 通過hive去保存的,也就是說,hive的元數據存在哪兒,它的元數據也就存在哪兒。
換句話說,SparkSQL的數據倉庫在建立在Hive之上實現的。我們要用SparkSQL去構建數據倉庫的時候,必須依賴於Hive。
2.2Spark-SQL腳本
如果用戶直接運行bin/spark-sql命令。會導致我們的元數據有兩種狀態:
1、in-memory狀態:如果SPARK-HOME/conf目錄下沒有放置hive-site.xml文件,元數據的狀態就是in-memory
2、hive狀態:如果我們在SPARK-HOME/conf目錄下放置瞭,hive-site.xml文件,那麼默認情況下,spark-sql的元數據的狀態就是hive.
到此這篇關於SparkSQL使用快速入門的文章就介紹到這瞭,更多相關SparkSQL使用內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- SparkSQl簡介及運行原理
- 創建SparkSession和sparkSQL的詳細過程
- SparkSQL使用IDEA快速入門DataFrame與DataSet的完美教程
- Spark SQL 2.4.8 操作 Dataframe的兩種方式
- pyspark對Mysql數據庫進行讀寫的實現