【spark 讀寫資料】資料來源的讀寫操作
阿新 • • 發佈:2019-02-01
通用的 Load/Save 函式
在最簡單的方式下,預設的資料來源(parquet 除非另外配置通過spark.sql.sources.default)將會用於所有的操作。
Parquet 是一個列式儲存格式的檔案,被許多其他資料處理系統所支援。Spark SQL 支援對 Parquet 檔案的讀寫還可以自動的儲存源資料的模式
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet" )
手動指定選項
你也可以手動的指定資料來源,並且將與你想要傳遞給資料來源的任何額外選項一起使用。資料來源由其完全限定名指定(例如 : org.apache.spark.sql.parquet),不過對於內建資料來源你也可以使用它們的縮寫名(json, parquet, jdbc)。使用下面這個語法可以將從任意型別資料來源載入的DataFrames 轉換為其他型別。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age" ).write.format("parquet").save("namesAndAges.parquet")
直接在檔案上執行 SQL
你也可以直接在檔案上執行 SQL 查詢來替代使用 API 將檔案載入到 DataFrame 再進行查詢。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
儲存為持久化的表
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json" )
peopleDF.write.parquet("people.parquet")
val parquetFileDF = spark.read.parquet("people.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+