1. 程式人生 > >SparkSQL入門案例之二(SparkSQL1.x)

SparkSQL入門案例之二(SparkSQL1.x)

SparkSQL入門案例一中的思路主要是:

1).建立SparkContext
2).建立SQLContext
3).建立RDD
4).建立一個類,並定義類的成員變數
5).整理資料並關聯class
6).將RDD轉換成DataFrame(匯入隱式轉換)
7).將DataFrame註冊成臨時表
8).書寫SQL(Transformation)
9).執行Action

還有另外一種思路和寫法:

1).建立SparkContext
2).建立SQLContext
3).建立RDD
4).建立StructType(schema)
5).整理資料將資料跟Row關聯
6).通過rowRDD和schema建立DataFrame
7).將DataFrame註冊成臨時表
8).書寫SQL(Transformation)
9).執行Action

這兩種方法主要是將RDD變成DataFrame的方式不同

具體程式碼實現流程:

package cn.ysjh0014.SparkSql


import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkSqlDemo2 {

  def main(args: Array[String]): Unit = {

    //這個程式可以提交到Spark叢集中
    val conf = new SparkConf().setAppName("SparkSql2").setMaster("local[4]") //這裡的setMaster是為了在本地執行,多執行緒執行

    //建立Spark Sql的連線
    val sc = new SparkContext(conf)
    //SparkContext不能建立特殊的RDD,將Spark Sql包裝進而增強
    val SqlContext = new SQLContext(sc)

    //建立DataFrame(特殊的RDD,就是有schema的RDD),先建立一個普通的RDD,然後再關聯上schema
    val lines = sc.textFile(args(0))

    //將資料進行處理
    val RowRdd: RDD[Row] = lines.map(line => {
      val fields = line.split(",")
      val id = fields(0).toLong
      val name = fields(1)
      val age = fields(2).toInt
      val yz = fields(3).toDouble
      Row(id,name,age,yz)
    })

    //結果型別,其實就是表頭,用於描述DataFrame
    val sm: StructType = StructType(List(
      StructField("id", LongType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("yz", DoubleType, true)
    ))


    //將RowRDD關聯schema
    val df: DataFrame = SqlContext.createDataFrame(RowRdd,sm)

    //變成DataFrame後就可以使用兩種API進行程式設計了

    //使用SQL的方式
    //把DataFrame註冊成臨時表
    df.registerTempTable("body") //過時的方法
    //書寫SQL(sql方法其實是Transformation)
    val result: DataFrame = SqlContext.sql("SELECT * FROM body ORDER BY yz desc, age asc")         //注意:  這裡的SQL語句該大寫的必須大寫
    //檢視結果(出發Action)
    result.show()


    //釋放資源
    sc.stop()

  }
}

執行結果和案例一中的一模一樣