1. 程式人生 > >使用spark訓練LR模型

使用spark訓練LR模型

最近在工作之餘參加了一個CTR預估的比賽,CTR預估是一個成熟又經典的問題,工業界目前使用的主流方案仍然是LR+海量特徵。趁著這一次比賽的機會,正好抱著學習的心態嘗試著學習用spark叢集來訓練一下LR。

在學校的時候大家訓練模型一般都是用python+pandas+numpy+sklearn,這一套工具在單機的環境下非常的簡單易學,但是面對海量資料或者高維稀疏矩陣的計算,就顯得無能為力。

相比之下,spark作為分散式計算框架,使用者操作起來的感覺更多是,雖然笨重,但是算得快啊。

spark提供了兩套機器學習的庫,mllib和ml。前者主要適用於RDD的處理,而後者主要適用於dataframe的處理。

目前spark的使用者中基於spark.dataframe已經成為了主流,mllib這個庫也不再維護,轉向更新ml這個庫。

spark上支援cpp、java、python和scala,其中scala是spark的原生語言,本文就以scala為例,訓練了一個非常簡單的LR模型。

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.Vectors

import scala.collection.mutable.ListBuffer

object newtrainLR {

  val spark = SparkSession.builder().appName("LR-Predict").getOrCreate()
//資料讀入
  val trainPath = "../train_with_hour.csv"

  val testPath = "../test_with_hour.csv"

  val trainDF = spark.read.format("csv").option("header", "true").load(trainPath)

  val testDF = spark.read.format("csv").option("header", "true").load(testPath)

  val newTrainDF = trainDF.drop("_c0", "Unnamed: 0", "time", "city", "app_paid").withColumn("flag", lit(1))

  val newTestDF = testDF.drop("_c0", "Unnamed: 0", "time", "city").
    withColumn("click", lit(3)).
    withColumn("flag", lit(2))

  //合併train、test,一起做one-hot編碼
  val allDF = newTrainDF.union(newTestDF)
  //獲取列名array
  val colNameDF = allDF.drop("flag", "click")

  // 要進行OneHotEncoder編碼的欄位
  val categoricalColumns = colNameDF.columns
  //採用Pileline方式處理機器學習流程
  val stagesArray = new ListBuffer[PipelineStage]()
  for (cate <- categoricalColumns) {
    //使用StringIndexer 建立類別索引
    val indexer = new StringIndexer().setInputCol(cate).setOutputCol(s"${cate}Index")
    // 使用OneHotEncoder將分類變數轉換為二進位制稀疏向量
    val encoder = new OneHotEncoder().setInputCol(indexer.getOutputCol).setOutputCol(s"${cate}classVec")
    stagesArray.append(indexer, encoder)
  }

  val assemblerInputs = categoricalColumns.map(_ + "classVec")
  // 使用VectorAssembler將所有特徵轉換為一個向量
  val assembler = new VectorAssembler().setInputCols(assemblerInputs).setOutputCol("features")

//使用pipeline批處理
  val pipeline = new Pipeline()
  pipeline.setStages(stagesArray.toArray)
  val pipelineModel = pipeline.fit(allDF)
  val dataset = pipelineModel.transform(allDF)


  val newDF = dataset.select("click", "features", "flag")

//拆分train、test
  val processedTrain = newDF.filter(col("flag") === 1).drop("flag")
  val processedTest = newDF.filter(col("flag") === 2).drop("click", "flag")


  //處理label列
  val indexer2Click = new StringIndexer().setInputCol("click").setOutputCol("ctr")
  val finalTrainDF = indexer2Click.fit(processedTrain).transform(processedTrain).drop("click")


  //隨機分割測試集和訓練集資料
  val Array(trainingDF, testDF) = finalTrainDF.randomSplit(Array(0.7, 0.3), seed = 1)
  println(s"trainingDF size=${trainingDF.count()},testDF size=${testDF.count()}")
  val lrModel = new LogisticRegression().
    setLabelCol("ctr").
    setFeaturesCol("features").
    setMaxIter(10000).
    setThreshold(0.5).
    setRegParam(0.15).
    fit(trainingDF)
  val predictions = lrModel.transform(testDF).select($"ctr".as("label"), "features", “rawPrediction", "probability", "prediction")

  //使用BinaryClassificationEvaluator來評價我們的模型
  val evaluator = new BinaryClassificationEvaluator()
  evaluator.setMetricName("areaUnderROC")
  val auc = evaluator.evaluate(predictions)


  val newprediction = lrModel.transform(processedTest).select("probability")

//取出預測為1的probability
  val reseult2 = newprediction.map(line => {
    val dense = line.get(line.fieldIndex("probability")).asInstanceOf[org.apache.spark.ml.linalg.DenseVector]
    val y = dense(1).toString
    (y)
  }).toDF("pro2ture")



  reseult2.repartition(1).write.text(“../firstLrResultStr")