library(SparkR)#初始化SparkContextsc <- sparkR.init("local", "RWordCount") #從HDFS上的一個文本文件創(chuàng)建RDD lines <- textFile(sc, "hdfs://localhost:9000/my_text_file")#調用RDD的transformation和action方法來計算word count#transformation用的函數(shù)是R代碼words <- flatMap(lines, function(line) { strsplit(line, " ")[[1]] })wordCount <- lapply(words, function(word) { list(word, 1L) })counts <- reduceByKey(wordCount, "+", 2L)output <- collect(counts)
基于DataFrame API的示例
基于DataFrame API的SparkR程序首先創(chuàng)建SparkContext,然后創(chuàng)建SQLContext,用SQLContext來創(chuàng)建DataFrame,再操作DataFrame里的數(shù)據(jù)。下面是用SparkR DataFrame API計算平均年齡的示例:
library(SparkR)#初始化SparkContext和SQLContextsc <- sparkR.init("local", "AverageAge") sqlCtx <- sparkRSQL.init(sc)#從當前目錄的一個JSON文件創(chuàng)建DataFramedf <- jsonFile(sqlCtx, "person.json")#調用DataFrame的操作來計算平均年齡df2 <- agg(df, age="avg")averageAge <- collect(df2)[1, 1]
對于上面兩個示例要注意的一點是SparkR RDD和DataFrame API的調用形式和Java/Scala API有些不同。假設rdd為一個RDD對象,在Java/Scala API中,調用rdd的map()方法的形式為:rdd.map(…),而在SparkR中,調用的形式為:map(rdd, …)。這是因為SparkR使用了R的S4對象系統(tǒng)來實現(xiàn)RDD和DataFrame類。
8/11 首頁 上一頁 6 7 8 9 10 11 下一頁 尾頁