1. 程式人生 > >Spark 小根堆(TreeSet)實現TopN問題-------基於上一篇文章的優化

Spark 小根堆(TreeSet)實現TopN問題-------基於上一篇文章的優化

第三步優化:假如資料量非常大的話,toList方法會產生記憶體溢位,使用treeSet方法可以解決 treeset既可以實現排序,還能有效的控制輸出的大小。

package day02

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

/**
  * @author WangLeiKai
  *         2018/9/27  18:53
  */
object FavSubTeacher4 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("FavSubTeacher4").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("F:\\上課畫圖\\spark 02\\課件與程式碼\\teacher(1).log")

    val subjectAndTeacher: RDD[((String, String), Int)] = lines.map(line => {
      val teacher: String = line.substring(line.lastIndexOf("/") + 1)
      val host = new URL(line).getHost
      val subject = host.substring(0, host.indexOf("."))
      ((subject, teacher), 1)
    })
    //取到所有的科目
    val subjects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect()

    val sbPartitioner: SubjectPartitioner2 = new SubjectPartitioner2(subjects)

    //reduceByKey方法  引數可以是分割槽器,如果沒有的話  使用的是預設的
    val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(sbPartitioner,_+_)

    val mapped: RDD[(String, (String, Int))] = reduced.map(tp => {
      val sub = tp._1._1
      val name = tp._1._2
      val num = tp._2
      (sub, (name, num))
    })


    val grouped: RDD[(String, Iterable[(String, Int)])] = mapped.groupByKey()
    val retRDD:RDD[(String, Iterable[(String, Int)])] = grouped.map(tuple => {
      var ts = new mutable.TreeSet[(String, Int)]()(new Ordering[(String, Int)]{
        override def compare(x: (String, Int), y: (String, Int)): Int = {

          val xField = x._2.toInt
          val yField = y._2.toInt
          -(xField - yField)
        }
      })
      val subject = tuple._1
      val nameNums = tuple._2
      for(nameNum <- nameNums) {
        // 新增到treeSet中
        ts.add(nameNum)
        if(ts.size > 2) {
          ts = ts.dropRight(1)
        }
      }
      (subject, ts)
    })


/*  object MyOrdering extends Ordering[(String, Int)]{
      override def compare(x: (String, Int), y: (String, Int)): Int = {

        val xField = x._2.toInt
        val yField = y._2.toInt
        xField - yField
      }
    }*/

    val tuples = retRDD.collect()
    tuples.foreach(println)

    sc.stop()
  }
}
class SubjectPartitioner2(sbs: Array[String]) extends Partitioner{

  //map裡放的是科目和對應的分割槽號 0  1 2
  private val rules: mutable.HashMap[String, Int] = new mutable.HashMap[String,Int]()
  var index = 0
  for(sb <- sbs){
    rules.put(sb,index)
    index += 1
  }

  //返回分割槽的數量  下一個RDD有多少個分割槽
  override def numPartitions: Int = sbs.length

  //這裡的key是一個元組
  override def getPartition(key: Any): Int = {

    //獲取學科名稱
    val subject: String = key.asInstanceOf[(String,String)]._1
    //根據規則計算分割槽編號
    rules(subject)
  }


}

可以使用匿名內部類實現,也可以另外寫一繼承Ordering的類

package day02

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

/**
  * @author WangLeiKai
  *         2018/9/27  18:53
  */
object FavSubTeacher5 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("FavSubTeacher5").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("F:\\上課畫圖\\spark 02\\課件與程式碼\\teacher(1).log")

    val subjectAndTeacher: RDD[((String, String), Int)] = lines.map(line => {
      val teacher: String = line.substring(line.lastIndexOf("/") + 1)
      val host = new URL(line).getHost
      val subject = host.substring(0, host.indexOf("."))
      ((subject, teacher), 1)
    })
    //取到所有的科目
    val subjects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect()

    val sbPartitioner: SubjectPartitioner3 = new SubjectPartitioner3(subjects)

    //reduceByKey方法  引數可以是分割槽器,如果沒有的話  使用的是預設的
    val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(sbPartitioner,_+_)

    val mapped: RDD[(String, (String, Int))] = reduced.map(tp => {
      val sub = tp._1._1
      val name = tp._1._2
      val num = tp._2
      (sub, (name, num))
    })


    val grouped: RDD[(String, Iterable[(String, Int)])] = mapped.groupByKey()
    val retRDD:RDD[(String, Iterable[(String, Int)])] = grouped.map(tuple => {
      var ts = new mutable.TreeSet[(String, Int)]()(new MyOrdering())
      val subject = tuple._1
      val nameNums = tuple._2
      for(nameNum <- nameNums) {
        // 新增到treeSet中
        ts.add(nameNum)
        if(ts.size > 2) {
          ts = ts.dropRight(1)
        }
      }
      (subject, ts)
    })




    val tuples = retRDD.collect()
    tuples.foreach(println)

    sc.stop()
  }
}
  class SubjectPartitioner3(sbs: Array[String]) extends Partitioner{

  //map裡放的是科目和對應的分割槽號 0  1 2
  private val rules: mutable.HashMap[String, Int] = new mutable.HashMap[String,Int]()
  var index = 0
  for(sb <- sbs){
    rules.put(sb,index)
    index += 1
  }

  //返回分割槽的數量  下一個RDD有多少個分割槽
  override def numPartitions: Int = sbs.length

  //這裡的key是一個元組
  override def getPartition(key: Any): Int = {

    //獲取學科名稱
    val subject: String = key.asInstanceOf[(String,String)]._1
    //根據規則計算分割槽編號
    rules(subject)
  }

}

//注意  該類要放在object 的外面
class MyOrdering extends Ordering[(String, Int)]{
  override def compare(x: (String, Int), y: (String, Int)): Int = {

    val xField = x._2.toInt
    val yField = y._2.toInt
    xField - yField
  }
}