分散式系統系列學習筆記:MapReduce程式設計模型(附程式碼實現)
作者:小羊 編輯:韓數
大家好,我是韓數,本文的作者是我的好朋友小羊,本次呢,特地邀請小羊大神來撰寫大資料系列的高階教程,隨著大資料的發展,越來越多優秀的開源框架逐漸進入到我們開發者的生活中,包括hadoop,spark,flink等等,而Hadoop在大資料領域幾乎無法撼動的地位,也成了每個大資料程式設計師的必修課,而MR(MapReduce),這一由google所提出的分散式計算模型,也是Hadoop的核心概念之一。本文呢,將從 MIT 6.824 課程 LEC 1 的內容出發,通過對 MapReduce 原論文的解讀,以及課後作業程式碼的具體實現,來幫助大家去深入理解MapReduce模型,從而對分散式有一個深入的理解。
注:
6.824 是 MIT(斯坦福) 開設的分散式系統課程,其中第一節是 閱讀論文並梳理 MR 模型的執行流程,實現單機版、分散式版的 word count,最後使用模型來生成倒排索引。
不說廢話,上東西。
分散式的引入:
什麼是分散式系統?
分散式系統就是有多臺協同的計算機組成的系統,這些系統往往用於大型資料的儲存,計算等傳統單機模式下不能完成任務的場景。比如,雲盤儲存,spark,Hadoop等計算引擎。
如圖所示,分散式系統通常會有多個副本,多個分割槽,以此來保證資料和系統的可靠性。
如何實現分散式:
分散式的實現,依賴彼此之間的通訊和協同,同時需要對資料的一致性,叢集的可靠性等進行維護。
- 首先需要將物理獨立的個體,通過通訊模組,組織到一起工作。
- 需要多副本來實現資料和系統的可靠性,保證容錯。
- 支援水平擴充套件來擴容,提高吞吐量。
分散式的優缺點:
- 優勢:
- 併發執行任務快,理論上只受最慢的一個任務影響,理想狀態下,
N
臺機器併發,耗時幾乎可以變為T/N
- 併發執行任務快,理論上只受最慢的一個任務影響,理想狀態下,
- 缺點:
- 複雜,併發部分難以實現
- 受到CAP原理的限制
- 需要處理區域性失敗的問題
MapReduce 論文 解讀
什麼是MapReduce
MapReduce
是一種程式設計模型,該模型主要分為兩個過程,即 Map
和 Reduce
。 MapReduce
的整體思想是: 將輸入的資料分成 M 個 tasks
Map
函式去執行任務,產出 <Key,Value>
形式的中間資料,然後相同的 key 通過使用者自定義的 Reduce
函式去聚合,得到最終的結果。 比如論文提及到的 詞頻統計, 倒排索引,分散式排序等,都可以通過MapReduce
來實現。
如下圖所示:
MapReduce 的實現
過程如下:
-
Map 端
- 根據輸入輸入資訊,將輸入資料 split 成 M 份, 比如上圖中的 split0 - split4
(這裡M=5)
- 在所有可用的
worker
節點中,起 M 個task
任務的執行緒, 每個任務會讀取對應一個 split 當做輸入。 - 呼叫
map
函式,將輸入轉化為<Key,Value>
格式的中間資料,並且排序後,寫入磁碟。 這裡,每個task
會寫 R 個檔案,對應著Reduce
任務數。 資料寫入哪個檔案的規則有Partitioner
決定,預設是hash(key) % R
- (可選) 為了優化效能,中間還可以用一個
combiner
的中間過程
- 根據輸入輸入資訊,將輸入資料 split 成 M 份, 比如上圖中的 split0 - split4
-
Reduce 端
-
map
階段結束以後, 開始進入Reduce
階段,每個Reduce task
會從所有的Map
中間資料中,獲取屬於自己的一份資料,拿到所有資料後,一般會進行排序(Hadoop 框架是這樣做)
。
說明: 這個排序是非常必要的,主要因為
Reduce
函式的輸入 是<key,[]values>
的格式,因為需要根據key去排序。有同學想為啥不用map<>()
去實現呢? 原因:因為map必須存到記憶體中,但是實際中資料量很大,往往需要溢寫到磁碟。 但是排序是可以做到的,比如歸併排序。 這也就是map端產出資料需要排序,Reduce端獲取資料後也需要先排序的原因。- 呼叫
Reduce
函式,得到最終的結果輸出結果,存入對應的檔案 - (可選) 彙總所有
Reduce
任務的結果。一般不做彙總,因為通常一個任務的結果往往是另一個MapReduce
任務的輸入,因此沒必要彙總到一個檔案中。
-
Master 資料結構:
master
是 MapReduce
任務中最核心的角色,它需要維護 狀態資訊 和 檔案資訊。
-
狀態資訊:
-
map
任務狀態 -
Reduce
任務狀態 -
worker
節點狀態
-
-
檔案資訊
- 輸入檔案資訊
- 輸出檔案資訊
-
map
中間資料檔案資訊
注:
由於
master
節點程式是整個任務的樞紐,因此,它需要維護輸入檔案地址,map
任務執行完後,會產出中間資料檔案等待reducer
去獲取,因此map
完成後,會向master
上報這些檔案的位置和大小資訊,這些資訊隨著Reduce
任務的啟動而分發下推到對應的worker
。
容錯
worker 節點失敗
master會週期性向所有節點傳送
ping 心跳檢測, 如果超時未回覆,
master會認為該
worker已經故障。任何在該節點完成的
map 或者
Reduce任務都會被標記為
idle, 並由其他的
worker` 重新執行。
說明: 因為
MapReduce
為了減少網路頻寬的消耗,map
的資料是儲存在本地磁碟的,如果某個worker
機器故障,會導致其他的Reduce
任務拿不到對應的中間資料,所以需要重跑任務。那麼這也可以看出,如果利用hadoop
等分散式檔案系統來儲存中間資料,其實對於完成的map
任務,是不需要重跑的,代價就是增加網路頻寬。
Master 節點失敗:
master
節點失敗,在沒有實現HA 的情況下,可以說基本整個MapReduce
任務就已經掛了,對於這種情況,直接重新啟動master
重跑任務就ok了。 當然啦,如果叢集有高可靠方案,比如master
主副備用,就可以實現master
的高可靠,代價就是得同步維護主副之間的狀態資訊和檔案資訊等。
失敗處理的語義:
論文中提到,只要Map
Reduce
函式是確定的,語義上不管是分散式執行還是單機執行,結果都是一致的。每個map
Reduce
任務輸出是通過原子提交來保證的, 即:
一個任務要麼有完整的最終檔案,要麼存在最終輸出結果,要麼不存在。
- 每個進行中的任務,在沒有最終語義完成之前,都只寫臨時檔案,每個
Reduce
任務會寫一個,而每個Map
任務會寫 R 個,對應 R 個reducer
. - 當
Map
任務完成的時候,會向master
傳送檔案位置,大小等資訊。Master
如果接受到一個已經完成的Map
任務的資訊,就忽略掉,否則,會記錄這個資訊。 - 當
Reduce
任務完成的時候,會將臨時檔案重新命名為最終的輸出檔案, 如果多個相同的Reduce
任務在多臺機器執行完,會多次覆蓋輸出檔案,這個由底層檔案系統的rename
操作的原子性,保證任何時刻,看到的都是一個完整的成功結果
對於大部分確定性的任務,不管是分散式還是序列執行,最終都會得到一致的結果。對於不確定的map
或者Reduce
任務,MapReduce
保證提供一個弱的,仍然合理的語義。
舉個例子來說:
確定性任務比如 詞頻統計 不管你怎麼執行,序列或者並行,最終得到的都是確定性的統計結果。
第二個不確定性任務: 隨機傳播演演算法,
pageRank
等,因為會有概率因素在裡面,也就是說你每次跑的結果資料不一定能對的上。但是是合理的,因為本來就有很多隨機的因素在裡面。
儲存優化
由於網路頻寬資源的昂貴性,因此對MapReduce
儲存做了很多必要的優化。
- 通過從本地磁碟讀取檔案,節約網路頻寬
- GFS 將檔案分解成多個 大小通常為 64M 的
block
,並多備份儲存在不同的機器上,在排程時,會考慮檔案的位置資訊,儘可能在存有輸入檔案的機器上排程map
任務,避免網路IO。 - 任務失敗時,也會嘗試在離副本最近的worker中執行,比如同一子網下的機器。
- MapReduce 任務在大叢集中執行時,大部分輸入直接可以從磁碟中讀取,不消耗頻寬。
任務粒度
通常情況下,任務數即為 O(M + R)
,這個數量應當比worker
數量多得多,這樣利於負載均衡和失敗恢復的情況,但是也不能無限增長,因為太多工的排程,會消耗master
儲存任務資訊的記憶體資源,如果啟動task所花的時間比任務執行時間還多,那就不償失了。
優化
自定義分割槽函式 (partition
):
自定義分割槽可以更好地符合業務和進行負載均衡,防止資料傾斜。 預設只是簡單的 hash(key) % R
有序保證:
每個`partition`內的資料都是排序的,這樣有利於`Reduce`階段的`merge`合併
複製程式碼
Combiner
函式:
這個是每個map
階段完成之後,區域性先做一次聚合。比如:詞頻統計,每個 Word 可能出現了100次,如果不使用combiner
, 就會傳送100 個 <word,1>
,如果combiner
聚合之後,則為 <word,100>
,大大地減少了網路傳輸和磁碟的IO。
輸入輸出型別
一個reader
沒必要非要從檔案讀資料,MapReduce
支援可以從不同的資料來源中以多種不同的方式讀取資料,比如從資料庫讀取,使用者只需要自定義split規則,就能輕易實現。
計數器
MapReduce
還添加了計數器,可以用來檢測MapReduce
的一些中間操作。
論文剩下的一些測試和實驗資料,還請讀者自行閱讀原論文,在此不再贅述。
課後作業 實現
接下來是 課程作業 的實現.
前提工作
- 配置 go 環境,這是必要的,就像你學習java電腦上得先有個java一樣。
- 從MIT 的git上下載課程實現的原始碼,
- 由於筆者在實現時,還用到了一個uuid的庫, 因此,還需要安裝這個庫
# 配置go環境
export GOPATH=/path/to/your/gopath
export GOROOT=/path/to/your/goroot
# 下載MIT原始git的骨架程式碼,這裡人家自定義實現好了很多東西,程式碼的註釋將提醒你一步一步實現
git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824
# 下載uuid, 可選
go get github.com/satori/go.uuid
複製程式碼
熟悉程式碼
這裡主要是熟悉課程提供的程式碼。 因為所有的執行都是從測試檔案開始,所以我們先從 test_test.go
檔案入手,這裡需要一些 go 語言的基礎,比如同一個包下的所有函式都是可見的,所以有些函式找起來會比較慢,最好還是用IDE,比如 Goland。
- 首先,測試程式碼中提供了輸入檔案數量,
map
函式和Reduce
函式, 和Reduce
任務數,檔案內容由makeInput
函式生成
const (
nNumber = 100000
nMap = 20
nReduce = 10
)
複製程式碼
-
Sequence 方式執行
MapReduce
任務這個隨便去一個
TestSequentialMany
測試程式碼進行分析:- 先啟動一個
master
, 程式碼在master.go
檔案中
func Sequential(jobName string,files []string,nreduce int,mapF func(string,string[]KeyValue,reduceF func(string,[]string) string,) (mr *Master) { // 這裡開啟多執行緒去併發 run 函式 go mr.run(jobName string,schedule func(phase jobPhase),finish func()) } // 這裡的 schedule 函式只是針對不同的phrase,執行對應的doMap任務或者是doReduce任務 func(phase jobPhase) { switch phase { case mapPhase: for i,f := range mr.files { doMap(mr.jobName,i,f,mr.nReduce,mapF) } case reducePhase: for i := 0; i < mr.nReduce; i++ { doReduce(mr.jobName,mergeName(mr.jobName,i),len(mr.files),reduceF) } } } 複製程式碼
-
doMap
和doReduce
函式分別在commom_map.go
和common_reduce.go
中, 需要我們在Part I 中實現 - run函式,注意:merge函式解析檔案是使用json去解析,所以最好doMap 和doReduce用json存中間資料。
func (mr *Master) run(jobName string,schedule func(phase jobPhase),finish func(),) { ...... schedule(mapPhase) schedule(reducePhase) // 排程完map 和Reduce任務後,執行finish任務, finish() mr.merge() ...... mr.doneChannel <- true } 複製程式碼
- 傳送完done以後, 在
TestSequentialMany
中 mr.wait() 函式返回,最終做一些校驗和清理的工作。
- 先啟動一個
-
Distributed 方式執行
MapReduce
任務分散式下與序列執行直接最大的區別就是需要和
worker
通訊完成排程。- 首先,通過
setup()
函式建立Distributed master
,master
註冊 rpc 服務, 然後執行go run 函式,和序列方式的區別在於,排程函式需要在schedule.go
中實現。 任務執行結束後,關閉worker和rpc服務。
func Distributed(jobName string,master string) (mr *Master) { mr = newMaster(master) mr.startRPCServer() go mr.run(jobName,files,nreduce,func(phase jobPhase) { ch := make(chan string) go mr.forwardRegistrations(ch) schedule(mr.jobName,mr.files,phase,ch) },func() { mr.stats = mr.killWorkers() mr.stopRPCServer() }) return } 複製程式碼
-
startRPCServer()
函式: 收到shutdown訊息以後,
func (mr *Master) startRPCServer() { go func() { loop: for { select { case <-mr.shutdown: break loop default: } conn,err := mr.l.Accept() if err == nil { go func() { // 這個是長連結,裡面有個無限迴圈, rpcs.ServeConn(conn) conn.Close() }() } else { debug("RegistrationServer: accept error %s",err) break } } debug("RegistrationServer: done\n") }() } 複製程式碼
- 建立
worker
: 注意nRPC引數,這代表了執行多少任務後關閉, 如果是-1, 則無窮。
func RunWorker(MasterAddress string,me string,MapFunc func(string,string) []KeyValue,ReduceFunc func(string,nRPC int,parallelism *Parallelism,) { ... // 註冊rpc 服務 .... // 向伺服器註冊 worker資訊 wk.register(MasterAddress) // DON'T MODIFY CODE BELOW for { wk.Lock() if wk.nRPC == 0 { wk.Unlock() break } wk.Unlock() // 每獲得一個請求, 即 call ( "Worker.doTask" ), 就 nRPC -= 1 conn,err := wk.l.Accept() if err == nil { wk.Lock() wk.nRPC-- wk.Unlock() go rpcs.ServeConn(conn) } else { break } } } 複製程式碼
後續執行和
Sequence
模式差不多的邏輯。 - 首先,通過
Assignment
Part I
這個部分的任務是完善 doMap
和 doReduce
函式,實現序列執行的 test
。
- doMap 任務
根據MapReduce 過程的分析,對於map階段,需要讀取檔案,然後呼叫使用者的 mapF 函式, 得到結果後存到中間檔案中。 具體的程式碼在對應得github
中: common_map.go
func doMap(
jobName string,// the name of the MapReduce job
mapTask int,// which map task this is
inFile string,nReduce int,// the number of reduce task that will be run ("R" in the paper)
mapF func(filename string,contents string) []KeyValue,) {
// 首先讀取檔案獲得資料
data,err := ioutil.ReadFile(inFile)
// 呼叫map函式獲得鍵值對
kvs := mapF(inFile,string(data))
.......
// 拿到鍵值對以後,需要對不同的reduce 任務寫中間結果到檔案中
intermediateFiles := make([]*os.File,nReduce)
.......
for _,kv := range kvs {
encoder := json.NewEncoder(intermediateFiles[ihash(kv.Key) % nReduce])
err := encoder.Encode(&kv)
if err != nil {
log.Fatal("encode kv failed",kv)
}
}
}
複製程式碼
-
doReduce 任務
根據分析,
Reduce
階段需要 從各個map
中,獲取自己的檔案內容,然後得到<key,[]values>
格式的資料, 最終呼叫使用者定義的reduceF
函式, 並將結果以json
方式,寫到outFile
中。 Github 程式碼地址: common_reduce.gofunc doReduce( jobName string,// the name of the whole MapReduce job reduceTask int,// which reduce task this is outFile string,// write the output here nMap int,// the number of map tasks that were run ("M" in the paper) reduceF func(key string,values []string) string,) { keyValues := make(map[string][]string) // 從多個map中讀取檔案 for mapTask := 0; mapTask < nMap; mapTask++ { fileName := reduceName(jobName,mapTask,reduceTask) file,err := os.Open(fileName) ...... decoder := json.NewDecoder(file) for decoder.More() { var kv KeyValue err := decoder.Decode(&kv) ....... keyValues[kv.Key] = append(keyValues[kv.Key],kv.Value) } file.Close() } // 對所有的key排序 var keys []string for k := range keyValues { keys = append(keys,k) } sort.Strings(keys) out,err := os.Create(tmpFile) ...... // 呼叫Reduce函式並存入檔案中 encoder := json.NewEncoder(out) for _,key := range keys { result := reduceF(key,keyValues[key]) err = encoder.Encode(KeyValue{key,result}) ...... } } 複製程式碼
-
測試
cd ../mit6.824/mapreduce
go test -run Sequential
# 最終會輸出pass,表示任務通過
複製程式碼
Part 2
這裡要求自己實現一個map 和Reduce函式,實現wordcount。 這個較為簡單,直接看程式碼,github 地址為: wc.go
func mapF_wc(filename string,contents string) []mapreduce.KeyValue {
// Your code here (Part II).
worldList := strings.FieldsFunc(contents,func(c rune) bool {
return !unicode.IsLetter(c)
})
resMap := make(map[string]int)
for _,word := range worldList {
resMap[word] ++;
}
var res []mapreduce.KeyValue
for k,v := range resMap {
res = append(res,mapreduce.KeyValue{Key: k,Value: strconv.Itoa(v)})
}
return res
}
func reduceF_wc(key string,values []string) string {
// Your code here (Part II).
sum := 0
for _,value := range values {
_num,err := strconv.Atoi(value)
if err != nil {
log.Fatal(err)
continue
}
sum += _num
}
return strconv.Itoa(sum)
}
複製程式碼
- 測試
cd /path/to/main/
go run wc.go master sequential pg-*.txt
sort -n -k2 mrtmp.wcseq | tail -10
# 對比以下的結果
that: 7871
it: 7987
in: 8415
was: 8578
a: 13382
of: 13536
I: 14296
to: 16079
and: 23612
the: 29748
## 或者 sh test-wc.sh
複製程式碼
Part 3 4
這個部分是重點,要實現一個分散式的排程函式 schedule
,程式碼在 schedule.go
中。 主要是有幾個注意的點:
-
master
需要知道並記錄任務狀態,只有所有的map
任務完成才會執行Reduce
任務, 只有所有Reduce
任務完成,才會合併最終結果並最終輸出。 -
master
需要知道當前可用worker
的地址Addr
, 才能呼叫rpc
服務,且我們知道,在worker
啟動時,會向master
註冊,並在go mr.run()
方法的時候,呼叫了go mr.forwardRegistrations(ch)
將目前註冊的worker
傳送到registerChan
中,供schedule
使用。 -
part 4 任務中,需要處理worker故障的情況,這個比任務失敗要簡單得多,因為實驗只是在任務完成之後,節點故障,在worker上任務還是完成了的,只是master schedule的時候 call 函式 rpc 呼叫超時,所以任務只需要重新執行就好。
明確了問題所在之後,那麼就可以看以下程式碼。 具體程式碼請看 schedule.go
// 這裡需要用同步模組,用來監控任務的完成狀態,只有成功執行才會進行Done操作減一 var wg sync.WaitGroup wg.Add(ntasks) // 這裡建立一個帶緩衝的任務通道,以減少阻塞的可能性,之所以不是for迴圈,是因為worker故障後,會出現失敗任務,這些任務應該要重新執行而不是丟棄 taskChan := make(chan int,3) go func() { for i := 0; i < ntasks; i++ { taskChan <- i } }() go func() { for { // 注意,這裡從通道中取,如果任務成功,這裡就表明worker正常,要把worker重新發到管道,否則會一直阻塞 availableWorker := <-registerChan task := <-taskChan doTaskArgs := DoTaskArgs{JobName: jobName,File: mapFiles[task],Phase: phase,TaskNumber: task,NumOtherPhase: n_other,} go func() { if call(availableWorker,"Worker.DoTask",doTaskArgs,new(struct{})) { // 任務成功 wg.Done() // 任務結束後,availableWorker 閒置,availableWorker 進入 registerChan 等待下一次分配任務 registerChan <- availableWorker } else { // 任務失敗,重新提交回任務,等待下一次排程分配 taskChan <- doTaskArgs.TaskNumber } }() } }() wg.Wait() 複製程式碼
- 測試
# 測試 part 3 並行排程
go test -run TestParallel
# 測試 part 4 失敗處理
go test -run Failure
複製程式碼
Part 5
這部分的要求是實現一個 倒排索引,即簡單的理解是:找出一個單詞出現過的所有檔案! 其實主要就是自己自定義一個mapF
和 ReduceF
的要求,一個優化就是在map
端 使用 set
來去重,減少中間檔案的資料量。具體的程式碼在 main/ii.go . 程式碼比較簡單,還請讀者自行訪問 github
。
總結:
我相信看到這裡的朋友想必已經對MapReduce有一個比大多數人都更加深入的理解了,本篇文章呢,雖然沒有將重點主要集中在Hadoop這些開源大資料框架的使用上(這個系列的教程主要是韓數來做),但是這些理論性的文章卻是整個Hadoop在實現自己的MapReduce重要參考,和java規範一樣,如果瞭解了MapReduce的設計思想和實現思路,我相信,今後不管是Hadoop也好或者新的大資料開源技術也好,只要涉及到了MapReduce這塊內容,那麼他們所體現出來的實現思路也一定是大同小異的,這同時也是高階篇系列筆記所編寫的初衷,不涉及具體框架的使用,而專注在這些技術的源頭。
韓數OS:未來我會多督促小羊大神爭取把文章寫的通俗易懂的,馬上安排上。
最後,本篇文章相關程式碼實現已經開源至小羊Github,一定要star哦。
萬水千山總是情,給個Star行不行!
參考
[2] www.jianshu.com