使用spark訓練LR模型
阿新 • • 發佈:2019-01-09
最近在工作之餘參加了一個CTR預估的比賽,CTR預估是一個成熟又經典的問題,工業界目前使用的主流方案仍然是LR+海量特徵。趁著這一次比賽的機會,正好抱著學習的心態嘗試著學習用spark叢集來訓練一下LR。
在學校的時候大家訓練模型一般都是用python+pandas+numpy+sklearn,這一套工具在單機的環境下非常的簡單易學,但是面對海量資料或者高維稀疏矩陣的計算,就顯得無能為力。
相比之下,spark作為分散式計算框架,使用者操作起來的感覺更多是,雖然笨重,但是算得快啊。
spark提供了兩套機器學習的庫,mllib和ml。前者主要適用於RDD的處理,而後者主要適用於dataframe的處理。
目前spark的使用者中基於spark.dataframe已經成為了主流,mllib這個庫也不再維護,轉向更新ml這個庫。
spark上支援cpp、java、python和scala,其中scala是spark的原生語言,本文就以scala為例,訓練了一個非常簡單的LR模型。
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} import org.apache.spark.sql.SparkSession import org.apache.spark.ml.linalg.Vectors import scala.collection.mutable.ListBuffer object newtrainLR { val spark = SparkSession.builder().appName("LR-Predict").getOrCreate() //資料讀入 val trainPath = "../train_with_hour.csv" val testPath = "../test_with_hour.csv" val trainDF = spark.read.format("csv").option("header", "true").load(trainPath) val testDF = spark.read.format("csv").option("header", "true").load(testPath) val newTrainDF = trainDF.drop("_c0", "Unnamed: 0", "time", "city", "app_paid").withColumn("flag", lit(1)) val newTestDF = testDF.drop("_c0", "Unnamed: 0", "time", "city"). withColumn("click", lit(3)). withColumn("flag", lit(2)) //合併train、test,一起做one-hot編碼 val allDF = newTrainDF.union(newTestDF) //獲取列名array val colNameDF = allDF.drop("flag", "click") // 要進行OneHotEncoder編碼的欄位 val categoricalColumns = colNameDF.columns //採用Pileline方式處理機器學習流程 val stagesArray = new ListBuffer[PipelineStage]() for (cate <- categoricalColumns) { //使用StringIndexer 建立類別索引 val indexer = new StringIndexer().setInputCol(cate).setOutputCol(s"${cate}Index") // 使用OneHotEncoder將分類變數轉換為二進位制稀疏向量 val encoder = new OneHotEncoder().setInputCol(indexer.getOutputCol).setOutputCol(s"${cate}classVec") stagesArray.append(indexer, encoder) } val assemblerInputs = categoricalColumns.map(_ + "classVec") // 使用VectorAssembler將所有特徵轉換為一個向量 val assembler = new VectorAssembler().setInputCols(assemblerInputs).setOutputCol("features") //使用pipeline批處理 val pipeline = new Pipeline() pipeline.setStages(stagesArray.toArray) val pipelineModel = pipeline.fit(allDF) val dataset = pipelineModel.transform(allDF) val newDF = dataset.select("click", "features", "flag") //拆分train、test val processedTrain = newDF.filter(col("flag") === 1).drop("flag") val processedTest = newDF.filter(col("flag") === 2).drop("click", "flag") //處理label列 val indexer2Click = new StringIndexer().setInputCol("click").setOutputCol("ctr") val finalTrainDF = indexer2Click.fit(processedTrain).transform(processedTrain).drop("click") //隨機分割測試集和訓練集資料 val Array(trainingDF, testDF) = finalTrainDF.randomSplit(Array(0.7, 0.3), seed = 1) println(s"trainingDF size=${trainingDF.count()},testDF size=${testDF.count()}") val lrModel = new LogisticRegression(). setLabelCol("ctr"). setFeaturesCol("features"). setMaxIter(10000). setThreshold(0.5). setRegParam(0.15). fit(trainingDF) val predictions = lrModel.transform(testDF).select($"ctr".as("label"), "features", “rawPrediction", "probability", "prediction") //使用BinaryClassificationEvaluator來評價我們的模型 val evaluator = new BinaryClassificationEvaluator() evaluator.setMetricName("areaUnderROC") val auc = evaluator.evaluate(predictions) val newprediction = lrModel.transform(processedTest).select("probability") //取出預測為1的probability val reseult2 = newprediction.map(line => { val dense = line.get(line.fieldIndex("probability")).asInstanceOf[org.apache.spark.ml.linalg.DenseVector] val y = dense(1).toString (y) }).toDF("pro2ture") reseult2.repartition(1).write.text(“../firstLrResultStr")