MapReduce是一種可用于數(shù)據(jù)處理的編程模型。該模型比較簡(jiǎn)單,但用于編寫有用的程序并不簡(jiǎn)單。Hadoop可以運(yùn)行由各種語(yǔ)言編寫的MapReduce程序。本章中,我們將看到用Java、Ruby、Python 和C++語(yǔ)言編寫的同一個(gè)程序。最重要的是,MapReduce程序本質(zhì)上是并行運(yùn)行的,因此可以將大規(guī)模的數(shù)據(jù)分析任務(wù)交給任何一個(gè)擁有足夠多機(jī)器的運(yùn)營(yíng)商。MapReduce的優(yōu)勢(shì)在于處理大規(guī)模數(shù)據(jù)集,所以這里先來(lái)看一個(gè)數(shù)據(jù)集。
一個(gè)氣象數(shù)據(jù)集
在我們的例子里,要編寫一個(gè)挖掘氣象數(shù)據(jù)的程序。分布在全球各地的很多氣象傳感器每隔一小時(shí)收集氣象數(shù)據(jù),進(jìn)而獲取了大量的日志數(shù)據(jù)。由于這些數(shù)據(jù)是半結(jié)構(gòu)化數(shù)據(jù)且是按照記錄方式存儲(chǔ)的,因此非常適合使用MapReduce來(lái)處理。
數(shù)據(jù)的格式
我們將使用國(guó)家氣候數(shù)據(jù)中心(National Climatic Data Center,簡(jiǎn)稱NCDC,網(wǎng)址為http://www.ncdc.noaa.gov/)提供的數(shù)據(jù)。這些數(shù)據(jù)按行并以ASCII編碼存儲(chǔ),其中每一行是一條記錄。該存儲(chǔ)格式能夠支持眾多氣象要素,其中許多要素可以有選擇性地列入收集范圍或其數(shù)據(jù)所需的存儲(chǔ)長(zhǎng)度是可變的。為了簡(jiǎn)單起見(jiàn),我們重點(diǎn)討論一些基本要素(如氣溫等),這些要素始終都有且長(zhǎng)度固定。
例2-1顯示了一行采樣數(shù)據(jù),其中重要字段已突出顯示。該行數(shù)據(jù)已被分成很多行以突出顯示每個(gè)字段,在實(shí)際文件中,這些字段被整合成一行且沒(méi)有任何分隔符。
例2-1. 國(guó)家氣候數(shù)據(jù)中心數(shù)據(jù)記錄的格式
0057 332130 # USAF weather station identifier
99999 # WBAN weather station identifier
19500101 # observation date
0300 # observation time
4 +51317 # latitude (degrees x 1000)
+028783 # longitude (degrees x 1000) F
M-12
+0171 # elevation (meters)
99999
V020
320 # wind direction (degrees)
1 # quality code
N
0072
1
00450 # sky ceiling height (meters)
1 # quality code
C
N
010000 # visibility distance (meters)
1 # quality code
N
9 -0128 # air temperature (degrees Celsius x 10)
1 # quality code -0139
# dew point temperature (degrees Celsius x 10)
1 # quality code 10268
# atmospheric pressure (hectopascals x 10)
1 # quality code
數(shù)據(jù)文件按照日期和氣象站進(jìn)行組織。從1901 年到2001 年,每一年都有一個(gè)目錄,每一個(gè)目錄中包含各個(gè)氣象站該年氣象數(shù)據(jù)的打包文件及其說(shuō)明文件。例如,1999年對(duì)應(yīng)文件夾下面包含如下記錄:
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
因?yàn)橛谐汕先f(wàn)個(gè)氣象臺(tái),所以整個(gè)數(shù)據(jù)集由大量的小容量文件組成。通常情況下,處理少量的大型文件顯得更容易且有效,因此,這些數(shù)據(jù)需要經(jīng)過(guò)預(yù)處理,將每年的數(shù)據(jù)文件拼接成一個(gè)獨(dú)立文件。具體做法請(qǐng)參見(jiàn)附錄C。
使用Unix工具進(jìn)行數(shù)據(jù)分析
該數(shù)據(jù)集中每年全球氣溫的最高記錄是多少?我們先不使用Hadoop來(lái)回答這一問(wèn)題,因?yàn)橹挥刑峁┬阅芑鶞?zhǔn)和結(jié)果檢查工具,才能和Hadoop進(jìn)行有效對(duì)比。
傳統(tǒng)處理按行存儲(chǔ)數(shù)據(jù)的工具是awk。例2-2是一個(gè)用于計(jì)算每年最高氣溫的程序腳本。
例2-2. 該程序從NCDC氣象記錄中找出每年最高氣溫
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk'{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if ( temp!=9999 && q ~ /[01459]/ && temp > max) max = temp}
END { print max }'
done
該腳本循環(huán)遍歷按年壓縮的數(shù)據(jù)文件,首先顯示年份,然后使用awk腳本處理每個(gè)文件。awk 腳本從數(shù)據(jù)中提取兩個(gè)字段:氣溫和質(zhì)量代碼。氣溫值通過(guò)加上一個(gè)0 轉(zhuǎn)換為整數(shù)。接著測(cè)試氣溫值是否有效(用值9999 替代NCDC 數(shù)據(jù)集中缺少的記錄),通過(guò)質(zhì)量代碼檢測(cè)讀取的數(shù)值是否可疑或錯(cuò)誤。如果數(shù)據(jù)讀取正確,那么該值將與目前讀取到的最大氣溫值進(jìn)行比較,如果該值比原先的最大值大,就替換目前的最大值。處理完文件中所有的行后,再執(zhí)行END塊中的代碼并打印出最大氣溫值。
下面是某次運(yùn)行結(jié)果的起始部分:
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
由于源文件中的氣溫值被放大了10倍,所以1901年的最高氣溫是 31.7°C (20世紀(jì)初記錄的氣溫?cái)?shù)據(jù)比較少,所以該結(jié)果是可能的)。使用亞馬遜的EC2 High-CPU Extra Large Instance運(yùn)行該程序,查找一個(gè)世紀(jì)以來(lái)氣象數(shù)據(jù)中的最大氣溫值需要42分鐘。
為了加快處理,我們需要并行運(yùn)行部分程序。從理論上講,這很簡(jiǎn)單:我們可以通過(guò)使用計(jì)算機(jī)上所有可用的硬件線程來(lái)處理,其中每個(gè)線程處理不同年份的數(shù)據(jù)。但是,其中依舊存在一些問(wèn)題。
首先,將任務(wù)劃分成大小相同的作業(yè)塊通常并不容易或明顯。在我們的例子中,不同年份數(shù)據(jù)文件的大小差異很大,因此部分線程會(huì)比其他線程更早運(yùn)行結(jié)束。即使讓它們繼續(xù)下一步的工作,整個(gè)運(yùn)行時(shí)間依舊由處理最長(zhǎng)文件所需的時(shí)間決定。另一種更好的方法是將輸入數(shù)據(jù)分成固定大小的塊,然后把每塊分配到各個(gè)進(jìn)程,這樣一來(lái),即使有些進(jìn)程能處理更多數(shù)據(jù),我們也可以為它們分配更多的數(shù)據(jù)。
其次,將獨(dú)立進(jìn)程運(yùn)行的結(jié)果合并后,可能還需要進(jìn)一步的處理。在我們的例子中,每年的結(jié)果獨(dú)立于其他年份,并可能將所有結(jié)果拼接起來(lái),然后按年份進(jìn)行排序。如果使用固定大小塊的方法,則需要特定的方法來(lái)合并結(jié)果。在這個(gè)例子中,某年的數(shù)據(jù)通常被分割成幾個(gè)塊,每個(gè)塊進(jìn)行獨(dú)立處理。我們將最終獲得每個(gè)數(shù)據(jù)塊中的最高氣溫,所以最后一步是尋找這些分塊數(shù)據(jù)中的最大值作為該年的最高氣溫,其他年份的數(shù)據(jù)均需如此處理。
最后,我們依舊受限于一臺(tái)計(jì)算機(jī)的處理能力。如果手上擁有的所有處理器都用上,至少也需要20分鐘,結(jié)果也就只能這樣了。我們不能使它更快。另外,某些數(shù)據(jù)集的增長(zhǎng)會(huì)超出一臺(tái)計(jì)算機(jī)的處理能力。當(dāng)我們開(kāi)始使用多臺(tái)計(jì)算機(jī)時(shí),整個(gè)大環(huán)境中的其他因素將對(duì)其產(chǎn)生影響,其中最主要的是協(xié)調(diào)性和可靠性兩大因素。哪個(gè)進(jìn)程負(fù)責(zé)運(yùn)行整個(gè)作業(yè)?我們?nèi)绾翁幚硎〉倪M(jìn)程?
因此,盡管可以實(shí)現(xiàn)并行處理,但實(shí)際上非常復(fù)雜。使用Hadoop之類的框架來(lái)實(shí)現(xiàn)并行數(shù)據(jù)處理將很有幫助。