原帖:
http://www.cnblogs.com/phinecos/Hadoop
是一個實現(xiàn)了 MapReduce 計算模型的開源分布式并行編程框架,借助于 Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運行于計算機集群上,完成海量數(shù)據(jù)的計算。本文將介紹 MapReduce 計算模型,分布式并行計算等基本概念,以及 Hadoop 的安裝部署和基本運行方法。
Hadoop 簡介
Hadoop 是一個開源的可運行于大規(guī)模集群上的分布式并行編程框架,由于分布式存儲對于分布式編程來說是必不可少的,這個框架中還包含了一個分布式文件系統(tǒng) HDFS( Hadoop Distributed File System )。也許到目前為止,Hadoop 還不是那么廣為人知,其最新的版本號也僅僅是 0.16,距離 1.0 似乎都還有很長的一段距離,但提及 Hadoop 一脈相承的另外兩個開源項目 Nutch 和 Lucene ( 三者的創(chuàng)始人都是 Doug Cutting ),那絕對是大名鼎鼎。Lucene 是一個用 Java 開發(fā)的開源高性能全文檢索工具包,它不是一個完整的應用程序,而是一套簡單易用的 API 。在全世界范圍內,已有無數(shù)的軟件系統(tǒng),Web 網站基于 Lucene 實現(xiàn)了全文檢索功能,后來 Doug Cutting 又開創(chuàng)了第一個開源的 Web 搜索引擎(http://www.nutch.org/) Nutch, 它在 Lucene 的基礎上增加了網絡爬蟲和一些和 Web 相關的功能,一些解析各類文檔格式的插件等,此外,Nutch 中還包含了一個分布式文件系統(tǒng)用于存儲數(shù)據(jù)。從 Nutch 0.8.0 版本之后,Doug Cutting 把 Nutch 中的分布式文件系統(tǒng)以及實現(xiàn) MapReduce 算法的代碼獨立出來形成了一個新的開源項 Hadoop。Nutch 也演化為基于 Lucene 全文檢索以及 Hadoop 分布式計算平臺的一個開源搜索引擎。
基于 Hadoop,你可以輕松地編寫可處理海量數(shù)據(jù)的分布式并行程序,并將其運行于由成百上千個結點組成的大規(guī)模計算機集群上。從目前的情況來 看,Hadoop 注定會有一個輝煌的未來:"云計算"是目前灸手可熱的技術名詞,全球各大 IT 公司都在投資和推廣這種新一代的計算模式,而 Hadoop 又被其中幾家主要的公司用作其"云計算"環(huán)境中的重要基礎軟件,如:雅虎正在借助 Hadoop 開源平臺的力量對抗 Google, 除了資助 Hadoop 開發(fā)團隊外,還在開發(fā)基于 Hadoop 的開源項目 Pig, 這是一個專注于海量數(shù)據(jù)集分析的分布式計算程序。Amazon 公司基于 Hadoop 推出了 Amazon S3 ( Amazon Simple Storage Service ),提供可靠,快速,可擴展的網絡存儲服務,以及一個商用的云計算平臺 Amazon EC2 ( Amazon Elastic Compute Cloud )。在 IBM 公司的云計算項目--"藍云計劃"中,Hadoop 也是其中重要的基礎軟件。Google 正在跟IBM合作,共同推廣基于 Hadoop 的云計算。
迎接編程方式的變革
在摩爾定律的作用下,以前程序員根本不用考慮計算機的性能會跟不上軟件的發(fā)展,因為約每隔 18 個月,CPU 的主頻就會增加一倍,性能也將提升一倍,軟件根本不用做任何改變,就可以享受免費的性能提升。然而,由于晶體管電路已經逐漸接近其物理上的性能極限,摩爾 定律在 2005 年左右開始失效了,人類再也不能期待單個 CPU 的速度每隔 18 個月就翻一倍,為我們提供越來越快的計算性能。Intel, AMD, IBM 等芯片廠商開始從多核這個角度來挖掘 CPU 的性能潛力,多核時代以及互聯(lián)網時代的到來,將使軟件編程方式發(fā)生重大變革,基于多核的多線程并發(fā)編程以及基于大規(guī)模計算機集群的分布式并行編程是將來軟 件性能提升的主要途徑。
許多人認為這種編程方式的重大變化將帶來一次軟件的并發(fā)危機,因為我們傳統(tǒng)的軟件方式基本上是單指令單數(shù)據(jù)流的順序執(zhí)行,這種順序執(zhí)行十分符合人類的思考 習慣,卻與并發(fā)并行編程格格不入。基于集群的分布式并行編程能夠讓軟件與數(shù)據(jù)同時運行在連成一個網絡的許多臺計算機上,這里的每一臺計算機均可以是一臺普 通的 PC 機。這樣的分布式并行環(huán)境的最大優(yōu)點是可以很容易的通過增加計算機來擴充新的計算結點,并由此獲得不可思議的海量計算能力, 同時又具有相當強的容錯能力,一批計算結點失效也不會影響計算的正常進行以及結果的正確性。Google 就是這么做的,他們使用了叫做 MapReduce 的并行編程模型進行分布式并行編程,運行在叫做 GFS ( Google File System )的分布式文件系統(tǒng)上,為全球億萬用戶提供搜索服務。
Hadoop 實現(xiàn)了 Google 的 MapReduce 編程模型,提供了簡單易用的編程接口,也提供了它自己的分布式文件系統(tǒng) HDFS,與 Google 不同的是,Hadoop 是開源的,任何人都可以使用這個框架來進行并行編程。如果說分布式并行編程的難度足以讓普通程序員望而生畏的話,開源的 Hadoop 的出現(xiàn)極大的降低了它的門檻,讀完本文,你會發(fā)現(xiàn)基于 Hadoop 編程非常簡單,無須任何并行開發(fā)經驗,你也可以輕松的開發(fā)出分布式的并行程序,并讓其令人難以置信地同時運行在數(shù)百臺機器上,然后在短時間內完成海量數(shù)據(jù) 的計算。你可能會覺得你不可能會擁有數(shù)百臺機器來運行你的并行程序,而事實上,隨著"云計算"的普及,任何人都可以輕松獲得這樣的海量計算能力。 例如現(xiàn)在 Amazon 公司的云計算平臺 Amazon EC2 已經提供了這種按需計算的租用服務,有興趣的讀者可以去了解一下,這篇系列文章的第三部分將有所介紹。
掌握一點分布式并行編程的知識對將來的程序員是必不可少的,Hadoop 是如此的簡便好用,何不嘗試一下呢?也許你已經急不可耐的想試一下基于 Hadoop 的編程是怎么回事了,但畢竟這種編程模型與傳統(tǒng)的順序程序大不相同,掌握一點基礎知識才能更好地理解基于 Hadoop 的分布式并行程序是如何編寫和運行的。因此本文會先介紹一下 MapReduce 的計算模型,Hadoop 中的分布式文件系統(tǒng) HDFS, Hadoop 是如何實現(xiàn)并行計算的,然后才介紹如何安裝和部署 Hadoop 框架,以及如何運行 Hadoop 程序。
MapReduce 計算模型
MapReduce 是 Google 公司的核心計算模型,它將復雜的運行于大規(guī)模集群上的并行計算過程高度的抽象到了兩個函數(shù),Map 和 Reduce, 這是一個令人驚訝的簡單卻又威力巨大的模型。適合用 MapReduce 來處理的數(shù)據(jù)集(或任務)有一個基本要求: 待處理的數(shù)據(jù)集可以分解成許多小的數(shù)據(jù)集,而且每一個小數(shù)據(jù)集都可以完全并行地進行處理。
圖 1. MapReduce 計算流程
圖一說明了用 MapReduce 來處理大數(shù)據(jù)集的過程, 這個 MapReduce 的計算過程簡而言之,就是將大數(shù)據(jù)集分解為成百上千的小數(shù)據(jù)集,每個(或若干個)數(shù)據(jù)集分別由集群中的一個結點(一般就是一臺普通的計算機)進行處理并生 成中間結果,然后這些中間結果又由大量的結點進行合并, 形成最終結果。
計算模型的核心是 Map 和 Reduce 兩個函數(shù),這兩個函數(shù)由用戶負責實現(xiàn),功能是按一定的映射規(guī)則將輸入的 <key, value> 對轉換成另一個或一批 <key, value> 對輸出。
表一 Map 和 Reduce 函數(shù)
函數(shù)
|
輸入
|
輸出
|
說明
|
Map
|
<k1, v1>
|
List(<k2,v2>)
|
1. 將小數(shù)據(jù)集進一步解析成一批 <key,value> 對,輸入 Map 函數(shù)中進行處理。 2. 每一個輸入的 <k1,v1> 會輸出一批 <k2,v2>。 <k2,v2> 是計算的中間結果。
|
Reduce
|
<k2,List(v2)>
|
<k3,v3>
|
輸入的中間結果 <k2,List(v2)> 中的 List(v2) 表示是一批屬于同一個 k2 的 value
|
以一個計算文本文件中每個單詞出現(xiàn)的次數(shù)的程序為例,<k1,v1> 可以是 <行在文件中的偏移位置, 文件中的一行>,經 Map 函數(shù)映射之后,形成一批中間結果 <單詞,出現(xiàn)次數(shù)>, 而 Reduce 函數(shù)則可以對中間結果進行處理,將相同單詞的出現(xiàn)次數(shù)進行累加,得到每個單詞的總的出現(xiàn)次數(shù)。
基于 MapReduce 計算模型編寫分布式并行程序非常簡單,程序員的主要編碼工作就是實現(xiàn) Map 和 Reduce 函數(shù),其它的并行編程中的種種復雜問題,如分布式存儲,工作調度,負載平衡,容錯處理,網絡通信等,均由 MapReduce 框架(比如 Hadoop )負責處理,程序員完全不用操心。
四 集群上的并行計算
MapReduce 計算模型非常適合在大量計算機組成的大規(guī)模集群上并行運行。圖一中的每一個 Map 任務和每一個 Reduce 任務均可以同時運行于一個單獨的計算結點上,可想而知其運算效率是很高的,那么這樣的并行計算是如何做到的呢?
數(shù)據(jù)分布存儲
Hadoop 中的分布式文件系統(tǒng) HDFS 由一個管理結點 ( NameNode )和N個數(shù)據(jù)結點 ( DataNode )組成,每個結點均是一臺普通的計算機。在使用上同我們熟悉的單機上的文件系統(tǒng)非常類似,一樣可以建目錄,創(chuàng)建,復制,刪除文件,查看文件內容等。但其底 層實現(xiàn)上是把文件切割成 Block,然后這些 Block 分散地存儲于不同的 DataNode 上,每個 Block 還可以復制數(shù)份存儲于不同的 DataNode 上,達到容錯容災之目的。NameNode 則是整個 HDFS 的核心,它通過維護一些數(shù)據(jù)結構,記錄了每一個文件被切割成了多少個 Block,這些 Block 可以從哪些 DataNode 中獲得,各個 DataNode 的狀態(tài)等重要信息。如果你想了解更多的關于 HDFS 的信息,可進一步閱讀參考資料: The Hadoop Distributed File System:Architecture and Design
分布式并行計算
Hadoop 中有一個作為主控的 JobTracker,用于調度和管理其它的 TaskTracker, JobTracker 可以運行于集群中任一臺計算機上。TaskTracker 負責執(zhí)行任務,必須運行于 DataNode 上,即 DataNode 既是數(shù)據(jù)存儲結點,也是計算結點。 JobTracker 將 Map 任務和 Reduce 任務分發(fā)給空閑的 TaskTracker, 讓這些任務并行運行,并負責監(jiān)控任務的運行情況。如果某一個 TaskTracker 出故障了,JobTracker 會將其負責的任務轉交給另一個空閑的 TaskTracker 重新運行。
本地計算
數(shù)據(jù)存儲在哪一臺計算機上,就由這臺計算機進行這部分數(shù)據(jù)的計算,這樣可以減少數(shù)據(jù)在網絡上的傳輸,降低對網絡帶寬的需求。在 Hadoop 這樣的基于集群的分布式并行系統(tǒng)中,計算結點可以很方便地擴充,而因它所能夠提供的計算能力近乎是無限的,但是由是數(shù)據(jù)需要在不同的計算機之間流動,故網 絡帶寬變成了瓶頸,是非常寶貴的,“本地計算”是最有效的一種節(jié)約網絡帶寬的手段,業(yè)界把這形容為“移動計算比移動數(shù)據(jù)更經濟”。
圖 2. 分布存儲與并行計算
任務粒度
把原始大數(shù)據(jù)集切割成小數(shù)據(jù)集時,通常讓小數(shù)據(jù)集小于或等于 HDFS 中一個 Block 的大小(缺省是 64M),這樣能夠保證一個小數(shù)據(jù)集位于一臺計算機上,便于本地計算。有 M 個小數(shù)據(jù)集待處理,就啟動 M 個 Map 任務,注意這 M 個 Map 任務分布于 N 臺計算機上并行運行,Reduce 任務的數(shù)量 R 則可由用戶指定。
Partition
把Map 任務輸出的中間結果按 key 的范圍劃分成 R 份( R 是預先定義的 Reduce 任務的個數(shù)),劃分時通常使用 hash 函數(shù)如: hash(key) mod R,這樣可以保證某一段范圍內的 key,一定是由一個 Reduce 任務來處理,可以簡化 Reduce 的過程。
Combine
在partition 之前,還可以對中間結果先做 combine,即將中間結果中有相同 key的 <key, value> 對合并成一對。combine 的過程與 Reduce 的過程類似,很多情況下就可以直接使用 Reduce 函數(shù),但 combine 是作為 Map 任務的一部分,在執(zhí)行完 Map 函數(shù)后緊接著執(zhí)行的。Combine 能夠減少中間結果中 <key, value> 對的數(shù)目,從而減少網絡流量。
Reduce 任務從 Map 任務結點取中間結果
Map任務的中間結果在做完 Combine 和 Partition 之后,以文件形式存于本地磁盤。中間結果文件的位置會通知主控 JobTracker, JobTracker 再通知 Reduce 任務到哪一個 DataNode 上去取中間結果。注意所有的 Map 任務產生中間結果均按其 Key 用同一個 Hash 函數(shù)劃分成了 R 份,R 個 Reduce 任務各自負責一段 Key 區(qū)間。每個 Reduce 需要向許多個 Map 任務結點取得落在其負責的 Key 區(qū)間內的中間結果,然后執(zhí)行 Reduce 函數(shù),形成一個最終的結果文件。
任務管道
有 R 個 Reduce 任務,就會有 R 個最終結果,很多情況下這 R 個最終結果并不需要合并成一個最終結果。因為這 R 個最終結果又可以做為另一個計算任務的輸入,開始另一個并行計算任務。
五 Hadoop 初體驗
Hadoop 支持 Linux 及 Windows 操作系統(tǒng), 但其官方網站聲明 Hadoop 的分布式操作在 Windows 上未做嚴格測試,建議只把 Windows 作為 Hadoop 的開發(fā)平臺。在 Windows 環(huán)境上的安裝步驟如下( Linux 平臺類似,且更簡單一些):
(1)在 Windows 下,需要先安裝 Cgywin, 安裝 Cgywin 時注意一定要選擇安裝 openssh (在 Net category )。安裝完成之后,把 Cgywin 的安裝目錄如 c:"cygwin"bin 加到系統(tǒng)環(huán)境變量 PATH 中,這是因為運行 Hadoop 要執(zhí)行一些 linux 環(huán)境下的腳本和命令。
(2)安裝 Java 1.5.x,并將 JAVA_HOME 環(huán)境變量設置為 Java 的安裝根目錄如 C:"Program Files"Java"jdk1.5.0_01。
(3)到 Hadoop 官方網站 http://hadoop.apache.org/下載Hadoop Core, 最新的穩(wěn)定版本是 0.16.0. 將下載后的安裝包解壓到一個目錄,本文假定解壓到 c:"hadoop-0.16.0。
4)修改 conf/hadoop-env.sh 文件,在其中設置 JAVA_HOME 環(huán)境變量: export JAVA_HOME="C:"Program Files"Java"jdk1.5.0_01” (因為路徑中 Program Files 中間有空格,一定要用雙引號將路徑引起來)
至此,一切就緒,可以運行 Hadoop 了。以下的運行過程,需要啟動 cygwin, 進入模擬 Linux 環(huán)境。在下載的 Hadoop Core 包中,帶有幾個示例程序并且已經打包成了 hadoop-0.16.0-examples.jar。其中有一個 WordCount 程序,功能是統(tǒng)計一批文本文件中各個單詞出現(xiàn)的次數(shù),我們先來看看怎么運行這個程序。Hadoop 共有三種運行模式: 單機(非分布式)模式,偽分布式運行模式,分布式運行模式,其中前兩種運行模式體現(xiàn)不了 Hadoop 分布式計算的優(yōu)勢,并沒有什么實際意義,但對程序的測試及調試很有幫助,我們先從這兩種模式入手,了解基于 Hadoop 的分布式并行程序是如何編寫和運行的。
單機(非分布式)模式
這種模式在一臺單機上運行,沒有分布式文件系統(tǒng),而是直接讀寫本地操作系統(tǒng)的文件系統(tǒng)。
代碼清單1
$ cd /cygdrive/c/hadoop-0.16.0
$ mkdir test-in
$ cd test-in
#在 test-in 目錄下創(chuàng)建兩個文本文件, WordCount 程序將統(tǒng)計其中各個單詞出現(xiàn)次數(shù)
$ echo "hello world bye world" >file1.txt
$ echo "hello hadoop goodbye hadoop" >file2.txt
$ cd ..
$ bin/hadoop jar hadoop-0.16.0-examples.jar wordcount test-in test-out
#執(zhí)行完畢,下面查看執(zhí)行結果:
$ cd test-out
$ cat part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2
注意事項:運行 bin/hadoop jar hadoop-0.16.0-examples.jar wordcount test-in test-out 時,務必注意第一個參數(shù)是 jar, 不是 -jar, 當你用 -jar 時,不會告訴你是參數(shù)錯了,報告出來的錯誤信息是:Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/util/ProgramDriver, 筆者當時以為是 classpath 的設置問題,浪費了不少時間。通過分析 bin/hadoop 腳本可知,-jar 并不是 bin/hadoop 腳本定義的參數(shù),此腳本會把 -jar 作為 Java 的參數(shù),Java 的-jar 參數(shù)表示執(zhí)行一個 Jar 文件(這個 Jar 文件必須是一個可執(zhí)行的 Jar,即在 MANIFEST 中定義了主類), 此時外部定義的 classpath 是不起作用的,因而會拋出 java.lang.NoClassDefFoundError 異常。而 jar 是 bin/hadoop 腳本定義的參數(shù),會調用 Hadoop 自己的一個工具類 RunJar,這個工具類也能夠執(zhí)行一個 Jar 文件,并且外部定義的 classpath 有效。
偽分布式運行模式
這種模式也是在一臺單機上運行,但用不同的 Java 進程模仿分布式運行中的各類結點 ( NameNode, DataNode, JobTracker, TaskTracker, Secondary NameNode ),請注意分布式運行中的這幾個結點的區(qū)別:
從分布式存儲的角度來說,集群中的結點由一個 NameNode 和若干個 DataNode 組成, 另有一個 Secondary NameNode 作為 NameNode 的備份。 從分布式應用的角度來說,集群中的結點由一個 JobTracker 和若干個 TaskTracker 組成,JobTracker 負責任務的調度,TaskTracker 負責并行執(zhí)行任務。TaskTracker 必須運行在 DataNode 上,這樣便于數(shù)據(jù)的本地計算。JobTracker 和 NameNode 則無須在同一臺機器上。
(1) 按代碼清單2修改 conf/hadoop-site.xml。注意 conf/hadoop-default.xml 中是 Hadoop 缺省的參數(shù),你可以通過讀此文件了解 Hadoop 中有哪些參數(shù)可供配置,但不要修改此文件。可通過修改 conf/hadoop-site.xml 改變缺省參數(shù)值,此文件中設置的參數(shù)值會覆蓋 conf/hadoop-default.xml 的同名參數(shù)。
代碼清單 2
<configuration>
<property>
<name>fs.default.name</name>
<value>localhost:9000</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
參數(shù) fs.default.name 指定 NameNode 的 IP 地址和端口號。缺省值是 file:///, 表示使用本地文件系統(tǒng), 用于單機非分布式模式。此處我們指定使用運行于本機 localhost 上的 NameNode。
參數(shù) mapred.job.tracker 指定 JobTracker 的 IP 地址和端口號。缺省值是 local, 表示在本地同一 Java 進程內執(zhí)行 JobTracker 和 TaskTracker, 用于單機非分布式模式。此處我們指定使用運行于本機 localhost 上的 JobTracker ( 用一個單獨的 Java 進程做 JobTracker )。
參數(shù) dfs.replication 指定 HDFS 中每個 Block 被復制的次數(shù),起數(shù)據(jù)冗余備份的作用。 在典型的生產系統(tǒng)中,這個數(shù)常常設置為3。
(2)配置 SSH,如代碼清單3所示:
代碼清單 3
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
配置完后,執(zhí)行一下 ssh localhost, 確認你的機器可以用 SSH 連接,并且連接時不需要手工輸入密碼。
(3)格式化一個新的分布式文件系統(tǒng), 如代碼清單4所示:
$ cd /cygdrive/c/hadoop-0.16.0
$ bin/hadoop namenode –format
(4) 啟動 hadoop 進程, 如代碼清單5所示。控制臺上的輸出信息應該顯示啟動了 namenode, datanode, secondary namenode, jobtracker, tasktracker。啟動完成之后,通過 ps –ef 應該可以看到啟動了5個新的 java 進程。
代碼清單 5
$ bin/start-all.sh
$ ps –ef
(5) 運行 wordcount 應用, 如代碼清單6所示:
$ bin/hadoop dfs -put ./test-in input
#將本地文件系統(tǒng)上的 ./test-in 目錄拷到 HDFS 的根目錄上,目錄名改為 input
#執(zhí)行 bin/hadoop dfs –help 可以學習各種 HDFS 命令的使用。
$ bin/hadoop jar hadoop-0.16.0-examples.jar wordcount input output
#查看執(zhí)行結果:
#將文件從 HDFS 拷到本地文件系統(tǒng)中再查看:
$ bin/hadoop dfs -get output output
$ cat output/*
#也可以直接查看
$ bin/hadoop dfs -cat output/*
$ bin/stop-all.sh #停止 hadoop 進程
故障診斷
(1) 執(zhí)行 $ bin/start-all.sh 啟動 Hadoop 進程后,會啟動5個 java 進程, 同時會在 /tmp 目錄下創(chuàng)建五個 pid 文件記錄這些進程 ID 號。通過這五個文件,可以得知 namenode, datanode, secondary namenode, jobtracker, tasktracker 分別對應于哪一個 Java 進程。當你覺得 Hadoop 工作不正常時,可以首先查看這5個 java 進程是否在正常運行。
(2) 使用 web 接口。訪問 http://localhost:50030 可以查看 JobTracker 的運行狀態(tài)。訪問 http://localhost:50060 可以查看 TaskTracker 的運行狀態(tài)。訪問 http://localhost:50070 可以查看 NameNode 以及整個分布式文件系統(tǒng)的狀態(tài),瀏覽分布式文件系統(tǒng)中的文件以及 log 等。
(3) 查看 ${HADOOP_HOME}/logs 目錄下的 log 文件,namenode, datanode, secondary namenode, jobtracker, tasktracker 各有一個對應的 log 文件,每一次運行的計算任務也有對應用 log 文件。分析這些 log 文件有助于找到故障原因。
結束語
現(xiàn)在,你已經了解了 MapReduce 計算模型,分布式文件系統(tǒng) HDFS,分布式并行計算等的基本原理, 并且有了一個可以運行的 Hadoop 環(huán)境,運行了一個基于 Hadoop 的并行程序。在下一篇文章中,你將了解到如何針對一個具體的計算任務,基于 Hadoop 編寫自己的分布式并行程序并將其部署運行等內容。
參考資料
學習
posted on 2010-01-28 13:50
小王 閱讀(2621)
評論(1) 編輯 收藏 引用 所屬分類:
分布式系統(tǒng)