1. 程式人生 > >spark mllib之分類和迴歸

spark mllib之分類和迴歸

Problem Type Supported Methods
二分類: 線性SVM(inear SVMs,), 羅輯迴歸(logistic regression), 決策樹(decision trees),隨機森林(random forests), 梯度增強樹( gradient-boosted trees), 樸素貝葉斯(naive Bayes)
多分類器: 羅輯迴歸(logistic regression),決策樹(decision trees), 隨機森林(random forests), 樸素貝葉斯(naive Bayes)
迴歸: 最小線性二乘法,LASSO迴歸, 嶺迴歸(ridge regression),決策樹( decision trees), 隨機森林(random forests), 梯度增強樹(gradient-boosted trees), 保序迴歸(isotonic regression)

分類的目的是將一堆物品分類,最常見的是二分類,通常分為positive和negative的兩類,如果有多個分類則為多分類,spark.mllib提供兩種線形分類方法,SVM支援向量機和邏輯迴歸,線形SVM只支援二分類,邏輯迴歸則支援二分類和多分類,spark.mllib對於這兩種方法都支援L1和L2的正則變數,訓練集是LabelPoint物件,(label,v1,v2,v3,v4….vn)v都是數字,非數字要轉成one hot
編碼。

BinaryClassification:

package com.demo.spark.mllib

import scopt.OptionParser
import
org.apache.spark.SparkConf import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.mllib.util.MLUtils import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import
org.apache.spark.mllib.optimization.L1Updater import org.apache.spark.mllib.optimization.SquaredL2Updater import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics object BinaryCLassification { object Alogrithm extends Enumeration { type Alogrithm = Value val SVM, LR = Value } object RegType extends Enumeration { type RegType = Value val L1, L2 = Value } import Alogrithm._ import RegType._ case class Params( input: String = null, numIterations: Int = 100, stepSize: Double = 1, alogrithm: Alogrithm = LR, regType: RegType = L2, regParam: Double = 0.01) extends Abstractparams[Params] def run(params:Params): Unit={ val conf = new SparkConf() val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) val examples = MLUtils.loadLibSVMFile(sc,params.input).cache() val splits = examples.randomSplit(Array(0.8,0.2)) val training = splits(0).cache() val testing = splits(1).cache() examples.unpersist(blocking=true) val updater = params.regType match{ case L1=>new L1Updater case L2=>new SquaredL2Updater } val model = params.alogrithm match{ case LR=> val alogrithm = new LogisticRegressionWithLBFGS() alogrithm.optimizer.setNumIterations(params.numIterations).setUpdater(updater).setRegParam(params.regParam) alogrithm.run(training).clearThreshold() case SVM=> val alogrithm = new SVMWithSGD alogrithm.optimizer.setNumIterations(params.numIterations).setUpdater(updater).setRegParam(params.regParam) alogrithm.run(training).clearThreshold() } val predicition = model.predict(testing.map(_.features)) val predicitionAndLabel = predicition.zip(testing.map(_.label)) val metrics = new BinaryClassificationMetrics(predicitionAndLabel) println(metrics.areaUnderROC()) sc.stop() } def main(args: Array[String]): Unit = { val defaultParam = Params() val parser = new OptionParser[Params]("BinaryClassification") { head("BinaryClassification: an example app for binary classification.") opt[Int]("numIterations").text("Number of iteration").action((x, c) => c.copy(numIterations = x)) opt[Double]("stepSize").text("initial step size(ignored by logistic regression)," + s"default: ${defaultParam.stepSize}").action((x, c) => c.copy(stepSize = x)) opt[String]("alogrithm").text(s"alogrithm (${Alogrithm.values.mkString(",")}),"+ s"default:${defaultParam.alogrithm}").action((x,c)=>c.copy(alogrithm=Alogrithm.withName(x))) opt[String]("regType").text(s"regularization type(${RegType.values.mkString(",")})"+ s"deafult:${defaultParam.regType}").action((x,c)=>c.copy(regType=RegType.withName(x))) opt[String]("regParam").text(s"regularization parameter,default:${defaultParam.regParam}") arg[String]("<input>").required().text("input path to labeled examples in LIBSVM format").action((x,c)=>c.copy(input=x)) note( """ For example, the following command runs this app on a synthetic dataset: bin/spark-submit --class org.apache.spark.examples.mllib.BinaryClassification \ examples/target/scala-*/spark-examples-*.jar \ --algorithm LR --regType L2 --regParam 1.0 \ data/mllib/sample_binary_classification_data.txt """ ) } parser.parse(args,defaultParam) match{ case Some(params)=>run(params) case _=>sys.exit(1) } } }


import scala.reflect.runtime.universe._

/**
 * Abstract class for parameter case classes.
 * This overrides the [[toString]] method to print all case class fields by name and value.
 * @tparam T  Concrete parameter class.
 */
abstract class AbstractParams[T: TypeTag] {

  private def tag: TypeTag[T] = typeTag[T]

  /**
   * Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
   * {
   *   [field name]:\t[field value]\n
   *   [field name]:\t[field value]\n
   *   ...
   * }
   */
  override def toString: String = {
    val tpe = tag.tpe
    val allAccessors = tpe.declarations.collect {
      case m: MethodSymbol if m.isCaseAccessor => m
    }
    val mirror = runtimeMirror(getClass.getClassLoader)
    val instanceMirror = mirror.reflect(this)
    allAccessors.map { f =>
      val paramName = f.name.toString
      val fieldMirror = instanceMirror.reflectField(f)
      val paramValue = fieldMirror.get
      s"  $paramName:\t$paramValue"
    }.mkString("{\n", ",\n", "\n}")
  }
}

決策樹,隨機森林樣例見之前文章這裡寫連結內容