1. 程式人生 > 程式設計 >分散式系統系列學習筆記:MapReduce程式設計模型(附程式碼實現)

分散式系統系列學習筆記:MapReduce程式設計模型(附程式碼實現)

作者:小羊 編輯:韓數

大家好,我是韓數,本文的作者是我的好朋友小羊,本次呢,特地邀請小羊大神來撰寫大資料系列的高階教程,隨著大資料的發展,越來越多優秀的開源框架逐漸進入到我們開發者的生活中,包括hadoop,spark,flink等等,而Hadoop在大資料領域幾乎無法撼動的地位,也成了每個大資料程式設計師的必修課,而MR(MapReduce),這一由google所提出的分散式計算模型,也是Hadoop的核心概念之一。本文呢,將從 MIT 6.824 課程 LEC 1 的內容出發,通過對 MapReduce 原論文的解讀,以及課後作業程式碼的具體實現,來幫助大家去深入理解MapReduce模型,從而對分散式有一個深入的理解。

注:

6.824 是 MIT(斯坦福) 開設的分散式系統課程,其中第一節是 閱讀論文並梳理 MR 模型的執行流程,實現單機版、分散式版的 word count,最後使用模型來生成倒排索引。

不說廢話,上東西。

分散式的引入:

什麼是分散式系統?

​ 分散式系統就是有多臺協同的計算機組成的系統,這些系統往往用於大型資料的儲存,計算等傳統單機模式下不能完成任務的場景。比如,雲盤儲存,spark,Hadoop等計算引擎。

​ 如圖所示,分散式系統通常會有多個副本,多個分割槽,以此來保證資料和系統的可靠性。

如何實現分散式:

​ 分散式的實現,依賴彼此之間的通訊和協同,同時需要對資料的一致性,叢集的可靠性等進行維護。

  • 首先需要將物理獨立的個體,通過通訊模組,組織到一起工作。
  • 需要多副本來實現資料和系統的可靠性,保證容錯。
  • 支援水平擴充套件來擴容,提高吞吐量。

分散式的優缺點:

  • 優勢:
    • 併發執行任務快,理論上只受最慢的一個任務影響,理想狀態下,N臺機器併發,耗時幾乎可以變為 T/N
  • 缺點:
    • 複雜,併發部分難以實現
    • 受到CAP原理的限制
    • 需要處理區域性失敗的問題

MapReduce 論文 解讀

什麼是MapReduce

MapReduce 是一種程式設計模型,該模型主要分為兩個過程,即 MapReduceMapReduce 的整體思想是: 將輸入的資料分成 M 個 tasks

, 由使用者自定義的 Map 函式去執行任務,產出 <Key,Value>形式的中間資料,然後相同的 key 通過使用者自定義的 Reduce 函式去聚合,得到最終的結果。 比如論文提及到的 詞頻統計, 倒排索引,分散式排序等,都可以通過MapReduce來實現。

​ 如下圖所示:

MapReduce 的實現

過程如下:

  • Map 端

    • 根據輸入輸入資訊,將輸入資料 split 成 M 份, 比如上圖中的 split0 - split4 (這裡M=5)
    • 在所有可用的worker節點中,起 M 個task任務的執行緒, 每個任務會讀取對應一個 split 當做輸入。
    • 呼叫 map 函式,將輸入轉化為 <Key,Value> 格式的中間資料,並且排序後,寫入磁碟。 這裡,每個 task 會寫 R 個檔案,對應著 Reduce 任務數。 資料寫入哪個檔案的規則有 Partitioner 決定,預設是 hash(key) % R
    • (可選) 為了優化效能,中間還可以用一個 combiner 的中間過程
  • Reduce 端

    • map 階段結束以後, 開始進入 Reduce 階段,每個 Reduce task會從所有的 Map 中間資料中,獲取屬於自己的一份資料,拿到所有資料後,一般會進行排序(Hadoop 框架是這樣做)

    說明: 這個排序是非常必要的,主要因為 Reduce 函式的輸入 是 <key,[]values> 的格式,因為需要根據key去排序。有同學想為啥不用 map<>() 去實現呢? 原因:因為map必須存到記憶體中,但是實際中資料量很大,往往需要溢寫到磁碟。 但是排序是可以做到的,比如歸併排序。 這也就是map端產出資料需要排序,Reduce端獲取資料後也需要先排序的原因。

    • 呼叫 Reduce 函式,得到最終的結果輸出結果,存入對應的檔案
    • (可選) 彙總所有 Reduce任務的結果。一般不做彙總,因為通常一個任務的結果往往是另一個 MapReduce任務的輸入,因此沒必要彙總到一個檔案中。

Master 資料結構:

masterMapReduce 任務中最核心的角色,它需要維護 狀態資訊檔案資訊

  • 狀態資訊:

    • map 任務狀態
    • Reduce 任務狀態
    • worker 節點狀態
  • 檔案資訊

    • 輸入檔案資訊
    • 輸出檔案資訊
    • map中間資料檔案資訊

    注:

    由於 master 節點程式是整個任務的樞紐,因此,它需要維護輸入檔案地址,map任務執行完後,會產出中間資料檔案等待 reducer 去獲取,因此 map完成後,會向 master 上報這些檔案的位置和大小資訊,這些資訊隨著Reduce 任務的啟動而分發下推到對應的 worker

容錯

worker 節點失敗

master會週期性向所有節點傳送ping 心跳檢測, 如果超時未回覆,master會認為該worker已經故障。任何在該節點完成的map 或者Reduce任務都會被標記為idle, 並由其他的worker` 重新執行。

說明: 因為MapReduce 為了減少網路頻寬的消耗,map的資料是儲存在本地磁碟的,如果某個worker機器故障,會導致其他的Reduce 任務拿不到對應的中間資料,所以需要重跑任務。那麼這也可以看出,如果利用hadoop 等分散式檔案系統來儲存中間資料,其實對於完成的map任務,是不需要重跑的,代價就是增加網路頻寬。

Master 節點失敗:

master節點失敗,在沒有實現HA 的情況下,可以說基本整個MapReduce任務就已經掛了,對於這種情況,直接重新啟動master 重跑任務就ok了。 當然啦,如果叢集有高可靠方案,比如master主副備用,就可以實現master的高可靠,代價就是得同步維護主副之間的狀態資訊和檔案資訊等。

失敗處理的語義:

論文中提到,只要Map Reduce函式是確定的,語義上不管是分散式執行還是單機執行,結果都是一致的。每個map Reduce 任務輸出是通過原子提交來保證的, 即:

一個任務要麼有完整的最終檔案,要麼存在最終輸出結果,要麼不存在。

  • 每個進行中的任務,在沒有最終語義完成之前,都只寫臨時檔案,每個Reduce 任務會寫一個,而每個Map 任務會寫 R 個,對應 R 個reducer.
  • Map 任務完成的時候,會向master傳送檔案位置,大小等資訊。Master如果接受到一個已經完成的Map任務的資訊,就忽略掉,否則,會記錄這個資訊。
  • Reduce 任務完成的時候,會將臨時檔案重新命名為最終的輸出檔案, 如果多個相同的Reduce任務在多臺機器執行完,會多次覆蓋輸出檔案,這個由底層檔案系統的rename操作的原子性,保證任何時刻,看到的都是一個完整的成功結果

對於大部分確定性的任務,不管是分散式還是序列執行,最終都會得到一致的結果。對於不確定的map 或者Reduce 任務,MapReduce 保證提供一個弱的,仍然合理的語義。

舉個例子來說:

確定性任務比如 詞頻統計 不管你怎麼執行,序列或者並行,最終得到的都是確定性的統計結果。

第二個不確定性任務: 隨機傳播演演算法,pageRank 等,因為會有概率因素在裡面,也就是說你每次跑的結果資料不一定能對的上。但是是合理的,因為本來就有很多隨機的因素在裡面。

儲存優化

​ 由於網路頻寬資源的昂貴性,因此對MapReduce 儲存做了很多必要的優化。

  • 通過從本地磁碟讀取檔案,節約網路頻寬
  • GFS 將檔案分解成多個 大小通常為 64M 的block,並多備份儲存在不同的機器上,在排程時,會考慮檔案的位置資訊,儘可能在存有輸入檔案的機器上排程map任務,避免網路IO。
  • 任務失敗時,也會嘗試在離副本最近的worker中執行,比如同一子網下的機器。
  • MapReduce 任務在大叢集中執行時,大部分輸入直接可以從磁碟中讀取,不消耗頻寬。

任務粒度

​ 通常情況下,任務數即為 O(M + R),這個數量應當比worker數量多得多,這樣利於負載均衡和失敗恢復的情況,但是也不能無限增長,因為太多工的排程,會消耗master 儲存任務資訊的記憶體資源,如果啟動task所花的時間比任務執行時間還多,那就不償失了。

優化

自定義分割槽函式 (partition):

​ 自定義分割槽可以更好地符合業務和進行負載均衡,防止資料傾斜。 預設只是簡單的 hash(key) % R

有序保證:

  每個`partition`內的資料都是排序的,這樣有利於`Reduce`階段的`merge`合併
複製程式碼

Combiner 函式:

這個是每個map階段完成之後,區域性先做一次聚合。比如:詞頻統計,每個 Word 可能出現了100次,如果不使用combiner, 就會傳送100 個 <word,1>,如果combiner聚合之後,則為 <word,100>,大大地減少了網路傳輸和磁碟的IO。

輸入輸出型別

一個reader沒必要非要從檔案讀資料,MapReduce 支援可以從不同的資料來源中以多種不同的方式讀取資料,比如從資料庫讀取,使用者只需要自定義split規則,就能輕易實現。

計數器

MapReduce 還添加了計數器,可以用來檢測MapReduce的一些中間操作。

論文剩下的一些測試和實驗資料,還請讀者自行閱讀原論文,在此不再贅述。

課後作業 實現

接下來是 課程作業 的實現.

前提工作

  • 配置 go 環境,這是必要的,就像你學習java電腦上得先有個java一樣。
  • 從MIT 的git上下載課程實現的原始碼,
  • 由於筆者在實現時,還用到了一個uuid的庫, 因此,還需要安裝這個庫
# 配置go環境
export GOPATH=/path/to/your/gopath
export GOROOT=/path/to/your/goroot

# 下載MIT原始git的骨架程式碼,這裡人家自定義實現好了很多東西,程式碼的註釋將提醒你一步一步實現
git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824

# 下載uuid, 可選
go get github.com/satori/go.uuid
複製程式碼

熟悉程式碼

​ 這裡主要是熟悉課程提供的程式碼。 因為所有的執行都是從測試檔案開始,所以我們先從 test_test.go 檔案入手,這裡需要一些 go 語言的基礎,比如同一個包下的所有函式都是可見的,所以有些函式找起來會比較慢,最好還是用IDE,比如 Goland。

  • 首先,測試程式碼中提供了輸入檔案數量,map函式和Reduce函式, 和Reduce 任務數,檔案內容由makeInput 函式生成
const (
	nNumber = 100000
	nMap    = 20
	nReduce = 10
)
複製程式碼
  • Sequence 方式執行 MapReduce 任務

    這個隨便去一個 TestSequentialMany 測試程式碼進行分析:

    • 先啟動一個 master, 程式碼在 master.go 檔案中
    func Sequential(jobName string,files []string,nreduce int,mapF func(string,string[]KeyValue,reduceF func(string,[]string) string,) (mr *Master) {
        // 這裡開啟多執行緒去併發 run 函式
        go mr.run(jobName string,schedule func(phase jobPhase),finish func())
    }
                    
    // 這裡的 schedule 函式只是針對不同的phrase,執行對應的doMap任務或者是doReduce任務
    func(phase jobPhase) {
    		switch phase {
    		case mapPhase:
    			for i,f := range mr.files {
    				doMap(mr.jobName,i,f,mr.nReduce,mapF)
    			}
    		    case reducePhase:
    			for i := 0; i < mr.nReduce; i++ {
    				doReduce(mr.jobName,mergeName(mr.jobName,i),len(mr.files),reduceF)
    			}
    		}
    	}
    複製程式碼
    • doMapdoReduce 函式分別在 commom_map.gocommon_reduce.go中, 需要我們在Part I 中實現
    • run函式,注意:merge函式解析檔案是使用json去解析,所以最好doMap 和doReduce用json存中間資料。
    func (mr *Master) run(jobName string,schedule func(phase jobPhase),finish func(),) {
    	......
    	schedule(mapPhase)
    	schedule(reducePhase)
      // 排程完map 和Reduce任務後,執行finish任務,
    	finish()
    	mr.merge()
    	......
    	mr.doneChannel <- true
    }
    複製程式碼
    • 傳送完done以後, 在 TestSequentialMany 中 mr.wait() 函式返回,最終做一些校驗和清理的工作。
  • Distributed 方式執行 MapReduce 任務

    分散式下與序列執行直接最大的區別就是需要和worker通訊完成排程。

    • 首先,通過 setup()函式建立 Distributed master,master 註冊 rpc 服務, 然後執行go run 函式,和序列方式的區別在於,排程函式需要在 schedule.go中實現。 任務執行結束後,關閉worker和rpc服務。
    func Distributed(jobName string,master string) (mr *Master) {
    	mr = newMaster(master)
    	mr.startRPCServer()
    	go mr.run(jobName,files,nreduce,func(phase jobPhase) {
    			ch := make(chan string)
    			go mr.forwardRegistrations(ch)
    			schedule(mr.jobName,mr.files,phase,ch)
    		},func() {
    			mr.stats = mr.killWorkers()
    			mr.stopRPCServer()
    		})
    	return
    }
    複製程式碼
    • startRPCServer() 函式: 收到shutdown訊息以後,
    func (mr *Master) startRPCServer() {
    	go func() {
    	loop:
    		for {
    			select {
    			case <-mr.shutdown:
    				break loop
    			default:
    			}
    			conn,err := mr.l.Accept()
    			if err == nil {
    				go func() {
    					// 這個是長連結,裡面有個無限迴圈,
    					rpcs.ServeConn(conn)
    					conn.Close()
    				}()
    			} else {
    				debug("RegistrationServer: accept error %s",err)
    				break
    			}
    		}
    		debug("RegistrationServer: done\n")
    	}()
    }
    複製程式碼
    • 建立 worker: 注意nRPC引數,這代表了執行多少任務後關閉, 如果是-1, 則無窮。
    func RunWorker(MasterAddress string,me string,MapFunc func(string,string) []KeyValue,ReduceFunc func(string,nRPC int,parallelism *Parallelism,) {
    	... // 註冊rpc 服務 ....
      // 向伺服器註冊 worker資訊
    	wk.register(MasterAddress)
      // DON'T MODIFY CODE BELOW
    	for {
    		wk.Lock()
    		if wk.nRPC == 0 {
    			wk.Unlock()
    			break
    		}
    		wk.Unlock()
        // 每獲得一個請求, 即 call ( "Worker.doTask" ), 就 nRPC -= 1
    		conn,err := wk.l.Accept()
    		if err == nil {
    			wk.Lock()
    			wk.nRPC--
    			wk.Unlock()
    			go rpcs.ServeConn(conn)
    		} else {
    			break
    		}
    	}
    }
    複製程式碼

    後續執行和 Sequence 模式差不多的邏輯。

Assignment

Part I

​ 這個部分的任務是完善 doMapdoReduce 函式,實現序列執行的 test

  • doMap 任務

​ 根據MapReduce 過程的分析,對於map階段,需要讀取檔案,然後呼叫使用者的 mapF 函式, 得到結果後存到中間檔案中。 具體的程式碼在對應得github中: common_map.go

func doMap(
	jobName string,// the name of the MapReduce job
	mapTask int,// which map task this is
	inFile string,nReduce int,// the number of reduce task that will be run ("R" in the paper)
	mapF func(filename string,contents string) []KeyValue,) {
  // 首先讀取檔案獲得資料
	data,err := ioutil.ReadFile(inFile)
  // 呼叫map函式獲得鍵值對
	kvs := mapF(inFile,string(data))
  .......
  // 拿到鍵值對以後,需要對不同的reduce 任務寫中間結果到檔案中
	intermediateFiles := make([]*os.File,nReduce)
  .......
  for _,kv := range kvs {
		encoder := json.NewEncoder(intermediateFiles[ihash(kv.Key) % nReduce])
		err := encoder.Encode(&kv)
		if err != nil {
			log.Fatal("encode kv failed",kv)
		}
	}
}
複製程式碼
  • doReduce 任務

    ​ 根據分析,Reduce 階段需要 從各個map中,獲取自己的檔案內容,然後得到 <key,[]values> 格式的資料, 最終呼叫使用者定義的 reduceF 函式, 並將結果以 json方式,寫到outFile中。 Github 程式碼地址: common_reduce.go

    func doReduce(
    	jobName string,// the name of the whole MapReduce job
    	reduceTask int,// which reduce task this is
    	outFile string,// write the output here
    	nMap int,// the number of map tasks that were run ("M" in the paper)
    	reduceF func(key string,values []string) string,) {
    	keyValues := make(map[string][]string)
    	// 從多個map中讀取檔案
    	for mapTask := 0; mapTask < nMap; mapTask++ {
    		fileName := reduceName(jobName,mapTask,reduceTask)
    		file,err := os.Open(fileName)
        ......
    		decoder := json.NewDecoder(file)
    
    		for decoder.More() {
    			var kv KeyValue
    			err := decoder.Decode(&kv)
    			.......
    			keyValues[kv.Key] = append(keyValues[kv.Key],kv.Value)
    		}
    
    		file.Close()
    	}
      
      // 對所有的key排序
    	var keys []string
    	for k := range keyValues {
    		keys = append(keys,k)
    	}
    	sort.Strings(keys)
      
      out,err := os.Create(tmpFile)
      ......
      // 呼叫Reduce函式並存入檔案中
      encoder := json.NewEncoder(out)
    	for _,key := range keys {
    		result := reduceF(key,keyValues[key])
    		err = encoder.Encode(KeyValue{key,result})
    		......
    	}
    }
    複製程式碼
  • 測試

cd ../mit6.824/mapreduce
go test -run Sequential

# 最終會輸出pass,表示任務通過
複製程式碼

Part 2

​ 這裡要求自己實現一個map 和Reduce函式,實現wordcount。 這個較為簡單,直接看程式碼,github 地址為: wc.go

func mapF_wc(filename string,contents string) []mapreduce.KeyValue {
	// Your code here (Part II).
	worldList := strings.FieldsFunc(contents,func(c rune) bool {
		return !unicode.IsLetter(c)
	})

	resMap := make(map[string]int)
	for _,word := range worldList {
		resMap[word] ++;
	}

	var res []mapreduce.KeyValue
	for k,v := range resMap {
		res = append(res,mapreduce.KeyValue{Key: k,Value: strconv.Itoa(v)})
	}
	return res
}

func reduceF_wc(key string,values []string) string {
	// Your code here (Part II).
	sum := 0
	for _,value := range values {
		_num,err := strconv.Atoi(value)
		if err != nil {
			log.Fatal(err)
			continue
		}
		sum += _num
	}

	return strconv.Itoa(sum)
}
複製程式碼
  • 測試
cd /path/to/main/
go run wc.go master sequential pg-*.txt
sort -n -k2 mrtmp.wcseq | tail -10
# 對比以下的結果
that: 7871
it: 7987
in: 8415
was: 8578
a: 13382
of: 13536
I: 14296
to: 16079
and: 23612
the: 29748

## 或者 sh test-wc.sh
複製程式碼

Part 3 4

​ 這個部分是重點,要實現一個分散式的排程函式 schedule,程式碼在 schedule.go 中。 主要是有幾個注意的點:

  1. master 需要知道並記錄任務狀態,只有所有的map任務完成才會執行Reduce任務, 只有所有Reduce任務完成,才會合併最終結果並最終輸出。

  2. master 需要知道當前可用worker的地址Addr, 才能呼叫rpc服務,且我們知道,在worker啟動時,會向master註冊,並在go mr.run()方法的時候,呼叫了go mr.forwardRegistrations(ch)將目前註冊的worker 傳送到 registerChan 中,供schedule使用。

  3. part 4 任務中,需要處理worker故障的情況,這個比任務失敗要簡單得多,因為實驗只是在任務完成之後,節點故障,在worker上任務還是完成了的,只是master schedule的時候 call 函式 rpc 呼叫超時,所以任務只需要重新執行就好。

    明確了問題所在之後,那麼就可以看以下程式碼。 具體程式碼請看 schedule.go

      // 這裡需要用同步模組,用來監控任務的完成狀態,只有成功執行才會進行Done操作減一
      var wg sync.WaitGroup
    	wg.Add(ntasks)
    	// 這裡建立一個帶緩衝的任務通道,以減少阻塞的可能性,之所以不是for迴圈,是因為worker故障後,會出現失敗任務,這些任務應該要重新執行而不是丟棄
    	taskChan := make(chan int,3)
    	go func() {
    		for i := 0; i < ntasks; i++ {
    			taskChan <- i
    		}
    	}()
    
    	go func() {
    		for {
          // 注意,這裡從通道中取,如果任務成功,這裡就表明worker正常,要把worker重新發到管道,否則會一直阻塞
    			availableWorker := <-registerChan
    			task := <-taskChan
    
    			doTaskArgs := DoTaskArgs{JobName: jobName,File: mapFiles[task],Phase: phase,TaskNumber: task,NumOtherPhase: n_other,}
    
    			go func() {
    				if call(availableWorker,"Worker.DoTask",doTaskArgs,new(struct{})) {
    					// 任務成功
    					wg.Done()
    					// 任務結束後,availableWorker 閒置,availableWorker 進入 registerChan 等待下一次分配任務
    					registerChan <- availableWorker
    				} else {
    					// 任務失敗,重新提交回任務,等待下一次排程分配
    					taskChan <- doTaskArgs.TaskNumber
    				}
    			}()
    		}
    	}()
    
    	wg.Wait()
    複製程式碼
  • 測試
# 測試 part 3 並行排程
go test -run TestParallel
# 測試 part 4 失敗處理
go test -run Failure
複製程式碼

Part 5

​ 這部分的要求是實現一個 倒排索引,即簡單的理解是:找出一個單詞出現過的所有檔案! 其實主要就是自己自定義一個mapFReduceF 的要求,一個優化就是在map端 使用 set 來去重,減少中間檔案的資料量。具體的程式碼在 main/ii.go . 程式碼比較簡單,還請讀者自行訪問 github

總結:

我相信看到這裡的朋友想必已經對MapReduce有一個比大多數人都更加深入的理解了,本篇文章呢,雖然沒有將重點主要集中在Hadoop這些開源大資料框架的使用上(這個系列的教程主要是韓數來做),但是這些理論性的文章卻是整個Hadoop在實現自己的MapReduce重要參考,和java規範一樣,如果瞭解了MapReduce的設計思想和實現思路,我相信,今後不管是Hadoop也好或者新的大資料開源技術也好,只要涉及到了MapReduce這塊內容,那麼他們所體現出來的實現思路也一定是大同小異的,這同時也是高階篇系列筆記所編寫的初衷,不涉及具體框架的使用,而專注在這些技術的源頭。

韓數OS:未來我會多督促小羊大神爭取把文章寫的通俗易懂的,馬上安排上。

最後,本篇文章相關程式碼實現已經開源至小羊Github,一定要star哦。

萬水千山總是情,給個Star行不行!

小羊的github

參考

[1] zou.cool/2018/11/27/…

[2] www.jianshu.com