Facebook 60TB+級的Apache Spark應(yīng)用案例 里大體有兩方面的PR,一個是Bug Fix,一個是性能優(yōu)化。這篇文章會對所有提及的Bug Issue進行一次解釋和說明。也請期待下一篇。
前言
Facebook 60TB+級的Apache Spark應(yīng)用案例 ,本來上周就準備看的,而且要求自己不能手機看,要在電腦上細細的看。然而終究是各種忙拖到了昨天晚上。
文章體現(xiàn)的工作,我覺得更像是一次挑戰(zhàn)賽,F(xiàn)acebook團隊通過層層加碼,最終將單個Spark Batch實例跑到了60T+ 的數(shù)據(jù),這是一個了不起的成就,最最重要的是,他們完成這項挑戰(zhàn)賽后給社區(qū)帶來了三個好處:
在如此規(guī)模下,發(fā)現(xiàn)了一些Spark團隊以前很難發(fā)現(xiàn)的Bug
提交了大量的bug fix 和 new features,而且我們可以在Spark 1.6.2 /Spark 2.0 里享受到其中的成果
在如此規(guī)模下,我們也知道我們最可能遇到的一些問題。大體是OOM和Driver的限制。
說實在的,我覺得這篇文章,可以算是一篇工程論文了。而且只用了三個人力,不知道一共花了多久。
值得注意的是,大部分Bug都是和OOM相關(guān)的,這也是Spark的一個痛點,所以這次提交的PR質(zhì)量非常高。
Bug 剖析
Make PipedRDD robust to fetch failure SPARK-13793
這個Issue 還是比較明顯的。PipedRDD 在Task內(nèi)部啟動一個新的Java進程(假設(shè)我們叫做ChildProcessor)獲取數(shù)據(jù)。這里就會涉及到三個點:
啟動一個線程往 ChildProcessor 寫數(shù)據(jù) (stdin writer)
啟動一個線程監(jiān)控ChildProcessor的錯誤輸出 (stderr reader)
獲取ChildProcessor輸入流,返回一個迭代器(Iterator)
既然都是讀取數(shù)據(jù)流,如果數(shù)據(jù)流因為某種異常原因關(guān)閉,那必然會拋出錯誤。所以我們需要記錄這個異常,對于1,2 兩個我們只要catch住異常,然后將異常記錄下來方便后續(xù)重新拋出。 那么什么時候拋出呢?迭代器有經(jīng)典的hasNext/next方法,每次hasNext時,我們都檢查下是否有Exception(來自1,2的),如果有就拋出了。既然已經(jīng)異常了,我們就應(yīng)該不需要繼續(xù)讀取這個分區(qū)的數(shù)據(jù)了。否則數(shù)據(jù)集很大的情況下,還要運行很長時間才能運行完。
在hasNext 為false的情況下,有兩類情況,一類是真的沒有數(shù)據(jù)了,一類是有異常了,比如有節(jié)點掛了,所以需要檢測下ChildProcessor的exitStatus狀態(tài)。如果不正常,就直接拋出異常,進行重試。
對于1,2兩點,原來都是沒有的,是這次Facebook團隊加上去的。
Configurable max number of fetch failures SPARK-13369
截止到我這篇文章發(fā)出,這個Issue 并沒有被接收。
我們知道,Shuffle 發(fā)生時,一般會發(fā)生有兩個Stage 產(chǎn)生,一個ShuffleMapStage (我們?nèi)∶麨?MapStage),他會寫入數(shù)據(jù)到文件中,接著下一個Stage (我們?nèi)∶麨镽educeStage) 就會去讀取對應(yīng)的數(shù)據(jù)。 很多情況下,ReduceStage 去讀取數(shù)據(jù)MapStage 的數(shù)據(jù)會失敗,可能的原因比如有節(jié)點重啟導(dǎo)致MapStage產(chǎn)生的數(shù)據(jù)有丟失,此外還有GC超時等。這個時候Spark 就會重跑這兩個Stage,如果連續(xù)四次都發(fā)生這個問題,那么就會將整個Job給標記為失敗。 現(xiàn)階段(包括在剛發(fā)布的2.0),這個數(shù)值是固定的,并不能夠設(shè)置。
[email protected] 給出的質(zhì)疑是,如果發(fā)生節(jié)點失敗導(dǎo)致Stage 重新被Resubmit ,Resubmit后理論上不會再嘗試原來失敗的節(jié)點,如果連續(xù)四次都無法找到正常的階段運行這些任務(wù),那么應(yīng)該是有Bug,簡單增加重試次數(shù)雖然也有意義,但是治標不治本。
我個人認為在集群規(guī)模較大,任務(wù)較重的過程中,出現(xiàn)一個或者一批Node 掛掉啥的是很正常的,如果僅僅是因為某個Shuffle 導(dǎo)致整個Job失敗,對于那種大而耗時的任務(wù)顯然是不能接受的。個人認為應(yīng)該講這個決定權(quán)交給用戶,也就是允許用戶配置嘗試次數(shù)。
Unresponsive driver SPARK-13279
這個Bug已經(jīng)在1.6.1, 2.0.0 中修復(fù)。 這個場景比較特殊,因為Facebook產(chǎn)生了高達200k的task數(shù),原來給 pendingTasksForExecutor:HashMap[String, ArrayBuffer[Int]] 添加新的task 的時候,都會根據(jù)Executor名獲取到已經(jīng)存在的列表,然后判斷該列表是否已經(jīng)包含了新Task,這個操作的時間復(fù)雜度是O(N^2)。在Task數(shù)比較小的情況下沒啥問題,但是一旦task數(shù)達到了200k,基本就要五分鐘,給人的感覺就是Driver沒啥反應(yīng)了。
而且在實際運行任務(wù)的過程中,會通過一個特殊的dequeueTaskFromList結(jié)構(gòu)來排除掉已經(jīng)運行的任務(wù),所以我們其實在addPendingTask 過程中不需要做這個檢測。