Spark 系列(十)—— Spark SQL 外部資料來源
一、簡介
1.1 多資料來源支援
Spark 支援以下六個核心資料來源,同時 Spark 社群還提供了多達上百種資料來源的讀取方式,能夠滿足絕大部分使用場景。
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
注:以下所有測試檔案均可從本倉庫的resources 目錄進行下載
1.2 讀資料格式
所有讀取 API 遵循以下呼叫格式:
// 格式
DataFrameReader.format(...).option("key","value").schema(...).load()
// 示例
spark.read.format("csv" )
.option("mode","FAILFAST") // 讀取模式
.option("inferSchema","true") // 是否自動推斷 schema
.option("path","path/to/file(s)") // 檔案路徑
.schema(someSchema) // 使用預定義的 schema
.load()
複製程式碼
讀取模式有以下三種可選項:
讀模式 | 描述 |
---|---|
permissive |
當遇到損壞的記錄時,將其所有欄位設定為 null,並將所有損壞的記錄放在名為 _corruption t_record 的字串列中 |
dropMalformed |
刪除格式不正確的行 |
failFast |
遇到格式不正確的資料時立即失敗 |
1.3 寫資料格式
// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
//示例
dataframe.write.format("csv")
.option("mode","OVERWRITE") //寫模式
.option("dateFormat","yyyy-MM-dd") //日期格式
.option("path","path/to/file(s)" )
.save()
複製程式碼
寫資料模式有以下四種可選項:
Scala/Java | 描述 |
---|---|
SaveMode.ErrorIfExists |
如果給定的路徑已經存在檔案,則丟擲異常,這是寫資料預設的模式 |
SaveMode.Append |
資料以追加的方式寫入 |
SaveMode.Overwrite |
資料以覆蓋的方式寫入 |
SaveMode.Ignore |
如果給定的路徑已經存在檔案,則不做任何操作 |
二、CSV
CSV 是一種常見的文字檔案格式,其中每一行表示一條記錄,記錄中的每個欄位用逗號分隔。
2.1 讀取CSV檔案
自動推斷型別讀取讀取示例:
spark.read.format("csv")
.option("header","false") // 檔案中的第一行是否為列的名稱
.option("mode","FAILFAST") // 是否快速失敗
.option("inferSchema","true") // 是否自動推斷 schema
.load("/usr/file/csv/dept.csv")
.show()
複製程式碼
使用預定義型別:
import org.apache.spark.sql.types.{StructField,StructType,StringType,LongType}
//預定義資料格式
val myManualSchema = new StructType(Array(
StructField("deptno",LongType,nullable = false),StructField("dname",nullable = true),StructField("loc",nullable = true)
))
spark.read.format("csv")
.option("mode","FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
複製程式碼
2.2 寫入CSV檔案
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
複製程式碼
也可以指定具體的分隔符:
df.write.format("csv").mode("overwrite").option("sep","\t").save("/tmp/csv/dept2")
複製程式碼
2.3 可選配置
為節省主文篇幅,所有讀寫配置項見文末 9.1 小節。
三、JSON
3.1 讀取JSON檔案
spark.read.format("json").option("mode","FAILFAST").load("/usr/file/json/dept.json").show(5)
複製程式碼
需要注意的是:預設不支援一條資料記錄跨越多行 (如下),可以通過配置 multiLine
為 true
來進行更改,其預設值為 false
。
// 預設支援單行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
//預設不支援多行
{
"DEPTNO": 10,"LOC": "NEW YORK"
}
複製程式碼
3.2 寫入JSON檔案
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
複製程式碼
3.3 可選配置
為節省主文篇幅,所有讀寫配置項見文末 9.2 小節。
四、Parquet
Parquet 是一個開源的面向列的資料儲存,它提供了多種儲存優化,允許讀取單獨的列非整個檔案,這不僅節省了儲存空間而且提升了讀取效率,它是 Spark 是預設的檔案格式。
4.1 讀取Parquet檔案
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
複製程式碼
2.2 寫入Parquet檔案
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
複製程式碼
2.3 可選配置
Parquet 檔案有著自己的儲存規則,因此其可選配置項比較少,常用的有如下兩個:
讀寫操作 | 配置項 | 可選值 | 預設值 | 描述 |
---|---|---|---|---|
Write | compression or codec | None, uncompressed, bzip2, deflate,gzip, lz4,or snappy |
None | 壓縮檔案格式 |
Read | mergeSchema | true,false | 取決於配置項 spark.sql.parquet.mergeSchema
|
當為真時,Parquet 資料來源將所有資料檔案收集的 Schema 合併在一起,否則將從摘要檔案中選擇 Schema,如果沒有可用的摘要檔案,則從隨機資料檔案中選擇 Schema。 |
更多可選配置可以參閱官方檔案:spark.apache.org/docs/latest…
五、ORC
ORC 是一種自描述的、型別感知的列檔案格式,它針對大型資料的讀寫進行了優化,也是大資料中常用的檔案格式。
5.1 讀取ORC檔案
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
複製程式碼
4.2 寫入ORC檔案
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
複製程式碼
六、SQL Databases
Spark 同樣支援與傳統的關係型資料庫進行資料讀寫。但是 Spark 程式預設是沒有提供資料庫驅動的,所以在使用前需要將對應的資料庫驅動上傳到安裝目錄下的 jars
目錄中。下面示例使用的是 Mysql 資料庫,使用前需要將對應的 mysql-connector-java-x.x.x.jar
上傳到 jars
目錄下。
6.1 讀取資料
讀取全表資料示例如下,這裡的 help_keyword
是 mysql 內建的字典表,只有 help_keyword_id
和 name
兩個欄位。
spark.read
.format("jdbc")
.option("driver","com.mysql.jdbc.Driver") //驅動
.option("url","jdbc:mysql://127.0.0.1:3306/mysql") //資料庫地址
.option("dbtable","help_keyword") //表名
.option("user","root").option("password","root").load().show(10)
複製程式碼
從查詢結果讀取資料:
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("dbtable",pushDownQuery)
.load().show()
//輸出
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 19| ASTEXT|
+---------------+-----------+
複製程式碼
也可以使用如下的寫法進行資料的過濾:
val props = new java.util.Properties
props.setProperty("driver","com.mysql.jdbc.Driver")
props.setProperty("user","root")
props.setProperty("password","root")
val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") //指定資料過濾條件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",predicates,props).show()
//輸出:
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 604| WHEN|
+---------------+-----------+
複製程式碼
可以使用 numPartitions
指定讀取資料的並行度:
option("numPartitions",10)
複製程式碼
在這裡,除了可以指定分割槽外,還可以設定上界和下界,任何小於下界的值都會被分配在第一個分割槽中,任何大於上界的值都會被分配在最後一個分割槽中。
val colName = "help_keyword_id" //用於判斷上下界的列
val lowerBound = 300L //下界
val upperBound = 500L //上界
val numPartitions = 10 //分割槽綜述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql",colName,lowerBound,upperBound,numPartitions,props)
複製程式碼
想要驗證分割槽內容,可以使用 mapPartitionsWithIndex
這個運算元,程式碼如下:
jdbcDf.rdd.mapPartitionsWithIndex((index,iterator) => {
val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "分割槽:" + iterator.next())
}
buffer.toIterator
}).foreach(println)
複製程式碼
執行結果如下:help_keyword
這張表只有 600 條左右的資料,本來資料應該均勻分佈在 10 個分割槽,但是 0 分割槽裡面卻有 319 條資料,這是因為設定了下限,所有小於 300 的資料都會被限制在第一個分割槽,即 0 分割槽。同理所有大於 500 的資料被分配在 9 分割槽,即最後一個分割槽。
6.2 寫入資料
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3306/mysql")
.option("user","emp")
.save()
複製程式碼
七、Text
Text 檔案在讀寫效能方面並沒有任何優勢,且不能表達明確的資料結構,所以其使用的比較少,讀寫操作如下:
7.1 讀取Text資料
spark.read.textFile("/usr/file/txt/dept.txt").show()
複製程式碼
7.2 寫入Text資料
df.write.text("/tmp/spark/txt/dept")
複製程式碼
八、資料讀寫高階特性
8.1 並行讀
多個 Executors 不能同時讀取同一個檔案,但它們可以同時讀取不同的檔案。這意味著當您從一個包含多個檔案的資料夾中讀取資料時,這些檔案中的每一個都將成為 DataFrame 中的一個分割槽,並由可用的 Executors 並行讀取。
8.2 並行寫
寫入的檔案或資料的數量取決於寫入資料時 DataFrame 擁有的分割槽數量。預設情況下,每個資料分割槽寫一個檔案。
8.3 分割槽寫入
分割槽和分桶這兩個概念和 Hive 中分割槽表和分桶表是一致的。都是將資料按照一定規則進行拆分儲存。需要注意的是 partitionBy
指定的分割槽和 RDD 中分割槽不是一個概念:這裡的分割槽表現為輸出目錄的子目錄,資料分別儲存在對應的子目錄中。
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
複製程式碼
輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出檔案。
8.3 分桶寫入
分桶寫入就是將資料按照指定的列和桶數進行雜湊,目前分桶寫入只支援儲存為表,實際上這就是 Hive 的分桶表。
val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets,columnToBucketBy).saveAsTable("bucketedFiles")
複製程式碼
8.5 檔案大小管理
如果寫入產生小檔案數量過多,這時會產生大量的元資料開銷。Spark 和 HDFS 一樣,都不能很好的處理這個問題,這被稱為“small file problem”。同時資料檔案也不能過大,否則在查詢時會有不必要的效能開銷,因此要把檔案大小控制在一個合理的範圍內。
在上文我們已經介紹過可以通過分割槽數量來控制生成檔案的數量,從而間接控制檔案大小。Spark 2.2 引入了一種新的方法,以更自動化的方式控制檔案大小,這就是 maxRecordsPerFile
引數,它允許你通過控制寫入檔案的記錄數來控制檔案大小。
// Spark 將確保檔案最多包含 5000 條記錄
df.write.option(“maxRecordsPerFile”,5000)
複製程式碼
九、可選配置附錄
9.1 CSV讀寫可選配置
讀\寫操作 | 配置項 | 可選值 | 預設值 | 描述 |
---|---|---|---|---|
Both | seq | 任意字元 |
, (逗號) |
分隔符 |
Both | header | true,false | false | 檔案中的第一行是否為列的名稱。 |
Read | escape | 任意字元 | \ | 轉義字元 |
Read | inferSchema | true,false | false | 是否自動推斷列型別 |
Read | ignoreLeadingWhiteSpace | true,false | false | 是否跳過值前面的空格 |
Both | ignoreTrailingWhiteSpace | true,false | false | 是否跳過值後面的空格 |
Both | nullValue | 任意字元 | “” | 宣告檔案中哪個字元表示空值 |
Both | nanValue | 任意字元 | NaN | 宣告哪個值表示 NaN 或者預設值 |
Both | positiveInf | 任意字元 | Inf | 正無窮 |
Both | negativeInf | 任意字元 | -Inf | 負無窮 |
Both | compression or codec | None,deflate, gzip,lz4,or snappy |
none | 檔案壓縮格式 |
Both | dateFormat | 任何能轉換為 Java 的 SimpleDataFormat 的字串 |
yyyy-MM-dd | 日期格式 |
Both | timestampFormat | 任何能轉換為 Java 的 SimpleDataFormat 的字串 |
yyyy-MMdd’T’HH:mm:ss.SSSZZ | 時間戳格式 |
Read | maxColumns | 任意整數 | 20480 | 宣告檔案中的最大列數 |
Read | maxCharsPerColumn | 任意整數 | 1000000 | 宣告一個列中的最大字元數。 |
Read | escapeQuotes | true,false | true | 是否應該轉義行中的引號。 |
Read | maxMalformedLogPerPartition | 任意整數 | 10 | 宣告每個分割槽中最多允許多少條格式錯誤的資料,超過這個值後格式錯誤的資料將不會被讀取 |
Write | quoteAll | true,false | false | 指定是否應該將所有值都括在引號中,而不只是轉義具有引號字元的值。 |
Read | multiLine | true,false | false | 是否允許每條完整記錄跨域多行 |
9.2 JSON讀寫可選配置
讀\寫操作 | 配置項 | 可選值 | 預設值 |
---|---|---|---|
Both | compression or codec | None,or snappy |
none |
Both | dateFormat | 任何能轉換為 Java 的 SimpleDataFormat 的字串 | yyyy-MM-dd |
Both | timestampFormat | 任何能轉換為 Java 的 SimpleDataFormat 的字串 | yyyy-MMdd’T’HH:mm:ss.SSSZZ |
Read | primitiveAsString | true,false | false |
Read | allowComments | true,false | false |
Read | allowUnquotedFieldNames | true,false | false |
Read | allowSingleQuotes | true,false | true |
Read | allowNumericLeadingZeros | true,false | false |
Read | allowBackslashEscapingAnyCharacter | true,false | false |
Read | columnNameOfCorruptRecord | true,false | Value of spark.sql.column&NameOf |
Read | multiLine | true,false | false |
9.3 資料庫讀寫可選配置
屬性名稱 | 含義 |
---|---|
url | 資料庫地址 |
dbtable | 表名稱 |
driver | 資料庫驅動 |
partitionColumn, lowerBound,upperBoun |
分割槽總數,上界,下界 |
numPartitions | 可用於表讀寫並行性的最大分割槽數。如果要寫的分割槽數量超過這個限制,那麼可以呼叫 coalesce(numpartition) 重置分割槽數。 |
fetchsize | 每次往返要獲取多少行資料。此選項僅適用於讀取資料。 |
batchsize | 每次往返插入多少行資料,這個選項只適用於寫入資料。預設值是 1000。 |
isolationLevel | 事務隔離級別:可以是 NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即標準事務隔離級別。 預設值是 READ_UNCOMMITTED。這個選項只適用於資料讀取。 |
createTableOptions | 寫入資料時自定義建立表的相關配置 |
createTableColumnTypes | 寫入資料時自定義建立列的列型別 |
資料庫讀寫更多配置可以參閱官方檔案:spark.apache.org/docs/latest…
參考資料
- Matei Zaharia,Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
- spark.apache.org/docs/latest…
更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南