1.MapReduce運行模型總體概覽:

mapreduceAllGraph.png
InputSplit: InputSplit是單個map任務(wù)的輸入文件片,默認(rèn)文件的一個block。
Map函數(shù):數(shù)據(jù)處理邏輯的主體,用開發(fā)者開發(fā)。
Partition:map的結(jié)果發(fā)送到相應(yīng)的reduce。
Combain:reduce之前進(jìn)行一次預(yù)合并,減小網(wǎng)絡(luò)IO。當(dāng)然,部分場景不適合。
Shuffle:map輸出數(shù)據(jù)按照Partition分發(fā)到各個reduce。
*reduce:將不同map匯總來的數(shù)據(jù)做reduce邏輯。
2.多reduce:

datatrans.png
3.經(jīng)典wordcount:

wordcountdatatrans.png

mapreducedataStream.png
4.Map類的實現(xiàn):
必須繼承org.apache.hadoop.mapreduce.Mapper 類
map()函數(shù),對于每一個輸入K/V都會調(diào)用一次map函數(shù),邏輯實現(xiàn)(必須)。
setup()函數(shù),在task開始前調(diào)用一次,做maptask的一些初始化工作,如連接數(shù)據(jù)庫、加載配置(可選)。
cleanup()函數(shù),在task結(jié)束前調(diào)用一次,做maptask的收尾清理工作,如批處理的收尾,關(guān)閉連接等(可選)
Context上下文環(huán)境對象,包含task相關(guān)的配置、屬性和狀態(tài)等。
5.Reduce類的實現(xiàn):
必須繼承org.apache.hadoop.mapreduce.Reducer類。
reduce(key, Iterable<>values,Context context)對于每一個key值調(diào)用一次reduce函數(shù)。
setup():在task開始前調(diào)用一次,做reducetask的一些初始化工作。
cleanup():在task結(jié)束時調(diào)用一次,做reducetask的收尾清理工作。
6.作業(yè)整體配置:
參數(shù)解析:String[]otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();
創(chuàng)建job: Jobjob= Job.getInstance(conf, "word count");
設(shè)置map類,reduce類。
設(shè)置map和reduce輸出的KV類型,二者輸出類型一致的話則可以只設(shè)置Reduce的輸出類型。
設(shè)置reduce的個數(shù) :默認(rèn)為1,綜合考慮,建議單個reduce處理數(shù)據(jù)量<10G。不想啟用reduce設(shè)置為0即可。
設(shè)置InputFormat
設(shè)置OutputFormat
設(shè)置輸入,輸出路徑。
job.waitForCompletion(true) (同步提交)和job.submit()(異步提交)
wordcount:
```public class WordCountTask {
private static final Logger logger = Logger.getLogger(WordCountTask.class);
public static class WordCountMap extends Mapper{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
[email protected]
protected void cleanup(Context context)
throws IOException, InterruptedException {
logger.info("mapTaskEnd.....");
}
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
[email protected]
protected void setup(Context context)
throws IOException, InterruptedException {
logger.info("mapTaskStart.....");
}
}
public static class WordCountReduce extends Reducer{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
context.write(key, this.result);
}
}
public static void main(String[] args)
throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {