1. 程式人生 > >Spark 2.x 自定義累加器AccumulatorV2的使用

Spark 2.x 自定義累加器AccumulatorV2的使用

廢除

Spark2.x之後,之前的的accumulator被廢除,用AccumulatorV2代替;

這裡寫程式碼片

更新增加

建立並註冊一個long accumulator, 從“0”開始,用“add”累加

  def longAccumulator(name: String): LongAccumulator = {
    val acc = new LongAccumulator
    register(acc, name)
    acc
  }

建立並註冊一個double accumulator, 從“0”開始,用“add”累加

  def doubleAccumulator(name: String)
:
DoubleAccumulator = { val acc = new DoubleAccumulator register(acc, name) acc }

建立並註冊一個CollectionAccumulator, 從“empty list”開始,並加入集合

  def collectionAccumulator[T](name: String): CollectionAccumulator[T] = {
    val acc = new CollectionAccumulator[T]
    register(acc, name)
    acc
  }

自定義累加器

1、類繼承extends AccumulatorV2[String, String],第一個為輸入型別,第二個為輸出型別
2、覆寫抽象方法:

isZero: 當AccumulatorV2中存在類似資料不存在這種問題時,是否結束程式。
copy: 拷貝一個新的AccumulatorV2
reset: 重置AccumulatorV2中的資料
add: 操作資料累加方法實現
merge: 合併資料
value: AccumulatorV2對外訪問的資料結果

參考,可參考longAccumulator原始碼:連結地址[291-361行]

下邊為一個簡單的案例:實現字串的拼接;

1、定義:MyAccumulator

class  MyAccumulator extends AccumulatorV2[String,String]{

  private var res = ""

  override def isZero: Boolean = {res == ""}

  override def merge(other: AccumulatorV2[String, String]): Unit = other match {
    case o : MyAccumulator => res += o.res
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def copy(): MyAccumulator = {
    val newMyAcc = new MyAccumulator
    newMyAcc.res = this.res
    newMyAcc
  }

  override def value: String = res

  override def add(v: String): Unit = res += v +"-"

  override def reset(): Unit = res = ""
}

2、呼叫:

object Accumulator1 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Accumulator1").setMaster("local")
    val sc = new SparkContext(conf)

    val myAcc = new MyAccumulator
    sc.register(myAcc,"myAcc")

    //val acc = sc.longAccumulator("avg")
    val nums = Array("1","2","3","4","5","6","7","8")
    val numsRdd = sc.parallelize(nums)

    numsRdd.foreach(num => myAcc.add(num))
    println(myAcc)
    sc.stop()
  }
}

3、結果:1-2-3-4-5-6-7-8-

使用注意點

像map()這樣的惰性轉換中,不保證會執行累加器更新。

// Here, accum is still 0 because no actions have caused the map operation to be computed.

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }