跟我一起學Spark之——RDD Join中寬依賴與窄依賴的判斷
1.規律
如果JoinAPI之前被呼叫的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分割槽數量一致,join結果的rdd分割槽數量也一樣,這個時候join api是窄依賴
除此之外的,rdd 的join api是寬依賴
2.Join的理解
3.舉例
A表資料: 1 a 2 b 3 c B表資料: 1 aa1 1 aa2 2 bb1 2 bb2 2 bb3 4 dd1 A inner join B: 1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 A left outer join B: 1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 3 c null null A right outer join B: 1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 null null 4 dd1 A full outer join B: 1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 3 c null null null null 4 dd1 A left semi join B: 1 a 2 b
4.API
必須是Key/value鍵值對
5.測試程式
import org.apache.spark.{SparkConf, SparkContext} /** * RDD資料Join相關API講解 * Created by ibf on 02/09. */ object RDDJoin { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("RDD-Join") val sc = SparkContext.getOrCreate(conf) // ==================具體程式碼====================== // 模擬資料產生 val rdd1 = sc.parallelize(Array( (1, "張三1"), (1, "張三2"), (2, "李四"), (3, "王五"), (4, "Tom"), (5, "Gerry"), (6, "莉莉") ), 1) val rdd2 = sc.parallelize(Array( (1, "上海"), (2, "北京1"), (2, "北京2"), (3, "南京"), (4, "紐約"), (6, "深圳"), (7, "香港") ), 1) // 呼叫RDD API實現內連線 val joinResultRDD = rdd1.join(rdd2).map { case (id, (name, address)) => { (id, name, address) } } println("----------------") joinResultRDD.foreachPartition(iter => { iter.foreach(println) }) // 呼叫RDD API實現左外連線 val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map { case (id, (name, addressOption)) => { (id, name, addressOption.getOrElse("NULL")) } } println("----------------") leftJoinResultRDd.foreachPartition(iter => { iter.foreach(println) }) // 左外連線稍微變化一下:需要左表出現,右表不出現的資料(not in) println("----------------") rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map { case (id, (name, _)) => (id, name) }.foreachPartition(iter => { iter.foreach(println) }) // 右外連線 println("----------------") rdd1 .rightOuterJoin(rdd2) .map { case (id, (nameOption, address)) => { (id, nameOption.getOrElse("NULL"), address) } } .foreachPartition(iter => iter.foreach(println)) // 全外連線 println("----------------") rdd1 .fullOuterJoin(rdd2) .map { case (id, (nameOption, addressOption)) => { (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL")) } } .foreachPartition(iter => iter.foreach(println)) // 休眠為了看4040頁面 Thread.sleep(1000000) } }
6.說明
RDD join API:
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
返回值是RDD,RDD中的型別是一個二元組(a),a第一個元素是KEY型別的值(join的key), a第二個元素又是二元組(b), b的第一個元素是來自呼叫join函式的RDD的value,
b的第二個元素是來自引數other這個RDD的value
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
對於右邊的資料返回的是Option型別是資料,所以如果右表資料不存在,返回的是None;否則是一個Some的具體資料
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
對於左邊的資料返回的是Option型別是資料,所以如果左表資料不存在,返回的是None;否則是一個Some的具體資料
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
返回的value型別是Option封裝後的資料,如果資料不存在, 返回的是None,存在返回的是Some具體資料
7.缺點
8.優化程式
沒有使用API,根據原理寫一個。
減少shufflw運算元的使用。
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD資料Join相關API講解
* Created by ibf on 02/09.
*/
object RDDJoin {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("RDD-Join")
val sc = SparkContext.getOrCreate(conf)
// ==================具體程式碼======================
// 模擬資料產生
val rdd1 = sc.parallelize(Array(
(1, "張三1"),
(1, "張三2"),
(2, "李四"),
(3, "王五"),
(4, "Tom"),
(5, "Gerry"),
(6, "莉莉")
), 1)
val rdd2 = sc.parallelize(Array(
(1, "上海"),
(2, "北京1"),
(2, "北京2"),
(3, "南京"),
(4, "紐約"),
(6, "深圳"),
(7, "香港")
), 1)
// 假設rdd2的資料比較少,將rdd2的資料廣播出去
val leastRDDCollection = rdd2.collect()
val broadcastRDDCollection = sc.broadcast(leastRDDCollection)
println("++++++++++++++++++")
// 類似Inner Join的操作,Inner Join的功能:將兩個表都出現的資料合併
println("-------------------")
rdd1
// 過濾rdd1中的資料,只要在rdd1中出現的資料,沒有出現的資料過濾掉
.filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
// 資料合併,由於一條rdd1的資料可能在rdd2中存在多條對應資料,所以使用flatMap
.flatMap {
case (id, name) => {
broadcastRDDCollection.value.filter(_._1 == id).map {
case (_, address) => {
(id, name, address)
}
}
}
}
.foreachPartition(iter => iter.foreach(println))
// 左外連線
println("---------------------")
rdd1
.flatMap {
case (id, name) => {
// 從右表所屬的廣播變數中獲取對應id的集合列表
val list = broadcastRDDCollection.value.filter(_._1 == id)
// 對應id的集合可能為空,也可能資料有多個
if (list.nonEmpty) {
// 存在多個
list.map(tuple => (id, name, tuple._2))
} else {
// id在右表中不存在,填預設值
(id, name, "NULL") :: Nil
}
}
}
.foreachPartition(iter => iter.foreach(println))
// 右外連線
/**
* rdd2中所有資料出現,由於rdd2中的資料在driver中可以儲存,可以認為rdd1和rdd2通過right join之後的資料也可以在driver中儲存下
**/
println("---------------------")
// 將rdd1中符合條件的資料過濾出來儲存到driver中
val stage1 = rdd1
.filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
.collect()
// 將driver中兩個集合進行right join
val stage2 = leastRDDCollection.flatMap {
case (id, address) => {
val list = stage1.filter(_._1 == id)
if (list.nonEmpty) {
list.map(tuple => (id, tuple._2, address))
} else {
Iterator.single((id, "NULL", address))
}
}
}
stage2.foreach(println)
// TODO: 全外連線,不寫程式碼,因為程式碼比較複雜
//====================================
// 左半連線:只出現左表資料(要求資料必須在右表中也出現過),如果左表的資料在右表中出現多次,最終結果只出現一次
println("+++++++++++++++++")
println("-----------------------")
rdd1
.join(rdd2)
.map {
case (id, (name, _)) => (id, name)
}
.distinct()
.foreachPartition(iter => iter.foreach(println))
println("------------------------")
rdd1
.filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
.foreachPartition(iter => iter.foreach(println))
// 休眠為了看4040頁面
Thread.sleep(1000000)
}
}
9.Join的窄依賴程式
使用reduceByKey,裡面的程式會給一個分割槽。
package com.ibeifeng.senior.join
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD資料Join相關API講解
* Created by ibf on 02/09.
*/
object RDDJoin2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("RDD-Join")
val sc = SparkContext.getOrCreate(conf)
// ==================具體程式碼======================
// 模擬資料產生, 新增map、reduceByKey、mapPartitions等api的主要功能是給rdd1和rdd2中新增一個分割槽器(表示當前rdd是存在shuffle過程的)
val rdd1 = sc.parallelize(Array(
(1, "張三1"),
(1, "張三2"),
(2, "李四"),
(3, "王五"),
(4, "Tom"),
(5, "Gerry"),
(6, "莉莉")
), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions(
iter => iter.map(tuple => tuple._1),
true // 使用上一個RDD的分割槽器,false表示不使用, 設定為None
)
val rdd2 = sc.parallelize(Array(
(1, "上海"),
(2, "北京1"),
(2, "北京2"),
(3, "南京"),
(4, "紐約"),
(6, "深圳"),
(7, "香港")
), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions(
iter => iter.map(tuple => tuple._1),
true // 使用上一個RDD的分割槽器,false表示不使用, 設定為None
)
// 呼叫RDD API實現內連線
val joinResultRDD = rdd1.join(rdd2).map {
case (id, (name, address)) => {
(id, name, address)
}
}
println("----------------")
joinResultRDD.foreachPartition(iter => {
iter.foreach(println)
})
// 休眠為了看4040頁面
Thread.sleep(1000000)
}
}
原文連結:https://www.cnblogs.com/juncaoit/p/6528146.html