1. 程式人生 > >Learning Spark中文版--第四章--使用鍵值對(2)

Learning Spark中文版--第四章--使用鍵值對(2)

最小 定制 單個 locate 最大限度 ces 之一 定期 情況

Actions Available on Pair RDDs (鍵值對RDD可用的action)

??和transformation(轉換)一樣,鍵值對RDD也可以使用基礎RDD上的action(開工),並且鍵值對RDD有一些利用鍵值對數據特性的的action,如下表:

表4-3 鍵值對RDD上的action

函數名 描述 例子 結果
countByKey() 計算每個鍵元素的總數 rdd.countByKey() {(1,1),(3,2)}
collectAsMap() 結果收集成一個map便於查詢 rdd.collectAsMap() Map{(1,2),(3,4),(3,6)}
lookup(key) 根據鍵返回值 rdd.lookup(3) [4,6]

??鍵值對RDD還有很多其他保存RDD的action,我們將在第五章進行討論。

Data Partitioning(Advanced)(數據分區)

??我們這一章討論的最後一個Spark特性就是如何控制節點間的數據分區。在分布式程序中,主機間的通信代價高昂,所以把數據安排妥當來最小化網絡間的通信可以極大地提高性能。很像單臺機器的程序需要為數據選擇正確的數據結構,Spark能夠控制RDD的分區來減少網絡間通信。分區不會對所有的應用都有用,舉個例子,如果給定的RDD值只被掃描一次,那麽預先對其分區沒有什麽意義。只有多次使用如join這樣的的鍵操作的RDD,分區才有意義。稍後會有一些例子。

??Spark中的所有鍵值對RDD都可以使用分區,因為系統的分組函數是根據每個元素的鍵。盡管Spark沒有明確地控制每個鍵所對應的工作節點(也因為系統在某些工作節點失敗的情況下也能正常運行),它允許程序能夠確保一組鍵會出現在同一個節點上。例如,你可以選擇哈希分區(hashpartition)將一個RDD劃分成100個分區,這樣模除100後有相同哈希值的鍵會出現在一個節點上。或者你可以使用區間分區(range-partition)按區間對鍵進行分區,這樣鍵在相同範圍內的元素會在相同的節點上。

??舉個簡單例子,想想一個內存中保存大量用戶信息的應用,(UserId,UserInfo)組成的鍵值對RDD,UserInfo包含用戶話題訂閱列表。這個應用定期把這個表和一個記錄了過去五分鐘發生的點擊事件的小文件結合,就是一個(UserId,LinkInfo)鍵值對,記錄了用戶五分鐘內點擊網站鏈接的信息的日誌。舉例來講,我們想統計用戶訪問和他們訂閱主題無關的鏈接數量。我們可以執行Spark的join操作,把UserInfo和LinkInfo鍵值對根據UserId鍵分組。Example4-22展示了這個例子:

Example 4-22. Scala simple application

// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn‘t provide Spark with any way of knowing in which partition a
// particular UserID is located.
//初始化代碼;我們從HDFS的Hadoop SequenceFile加載用戶信息,它通過他們找到的HDFS block來分發userData元素,Spark並不知道每個UserId在分區中的位置
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// Function called periodically to process a logfile of events in the past 5 minutes;
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
//函數定期被調用處理過去五分鐘的事件日誌,(假定這個SequenceFIle包含(UserId,Link//Info)鍵值對

def processNewLogs(logFileName: String) {
    val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
    val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
    val offTopicVisits = joined.filter {
        case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
            !userInfo.topics.contains(linkInfo.topic)
    }.count()
    println("Number of visits to non-subscribed topics: " + offTopicVisits)
}

??這段代碼能夠達到我們的目的,但是效率會很差。這是因為每次調用processNewLogs()時調用的join()操作都不知道鍵在數據的分區方式。默認情況下,這些操作會用hash值混洗兩個數據集的所有鍵,把具有相同哈希值的鍵發送到相同的機器中,然後在這臺機器上join相同鍵的元素(如圖4-4)。因為我們知道userData表比每五分鐘的點擊事件日誌大很多,這浪費了大量工作:userData表每次被調用都要在通過網絡把數據打亂再用哈希值對鍵分組,有時候用戶表甚至沒有變化也要這樣做。

技術分享圖片


??改正其實很簡單:在程序開始時對userData使用partitionBy()轉換(transformation)來把數據進行哈希分區。還需傳遞一個spark.HashPartitioner對象給partitionBy,如例所示:

Example 4-23. Scala custom partitioner
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
                .partitionBy(new HashPartitioner(100)) // Create 100 partitions
                .persist()

??processNewLogs()可以保持不變:事件RDD是從本地進入的processNewLogs()方法,並且在本方法中只被使用了一次,所以時間RDD被指定分區就沒什麽好處。因為我們構建userData時調用了partitionBy(),Spark會立即知道它被哈希分區了,調用join()時會利用這些信息。特別是當我們調用userData.join(events)時,Spark會只混洗eventsRDD,向包含用戶數據的相應哈希分區的機器發送帶有每個特定UserId的事件(如圖4-5)。結果就是只有少量數據需要在網絡中通信,程序極大提高速度。

技術分享圖片
??註意一點partitionBy()是transformation(轉換),所以他總是返回一個新RDD,原始RDD不會被改變。RDD一旦創建可以永不改變。因此,持久化並且保存partitionBy()後的userData的結果是很重要的,而不是保存原始的sequenceFile()。並且,把100傳遞給partitionBy()表示分區的數量,這會控制相同數量的並行task(任務)在RDD執行後續的操作(例如:join);通常,這個數量至少和集群上的核心數一樣大。

在partitionBy()之後未能持久化會導致後續使用RDD重復對數據分區。沒有持久化,已分區RDD的使用將會導致對RDD完整繼承關系的重新求導。這是partitionBy()的弊端,這會導致跨網絡的重復分區和數據洗牌,類似於沒有指定分區的情況。

??實際上,Spark的許多操作會自動生成附加分區信息的RDD,並且很多操作會利用這些分區信息,除了join()。舉個例子,sortBykey()groupByKey()會分別生成區間分區和哈希分區。另一方面,類似map()操作產生的新RDD會忘記父RDD的分區信息,因為這種操作理論上有可能修改每條記錄的鍵信息。後面部分會介紹如何決定RDD分區,和Spark不同的操作如何影響分區。

Java,Python和Scala三者的API受益於分區的方式並無二致。但是,在Python中,你不能把一個Hash Partitioner對象傳遞給partitionBy;你可以直接傳遞分區要求的數量(例如:rdd.partitionBy(100))。

Determining an RDD‘s Patitioner(決定RDD的分區器)

??在Scala和Java中,你可以通過partitioner的屬性決定RDD如何分區(或者Java中的partitioner()方法)。這回返回一個scala.Option對象,一個Scala中包含可能存在可能不存在對象的容器類。你還可以調用Option對象的isDefined()來檢查是否有值,get()方法返回這個值。如果有值,會是一個spark.Partioner對象。這本質上是一個表示RDD每個鍵的分區的函數;稍後會詳細介紹。

??利用partitioner屬性是個在Spark shell中測試Spark不同操作對分區影響的好手段,還能夠檢查你想在程序中執行的操作是否符合正確的結果(見Example4-24)。

Example 4-24. Determining partitioner of an RDD

scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> pairs.partitioner
res0: Option[spark.Partitioner] = None

scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14

scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)

??在這個簡單的shell會話中,我們創建了(Int,Int)鍵值對RDD,其初始化是沒有任何分區信息(Option對象的值是None)。然後我們通過對第一個RDD哈希分區創建了第二個RDD。如果我們實際上想在後面的操作使用已定義的partitioned,我們應該在例子第三行輸入的末尾加上persist()。這和在之前例子中需要對userData使用persist()的原因是相同的:如果不適用persist(),後續RDD的action計算分區的整個繼承關系,這會導致鍵值對被一遍又一遍地哈希分區。

Operations That Benefit from Partitioning(從分區獲益的操作)

??很多Spark的操作會導致在網絡間根據鍵對數據洗牌。這些操作都可以通過分區進行優化。像Spark1.0,通過分區可以優化的操作有:cogroup(),groupWith(),join(),lefOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey()和lookup()

??對於運行單獨RDD上的操作,如reduceByKey(),運行一個分區RDD會導致每個鍵的所有值在單個機器上計算,只需要最後把本地歸約的值從工作節點發送回主節點。對於二元運算,如cogroup()join(),預分區(pre-partitioning)會導致至少一個RDD(已知分區器的RDD)不被混洗。如果RDD都是相同的分區器並且緩存在相同的機器上或者其中之一仍未被計算,那麽不會發生網絡間的數據洗牌。

Operations That Affect Partitioning(影響分區的操作)

??Spark內部知道操作是如何影響分區的,自動對會為數據分區的操作創建的RDD設置分區器。舉例來講,假設你調用了join()來連接兩個RDD;因為有相同鍵的元素已經被哈希分區到相同的機器上了,Spark知道結果就是哈希分區,在join產生的結果上的操作如reduceByKey()會明顯變快。

??另一方面,有些無法確保會生成已知分區的轉換,輸出的RDD不會有分區器集。舉例來說,如果你對一個哈希分區的鍵值對RDD調用map()map()中的函數參數理論上可以改變每個元素的鍵,所以結果不會包含分區器。Spark不會分析你的函數來檢查是否改變了鍵,而是提供了兩個操作,mapvalues()flatMapValues()來保證每個元組的鍵未被改變。

??總結一下,所有會輸出分區器的操作:cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(),mapValues() (如果父RDD有分區器), flatMapValues() (如果父RDD有分區器), and filter() (如果父RDD有分區器)。剩下的操作不會產生分區器。

??對於二元操作,輸出分區器的設置取決於父RDD的分區器。默認情況下,使用哈希分區器,分區的數量由操作的並行度確定。但是,如果父RDD其中之一有分區器集,那該分區器會設置為分區器,如果所有的父RDD都有分區器集,那麽設置分區器為第一個父分區器。

Example:PageRank(例:PageRank)

??我們認為PageRank(網頁排名)算法是一個典型的會因分區提升效率的例子。PageRank算法是以谷歌的Larry Page命名的,用於衡量特定網頁相對於搜索引擎索引中的其他網頁而言的重要程度。這可以用來為網頁排名,也可以是論文或用戶影響力。

??PageRank是一個會執行很多join的叠代算法,所以是使用RDD分區的好樣本。這個算法包含兩個數據集:一個是(pageId,linkList),包含每個網頁的鄰居列表(該頁面包含其他頁面的鏈接,這個鏈接頁面稱為鄰居頁面);另一個是(pageID,rank),包含每個網頁的當前排名。它的計算流程大致如下:

1.把每個頁面的初始級別設置為1.0.

2.每次叠代,頁面p發送rank(p)/numNeighbors(p)的貢獻給它的鄰居網站(它有鏈接的頁面)。

3.設置頁面p級別為0.5+0.85*收到的貢獻。

??最後兩步重復叠代多次,在這個過程中,算法會漸漸收斂到每個網頁正確的PageRank值。實際上,通常進行十次叠代。

Example 4-25 gives the code to implement PageRank in Spark.

Example 4-25. Scala PageRank

// Assume that our neighbor list was saved as a Spark objectFile
//假設鄰頁列表存在了Spark的objectFile中。
val links = sc.objectFile[(String, Seq[String])]("links")
                .partitionBy(new HashPartitioner(100))
                .persist()
                
// Initialize each page‘s rank to 1.0; since we use mapValues, the resulting RDD
// will have the same partitioner as links
//把每個頁面的初始值設為1,因為使用mapValue,所以RDD會有和鏈接相同的分區器
var ranks = links.mapValues(v => 1.0)

// Run 10 iterations of PageRank
//運行十遍PageRank的叠代
for (i <- 0 until 10) {
    val contributions = links.join(ranks).flatMap {
        case (pageId, (links, rank)) =>
            links.map(dest => (dest, rank / links.size))
    }
    ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// Write out the final ranks
ranks.saveAsTextFile("ranks")

??就是這樣!算法開始時,rankRDD對每個元素設置1.0的初始值,每次叠代都會更新rank變量。PageRank算法在Spark中表達非常簡單:首先將當前等級RDD和靜態鏈接RDD通過join()結合,這是為了得到(頁面ID,鏈接列表,頁面等級)元組,然後使用flatMap生成“貢獻”值發送給每個鄰居頁面。我們把貢獻值按頁面ID求和並且設置頁面的等級為0.15+0.85*收到的貢獻

??雖然代碼本身很簡單,但是這個例子為了確保使用高效的方式分區RDD和最小化網絡通信做了很多事情:

1.註意每次叠代都是links.join(ranks)。因為links是一個靜態數據集,程序開始就用partitionBy對它分區了,所以這個RDD並不需要在網絡間對數據洗牌。實際上,linksRDD可能會比ranksRDD大很多,因為它保存了每個頁面的鄰居頁面列表,所以這種優化比簡單地實現PageRank(如,使用簡單的MapReduce)減少了大量的網絡任務。

2.同理,我們把linksRDDpersist()避免對其叠代。

3.當我們第一次創建ranksRDD時,我們使用mapValues()而不是map()來保存父RDD(links)的分區信息,所以我們第一次join開銷很小。

4.再循環體中,我們在reduceByKey()之後執行mapValues();因為reduceByKey()的結果已經被哈希分區了,這將使得將map的結果與下一次叠代中的鏈接結合起來更有效率。

為了最大限度地發揮分區優化的潛力,當你不改變鍵的值時應該使用mapValues()flatMapValues()

Custom Partitioners(定制分區器)

??盡管哈希分區器和區間分區器可以在很多場景使用,Spark仍然允許你通過提供一個定制的Partitioner對象來自定義RDD分區方式。這可以幫助你利用特定領域的知識來減少網絡通信的消耗。

??舉例來說,假如我們想使用PageRank算法計算一組web頁面,以頁面URL做RDD的鍵,即PageID為URL,使用哈希分區的話,域名相同後綴不相同的URL不會在一個分區(如,http://www.cnn.com/WORLD 和 http://www.cnn.com/US)。我們知道同一個域名中的鏈接往往彼此連接。由於PageRank需要在每次叠代時將每個頁面的消息發送給每個鄰居,因此定制分區器有助於將這些頁面分組到相同的分區中。我們可以定制一個分區器把相同域名的URL分區到一個節點上。

??定制分區器需要是org.apache.spark.Partitioner的子類並且實現三個方法:

  • numPartitions:Int,返回你創建分區的數量。

  • getPartition(key:Any):Int,返回對應鍵的分區ID(0到numPartitions-1)。

  • equals(),標準Java相等方法。這個實現很重要,因為Spark需要測試你的分區器與其它實例是否相等來判斷兩個RDD的分區是否是一種方式。

??有一點需要註意的是如果你的算法中依賴了Java的hashCode()方法,這有可能返回一個負數。你需要確保getPartition()不會返回負數。

??Example4-26展示了一個我們之前描述的域名分區器,這個分區器只對每個URL的域名進行哈希分區。

Example 4-26. Scala custom partitioner
class DomainNamePartitioner(numParts: Int) extends Partitioner {
    override def numPartitions: Int = numParts
    override def getPartition(key: Any): Int = {
        val domain = new Java.net.URL(key.toString).getHost()
        val code = (domain.hashCode % numPartitions)
        if (code < 0) {
            code + numPartitions // Make it non-negative
        } else {
            code
    }
}
// Java equals method to let Spark compare our Partitioner objects
override def equals(other: Any): Boolean = other match {
    case dnp: DomainNamePartitioner =>
        dnp.numPartitions == numPartitions
    case _ =>
        false
    }
}

??註意在equals()方法中,我們使用Scala的模式匹配操作測試other是否是一個DomainNamePartitioner對象,如果是的就跳進裏面的方法;這和使用Java的intanceof是一樣的。

??使用定制分區器非常簡單:把它傳給partitionBy()方法就行了。Spark中很多基於數據洗牌的操作,如join()groupByKey(),也可以使用可選的Partitioner對象來控制分區的輸出。

??在Java中創建一個定制分區器和Scala很相似:直接繼承spark.Partitioner類並且實現需要的方法就行。

??在Python中,你不需要繼承Partitioner類,但是需要給RDD.partitionBy()方法傳遞一個哈希函數作為額外的參數。示例如下:

Example 4-27. Python custom partitioner

import urlparse

def hash_domain(url):
    return hash(urlparse.urlparse(url).netloc)
    
rdd.partitionBy(20, hash_domain) # Create 20 partitions

??註意一點你傳遞的哈希函數會作為和其他RDD比較的標識。如果你想使用相同的分區器對多個RDD分區,那麽需要傳遞相同的函數對象(如,全局函數)而不是為每個創建一個lambda表達式。

Conclusion(總結)

??本章中,我們了解了Spark中處理鍵值對數據的特殊函數。第三章學到的技術對鍵值對仍然適用。下一章節,我們將了解如何加載保存數據。

Learning Spark中文版--第四章--使用鍵值對(2)