R Worker
SparkR RDD API和Scala RDD API相比有兩大不同:SparkR RDD是R對(duì)象的分布式數(shù)據(jù)集,SparkR RDD transformation操作應(yīng)用的是R函數(shù)。SparkR RDD API的執(zhí)行依賴于Spark Core但運(yùn)行在JVM上的Spark Core既無(wú)法識(shí)別R對(duì)象的類型和格式,又不能執(zhí)行R的函數(shù),因此如何在Spark的分布式計(jì)算核心的基礎(chǔ)上實(shí)現(xiàn)SparkR RDD API是SparkR架構(gòu)設(shè)計(jì)的關(guān)鍵。
SparkR設(shè)計(jì)了Scala RRDD類,除了從數(shù)據(jù)源創(chuàng)建的SparkR RDD外,每個(gè)SparkR RDD對(duì)象概念上在JVM端有一個(gè)對(duì)應(yīng)的RRDD對(duì)象。RRDD派生自RDD類,改寫了RDD的compute()方法,在執(zhí)行時(shí)會(huì)啟動(dòng)一個(gè)R worker進(jìn)程,通過(guò)socket連接將父RDD的分區(qū)數(shù)據(jù)、序列化后的R函數(shù)以及其它信息傳給R worker進(jìn)程。R worker進(jìn)程反序列化接收到的分區(qū)數(shù)據(jù)和R函數(shù),將R函數(shù)應(yīng)到到分區(qū)數(shù)據(jù)上,再把結(jié)果數(shù)據(jù)序列化成字節(jié)數(shù)組傳回JVM端。
從這里可以看出,與Scala RDD API相比,SparkR RDD API的實(shí)現(xiàn)多了幾項(xiàng)開(kāi)銷:?jiǎn)?dòng)R worker進(jìn)程,將分區(qū)數(shù)據(jù)傳給R worker和R worker將結(jié)果返回,分區(qū)數(shù)據(jù)的序列化和反序列化。這也是SparkR RDD API相比Scala RDD API有較大性能差距的原因。
DataFrame API的實(shí)現(xiàn)
由于SparkR DataFrame API不需要傳入R語(yǔ)言的函數(shù)(UDF()方法和RDD相關(guān)方法除外),而且DataFrame中的數(shù)據(jù)全部是以JVM的數(shù)據(jù)類型存儲(chǔ),所以和SparkR RDD API的實(shí)現(xiàn)相比,SparkR DataFrame API的實(shí)現(xiàn)簡(jiǎn)單很多。R端的DataFrame對(duì)象就是對(duì)應(yīng)的JVM端DataFrame對(duì)象的wrapper,一個(gè)DataFrame方法的實(shí)現(xiàn)基本上就是簡(jiǎn)單地調(diào)用JVM端DataFrame的相應(yīng)方法。這種情況下,R Worker就不需要了。這是使用SparkR DataFrame API能獲得和ScalaAPI近乎相同的性能的原因。
當(dāng)然,DataFrame API還包含了一些RDD API,這些RDD API方法的實(shí)現(xiàn)是先將DataFrame轉(zhuǎn)換成RDD,然后調(diào)用RDD 的相關(guān)方法。