spark mllib之分類和迴歸
阿新 • • 發佈:2019-02-12
Problem Type Supported Methods
二分類: 線性SVM(inear SVMs,), 羅輯迴歸(logistic regression), 決策樹(decision trees),隨機森林(random forests), 梯度增強樹( gradient-boosted trees), 樸素貝葉斯(naive Bayes)
多分類器: 羅輯迴歸(logistic regression),決策樹(decision trees), 隨機森林(random forests), 樸素貝葉斯(naive Bayes)
迴歸: 最小線性二乘法,LASSO迴歸, 嶺迴歸(ridge regression),決策樹( decision trees), 隨機森林(random forests), 梯度增強樹(gradient-boosted trees), 保序迴歸(isotonic regression)
分類的目的是將一堆物品分類,最常見的是二分類,通常分為positive和negative的兩類,如果有多個分類則為多分類,spark.mllib提供兩種線形分類方法,SVM支援向量機和邏輯迴歸,線形SVM只支援二分類,邏輯迴歸則支援二分類和多分類,spark.mllib對於這兩種方法都支援L1和L2的正則變數,訓練集是LabelPoint物件,(label,v1,v2,v3,v4….vn)v都是數字,非數字要轉成one hot
編碼。
BinaryClassification:
package com.demo.spark.mllib
import scopt.OptionParser
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.optimization.L1Updater
import org.apache.spark.mllib.optimization.SquaredL2Updater
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
object BinaryCLassification {
object Alogrithm extends Enumeration {
type Alogrithm = Value
val SVM, LR = Value
}
object RegType extends Enumeration {
type RegType = Value
val L1, L2 = Value
}
import Alogrithm._
import RegType._
case class Params(
input: String = null,
numIterations: Int = 100,
stepSize: Double = 1,
alogrithm: Alogrithm = LR,
regType: RegType = L2,
regParam: Double = 0.01) extends Abstractparams[Params]
def run(params:Params): Unit={
val conf = new SparkConf()
val sc = new SparkContext(conf)
Logger.getRootLogger.setLevel(Level.WARN)
val examples = MLUtils.loadLibSVMFile(sc,params.input).cache()
val splits = examples.randomSplit(Array(0.8,0.2))
val training = splits(0).cache()
val testing = splits(1).cache()
examples.unpersist(blocking=true)
val updater = params.regType match{
case L1=>new L1Updater
case L2=>new SquaredL2Updater
}
val model = params.alogrithm match{
case LR=>
val alogrithm = new LogisticRegressionWithLBFGS()
alogrithm.optimizer.setNumIterations(params.numIterations).setUpdater(updater).setRegParam(params.regParam)
alogrithm.run(training).clearThreshold()
case SVM=>
val alogrithm = new SVMWithSGD
alogrithm.optimizer.setNumIterations(params.numIterations).setUpdater(updater).setRegParam(params.regParam)
alogrithm.run(training).clearThreshold()
}
val predicition = model.predict(testing.map(_.features))
val predicitionAndLabel = predicition.zip(testing.map(_.label))
val metrics = new BinaryClassificationMetrics(predicitionAndLabel)
println(metrics.areaUnderROC())
sc.stop()
}
def main(args: Array[String]): Unit = {
val defaultParam = Params()
val parser = new OptionParser[Params]("BinaryClassification") {
head("BinaryClassification: an example app for binary classification.")
opt[Int]("numIterations").text("Number of iteration").action((x, c) => c.copy(numIterations = x))
opt[Double]("stepSize").text("initial step size(ignored by logistic regression)," +
s"default: ${defaultParam.stepSize}").action((x, c) => c.copy(stepSize = x))
opt[String]("alogrithm").text(s"alogrithm (${Alogrithm.values.mkString(",")}),"+
s"default:${defaultParam.alogrithm}").action((x,c)=>c.copy(alogrithm=Alogrithm.withName(x)))
opt[String]("regType").text(s"regularization type(${RegType.values.mkString(",")})"+
s"deafult:${defaultParam.regType}").action((x,c)=>c.copy(regType=RegType.withName(x)))
opt[String]("regParam").text(s"regularization parameter,default:${defaultParam.regParam}")
arg[String]("<input>").required().text("input path to labeled examples in LIBSVM format").action((x,c)=>c.copy(input=x))
note(
"""
For example, the following command runs this app on a synthetic dataset:
bin/spark-submit --class org.apache.spark.examples.mllib.BinaryClassification \
examples/target/scala-*/spark-examples-*.jar \
--algorithm LR --regType L2 --regParam 1.0 \
data/mllib/sample_binary_classification_data.txt
"""
)
}
parser.parse(args,defaultParam) match{
case Some(params)=>run(params)
case _=>sys.exit(1)
}
}
}
import scala.reflect.runtime.universe._
/**
* Abstract class for parameter case classes.
* This overrides the [[toString]] method to print all case class fields by name and value.
* @tparam T Concrete parameter class.
*/
abstract class AbstractParams[T: TypeTag] {
private def tag: TypeTag[T] = typeTag[T]
/**
* Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
* {
* [field name]:\t[field value]\n
* [field name]:\t[field value]\n
* ...
* }
*/
override def toString: String = {
val tpe = tag.tpe
val allAccessors = tpe.declarations.collect {
case m: MethodSymbol if m.isCaseAccessor => m
}
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(this)
allAccessors.map { f =>
val paramName = f.name.toString
val fieldMirror = instanceMirror.reflectField(f)
val paramValue = fieldMirror.get
s" $paramName:\t$paramValue"
}.mkString("{\n", ",\n", "\n}")
}
}
決策樹,隨機森林樣例見之前文章這裡寫連結內容