1. 程式人生 > >【原創】大資料基礎之Spark(6)rdd sort實現原理

【原創】大資料基礎之Spark(6)rdd sort實現原理

spark 2.1.1

spark中可以通過RDD.sortBy來對分散式資料進行排序,具體是如何實現的?來看程式碼:

org.apache.spark.rdd.RDD

  /**
   * Return this RDD sorted by the given key function.
   */
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 
= withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values } /** * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: T => K): RDD[(K, T)] = withScope { val cleanedF = sc.clean(f) map(x => (cleanedF(x), x)) }
/** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys).
*/ // TODO: this currently doesn't work on P other than Tuple2! def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }

程式碼比較簡單:sort是一個transformation操作,需要定義一個keyBy,即根據什麼排序,然後會做一步map,即 item -> (keyBy(item), item),然後定義一個Partitioner,即分割槽策略(多少個分割槽,升序降序等),最後返回一個ShuffledRDD;

ShuffledRDD原理詳見 https://www.cnblogs.com/barneywill/p/10158457.html

這裡重點說下RangePartitioner:

org.apache.spark.RangePartitioner

/**
 * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
 * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
 *
 * @note The actual number of partitions created by the RangePartitioner might not be the same
 * as the `partitions` parameter, in the case where the number of sampled records is less than
 * the value of `partitions`.
 */
class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }

  def numPartitions: Int = rangeBounds.length + 1

  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

這裡會根據partition的數量確定rangeBounds,rangeBounds很像QuickSort中的pivot,

舉例來說:叢集現在有10個節點,對1億資料做排序,partition數量是100,最理想的情況是1億資料平均分成100份,然後每個節點存放10份,然後各自排序就好,沒有資料傾斜;
但是這個很難實現,要注意的是這裡平分的過程實際上也是劃分邊界的過程,即確定每份的最小值和最大值邊界,需要對全部資料遍歷統計之後才能精確實現;

spark中採用的是一種通過對資料取樣瞭解資料分佈並最終達到近似精確的方式,具體實現為在從全部資料中取樣sampleSize個數據,每個分割槽取樣sampleSizePerPartition個,如果某些分割槽很大,會追加取樣個數,這樣保證取樣過程儘可能的平均,然後針對取樣資料進行探測劃分邊界,得到rangeBounds,有了rangeBounds之後就可以知道1億資料中的每一條具體在哪個新的分割槽;

 

還有一個問題:在sort之後如果collect到driver,array資料還會保持排序狀態嗎?

org.apache.spark.rdd.RDD

  /**
   * Return an array that contains all of the elements in this RDD.
   *
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
   */
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

答案是肯定的;