大資料學習之路87-SparkSQL的執行結果以不同方式寫出,及載入
阿新 • • 發佈:2018-11-09
我們可以將我們之前寫的wordcount的結果寫成各種格式:
csv格式:
程式碼如下:
package com.test.SparkSQL import org.apache.avro.generic.GenericData.StringType import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SQLContext, types} import org.apache.spark.{SparkConf, SparkContext} case class Person(id:Long,name:String,age:Int,fv:Int) object OldSparkSQL { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("OldSparkSQL").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://marshal:9000/person.txt") //val PersonRDD = lines.map(_.split(",")).map(arr => Person(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt)) val rowRDD = lines.map(line =>{ val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toInt // Person(id,name,age,fv) Row(id,name,age,fv) }) val sqlContext = new SQLContext(sc) //匯入隱式轉換才能變為DataFrame import sqlContext.implicits._ val schema = StructType( List( StructField("id",LongType,true), StructField("name",org.apache.spark.sql.types.StringType,true), StructField("age",IntegerType,true), StructField("fv",IntegerType,true) ) ) val pdf: DataFrame = sqlContext.createDataFrame(rowRDD,schema) pdf.registerTempTable("t_person") val result: DataFrame = sqlContext.sql("select name,age,fv from t_person order by fv desc, age asc") result.write.csv("C:/Users/11489/Desktop/result1") // result.show() sc.stop() } }
結果如下:
json格式:
程式碼如下:
package com.test.SparkSQL import org.apache.spark.sql._ object DataSetWordCount { def main(args: Array[String]): Unit = { import org.apache.spark //建立一個sparkSession val session: SparkSession = SparkSession.builder() .appName("DataSetWordCount") .master("local[*]") .getOrCreate() import session.implicits._ val lines: Dataset[String] = session.read.textFile("D:/a/word.txt") val words: Dataset[String] = lines.flatMap(_.split(" ")) // import org.apache.spark.sql.functions._ // val result: Dataset[Row] = words.groupBy($"value" as "word") // .agg(count("*") as "counts") // .sort($"counts" desc) val grouped: RelationalGroupedDataset = words.groupBy($"value" as "word") val counted: DataFrame = grouped.count() val result: Dataset[Row] = counted.sort($"count" desc) result.write.json("C:/Users/11489/Desktop/result2") result.show() } }
結果如下:
parquet格式:
這個是支援列儲存的型別,那麼存成這種格式有什麼好處呢?
這樣以後我們想讀哪列就可以讀哪列,並且他是基於列進行壓縮的。佔用空間更小。
package com.test.SparkSQL import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object sqlWordCount { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder() .appName("sqlWordCount") .master("local[*]") .getOrCreate() //指定讀取資料的位置 //val lines: Dataset[String] = session.read.textFile("D:/a/word.txt") val lines: DataFrame = session.read.format("text").load("D:/a/word.txt") //匯入sparksession中的隱式轉換 import session.implicits._ val words: Dataset[String] = lines.flatMap(_.getAs[String]("value").split(" ")) // val df: DataFrame = words.withColumnRenamed("value","word") // //先建立檢視,再執行sql // df.createTempView("v_wc") // val result: DataFrame = session.sql("select word,count(*) counts from v_wc group by word order by counts desc") // result.show() // //DSL方式 // import org.apache.spark.sql.functions._ // val result: Dataset[Row] = words.groupBy($"value").agg(count("*") as "counts").sort($"counts" desc) // result.show() //將DataSet轉換成DataFrame,這樣就可以寫sql了 val df = words.toDF() df.createTempView("v_wc") val result: DataFrame = session.sql("select value word,count(*) counts from v_wc group by word order by counts desc") result.write.parquet("C:/Users/11489/Desktop/result3") result.show() } }
執行結果:
我們可以看一下這個檔名:
從mysql中載入資料:
package com.test.SparkSQL
import org.apache.spark.sql.{DataFrame, SparkSession}
object JdbcDataSource {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("JdbcDataSource")
.master("local[*]")
.getOrCreate()
val logs: DataFrame = spark.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://localhost:3306/lfr",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "users",
"user" -> "root",
"password" -> "928918zbgch521"))
.load()
logs.show()
}
}
執行結果:
將結果寫入到mysql:
程式碼如下:
val prop = new Properties()
prop.put("user","root")
prop.put("password","928918zbgch521")
logs.where(logs.col("id") <= 3)
.write.mode("append")
.jdbc("jdbc:mysql://localhost:3306/lfr","users",prop)
執行結果:
從parquet檔案中讀取資料:
程式碼如下:
package com.test.SparkSQL
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadParquet {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("ReadParquet")
.master("local[*]")
.getOrCreate()
val result: DataFrame = spark.read.format("parquet").load("C:/Users/11489/Desktop/result3")
result.show()
}
}
執行結果如下: