
flink-iterations-iterate-operator
Step函數(shù)在每一輪迭代中都會(huì)被執(zhí)行,它可以是由map、reduce、join等Operator組成的數(shù)據(jù)流。下面通過官網(wǎng)給出的一個(gè)例子來說明Iterate Operator,非常簡(jiǎn)單直觀,如下圖所示:

flink-iterations-iterate-operator-example
上面迭代過程中,輸入數(shù)據(jù)為1到5的數(shù)字,Step函數(shù)就是一個(gè)簡(jiǎn)單的map函數(shù),會(huì)對(duì)每個(gè)輸入的數(shù)字進(jìn)行加1處理,而Next Partial Solution對(duì)應(yīng)于經(jīng)過map函數(shù)處理后的結(jié)果,比如第一輪迭代,對(duì)輸入的數(shù)字1加1后結(jié)果為2,對(duì)輸入的數(shù)字2加1后結(jié)果為3,直到對(duì)輸入數(shù)字5加1后結(jié)果為變?yōu)?,這些新生成結(jié)果數(shù)字2~6會(huì)作為第二輪迭代的輸入。迭代終止條件為進(jìn)行10輪迭代,則最終的結(jié)果為11~15。
Delta Iterate
Delta Iterate Operator實(shí)現(xiàn)了增量迭代,它的實(shí)現(xiàn)原理如下圖所示:

flink-iterations-delta-iterate-operator
基于Delta Iterate Operator實(shí)現(xiàn)增量迭代,它有2個(gè)輸入,其中一個(gè)是初始Workset,表示輸入待處理的增量Stream數(shù)據(jù),另一個(gè)是初始Solution Set,它是經(jīng)過Stream方向上Operator處理過的結(jié)果。第一輪迭代會(huì)將Step函數(shù)作用在初始Workset上,得到的計(jì)算結(jié)果Workset作為下一輪迭代的輸入,同時(shí)還要增量更新初始Solution Set。如果反復(fù)迭代知道滿足迭代終止條件,最后會(huì)根據(jù)Solution Set的結(jié)果,輸出最終迭代結(jié)果。
比如,我們現(xiàn)在已知一個(gè)Solution集合中保存的是,已有的商品分類大類中購(gòu)買量最多的商品,而Workset輸入的是來自線上實(shí)時(shí)交易中最新達(dá)成購(gòu)買的商品的人數(shù),經(jīng)過計(jì)算會(huì)生成新的商品分類大類中商品購(gòu)買量最多的結(jié)果,如果某些大類中商品購(gòu)買量突然增長(zhǎng),它需要更新Solution Set中的結(jié)果(原來購(gòu)買量最多的商品,經(jīng)過增量迭代計(jì)算,可能已經(jīng)不是最多),最后會(huì)輸出最終商品分類大類中購(gòu)買量最多的商品結(jié)果集合。更詳細(xì)的例子,可以參考官網(wǎng)給出的“Propagate Minimum in Graph”,這里不再累述。
Backpressure監(jiān)控
Backpressure在流式計(jì)算系統(tǒng)中會(huì)比較受到關(guān)注,因?yàn)樵谝粋€(gè)Stream上進(jìn)行處理的多個(gè)Operator之間,它們處理速度和方式可能非常不同,所以就存在上游Operator如果處理速度過快,下游Operator處可能機(jī)會(huì)堆積Stream記錄,嚴(yán)重會(huì)造成處理延遲或下游Operator負(fù)載過重而崩潰(有些系統(tǒng)可能會(huì)丟失數(shù)據(jù))。因此,對(duì)下游Operator處理速度跟不上的情況,如果下游Operator能夠?qū)⒆约禾幚頎顟B(tài)傳播給上游Operator,使得上游Operator處理速度慢下來就會(huì)緩解上述問題,比如通過告警的方式通知現(xiàn)有流處理系統(tǒng)存在的問題。
Flink Web界面上提供了對(duì)運(yùn)行Job的Backpressure行為的監(jiān)控,它通過使用Sampling線程對(duì)正在運(yùn)行的Task進(jìn)行堆棧跟蹤采樣來實(shí)現(xiàn),具體實(shí)現(xiàn)方式如下圖所示:

flink-back-pressure-sampling
JobManager會(huì)反復(fù)調(diào)用一個(gè)Job的Task運(yùn)行所在線程的Thread.getStackTrace(),默認(rèn)情況下,JobManager會(huì)每間隔50ms觸發(fā)對(duì)一個(gè)Job的每個(gè)Task依次進(jìn)行100次堆棧跟蹤調(diào)用,根據(jù)調(diào)用調(diào)用結(jié)果來確定Backpressure,F(xiàn)link是通過計(jì)算得到一個(gè)比值(Radio)來確定當(dāng)前運(yùn)行Job的Backpressure狀態(tài)。在Web界面上可以看到這個(gè)Radio值,它表示在一個(gè)內(nèi)部方法調(diào)用中阻塞(Stuck)的堆棧跟蹤次數(shù),例如,radio=0.01,表示100次中僅有1次方法調(diào)用阻塞。Flink目前定義了如下Backpressure狀態(tài):
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
另外,F(xiàn)link還提供了3個(gè)參數(shù)來配置Backpressure監(jiān)控行為:

參數(shù)名稱 默認(rèn)值 說明
jobmanager.web.backpressure.refresh-interval 60000 默認(rèn)1分鐘,表示采樣統(tǒng)計(jì)結(jié)果刷新時(shí)間間隔
jobmanager.web.backpressure.num-samples 100 評(píng)估Backpressure狀態(tài),所使用的堆棧跟蹤調(diào)用次數(shù)