RDD使用程式設計介面方式轉換為DataFrame的工具類(針對欄位特別多的)
阿新 • • 發佈:2018-11-29
在使用Spark-Sql 時,需要把RDD型別轉換為DataFrame,再使用一些SQL操作,在轉換為DataFrame時有兩種方式一種是通過反射方式,一種是通過程式設計介面方式
程式設計介面的方式比較常用,但是這種方式程式碼量可能比較大,特別是在你的欄位特別多的時候,你需要先把RDD中的型別轉換為Row,還有根據每個欄位的不同對其進行型別轉換,還要再建立元資料構建StructType,特別的麻煩,因此我就寫了兩個工具類,可以在欄位多的時候使用,對於欄位少就沒這個必要啦,下面就是程式碼,可以根據自己的需求修改使用
import org.apache.spark.sql.Row
object RowUtils {
//把一行資料進行切分後的到一個數組,陣列中的每一個值都對應一個欄位,根據對應欄位對應型別不同構建Row
def createRow(t :Array[String]): Row={
var row = Row()
for (i <- 0 to t.length - 1) {
//這裡只需要根據哪些欄位是Int|Double|String修改對應的下標即可
//陣列中 型別為整型的 下標
if(i==1||i==2||i==3||i==4||i==7||i==8||i==17||i==21||i==20||i==26||i==28 ||i==30||i==31||i==32
||i==34||i==35||i==36||i==38||i==39||i==42||i==57||i==59||i==60||i==73||i==84) {
row = Row.merge(row,Row(t(i).toInt))
}
// 型別中為Double型別的 下標
else if(i==9||i==10||i==40||i==41||i==44||i==45||i==58||i==74||i==75||i==76||i==77||i==78){
row = Row. merge(row,Row(t(i).toDouble))
}
// 其餘的String型別的
else row = Row.merge(row,Row(t(i)))
}
row
}
}
package utils
import org.apache.spark.sql.types.StructField
import scala.io.Source
object Schema1 {
def getSchema(): List[StructField] ={
var schema:List[StructField] = List[StructField]()
val lines: Iterator[String] = Source.fromFile("D:\\fields.txt","gb2312").getLines()
val list = lines.toList
for (i <- 0 until list.length){
/*
*essionid: String, 會話標識
*dvertisersid: Int, 廣告主 id
*dorderid: Int, 廣告 id
*檔案中的格式向上面一樣
*可以根據自己的欄位的格式進行切割,只需要修改.split()中的切分的字元和 取第幾個值就好啦
*/
val split = list(i).split(":")
//那個欄位是int型別,那幾個欄位是Double型別、String型別
if (i==1||i==2||i==3||i==4||i==7||i==8||i==17||i==21||i==20||i==26||i==28||i==30||i==31||i==32
||i==34||i==35||i==36||i==38||i==39||i==42||i==57||i==59||i==60||i==73||i==84) {
///schema = schema++ List(StructField(split(0),IntegerType,true))
//取每行的第1個是 想要的欄位
schema = schema ::: List(StructField(split(0),IntegerType,true))
} else if (i==9||i==10||i==40||i==41||i==44||i==45||i==58||i==74||i==75||i==76||i==77||i==78) {
//schema =schema++ List(StructField(split(0),DoubleType,true))
schema = schema ::: List(StructField(split(0),DoubleType,true))
} else {
//schema= schema++List(StructField(split(0),StringType,true))
schema= schema ::: List(StructField(split(0),StringType,true))
}
}
schema
}
}