1. 程式人生 > >Spark-Sql之DataFrame實戰詳解

Spark-Sql之DataFrame實戰詳解

集合 case 編程方式 優化 所表 register 操作數 print ava

1、DataFrame簡介:

在Spark中,DataFrame是一種以RDD為基礎的分布式數據據集,類似於傳統數據庫聽二維表格,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。

類似這樣的

root  
 |-- age: long (nullable = true)  
 |-- id: long (nullable = true)  
 |-- name: string (nullable = true)  

2、準備測試結構化數據集

people.json

 
{"id":1, "name":"Ganymede", "age":32}  
{"id":2, "name":"Lilei", "age":19}  
{"id":3, "name":"Lily", "age":25}  
{"id":4, "name":"Hanmeimei", "age":25}  
{"id":5, "name":"Lucy", "age":37}  
{"id":6, "name":"Tom", "age":27}  

3、通過編程方式理解DataFrame

1) 通過DataFrame的API來操作數據

import org.apache.spark.sql.SQLContext  
import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
import org.apache.log4j.Level  
import org.apache.log4j.Logger  
  
object DataFrameTest {  
  def main(args: Array[String]): Unit = {  
    //日誌顯示級別  
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)  
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)  
  
    //初始化  
    val conf = new SparkConf().setAppName("DataFrameTest")  
    val sc = new SparkContext(conf)  
    val sqlContext = new SQLContext(sc)  
    val df = sqlContext.read.json("people.json")  
  
    //查看df中的數據  
    df.show()  
    //查看Schema  
    df.printSchema()  
    //查看某個字段  
    df.select("name").show()  
    //查看多個字段,plus為加上某值  
    df.select(df.col("name"), df.col("age").plus(1)).show()  
    //過濾某個字段的值  
    df.filter(df.col("age").gt(25)).show()  
    //count group 某個字段的值  
    df.groupBy("age").count().show()  
  
    //foreach 處理各字段返回值  
    df.select(df.col("id"), df.col("name"), df.col("age")).foreach { x =>  
      {  
        //通過下標獲取數據  
        println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2))  
      }  
    }  
  
    //foreachPartition 處理各字段返回值,生產中常用的方式  
    df.select(df.col("id"), df.col("name"), df.col("age")).foreachPartition { iterator =>  
      iterator.foreach(x => {  
        //通過字段名獲取數據  
        println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age"))  
  
      })  
    }  
  
  }  
}  


2)通過註冊表,操作sql的方式來操作數據

  1. import org.apache.spark.sql.SQLContext  
    import org.apache.spark.SparkConf  
    import org.apache.spark.SparkContext  
    import org.apache.log4j.Level  
    import org.apache.log4j.Logger  
      
    /** 
     * @author Administrator 
     */  
    object DataFrameTest2 {  
      def main(args: Array[String]): Unit = {  
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);  
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  
      
        val conf = new SparkConf().setAppName("DataFrameTest2")  
        val sc = new SparkContext(conf)  
        val sqlContext = new SQLContext(sc)  
        val df = sqlContext.read.json("people.json")  
      
        df.registerTempTable("people")  
      
        df.show();  
        df.printSchema();  
      
        //查看某個字段  
        sqlContext.sql("select name from people ").show()  
        //查看多個字段  
        sqlContext.sql("select name,age+1 from people ").show()  
        //過濾某個字段的值  
        sqlContext.sql("select age from people where age>=25").show()  
        //count group 某個字段的值  
        sqlContext.sql("select age,count(*) cnt from people group by age").show()  
      
        //foreach 處理各字段返回值  
        sqlContext.sql("select id,name,age  from people ").foreach { x =>  
          {  
            //通過下標獲取數據  
            println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2))  
          }  
        }  
      
        //foreachPartition 處理各字段返回值,生產中常用的方式  
        sqlContext.sql("select id,name,age  from people ").foreachPartition { iterator =>  
          iterator.foreach(x => {  
            //通過字段名獲取數據  
            println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age"))  
      
          })  
        }  
      
      }  
    }  

兩種方式運行結果是一樣的,第一種適合程序員,第二種適合熟悉sql的人員。

4、對於非結構化的數據

people.txt

  1. 1,Ganymede,32  
    2, Lilei, 19  
    3, Lily, 25  
    4, Hanmeimei, 25  
    5, Lucy, 37  
    6, wcc, 4  

1) 通過字段反射來映射註冊臨時表


     import org.apache.spark.sql.SQLContext  

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
import org.apache.log4j.Level  
import org.apache.log4j.Logger  
import org.apache.spark.sql.types.IntegerType  
import org.apache.spark.sql.types.StructType  
import org.apache.spark.sql.types.StringType  
import org.apache.spark.sql.types.StructField  
import org.apache.spark.sql.Row  
  
/** 
 * @author Administrator 
 */  
object DataFrameTest3 {  
  def main(args: Array[String]): Unit = {  
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);  
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  
  
    val conf = new SparkConf().setAppName("DataFrameTest3")  
    val sc = new SparkContext(conf)  
    val sqlContext = new SQLContext(sc)  
    val people = sc.textFile("people.txt")  
  
    val peopleRowRDD = people.map { x => x.split(",") }.map { data =>  
      {  
        val id = data(0).trim().toInt  
        val name = data(1).trim()  
        val age = data(2).trim().toInt  
        Row(id, name, age)  
      }  
    }  
  
    val structType = StructType(Array(  
      StructField("id", IntegerType, true),  
      StructField("name", StringType, true),  
      StructField("age", IntegerType, true)));  
  
    val df = sqlContext.createDataFrame(peopleRowRDD, structType);  
  
    df.registerTempTable("people")  
  
    df.show()  
    df.printSchema()  
  
  }  
}  

2) 通過case class反射來映射註冊臨時表

 

import org.apache.spark.sql.SQLContext  
import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
import org.apache.log4j.Level  
import org.apache.log4j.Logger  
import org.apache.spark.sql.types.IntegerType  
import org.apache.spark.sql.types.StructType  
import org.apache.spark.sql.types.StringType  
import org.apache.spark.sql.types.StructField  
import org.apache.spark.sql.Row  
  
/** 
 * @author Administrator 
 */  
object DataFrameTest4 {  
  case class People(id: Int, name: String, age: Int)  
  def main(args: Array[String]): Unit = {  
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);  
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  
  
    val conf = new SparkConf().setAppName("DataFrameTest4")  
    val sc = new SparkContext(conf)  
    val sqlContext = new SQLContext(sc)  
    val people = sc.textFile("people.txt")  
  
    val peopleRDD = people.map { x => x.split(",") }.map { data =>  
      {  
        People(data(0).trim().toInt, data(1).trim(), data(2).trim().toInt)  
      }  
    }  
  
    //這裏需要隱式轉換一把  
    import sqlContext.implicits._  
    val df = peopleRDD.toDF()  
    df.registerTempTable("people")  
  
    df.show()  
    df.printSchema()  
      
  
  }  
}  

5、總結:

Spark SQL是Spark中的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象,就是DataFrame。同時Spark SQL還可以作為分布式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢數據。

DataFrame,可以理解為是,以列的形式組織的,分布式的數據集合。它其實和關系型數據庫中的表非常類似,但是底層做了很多的優化。DataFrame可以通過很多來源進行構建,包括:結構化的數據文件,Hive中的表,外部的關系型數據庫,以及RDD。

Spark-Sql之DataFrame實戰詳解