1. 程式人生 > >RDD使用程式設計介面方式轉換為DataFrame的工具類(針對欄位特別多的)

RDD使用程式設計介面方式轉換為DataFrame的工具類(針對欄位特別多的)

在使用Spark-Sql 時,需要把RDD型別轉換為DataFrame,再使用一些SQL操作,在轉換為DataFrame時有兩種方式一種是通過反射方式,一種是通過程式設計介面方式
程式設計介面的方式比較常用,但是這種方式程式碼量可能比較大,特別是在你的欄位特別多的時候,你需要先把RDD中的型別轉換為Row,還有根據每個欄位的不同對其進行型別轉換,還要再建立元資料構建StructType,特別的麻煩,因此我就寫了兩個工具類,可以在欄位多的時候使用,對於欄位少就沒這個必要啦,下面就是程式碼,可以根據自己的需求修改使用

import org.apache.spark.sql.Row

object RowUtils {
//把一行資料進行切分後的到一個數組,陣列中的每一個值都對應一個欄位,根據對應欄位對應型別不同構建Row def createRow(t :Array[String]): Row={ var row = Row() for (i <- 0 to t.length - 1) { //這裡只需要根據哪些欄位是Int|Double|String修改對應的下標即可 //陣列中 型別為整型的 下標 if(i==1||i==2||i==3||i==4||i==7||i==8||i==17||i==21||i==20||i==26||i==28
||i==30||i==31||i==32 ||i==34||i==35||i==36||i==38||i==39||i==42||i==57||i==59||i==60||i==73||i==84) { row = Row.merge(row,Row(t(i).toInt)) } // 型別中為Double型別的 下標 else if(i==9||i==10||i==40||i==41||i==44||i==45||i==58||i==74||i==75||i==76||i==77||i==78){ row = Row.
merge(row,Row(t(i).toDouble)) } // 其餘的String型別的 else row = Row.merge(row,Row(t(i))) } row } }
package utils

import org.apache.spark.sql.types.StructField

import scala.io.Source

object Schema1 {
  def getSchema(): List[StructField] ={
    var schema:List[StructField] = List[StructField]()
    val lines: Iterator[String] = Source.fromFile("D:\\fields.txt","gb2312").getLines()
    val list = lines.toList
    for (i <- 0 until list.length){
    /*
     *essionid: String,	會話標識
	 *dvertisersid: Int,	廣告主 id
	 *dorderid: Int,	廣告 id
   	 *檔案中的格式向上面一樣
   	 *可以根據自己的欄位的格式進行切割,只需要修改.split()中的切分的字元和 取第幾個值就好啦
   	 */
      val split = list(i).split(":")
      //那個欄位是int型別,那幾個欄位是Double型別、String型別
      if (i==1||i==2||i==3||i==4||i==7||i==8||i==17||i==21||i==20||i==26||i==28||i==30||i==31||i==32
        ||i==34||i==35||i==36||i==38||i==39||i==42||i==57||i==59||i==60||i==73||i==84) {
        ///schema = schema++ List(StructField(split(0),IntegerType,true))
        //取每行的第1個是 想要的欄位
        schema = schema ::: List(StructField(split(0),IntegerType,true))
      } else if (i==9||i==10||i==40||i==41||i==44||i==45||i==58||i==74||i==75||i==76||i==77||i==78) {
       //schema =schema++  List(StructField(split(0),DoubleType,true))
        schema = schema ::: List(StructField(split(0),DoubleType,true))
      } else {
       
        //schema= schema++List(StructField(split(0),StringType,true))
        schema= schema ::: List(StructField(split(0),StringType,true))
      }
    }
    schema
  }

}