1. 程式人生 > >跟我一起學Spark之——RDD Join中寬依賴與窄依賴的判斷

跟我一起學Spark之——RDD Join中寬依賴與窄依賴的判斷

1.規律

   如果JoinAPI之前被呼叫的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分割槽數量一致,join結果的rdd分割槽數量也一樣,這個時候join api是窄依賴
  除此之外的,rdd 的join api是寬依賴

2.Join的理解

 

3.舉例

A表資料:
  1 a
  2 b
  3 c
B表資料:
  1 aa1
  1 aa2
  2 bb1
  2 bb2
  2 bb3
  4 dd1

A inner join B:
  1    a 1 aa1
  1    a 1 aa2
  2   b 2 bb1
  2   b 2 bb2
  2   b 2 bb3

A left outer join B:
  1    a 1 aa1
  1    a 1 aa2
  2   b 2 bb1
  2   b 2 bb2
  2   b 2 bb3
  3   c null null

A right outer join B:
  1    a 1 aa1
  1    a 1 aa2
  2   b 2 bb1
  2   b 2 bb2
  2   b 2 bb3
  null null 4 dd1

A full outer join B:
  1    a 1 aa1
  1    a 1 aa2
  2   b 2 bb1
  2   b 2 bb2
  2   b 2 bb3
  3   c null null
  null null 4 dd1

A left semi join B:
  1 a
  2 b

4.API

  必須是Key/value鍵值對

  

5.測試程式

import org.apache.spark.{SparkConf, SparkContext}

/**
  * RDD資料Join相關API講解
  * Created by ibf on 02/09.
  */
object RDDJoin {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("RDD-Join")
    val sc = SparkContext.getOrCreate(conf)

    // ==================具體程式碼======================
    // 模擬資料產生
    val rdd1 = sc.parallelize(Array(
      (1, "張三1"),
      (1, "張三2"),
      (2, "李四"),
      (3, "王五"),
      (4, "Tom"),
      (5, "Gerry"),
      (6, "莉莉")
    ), 1)

    val rdd2 = sc.parallelize(Array(
      (1, "上海"),
      (2, "北京1"),
      (2, "北京2"),
      (3, "南京"),
      (4, "紐約"),
      (6, "深圳"),
      (7, "香港")
    ), 1)

    // 呼叫RDD API實現內連線
    val joinResultRDD = rdd1.join(rdd2).map {
      case (id, (name, address)) => {
        (id, name, address)
      }
    }
    println("----------------")
    joinResultRDD.foreachPartition(iter => {
      iter.foreach(println)
    })
    // 呼叫RDD API實現左外連線
    val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map {
      case (id, (name, addressOption)) => {
        (id, name, addressOption.getOrElse("NULL"))
      }
    }
    println("----------------")
    leftJoinResultRDd.foreachPartition(iter => {
      iter.foreach(println)
    })
    // 左外連線稍微變化一下:需要左表出現,右表不出現的資料(not in)
    println("----------------")
    rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map {
      case (id, (name, _)) => (id, name)
    }.foreachPartition(iter => {
      iter.foreach(println)
    })

    // 右外連線
    println("----------------")
    rdd1
      .rightOuterJoin(rdd2)
      .map {
        case (id, (nameOption, address)) => {
          (id, nameOption.getOrElse("NULL"), address)
        }
      }
      .foreachPartition(iter => iter.foreach(println))

    // 全外連線
    println("----------------")
    rdd1
      .fullOuterJoin(rdd2)
      .map {
        case (id, (nameOption, addressOption)) => {
          (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL"))
        }
      }
      .foreachPartition(iter => iter.foreach(println))

    // 休眠為了看4040頁面
        Thread.sleep(1000000)
  }
}

6.說明 

 RDD join API:
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    返回值是RDD,RDD中的型別是一個二元組(a),a第一個元素是KEY型別的值(join的key), a第二個元素又是二元組(b), b的第一個元素是來自呼叫join函式的RDD的value,
    b的第二個元素是來自引數other這個RDD的value

  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    對於右邊的資料返回的是Option型別是資料,所以如果右表資料不存在,返回的是None;否則是一個Some的具體資料

  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
    對於左邊的資料返回的是Option型別是資料,所以如果左表資料不存在,返回的是None;否則是一個Some的具體資料

  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
    返回的value型別是Option封裝後的資料,如果資料不存在, 返回的是None,存在返回的是Some具體資料

7.缺點

8.優化程式

  沒有使用API,根據原理寫一個。

  減少shufflw運算元的使用。

import org.apache.spark.{SparkConf, SparkContext}

/**
  * RDD資料Join相關API講解
  * Created by ibf on 02/09.
  */
object RDDJoin {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("RDD-Join")
    val sc = SparkContext.getOrCreate(conf)

    // ==================具體程式碼======================
    // 模擬資料產生
    val rdd1 = sc.parallelize(Array(
      (1, "張三1"),
      (1, "張三2"),
      (2, "李四"),
      (3, "王五"),
      (4, "Tom"),
      (5, "Gerry"),
      (6, "莉莉")
    ), 1)

    val rdd2 = sc.parallelize(Array(
      (1, "上海"),
      (2, "北京1"),
      (2, "北京2"),
      (3, "南京"),
      (4, "紐約"),
      (6, "深圳"),
      (7, "香港")
    ), 1)

    // 假設rdd2的資料比較少,將rdd2的資料廣播出去
    val leastRDDCollection = rdd2.collect()
    val broadcastRDDCollection = sc.broadcast(leastRDDCollection)

    println("++++++++++++++++++")
    // 類似Inner Join的操作,Inner Join的功能:將兩個表都出現的資料合併
    println("-------------------")
    rdd1
      // 過濾rdd1中的資料,只要在rdd1中出現的資料,沒有出現的資料過濾掉
      .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
      // 資料合併,由於一條rdd1的資料可能在rdd2中存在多條對應資料,所以使用flatMap
      .flatMap {
      case (id, name) => {
        broadcastRDDCollection.value.filter(_._1 == id).map {
          case (_, address) => {
            (id, name, address)
          }
        }
      }
    }
      .foreachPartition(iter => iter.foreach(println))

    // 左外連線
    println("---------------------")
    rdd1
      .flatMap {
        case (id, name) => {
          // 從右表所屬的廣播變數中獲取對應id的集合列表
          val list = broadcastRDDCollection.value.filter(_._1 == id)
          // 對應id的集合可能為空,也可能資料有多個
          if (list.nonEmpty) {
            // 存在多個
            list.map(tuple => (id, name, tuple._2))
          } else {
            // id在右表中不存在,填預設值
            (id, name, "NULL") :: Nil
          }
        }
      }
      .foreachPartition(iter => iter.foreach(println))

    // 右外連線
    /**
      * rdd2中所有資料出現,由於rdd2中的資料在driver中可以儲存,可以認為rdd1和rdd2通過right join之後的資料也可以在driver中儲存下
      **/
    println("---------------------")
    // 將rdd1中符合條件的資料過濾出來儲存到driver中
    val stage1 = rdd1
      .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
      .collect()
    // 將driver中兩個集合進行right join
    val stage2 = leastRDDCollection.flatMap {
      case (id, address) => {
        val list = stage1.filter(_._1 == id)
        if (list.nonEmpty) {
          list.map(tuple => (id, tuple._2, address))
        } else {
          Iterator.single((id, "NULL", address))
        }
      }
    }
    stage2.foreach(println)

    // TODO: 全外連線,不寫程式碼,因為程式碼比較複雜

    //====================================
    // 左半連線:只出現左表資料(要求資料必須在右表中也出現過),如果左表的資料在右表中出現多次,最終結果只出現一次
    println("+++++++++++++++++")
    println("-----------------------")
    rdd1
      .join(rdd2)
      .map {
        case (id, (name, _)) => (id, name)
      }
      .distinct()
      .foreachPartition(iter => iter.foreach(println))
    println("------------------------")
    rdd1
      .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
      .foreachPartition(iter => iter.foreach(println))

    // 休眠為了看4040頁面
        Thread.sleep(1000000)
  }
}

9.Join的窄依賴程式

  使用reduceByKey,裡面的程式會給一個分割槽。 

package com.ibeifeng.senior.join

import org.apache.spark.{SparkConf, SparkContext}

/**
  * RDD資料Join相關API講解
  * Created by ibf on 02/09.
  */
object RDDJoin2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("RDD-Join")
    val sc = SparkContext.getOrCreate(conf)

    // ==================具體程式碼======================
    // 模擬資料產生, 新增map、reduceByKey、mapPartitions等api的主要功能是給rdd1和rdd2中新增一個分割槽器(表示當前rdd是存在shuffle過程的)
    val rdd1 = sc.parallelize(Array(
      (1, "張三1"),
      (1, "張三2"),
      (2, "李四"),
      (3, "王五"),
      (4, "Tom"),
      (5, "Gerry"),
      (6, "莉莉")
    ), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions(
      iter => iter.map(tuple => tuple._1),
      true // 使用上一個RDD的分割槽器,false表示不使用, 設定為None
    )

    val rdd2 = sc.parallelize(Array(
      (1, "上海"),
      (2, "北京1"),
      (2, "北京2"),
      (3, "南京"),
      (4, "紐約"),
      (6, "深圳"),
      (7, "香港")
    ), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions(
      iter => iter.map(tuple => tuple._1),
      true // 使用上一個RDD的分割槽器,false表示不使用, 設定為None
    )

    // 呼叫RDD API實現內連線
    val joinResultRDD = rdd1.join(rdd2).map {
      case (id, (name, address)) => {
        (id, name, address)
      }
    }
    println("----------------")
    joinResultRDD.foreachPartition(iter => {
      iter.foreach(println)
    })

    // 休眠為了看4040頁面
        Thread.sleep(1000000)
  }
}

 

原文連結:https://www.cnblogs.com/juncaoit/p/6528146.html