MapReduce是一種編程模式,在很大程度上借鑒了函數(shù)式語言。它主要的思想是分而治之(divide and conquer)。將一個大的問題切分成很多小的問題,然后在集群中的各個節(jié)點上執(zhí)行,這既是Map過程。在Map過程結(jié)束之后,會有一個Ruduce的過程,這個過程即將所有的Map階段產(chǎn)出的結(jié)果進(jìn)行匯集。
上述過程可以說是一個顯而易見的過程,所以說MapReduce是一個極其簡單而有極其復(fù)雜的編程模式。說它簡單是因為在程序員使用它編程解決實際問題時,他只要編寫一個Mapper函數(shù)和一個Reduce函數(shù),或許在復(fù)雜一點加上一個Combiner函數(shù)和Partitioner 函數(shù),其余的就直接交給MapReduce框架執(zhí)行。這樣程序員就只要關(guān)注數(shù)據(jù)業(yè)務(wù),而不用關(guān)注具體的執(zhí)行過程。但說它復(fù)雜是因為MapRuduce有著看不見的部分,在程序員準(zhǔn)備好Mapper和Reducer之后提交個MapReduce,而具體執(zhí)行過程卻是一個非常復(fù)雜的過程。
MapReduce在具體執(zhí)行過程中,同步化是一個非常棘手的問題。在MapReduce的并行執(zhí)行過程唯一發(fā)生集群級同步化是在Shuffle和Sort階段,即在Mapper階段完成后,將各個節(jié)點上的中間結(jié)果Key/Value 依據(jù)Key的值聚集后復(fù)制到Reducer節(jié)點上。除此之外的過程,各個節(jié)點都是獨立運行的而且沒有直接的通信。這就意味著程序員對MapReduce的執(zhí)行過程的很多方面都沒有控制能力,比如:
集群中哪個節(jié)點來執(zhí)行Mapper和Reducer。
什么時候開始和結(jié)束Mapper和Reducer。
輸入的key/value 由哪個Mapper來處理。
中間結(jié)果產(chǎn)生的key/value 由哪個Reucer來處理。
所謂山不轉(zhuǎn)水轉(zhuǎn),從另一個角度來說,程序員可以控制以下的幾個方面:
數(shù)據(jù)結(jié)構(gòu)的自定義,即key/value的具體的結(jié)構(gòu)可以是程序員自定義的。
MapReude 每個Mapper和Reducer的執(zhí)行過程在開始和結(jié)束都可以有一段程序自定義代碼,來確定每個Mapper和Reducer在執(zhí)行之前和結(jié)束后的動作。比如Hadoop在實現(xiàn)MapReude框架時,每個Mapper和Reducer都有一個setup 和cleanup 函數(shù),既是定義開始和結(jié)束的動作。在一些MapReduce優(yōu)化算法中會充分利用這兩個函數(shù)。
對MapRuduce的Mapper的輸入Key 和Reducer的中間結(jié)果Key 狀態(tài)的控制。
對中間結(jié)果Key 排序的控制,這樣程序員就有能力控制Reducer處理Key的順序。
對中間結(jié)果Key 分區(qū)的控制,這樣程序員就有能力控制Reducer處理Key的集合。
雖然說MapRudce 框架為我們提供了良好的編程模式和接口,但是從上面的可控方面和不可控方面,我們可以挖掘出一些有效的設(shè)計模式。這些模式或可以幫助我們提高M(jìn)apReduce的執(zhí)行效率,或可以幫助我們更好的控制代碼執(zhí)行和數(shù)據(jù)流模式轉(zhuǎn)化。下面具體介紹MapReduce算法設(shè)計。
1. 本地聚集
在偏數(shù)據(jù)敏感的分布式處理中,一個重要的性能瓶頸是中間結(jié)果的網(wǎng)絡(luò)傳輸。就拿Hadoop來說,它將Mapper處理的中間結(jié)果在本地磁盤上存儲(該過程還涉及到序列化),然后通過網(wǎng)絡(luò)傳輸給Reducer。這里磁盤和網(wǎng)絡(luò)的延時會極大的影響MapReduce的執(zhí)行效率。很容易想到的解決方案就是如何減少中間結(jié)果的大小來提高效率。而本地聚集(local aggregation)可以減少中間結(jié)果的產(chǎn)生,從而能提高M(jìn)apReduce的效率(特別是對于一個Mapper可能產(chǎn)生多個相同的Key)。
1.1 Combiner和 in-MapperCombining
Combiner在大多數(shù)MapReduce實現(xiàn)中都會提供的,它的主要作用是對每個Mapper產(chǎn)生的結(jié)果進(jìn)行本地聚集。我們知道在MapReducer的輸入實現(xiàn)過程是大致這樣的:一個大的作業(yè)文件會通過MapReduce提供的Splitter 來進(jìn)行切割成多個小的文件,每個文件會被一個Mapper處理,而每個小文件又會被MapReduce提供的RecordReader進(jìn)行分割形成很多的key/value 形式的記錄,每個Mapper對象中的map 方法會對每條key/value 的記錄進(jìn)行處理。處理后會形成一個新的Key/value 的中間結(jié)果,會序列化寫到本地磁盤。
舉個常見的例子來說:數(shù)單詞。假設(shè)現(xiàn)在有100 篇的文集,要數(shù)出每個單詞出現(xiàn)的次數(shù)。MapRudce 接到這個任務(wù)后,首先檢測文集數(shù)據(jù)的正確性。然后進(jìn)行分割(這里我們采用邏輯上的分割)。比如集群有10 個TaskTracker可用。MapReduce將100文檔進(jìn)行平均分割,每個TaskTracker會得到10篇文集。若每個TaskTracker運行一個Mapper的話,這10篇文集會一次被Mapper處理。10篇文集又會被分切成10個RecordReader形式的Key/Value(key=docId,value=http://www.china-cloud.com/yunzixun/yunjisuanxinwen/docContent)。這樣一個Mapper一次就會處理一片文檔。具體算法偽碼如下:
1: class Mapper
2: method Map(docid a; doc d)
3: for all term t in doc d do
4: Emit(term t; count 1)
1: class Reducer
2: method Reduce(term t; counts [c1; c2; : : :])
3: sum = 0
4: for all count c in counts [c1; c2; : : :] do
5: sum = sum + c
6: Emit(term t; count sum)
首先,我們來討論Combiner的使用。上述算法只使用了Mapper和Reducer,并沒有使用Combiner,這個算法的中間結(jié)果都是(term t; count 1) 的形式。即每個單詞記錄一次,這樣的中間結(jié)果會很多。我們在下面改進(jìn)的算法中使用Combiner。
1: class Mapper
2: method Map(docid a; doc d)
3: H = new AssociativeArray
4: for all term t in doc d do
5: H{t} = H{t} + 1 . //Tally counts for entire document
6: for all term t in H do
7: Emit(term t; count H{t})
1: class Reducer
2: method Reduce(term t; counts [c1; c2; : : :])
3: sum = 0
4: for all count c in counts [c1; c2; : : :] do
5: sum = sum + c
6: Emit(term t; count sum)
從改進(jìn)算法的偽碼中我們可以看出,我們只是在Mapper中的map方法中添加一個關(guān)聯(lián)數(shù)組(即JAVA中的Map)。每次map 處理一個文檔時,并不是遇到一個單詞就寫到本地磁盤,而是將其添加到關(guān)聯(lián)數(shù)組中,并且關(guān)聯(lián)數(shù)組的值加1。在所有單詞處理完后,在使用一個for循環(huán)將所有的單詞及其出現(xiàn)在該文檔中的次數(shù)寫回本地磁盤。這樣每個Mapper 的輸出就是一個Map方法處理一篇文集中的單詞及其出現(xiàn)在該文檔中的次數(shù)。比如在處理docId =“16”的時候,單詞“link”在這個文集中出現(xiàn)了5次。在沒有使用Combiner的情況下,會有5個中間結(jié)果寫回磁盤即(link;1), (link;1), (link;1), (link;1), (link;1)。而在使用Combiner 的情況下對于單詞“link”只會有一個中間結(jié)果(link;5)寫回磁盤。從中可以看出Combiner可以減少中間結(jié)果的數(shù)量。
這個的Combiner只是對本地Map方法產(chǎn)生的結(jié)果進(jìn)行匯總。其作用相當(dāng)于一個“mini-Reducer”。而事實上在Hadoop的MapReduce的實現(xiàn)中,其Combiner就是實現(xiàn)一個Reducer。
上述的算法相比沒有Combiner 的算法有了很大的提高,實際上該算法還有提升的空間。這就是接下來要講的“In-Mapper Combining”。
Combiner 的使用可以使得每個Mapper 的map 方法產(chǎn)生的結(jié)果本地聚集。實際上更為有效是,我們可以讓每個Mapper 的結(jié)果本地聚集。上面數(shù)單詞的例子中,每個Mapper 會處理10 文檔,而Mapper中的map方法會每次處理1個文檔,map會循環(huán)10遍。我們可以直接將 10個文檔的單詞進(jìn)行本地聚集。
下面是使用 “In-Mapper Combining”的算法偽碼實現(xiàn):
1: class Mapper
2: method Initialize
3: H = new AssociativeArray
4: method Map(docid a; doc d)
5: for all term t in doc d do
6: H{t} = H{t} + 1 . //Tally counts across documents
7: method Close
8: for all term t in H do
9: Emit(term t; count H{t})
1: class Reducer
2: method Reduce(term t; counts [c1; c2; : : :])
3: sum = 0
4: for all count c in counts [c1; c2; : : :] do
5: sum = sum + c
6: Emit(term t; count sum)
從上面使用“In-Mapper Combining”的偽碼中可以看出,Mapper中的不只是有map 一個方法,而是增加了Initialize 和Close 方法。即我們之前說的每個Mapper執(zhí)行過程中在開始和結(jié)束都可以有一段程序自定義代碼,來確定每個Mapper 和Reducer 在執(zhí)行之前和結(jié)束后的動作。這里對應(yīng)Hadoop 中的方法是:setup 和 cleanup 方法。
下面對該算法分析:在每個Mapper 啟動的時候會有一個關(guān)聯(lián)數(shù)組的產(chǎn)生。在執(zhí)行每個Map方法執(zhí)行完時,并不直接寫回磁盤,而是將單詞加入到關(guān)聯(lián)數(shù)組中,在整個Mapper執(zhí)行完后才將所有單詞寫回磁盤(Close方法完成)。對應(yīng)數(shù)單詞的例子來說,每個Mapper不是在每篇文檔處理完后寫回磁盤的,而是每個Spiltter 的10篇文檔處理完后,才一次性寫回磁盤。這樣中間結(jié)果相比直接使用Combiner就更少。
1.2 Combiner 和 in-Mapper Combining的優(yōu)缺點
In-Mapper Combining 雖然比Combiner 有更少的中間結(jié)果。但是它有幾個缺點。首先它破壞了MapReduce 編程模式的基礎(chǔ),因為保存中間結(jié)果跨越了多個Key/Value。如果說為了效率,我們不刻意的去追求模式。但是對于一些特定的算法它是不合適使用,比如某些算法要求對Map方法處理的Key/Value的中間結(jié)果先后有要求,那么這種In-Mapper Combining 是不適應(yīng)的。另一個重要的缺點是In-Mapper Combining 對拓展性提出了挑戰(zhàn)。以數(shù)單詞為例,假設(shè)Mapper處理的10篇文檔很大設(shè)計到很多的單詞,這樣關(guān)聯(lián)數(shù)組勢必會非常大,又可能大到一個JVM不能完全存儲這個關(guān)聯(lián)數(shù)組。這樣拓展性會遇到挑戰(zhàn)。
對于第二個缺點,我可以采用定期寫回磁盤的方法來解決。
Combiner 和In-Mapper Combining 有除了減少中間結(jié)果外,還可以減小分布的傾斜度。比如在數(shù)單詞的例子中,一些常用的單詞,可能會有很多的中間結(jié)果,以至于處理這些常用單詞的Reducer 會比其他的Reducer 慢很多,這種Reducer拖后腿的現(xiàn)象在MapReducer經(jīng)常出現(xiàn),而Combiner 和In-Mapper Combining的使用有助于減少這種情況。