文/祝威廉(簡(jiǎn)書作者)
原文鏈接:http://www.jianshu.com/p/d328c96aebfd
著作權(quán)歸作者所有,轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),并標(biāo)注“簡(jiǎn)書作者”。
Tungsten-sort 算不得一個(gè)全新的shuffle 方案,它在特定場(chǎng)景下基于類似現(xiàn)有的Sort Based Shuffle處理流程,對(duì)內(nèi)存/CPU/Cache使用做了非常大的優(yōu)化。帶來(lái)高效的同時(shí),也就限定了自己的使用場(chǎng)景。如果Tungsten-sort 發(fā)現(xiàn)自己無(wú)法處理,則會(huì)自動(dòng)使用 Sort Based Shuffle進(jìn)行處理。
前言
看這篇文章前,建議你先簡(jiǎn)單看看Spark Sort Based Shuffle內(nèi)存分析。
Tungsten 中文是鎢絲的意思。 Tungsten Project 是 Databricks 公司提出的對(duì)Spark優(yōu)化內(nèi)存和CPU使用的計(jì)劃,該計(jì)劃初期似乎對(duì)Spark SQL優(yōu)化的最多。不過(guò)部分RDD API 還有Shuffle也因此受益。
簡(jiǎn)述
Tungsten-sort優(yōu)化點(diǎn)主要在三個(gè)方面:
直接在serialized binary data上sort而不是java objects,減少了memory的開銷和GC的overhead。
提供cache-efficient sorter,使用一個(gè)8bytes的指針,把排序轉(zhuǎn)化成了一個(gè)指針數(shù)組的排序。
spill的merge過(guò)程也無(wú)需反序列化即可完成
這些優(yōu)化的實(shí)現(xiàn)導(dǎo)致引入了一個(gè)新的內(nèi)存管理模型,類似OS的Page,對(duì)應(yīng)的實(shí)際數(shù)據(jù)結(jié)構(gòu)為MemoryBlock,支持off-heap 以及 in-heap 兩種模式。為了能夠?qū)ecord 在這些MemoryBlock進(jìn)行定位,引入了Pointer(指針)的概念。
如果你還記得Sort Based Shuffle里存儲(chǔ)數(shù)據(jù)的對(duì)象PartitionedAppendOnlyMap,這是一個(gè)放在JVM heap里普通對(duì)象,在Tungsten-sort中,他被替換成了類似操作系統(tǒng)內(nèi)存頁(yè)的對(duì)象。如果你無(wú)法申請(qǐng)到新的Page,這個(gè)時(shí)候就要執(zhí)行spill操作,也就是寫入到磁盤的操作。具體觸發(fā)條件,和Sort Based Shuffle 也是類似的。
開啟條件
Spark 默認(rèn)開啟的是Sort Based Shuffle,想要打開Tungsten-sort ,請(qǐng)?jiān)O(shè)置
spark.shuffle.manager=tungsten-sort
對(duì)應(yīng)的實(shí)現(xiàn)類是:
org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
名字的來(lái)源是因?yàn)槭褂昧舜罅縅DK Sun Unsafe API。
當(dāng)且僅當(dāng)下面條件都滿足時(shí),才會(huì)使用新的Shuffle方式:
Shuffle dependency 不能帶有aggregation 或者輸出需要排序
Shuffle 的序列化器需要是 KryoSerializer 或者 Spark SQL's 自定義的一些序列化方式.
Shuffle 文件的數(shù)量不能大于 16777216
序列化時(shí),單條記錄不能大于 128 MB
可以看到,能使用的條件還是挺苛刻的。
這些限制來(lái)源于哪里
參看如下代碼,page的大小:
this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
shuffleMemoryManager.pageSizeBytes());
這就保證了頁(yè)大小不超過(guò)PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES 的值,該值就被定義成了128M。
而產(chǎn)生這個(gè)限制的具體設(shè)計(jì)原因,我們還要仔細(xì)分析下Tungsten的內(nèi)存模型:

來(lái)源于:https://github.com/hustnn/TungstenSecret/tree/master
這張圖其實(shí)畫的是 alt="" width="946" height="807" />
圖片來(lái)源:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-core/
(另外,值得注意的是,這張圖里進(jìn)行spill操作的同時(shí)檢查內(nèi)存可用而導(dǎo)致的Exeception 的bug 已經(jīng)在1.5.1版本被修復(fù)了,忽略那條路徑)
內(nèi)存是否充足的條件依然shuffleMemoryManager 來(lái)決定,也就是所有task shuffle 申請(qǐng)的Page內(nèi)存總和不能大于下面的值:
ExecutorHeapMemeory * 0.2 * 0.8
上面的數(shù)字可通過(guò)下面兩個(gè)配置來(lái)更改:
spark.shuffle.memoryFraction=0.2
spark.shuffle.safetyFraction=0.8
UnsafeShuffleExternalSorter 負(fù)責(zé)申請(qǐng)內(nèi)存,并且會(huì)生成該條記錄最后的邏輯地址,也就前面提到的 Pointer。
接著Record 會(huì)繼續(xù)流轉(zhuǎn)到UnsafeShuffleInMemorySorter中,這個(gè)對(duì)象維護(hù)了一個(gè)指針數(shù)組:
private long[] pointerArray;
數(shù)組的初始大小為 4096,后續(xù)如果不夠了,則按每次兩倍大小進(jìn)行擴(kuò)充。
假設(shè)100萬(wàn)條記錄,那么該數(shù)組大約是8M 左右,所以其實(shí)還是很小的。一旦spill后該UnsafeShuffleInMemorySorter就會(huì)被賦為null,被回收掉。
我們回過(guò)頭來(lái)看spill,其實(shí)邏輯上也異常簡(jiǎn)單了,UnsafeShuffleInMemorySorter 會(huì)返回一個(gè)迭代器,該迭代器粒度每個(gè)元素就是一個(gè)指針,然后到根據(jù)該指針可以拿到真實(shí)的record,然后寫入到磁盤,因?yàn)檫@些record 在一開始進(jìn)入U(xiǎn)nsafeShuffleExternalSorter 就已經(jīng)被序列化了,所以在這里就純粹變成寫字節(jié)數(shù)組了。形成的結(jié)構(gòu)依然和Sort Based Shuffle 一致,一個(gè)文件里不同的partiton的數(shù)據(jù)用fileSegment來(lái)表示,對(duì)應(yīng)的信息存在一個(gè)index文件里。