1. 程式人生 > >Spark DataSet 、DataFrame 一些使用示例

Spark DataSet 、DataFrame 一些使用示例

read ray .sql null support 就是 elf encoder 方式

以前使用過DS和DF,最近使用Spark ML跑實驗,再次用到簡單復習一下。

//案例數據
1,2,3
4,5,6
7,8,9
10,11,12
13,14,15
1,2,3
4,5,6
7,8,9
10,11,12
13,14,15
1,2,3
4,5,6
7,8,9
10,11,12
13,14,15

1:DS與DF關系?

type DataFrame = Dataset[Row]

2:加載txt數據

  val rdd = sc.textFile("data")

  val df = rdd.toDF()

這種直接生成DF,df數據結構為(查詢語句:df.select("*").show(5)):

技術分享圖片

只有一列,屬性為value。

3: df.printSchema()

技術分享圖片

4:case class 可以直接就轉成DS

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()

5:直接解析主流格式文件

val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]

6:RDD轉成DataSet兩種方法

數據格式:

xiaoming,18,iPhone
mali,22,xiaomi
jack,26,smartisan
mary,16,meizu
kali,45,huawei

(a):使用反射推斷模式

  val persons = rdd.map {
    x 
=> val fs = x.split(",") Person(fs(0), fs(1).toInt, fs(2)) } persons.toDS().show(2) persons.toDF("newName", "newAge", "newPhone").show(2) persons.toDF().show(2)

技術分享圖片

(b):編程方式指定模式

步驟:

技術分享圖片

import org.apache.spark.sql.types._
  //1:創建RDD
  val rddString = sc.textFile("C:\\Users\\Daxin\\Documents\\GitHub\\OptimizedRF\\sql_data")
  //2:創建schema
  val schemaString = "name age phone"
  val fields = schemaString.split(" ").map {
    filedName => StructField(filedName, StringType, nullable = true)
  }
  val schema = StructType(fields)
  //3:數據轉成Row
  val rowRdd = rddString.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1), attributes(2)))
  //創建DF
  val personDF = spark.createDataFrame(rowRdd, schema)
  personDF.show(5)

7:註冊視圖

  //全局表,生命周期多個session可以共享並且創建該視圖的sparksession停止該視圖也不會過期
  personDF.createGlobalTempView("GlobalTempView_Person")
  //臨時表,存在的話覆蓋。生命周期和sparksession相同
  personDF.createOrReplaceTempView("TempView_Person")
  //personDF.createTempView("TempView_Person") //如果視圖已經存在則異常

  //  Global temporary view is tied to a system preserved database `global_temp`
  //全局視圖存儲在global_temp數據庫中,如果不加數據庫前綴異常,提示找不到視圖
  spark.sql("select * from global_temp.GlobalTempView_Person").show(2)
  //臨時表不需要添加數據庫
  spark.sql("select * from TempView_Person").show(2)

技術分享圖片

8:UDF 定義:

Untyped User-Defined Aggregate Functions

package com.daxin.sq.df

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

/**
  * Created by Daxin on 2017/11/18.
  * url:http://spark.apache.org/docs/latest/sql-programming-guide.html#untyped-user-defined-aggregate-functions
  */

//Untyped User-Defined Aggregate Functions
object MyAverage extends UserDefinedAggregateFunction {

  // Data types of input arguments of this aggregate function
  override def inputSchema: StructType = StructType(StructField("inputColumn", IntegerType) :: Nil) //2


  // Updates the given aggregation buffer `buffer` with new input data from `input`
  //TODO  第一個緩沖區是sum,第二個緩沖區是元素個數
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getInt(0) + input.getInt(0) // input.getInt(0)是中inputSchema定義的第0個元素
      buffer(1) = buffer.getInt(1) + 1
      println()
    }
  }


  // Data types of values in the aggregation buffer
  //TODO  定義緩沖區的模型(也就是數據結構)
  override def bufferSchema: StructType = StructType(StructField("sum", IntegerType) :: StructField("count", IntegerType) :: Nil)


  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  //TODO MutableAggregationBuffer 是Row子類
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //TODO 合並分區,將結果更新到buffer1
    buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)

    println()
  }


  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0
    buffer(1) = 0
  }

  // Whether this function always returns the same output on the identical input
  override def deterministic: Boolean = true

  // Calculates the final result
  override def evaluate(buffer: Row): Int = buffer.getInt(0) / buffer.getInt(1)

  // The data type of the returned value,返回值類型
  override def dataType: DataType = IntegerType // 1
}

測試代碼:

  spark.udf.register("myAverage", MyAverage)
  val result = spark.sql("SELECT myAverage(age)  FROM TempView_Person")
  result.show()

8:關於機器學習中的DataFrame的schema定:

一列名字為 label,另一列名字為 features。一般可以使用case class完成轉換

case class UDLabelpOint(label: Double, features: org.apache.spark.ml.linalg.Vector)

Spark DataSet 、DataFrame 一些使用示例