• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            The Way of C++

              C++博客 :: 首頁 :: 聯(lián)系 :: 聚合  :: 管理
              55 Posts :: 0 Stories :: 19 Comments :: 0 Trackbacks

            公告

            The first time i use this blog, i will write something that i learn which i think is worth write down.

            常用鏈接

            留言簿(3)

            我參與的團隊

            搜索

            •  

            最新評論

            閱讀排行榜

            評論排行榜

            目的
            這篇教程從用戶的角度出發(fā),全面地介紹了Hadoop Map/Reduce框架的各個方面。

            先決條件
            請先確認Hadoop被正確安裝、配置和正常運行中。更多信息見:

            Hadoop快速入門對初次使用者。
            Hadoop集群搭建對大規(guī)模分布式集群。
            概述
            Hadoop Map/Reduce是一個使用簡易的軟件框架,基于它寫出來的應(yīng)用程序能夠運行在由上千個商用機器組成的大型集群上,并以一種可靠容錯的方式并行處理上T級別的數(shù)據(jù)集。

            一個Map/Reduce 作業(yè)(job) 通常會把輸入的數(shù)據(jù)集切分為若干獨立的數(shù)據(jù)塊,由 map任務(wù)(task)以完全并行的方式處理它們。框架會對map的輸出先進行排序, 然后把結(jié)果輸入給reduce任務(wù)。通常作業(yè)的輸入和輸出都會被存儲在文件系統(tǒng)中。 整個框架負責(zé)任務(wù)的調(diào)度和監(jiān)控,以及重新執(zhí)行已經(jīng)失敗的任務(wù)。

            通常,Map/Reduce框架和分布式文件系統(tǒng)是運行在一組相同的節(jié)點上的,也就是說,計算節(jié)點和存儲節(jié)點通常在一起。這種配置允許框架在那些已經(jīng)存好數(shù)據(jù)的節(jié)點上高效地調(diào)度任務(wù),這可以使整個集群的網(wǎng)絡(luò)帶寬被非常高效地利用。

            Map/Reduce框架由一個單獨的master JobTracker 和每個集群節(jié)點一個slave TaskTracker共同組成。master負責(zé)調(diào)度構(gòu)成一個作業(yè)的所有任務(wù),這些任務(wù)分布在不同的slave上,master監(jiān)控它們的執(zhí)行,重新執(zhí)行已經(jīng)失敗的任務(wù)。而slave僅負責(zé)執(zhí)行由master指派的任務(wù)。

            應(yīng)用程序至少應(yīng)該指明輸入/輸出的位置(路徑),并通過實現(xiàn)合適的接口或抽象類提供map和reduce函數(shù)。再加上其他作業(yè)的參數(shù),就構(gòu)成了作業(yè)配置(job configuration)。然后,Hadoop的 job client提交作業(yè)(jar包/可執(zhí)行程序等)和配置信息給JobTracker,后者負責(zé)分發(fā)這些軟件和配置信息給slave、調(diào)度任務(wù)并監(jiān)控它們的執(zhí)行,同時提供狀態(tài)和診斷信息給job-client。

            雖然Hadoop框架是用JavaTM實現(xiàn)的,但Map/Reduce應(yīng)用程序則不一定要用 Java來寫 。

            Hadoop Streaming是一種運行作業(yè)的實用工具,它允許用戶創(chuàng)建和運行任何可執(zhí)行程序 (例如:Shell工具)來做為mapper和reducer。
            Hadoop Pipes是一個與SWIG兼容的C++ API (沒有基于JNITM技術(shù)),它也可用于實現(xiàn)Map/Reduce應(yīng)用程序。
            輸入與輸出
            Map/Reduce框架運轉(zhuǎn)在<key, value> 鍵值對上,也就是說, 框架把作業(yè)的輸入看為是一組<key, value> 鍵值對,同樣也產(chǎn)出一組 <key, value> 鍵值對做為作業(yè)的輸出,這兩組鍵值對的類型可能不同。

            框架需要對key和value的類(classes)進行序列化操作, 因此,這些類需要實現(xiàn) Writable接口。 另外,為了方便框架執(zhí)行排序操作,key類必須實現(xiàn) WritableComparable接口。

            一個Map/Reduce 作業(yè)的輸入和輸出類型如下所示:

            (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

            例子:WordCount v1.0
            在深入細節(jié)之前,讓我們先看一個Map/Reduce的應(yīng)用示例,以便對它們的工作方式有一個初步的認識。

            WordCount是一個簡單的應(yīng)用,它可以計算出指定數(shù)據(jù)集中每一個單詞出現(xiàn)的次數(shù)。

            這個應(yīng)用適用于 單機模式, 偽分布式模式 或 完全分布式模式 三種Hadoop安裝方式。

            源代碼
              WordCount.java
            1. package org.myorg;
            2.  
            3. import java.io.IOException;
            4. import java.util.*;
            5.  
            6. import org.apache.hadoop.fs.Path;
            7. import org.apache.hadoop.conf.*;
            8. import org.apache.hadoop.io.*;
            9. import org.apache.hadoop.mapred.*;
            10. import org.apache.hadoop.util.*;
            11.  
            12. public class WordCount {
            13.  
            14.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
            15.      private final static IntWritable one = new IntWritable(1);
            16.      private Text word = new Text();
            17.  
            18.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            19.        String line = value.toString();
            20.        StringTokenizer tokenizer = new StringTokenizer(line);
            21.        while (tokenizer.hasMoreTokens()) {
            22.          word.set(tokenizer.nextToken());
            23.          output.collect(word, one);
            24.        }
            25.      }
            26.    }
            27.  
            28.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            29.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            30.        int sum = 0;
            31.        while (values.hasNext()) {
            32.          sum += values.next().get();
            33.        }
            34.        output.collect(key, new IntWritable(sum));
            35.      }
            36.    }
            37.  
            38.    public static void main(String[] args) throws Exception {
            39.      JobConf conf = new JobConf(WordCount.class);
            40.      conf.setJobName("wordcount");
            41.  
            42.      conf.setOutputKeyClass(Text.class);
            43.      conf.setOutputValueClass(IntWritable.class);
            44.  
            45.      conf.setMapperClass(Map.class);
            46.      conf.setCombinerClass(Reduce.class);
            47.      conf.setReducerClass(Reduce.class);
            48.  
            49.      conf.setInputFormat(TextInputFormat.class);
            50.      conf.setOutputFormat(TextOutputFormat.class);
            51.  
            52.      FileInputFormat.setInputPaths(conf, new Path(args[0]));
            53.      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
            54.  
            55.      JobClient.runJob(conf);
            57.    }
            58. }
            59.  

            用法
            假設(shè)環(huán)境變量HADOOP_HOME對應(yīng)安裝時的根目錄,HADOOP_VERSION對應(yīng)Hadoop的當(dāng)前安裝版本,編譯WordCount.java來創(chuàng)建jar包,可如下操作:

            $ mkdir wordcount_classes
            $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java
            $ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .

            假設(shè):

            /usr/joe/wordcount/input - 是HDFS中的輸入路徑
            /usr/joe/wordcount/output - 是HDFS中的輸出路徑
            用示例文本文件做為輸入:

            $ bin/hadoop dfs -ls /usr/joe/wordcount/input/
            /usr/joe/wordcount/input/file01
            /usr/joe/wordcount/input/file02

            $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
            Hello World Bye World

            $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
            Hello Hadoop Goodbye Hadoop

            運行應(yīng)用程序:

            $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

            輸出是:

            $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
            Bye 1
            Goodbye 1
            Hadoop 2
            Hello 2
            World 2

            應(yīng)用程序能夠使用-files選項來指定一個由逗號分隔的路徑列表,這些路徑是task的當(dāng)前工作目錄。使用選項-libjars可以向map和reduce的classpath中添加jar包。使用-archives選項程序可以傳遞檔案文件做為參數(shù),這些檔案文件會被解壓并且在task的當(dāng)前工作目錄下會創(chuàng)建一個指向解壓生成的目錄的符號鏈接(以壓縮包的名字命名)。 有關(guān)命令行選項的更多細節(jié)請參考 Commands manual。

            使用-libjars和-files運行wordcount例子:
            hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output

            解釋
            WordCount應(yīng)用程序非常直截了當(dāng)。

            Mapper(14-26行)中的map方法(18-25行)通過指定的 TextInputFormat(49行)一次處理一行。然后,它通過StringTokenizer 以空格為分隔符將一行切分為若干tokens,之后,輸出< <word>, 1> 形式的鍵值對。

            對于示例中的第一個輸入,map輸出是:
            < Hello, 1>
            < World, 1>
            < Bye, 1>
            < World, 1>

            第二個輸入,map輸出是:
            < Hello, 1>
            < Hadoop, 1>
            < Goodbye, 1>
            < Hadoop, 1>

            關(guān)于組成一個指定作業(yè)的map數(shù)目的確定,以及如何以更精細的方式去控制這些map,我們將在教程的后續(xù)部分學(xué)習(xí)到更多的內(nèi)容。

            WordCount還指定了一個combiner (46行)。因此,每次map運行之后,會對輸出按照key進行排序,然后把輸出傳遞給本地的combiner(按照作業(yè)的配置與Reducer一樣),進行本地聚合。

            第一個map的輸出是:
            < Bye, 1>
            < Hello, 1>
            < World, 2>

            第二個map的輸出是:
            < Goodbye, 1>
            < Hadoop, 2>
            < Hello, 1>

            Reducer(28-36行)中的reduce方法(29-35行) 僅是將每個key(本例中就是單詞)出現(xiàn)的次數(shù)求和。

            因此這個作業(yè)的輸出就是:
            < Bye, 1>
            < Goodbye, 1>
            < Hadoop, 2>
            < Hello, 2>
            < World, 2>

            代碼中的run方法中指定了作業(yè)的幾個方面, 例如:通過命令行傳遞過來的輸入/輸出路徑、key/value的類型、輸入/輸出的格式等等JobConf中的配置信息。隨后程序調(diào)用了JobClient.runJob(55行)來提交作業(yè)并且監(jiān)控它的執(zhí)行。

            我們將在本教程的后續(xù)部分學(xué)習(xí)更多的關(guān)于JobConf, JobClient, Tool和其他接口及類(class)。

            Map/Reduce - 用戶界面
            這部分文檔為用戶將會面臨的Map/Reduce框架中的各個環(huán)節(jié)提供了適當(dāng)?shù)募毠?jié)。這應(yīng)該會幫助用戶更細粒度地去實現(xiàn)、配置和調(diào)優(yōu)作業(yè)。然而,請注意每個類/接口的javadoc文檔提供最全面的文檔;本文只是想起到指南的作用。

            我們會先看看Mapper和Reducer接口。應(yīng)用程序通常會通過提供map和reduce方法來實現(xiàn)它們。

            然后,我們會討論其他的核心接口,其中包括: JobConf,JobClient,Partitioner, OutputCollector,Reporter, InputFormat,OutputFormat等等。

            最后,我們將通過討論框架中一些有用的功能點(例如:DistributedCache, IsolationRunner等等)來收尾。

            核心功能描述
            應(yīng)用程序通常會通過提供map和reduce來實現(xiàn) Mapper和Reducer接口,它們組成作業(yè)的核心。

            Mapper
            Mapper將輸入鍵值對(key/value pair)映射到一組中間格式的鍵值對集合。

            Map是一類將輸入記錄集轉(zhuǎn)換為中間格式記錄集的獨立任務(wù)。 這種轉(zhuǎn)換的中間格式記錄集不需要與輸入記錄集的類型一致。一個給定的輸入鍵值對可以映射成0個或多個輸出鍵值對。

            Hadoop Map/Reduce框架為每一個InputSplit產(chǎn)生一個map任務(wù),而每個InputSplit是由該作業(yè)的InputFormat產(chǎn)生的。

            概括地說,對Mapper的實現(xiàn)者需要重寫 JobConfigurable.configure(JobConf)方法,這個方法需要傳遞一個JobConf參數(shù),目的是完成Mapper的初始化工作。然后,框架為這個任務(wù)的InputSplit中每個鍵值對調(diào)用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作。應(yīng)用程序可以通過重寫Closeable.close()方法來執(zhí)行相應(yīng)的清理工作。

            輸出鍵值對不需要與輸入鍵值對的類型一致。一個給定的輸入鍵值對可以映射成0個或多個輸出鍵值對。通過調(diào)用 OutputCollector.collect(WritableComparable,Writable)可以收集輸出的鍵值對。

            應(yīng)用程序可以使用Reporter報告進度,設(shè)定應(yīng)用級別的狀態(tài)消息,更新Counters(計數(shù)器),或者僅是表明自己運行正常。

            框架隨后會把與一個特定key關(guān)聯(lián)的所有中間過程的值(value)分成組,然后把它們傳給Reducer以產(chǎn)出最終的結(jié)果。用戶可以通過 JobConf.setOutputKeyComparatorClass(Class)來指定具體負責(zé)分組的 Comparator。

            Mapper的輸出被排序后,就被劃分給每個Reducer。分塊的總數(shù)目和一個作業(yè)的reduce任務(wù)的數(shù)目是一樣的。用戶可以通過實現(xiàn)自定義的 Partitioner來控制哪個key被分配給哪個 Reducer。

            用戶可選擇通過 JobConf.setCombinerClass(Class)指定一個combiner,它負責(zé)對中間過程的輸出進行本地的聚集,這會有助于降低從Mapper到 Reducer數(shù)據(jù)傳輸量。

            這些被排好序的中間過程的輸出結(jié)果保存的格式是(key-len, key, value-len, value),應(yīng)用程序可以通過JobConf控制對這些中間結(jié)果是否進行壓縮以及怎么壓縮,使用哪種 CompressionCodec。

            需要多少個Map?
            Map的數(shù)目通常是由輸入數(shù)據(jù)的大小決定的,一般就是所有輸入文件的總塊(block)數(shù)。

            Map正常的并行規(guī)模大致是每個節(jié)點(node)大約10到100個map,對于CPU 消耗較小的map任務(wù)可以設(shè)到300個左右。由于每個任務(wù)初始化需要一定的時間,因此,比較合理的情況是map執(zhí)行的時間至少超過1分鐘。

            這樣,如果你輸入10TB的數(shù)據(jù),每個塊(block)的大小是128MB,你將需要大約82,000個map來完成任務(wù),除非使用 setNumMapTasks(int)(注意:這里僅僅是對框架進行了一個提示(hint),實際決定因素見這里)將這個數(shù)值設(shè)置得更高。

            Reducer
            Reducer將與一個key關(guān)聯(lián)的一組中間數(shù)值集歸約(reduce)為一個更小的數(shù)值集。

            用戶可以通過 JobConf.setNumReduceTasks(int)設(shè)定一個作業(yè)中reduce任務(wù)的數(shù)目。

            概括地說,對Reducer的實現(xiàn)者需要重寫 JobConfigurable.configure(JobConf)方法,這個方法需要傳遞一個JobConf參數(shù),目的是完成Reducer的初始化工作。然后,框架為成組的輸入數(shù)據(jù)中的每個<key, (list of values)>對調(diào)用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。之后,應(yīng)用程序可以通過重寫Closeable.close()來執(zhí)行相應(yīng)的清理工作。

            Reducer有3個主要階段:shuffle、sort和reduce。

            Shuffle
            Reducer的輸入就是Mapper已經(jīng)排好序的輸出。在這個階段,框架通過HTTP為每個Reducer獲得所有Mapper輸出中與之相關(guān)的分塊。

            Sort
            這個階段,框架將按照key的值對Reducer的輸入進行分組 (因為不同mapper的輸出中可能會有相同的key)。

            Shuffle和Sort兩個階段是同時進行的;map的輸出也是一邊被取回一邊被合并的。

            Secondary Sort
            如果需要中間過程對key的分組規(guī)則和reduce前對key的分組規(guī)則不同,那么可以通過 JobConf.setOutputValueGroupingComparator(Class)來指定一個Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用于控制中間過程的key如何被分組,所以結(jié)合兩者可以實現(xiàn)按值的二次排序。

            Reduce
            在這個階段,框架為已分組的輸入數(shù)據(jù)中的每個 <key, (list of values)>對調(diào)用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。

            Reduce任務(wù)的輸出通常是通過調(diào)用 OutputCollector.collect(WritableComparable, Writable)寫入 文件系統(tǒng)的。

            應(yīng)用程序可以使用Reporter報告進度,設(shè)定應(yīng)用程序級別的狀態(tài)消息,更新Counters(計數(shù)器),或者僅是表明自己運行正常。

            Reducer的輸出是沒有排序的。

            需要多少個Reduce?
            Reduce的數(shù)目建議是0.95或1.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。

            用0.95,所有reduce可以在maps一完成時就立刻啟動,開始傳輸map的輸出結(jié)果。用1.75,速度快的節(jié)點可以在完成第一輪reduce任務(wù)后,可以開始第二輪,這樣可以得到比較好的負載均衡的效果。

            增加reduce的數(shù)目會增加整個框架的開銷,但可以改善負載均衡,降低由于執(zhí)行失敗帶來的負面影響。

            上述比例因子比整體數(shù)目稍小一些是為了給框架中的推測性任務(wù)(speculative-tasks) 或失敗的任務(wù)預(yù)留一些reduce的資源。

            無Reducer
            如果沒有歸約要進行,那么設(shè)置reduce任務(wù)的數(shù)目為零是合法的。

            這種情況下,map任務(wù)的輸出會直接被寫入由 setOutputPath(Path)指定的輸出路徑。框架在把它們寫入FileSystem之前沒有對它們進行排序。

            Partitioner
            Partitioner用于劃分鍵值空間(key space)。

            Partitioner負責(zé)控制map輸出結(jié)果key的分割。Key(或者一個key子集)被用于產(chǎn)生分區(qū),通常使用的是Hash函數(shù)。分區(qū)的數(shù)目與一個作業(yè)的reduce任務(wù)的數(shù)目是一樣的。因此,它控制將中間過程的key(也就是這條記錄)應(yīng)該發(fā)送給m個reduce任務(wù)中的哪一個來進行reduce操作。

            HashPartitioner是默認的 Partitioner。

            Reporter
            Reporter是用于Map/Reduce應(yīng)用程序報告進度,設(shè)定應(yīng)用級別的狀態(tài)消息, 更新Counters(計數(shù)器)的機制。

            Mapper和Reducer的實現(xiàn)可以利用Reporter 來報告進度,或者僅是表明自己運行正常。在那種應(yīng)用程序需要花很長時間處理個別鍵值對的場景中,這種機制是很關(guān)鍵的,因為框架可能會以為這個任務(wù)超時了,從而將它強行殺死。另一個避免這種情況發(fā)生的方式是,將配置參數(shù)mapred.task.timeout設(shè)置為一個足夠高的值(或者干脆設(shè)置為零,則沒有超時限制了)。

            應(yīng)用程序可以用Reporter來更新Counter(計數(shù)器)。

            OutputCollector
            OutputCollector是一個Map/Reduce框架提供的用于收集 Mapper或Reducer輸出數(shù)據(jù)的通用機制 (包括中間輸出結(jié)果和作業(yè)的輸出結(jié)果)。

            Hadoop Map/Reduce框架附帶了一個包含許多實用型的mapper、reducer和partitioner 的類庫。

            作業(yè)配置
            JobConf代表一個Map/Reduce作業(yè)的配置。

            JobConf是用戶向Hadoop框架描述一個Map/Reduce作業(yè)如何執(zhí)行的主要接口。框架會按照JobConf描述的信息忠實地去嘗試完成這個作業(yè),然而:

            一些參數(shù)可能會被管理者標記為 final,這意味它們不能被更改。
            一些作業(yè)的參數(shù)可以被直截了當(dāng)?shù)剡M行設(shè)置(例如: setNumReduceTasks(int)),而另一些參數(shù)則與框架或者作業(yè)的其他參數(shù)之間微妙地相互影響,并且設(shè)置起來比較復(fù)雜(例如: setNumMapTasks(int))。
            通常,JobConf會指明Mapper、Combiner(如果有的話)、 Partitioner、Reducer、InputFormat和 OutputFormat的具體實現(xiàn)。JobConf還能指定一組輸入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及輸出文件應(yīng)該寫在哪兒 (setOutputPath(Path))。

            JobConf可選擇地對作業(yè)設(shè)置一些高級選項,例如:設(shè)置Comparator; 放到DistributedCache上的文件;中間結(jié)果或者作業(yè)輸出結(jié)果是否需要壓縮以及怎么壓縮; 利用用戶提供的腳本(setMapDebugScript(String)/setReduceDebugScript(String)) 進行調(diào)試;作業(yè)是否允許預(yù)防性(speculative)任務(wù)的執(zhí)行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;每個任務(wù)最大的嘗試次數(shù) (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一個作業(yè)能容忍的任務(wù)失敗的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) ;等等。

            當(dāng)然,用戶能使用 set(String, String)/get(String, String) 來設(shè)置或者取得應(yīng)用程序需要的任意參數(shù)。然而,DistributedCache的使用是面向大規(guī)模只讀數(shù)據(jù)的。

            任務(wù)的執(zhí)行和環(huán)境
            TaskTracker是在一個單獨的jvm上以子進程的形式執(zhí)行 Mapper/Reducer任務(wù)(Task)的。

            子任務(wù)會繼承父TaskTracker的環(huán)境。用戶可以通過JobConf中的 mapred.child.java.opts配置參數(shù)來設(shè)定子jvm上的附加選項,例如: 通過-Djava.library.path=<> 將一個非標準路徑設(shè)為運行時的鏈接用以搜索共享庫,等等。如果mapred.child.java.opts包含一個符號@taskid@, 它會被替換成map/reduce的taskid的值。

            下面是一個包含多個參數(shù)和替換的例子,其中包括:記錄jvm GC日志; JVM JMX代理程序以無密碼的方式啟動,這樣它就能連接到j(luò)console上,從而可以查看子進程的內(nèi)存和線程,得到線程的dump;還把子jvm的最大堆尺寸設(shè)置為512MB, 并為子jvm的java.library.path添加了一個附加路徑。

            <property>
              <name>mapred.child.java.opts</name>
              <value>
                 -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
                 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
              </value>
            </property>

            用戶或管理員也可以使用mapred.child.ulimit設(shè)定運行的子任務(wù)的最大虛擬內(nèi)存。mapred.child.ulimit的值以(KB)為單位,并且必須大于或等于-Xmx參數(shù)傳給JavaVM的值,否則VM會無法啟動。

            注意:mapred.child.java.opts只用于設(shè)置task tracker啟動的子任務(wù)。為守護進程設(shè)置內(nèi)存選項請查看 cluster_setup.html

            ${mapred.local.dir}/taskTracker/是task tracker的本地目錄, 用于創(chuàng)建本地緩存和job。它可以指定多個目錄(跨越多個磁盤),文件會半隨機的保存到本地路徑下的某個目錄。當(dāng)job啟動時,task tracker根據(jù)配置文檔創(chuàng)建本地job目錄,目錄結(jié)構(gòu)如以下所示:

            ${mapred.local.dir}/taskTracker/archive/ :分布式緩存。這個目錄保存本地的分布式緩存。因此本地分布式緩存是在所有task和job間共享的。
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目錄。
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享目錄。各個任務(wù)可以使用這個空間做為暫存空間,用于它們之間共享文件。這個目錄通過job.local.dir 參數(shù)暴露給用戶。這個路徑可以通過API JobConf.getJobLocalDir()來訪問。它也可以被做為系統(tǒng)屬性獲得。因此,用戶(比如運行streaming)可以調(diào)用System.getProperty("job.local.dir")獲得該目錄。
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路徑,用于存放作業(yè)的jar文件和展開的jar。job.jar是應(yīng)用程序的jar文件,它會被自動分發(fā)到各臺機器,在task啟動前會被自動展開。使用api JobConf.getJar() 函數(shù)可以得到j(luò)ob.jar的位置。使用JobConf.getJar().getParent()可以訪問存放展開的jar包的目錄。
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一個job.xml文件,本地的通用的作業(yè)配置文件。
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每個任務(wù)有一個目錄task-id,它里面有如下的目錄結(jié)構(gòu):
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一個job.xml文件,本地化的任務(wù)作業(yè)配置文件。任務(wù)本地化是指為該task設(shè)定特定的屬性值。這些值會在下面具體說明。
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 一個存放中間過程的輸出文件的目錄。它保存了由framwork產(chǎn)生的臨時map reduce數(shù)據(jù),比如map的輸出文件等。
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的當(dāng)前工作目錄。
            ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的臨時目錄。(用戶可以設(shè)定屬性mapred.child.tmp 來為map和reduce task設(shè)定臨時目錄。缺省值是./tmp。如果這個值不是絕對路徑, 它會把task的工作路徑加到該路徑前面作為task的臨時文件路徑。如果這個值是絕對路徑則直接使用這個值。 如果指定的目錄不存在,會自動創(chuàng)建該目錄。之后,按照選項 -Djava.io.tmpdir='臨時文件的絕對路徑'執(zhí)行java子任務(wù)。 pipes和streaming的臨時文件路徑是通過環(huán)境變量TMPDIR='the absolute path of the tmp dir'設(shè)定的)。 如果mapred.child.tmp有./tmp值,這個目錄會被創(chuàng)建。
            下面的屬性是為每個task執(zhí)行時使用的本地參數(shù),它們保存在本地化的任務(wù)作業(yè)配置文件里:

            名稱 類型 描述
            mapred.job.id String job id
            mapred.jar String job目錄下job.jar的位置
            job.local.dir String job指定的共享存儲空間
            mapred.tip.id String task id
            mapred.task.id String task嘗試id
            mapred.task.is.map boolean 是否是map task
            mapred.task.partition int task在job中的id
            map.input.file String map讀取的文件名
            map.input.start long map輸入的數(shù)據(jù)塊的起始位置偏移
            map.input.length long map輸入的數(shù)據(jù)塊的字節(jié)數(shù)
            mapred.work.output.dir String task臨時輸出目錄

            task的標準輸出和錯誤輸出流會被讀到TaskTracker中,并且記錄到 ${HADOOP_LOG_DIR}/userlogs

            DistributedCache 可用于map或reduce task中分發(fā)jar包和本地庫。子jvm總是把 當(dāng)前工作目錄 加到 java.library.path 和 LD_LIBRARY_PATH。 因此,可以通過 System.loadLibrary或 System.load裝載緩存的庫。有關(guān)使用分布式緩存加載共享庫的細節(jié)請參考 native_libraries.html

            作業(yè)的提交與監(jiān)控
            JobClient是用戶提交的作業(yè)與JobTracker交互的主要接口。

            JobClient 提供提交作業(yè),追蹤進程,訪問子任務(wù)的日志記錄,獲得Map/Reduce集群狀態(tài)信息等功能。

            作業(yè)提交過程包括:

            檢查作業(yè)輸入輸出樣式細節(jié)
            為作業(yè)計算InputSplit值。
            如果需要的話,為作業(yè)的DistributedCache建立必須的統(tǒng)計信息。
            拷貝作業(yè)的jar包和配置文件到FileSystem上的Map/Reduce系統(tǒng)目錄下。
            提交作業(yè)到JobTracker并且監(jiān)控它的狀態(tài)。
            作業(yè)的歷史文件記錄到指定目錄的"_logs/history/"子目錄下。這個指定目錄由hadoop.job.history.user.location設(shè)定,默認是作業(yè)輸出的目錄。因此默認情況下,文件會存放在mapred.output.dir/_logs/history目錄下。用戶可以設(shè)置hadoop.job.history.user.location為none來停止日志記錄。

            用戶使用下面的命令可以看到在指定目錄下的歷史日志記錄的摘要。
            $ bin/hadoop job -history output-dir
            這個命令會打印出作業(yè)的細節(jié),以及失敗的和被殺死的任務(wù)細節(jié)。
            要查看有關(guān)作業(yè)的更多細節(jié)例如成功的任務(wù)、每個任務(wù)嘗試的次數(shù)(task attempt)等,可以使用下面的命令
            $ bin/hadoop job -history all output-dir

            用戶可以使用 OutputLogFilter 從輸出目錄列表中篩選日志文件。

            一般情況,用戶利用JobConf創(chuàng)建應(yīng)用程序并配置作業(yè)屬性, 然后用 JobClient 提交作業(yè)并監(jiān)視它的進程。

            作業(yè)的控制
            有時候,用一個單獨的Map/Reduce作業(yè)并不能完成一個復(fù)雜的任務(wù),用戶也許要鏈接多個Map/Reduce作業(yè)才行。這是容易實現(xiàn)的,因為作業(yè)通常輸出到分布式文件系統(tǒng)上的,所以可以把這個作業(yè)的輸出作為下一個作業(yè)的輸入實現(xiàn)串聯(lián)。

            然而,這也意味著,確保每一作業(yè)完成(成功或失敗)的責(zé)任就直接落在了客戶身上。在這種情況下,可以用的控制作業(yè)的選項有:

            runJob(JobConf):提交作業(yè),僅當(dāng)作業(yè)完成時返回。
            submitJob(JobConf):只提交作業(yè),之后需要你輪詢它返回的 RunningJob句柄的狀態(tài),并根據(jù)情況調(diào)度。
            JobConf.setJobEndNotificationURI(String):設(shè)置一個作業(yè)完成通知,可避免輪詢。
            作業(yè)的輸入
            InputFormat 為Map/Reduce作業(yè)描述輸入的細節(jié)規(guī)范。

            Map/Reduce框架根據(jù)作業(yè)的InputFormat來:

            檢查作業(yè)輸入的有效性。
            把輸入文件切分成多個邏輯InputSplit實例, 并把每一實例分別分發(fā)給一個 Mapper。
            提供RecordReader的實現(xiàn),這個RecordReader從邏輯InputSplit中獲得輸入記錄, 這些記錄將由Mapper處理。
            基于文件的InputFormat實現(xiàn)(通常是 FileInputFormat的子類) 默認行為是按照輸入文件的字節(jié)大小,把輸入數(shù)據(jù)切分成邏輯分塊(logical InputSplit )。 其中輸入文件所在的FileSystem的數(shù)據(jù)塊尺寸是分塊大小的上限。下限可以設(shè)置mapred.min.split.size 的值。

            考慮到邊界情況,對于很多應(yīng)用程序來說,很明顯按照文件大小進行邏輯分割是不能滿足需求的。 在這種情況下,應(yīng)用程序需要實現(xiàn)一個RecordReader來處理記錄的邊界并為每個任務(wù)提供一個邏輯分塊的面向記錄的視圖。

            TextInputFormat 是默認的InputFormat。

            如果一個作業(yè)的Inputformat是TextInputFormat, 并且框架檢測到輸入文件的后綴是.gz和.lzo,就會使用對應(yīng)的CompressionCodec自動解壓縮這些文件。 但是需要注意,上述帶后綴的壓縮文件不會被切分,并且整個壓縮文件會分給一個mapper來處理。

            InputSplit
            InputSplit 是一個單獨的Mapper要處理的數(shù)據(jù)塊。

            一般的InputSplit 是字節(jié)樣式輸入,然后由RecordReader處理并轉(zhuǎn)化成記錄樣式。

            FileSplit 是默認的InputSplit。 它把 map.input.file 設(shè)定為輸入文件的路徑,輸入文件是邏輯分塊文件。

            RecordReader
            RecordReader 從InputSlit讀入<key, value>對。

            一般的,RecordReader 把由InputSplit 提供的字節(jié)樣式的輸入文件,轉(zhuǎn)化成由Mapper處理的記錄樣式的文件。 因此RecordReader負責(zé)處理記錄的邊界情況和把數(shù)據(jù)表示成keys/values對形式。

            作業(yè)的輸出
            OutputFormat 描述Map/Reduce作業(yè)的輸出樣式。

            Map/Reduce框架根據(jù)作業(yè)的OutputFormat來:

            檢驗作業(yè)的輸出,例如檢查輸出路徑是否已經(jīng)存在。
            提供一個RecordWriter的實現(xiàn),用來輸出作業(yè)結(jié)果。 輸出文件保存在FileSystem上。
            TextOutputFormat是默認的 OutputFormat。

            任務(wù)的Side-Effect File
            在一些應(yīng)用程序中,子任務(wù)需要產(chǎn)生一些side-file,這些文件與作業(yè)實際輸出結(jié)果的文件不同。

            在這種情況下,同一個Mapper或者Reducer的兩個實例(比如預(yù)防性任務(wù))同時打開或者寫 FileSystem上的同一文件就會產(chǎn)生沖突。因此應(yīng)用程序在寫文件的時候需要為每次任務(wù)嘗試(不僅僅是每次任務(wù),每個任務(wù)可以嘗試執(zhí)行很多次)選取一個獨一無二的文件名(使用attemptid,例如task_200709221812_0001_m_000000_0)。

            為了避免沖突,Map/Reduce框架為每次嘗試執(zhí)行任務(wù)都建立和維護一個特殊的 ${mapred.output.dir}/_temporary/_${taskid}子目錄,這個目錄位于本次嘗試執(zhí)行任務(wù)輸出結(jié)果所在的FileSystem上,可以通過 ${mapred.work.output.dir}來訪問這個子目錄。 對于成功完成的任務(wù)嘗試,只有${mapred.output.dir}/_temporary/_${taskid}下的文件會移動到${mapred.output.dir}。當(dāng)然,框架會丟棄那些失敗的任務(wù)嘗試的子目錄。這種處理過程對于應(yīng)用程序來說是完全透明的。

            在任務(wù)執(zhí)行期間,應(yīng)用程序在寫文件時可以利用這個特性,比如 通過 FileOutputFormat.getWorkOutputPath()獲得${mapred.work.output.dir}目錄, 并在其下創(chuàng)建任意任務(wù)執(zhí)行時所需的side-file,框架在任務(wù)嘗試成功時會馬上移動這些文件,因此不需要在程序內(nèi)為每次任務(wù)嘗試選取一個獨一無二的名字。

            注意:在每次任務(wù)嘗試執(zhí)行期間,${mapred.work.output.dir} 的值實際上是 ${mapred.output.dir}/_temporary/_{$taskid},這個值是Map/Reduce框架創(chuàng)建的。 所以使用這個特性的方法是,在 FileOutputFormat.getWorkOutputPath() 路徑下創(chuàng)建side-file即可。

            對于只使用map不使用reduce的作業(yè),這個結(jié)論也成立。這種情況下,map的輸出結(jié)果直接生成到HDFS上。

            RecordWriter
            RecordWriter 生成<key, value> 對到輸出文件。

            RecordWriter的實現(xiàn)把作業(yè)的輸出結(jié)果寫到 FileSystem。

            其他有用的特性
            Counters
            Counters 是多個由Map/Reduce框架或者應(yīng)用程序定義的全局計數(shù)器。 每一個Counter可以是任何一種 Enum類型。同一特定Enum類型的Counter可以匯集到一個組,其類型為Counters.Group。

            應(yīng)用程序可以定義任意(Enum類型)的Counters并且可以通過 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架會匯總這些全局counters。

            DistributedCache
            DistributedCache 可將具體應(yīng)用相關(guān)的、大尺寸的、只讀的文件有效地分布放置。

            DistributedCache 是Map/Reduce框架提供的功能,能夠緩存應(yīng)用程序所需的文件 (包括文本,檔案文件,jar文件等)。

            應(yīng)用程序在JobConf中通過url(hdfs://)指定需要被緩存的文件。 DistributedCache假定由hdfs://格式url指定的文件已經(jīng)在 FileSystem上了。

            Map-Redcue框架在作業(yè)所有任務(wù)執(zhí)行之前會把必要的文件拷貝到slave節(jié)點上。 它運行高效是因為每個作業(yè)的文件只拷貝一次并且為那些沒有文檔的slave節(jié)點緩存文檔。

            DistributedCache 根據(jù)緩存文檔修改的時間戳進行追蹤。 在作業(yè)執(zhí)行期間,當(dāng)前應(yīng)用程序或者外部程序不能修改緩存文件。

            distributedCache可以分發(fā)簡單的只讀數(shù)據(jù)或文本文件,也可以分發(fā)復(fù)雜類型的文件例如歸檔文件和jar文件。歸檔文件(zip,tar,tgz和tar.gz文件)在slave節(jié)點上會被解檔(un-archived)。 這些文件可以設(shè)置執(zhí)行權(quán)限。

            用戶可以通過設(shè)置mapred.cache.{files|archives}來分發(fā)文件。 如果要分發(fā)多個文件,可以使用逗號分隔文件所在路徑。也可以利用API來設(shè)置該屬性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) 其中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通過命令行選項 -cacheFile/-cacheArchive 分發(fā)文件。

            用戶可以通過 DistributedCache.createSymlink(Configuration)方法讓DistributedCache 在當(dāng)前工作目錄下創(chuàng)建到緩存文件的符號鏈接。 或者通過設(shè)置配置文件屬性mapred.create.symlink為yes。 分布式緩存會截取URI的片段作為鏈接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 則在task當(dāng)前工作目錄會有名為lib.so的鏈接, 它會鏈接分布式緩存中的lib.so.1。

            DistributedCache可在map/reduce任務(wù)中作為 一種基礎(chǔ)軟件分發(fā)機制使用。它可以被用于分發(fā)jar包和本地庫(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能夠被用于 緩存文件和jar包,并把它們加入子jvm的classpath。也可以通過設(shè)置配置文檔里的屬性 mapred.job.classpath.{files|archives}達到相同的效果。緩存文件可用于分發(fā)和裝載本地庫。

            Tool
            Tool 接口支持處理常用的Hadoop命令行選項。

            Tool 是Map/Reduce工具或應(yīng)用的標準。應(yīng)用程序應(yīng)只處理其定制參數(shù), 要把標準命令行選項通過 ToolRunner.run(Tool, String[]) 委托給 GenericOptionsParser處理。

            Hadoop命令行的常用選項有:
            -conf <configuration file>
            -D <property=value>
            -fs <local|namenode:port>
            -jt <local|jobtracker:port>

            IsolationRunner
            IsolationRunner 是幫助調(diào)試Map/Reduce程序的工具。

            使用IsolationRunner的方法是,首先設(shè)置 keep.failed.tasks.files屬性為true (同時參考keep.tasks.files.pattern)。

            然后,登錄到任務(wù)運行失敗的節(jié)點上,進入 TaskTracker的本地路徑運行 IsolationRunner:
            $ cd <local path>/taskTracker/${taskid}/work
            $ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

            IsolationRunner會把失敗的任務(wù)放在單獨的一個能夠調(diào)試的jvm上運行,并且采用和之前完全一樣的輸入數(shù)據(jù)。

            Profiling
            Profiling是一個工具,它使用內(nèi)置的java profiler工具進行分析獲得(2-3個)map或reduce樣例運行分析報告。

            用戶可以通過設(shè)置屬性mapred.task.profile指定系統(tǒng)是否采集profiler信息。 利用api JobConf.setProfileEnabled(boolean)可以修改屬性值。如果設(shè)為true, 則開啟profiling功能。profiler信息保存在用戶日志目錄下。缺省情況,profiling功能是關(guān)閉的。

            如果用戶設(shè)定使用profiling功能,可以使用配置文檔里的屬性 mapred.task.profile.{maps|reduces} 設(shè)置要profile map/reduce task的范圍。設(shè)置該屬性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范圍的缺省值是0-2。

            用戶可以通過設(shè)定配置文檔里的屬性mapred.task.profile.params 來指定profiler配置參數(shù)。修改屬性要使用api JobConf.setProfileParams(String)。當(dāng)運行task時,如果字符串包含%s。 它會被替換成profileing的輸出文件名。這些參數(shù)會在命令行里傳遞到子JVM中。缺省的profiling 參數(shù)是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。

            調(diào)試
            Map/Reduce框架能夠運行用戶提供的用于調(diào)試的腳本程序。 當(dāng)map/reduce任務(wù)失敗時,用戶可以通過運行腳本在任務(wù)日志(例如任務(wù)的標準輸出、標準錯誤、系統(tǒng)日志以及作業(yè)配置文件)上做后續(xù)處理工作。用戶提供的調(diào)試腳本程序的標準輸出和標準錯誤會輸出為診斷文件。如果需要的話這些輸出結(jié)果也可以打印在用戶界面上。

            在接下來的章節(jié),我們討論如何與作業(yè)一起提交調(diào)試腳本。為了提交調(diào)試腳本, 首先要把這個腳本分發(fā)出去,而且還要在配置文件里設(shè)置。

            如何分發(fā)腳本文件:
            用戶要用 DistributedCache 機制來分發(fā)和鏈接腳本文件

            如何提交腳本:
            一個快速提交調(diào)試腳本的方法是分別為需要調(diào)試的map任務(wù)和reduce任務(wù)設(shè)置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 屬性的值。這些屬性也可以通過 JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API來設(shè)置。對于streaming, 可以分別為需要調(diào)試的map任務(wù)和reduce任務(wù)使用命令行選項-mapdebug 和 -reducedegug來提交調(diào)試腳本。

            腳本的參數(shù)是任務(wù)的標準輸出、標準錯誤、系統(tǒng)日志以及作業(yè)配置文件。在運行map/reduce失敗的節(jié)點上運行調(diào)試命令是:
            $script $stdout $stderr $syslog $jobconf

            Pipes 程序根據(jù)第五個參數(shù)獲得c++程序名。 因此調(diào)試pipes程序的命令是
            $script $stdout $stderr $syslog $jobconf $program

            默認行為
            對于pipes,默認的腳本會用gdb處理core dump, 打印 stack trace并且給出正在運行線程的信息。

            JobControl
            JobControl是一個工具,它封裝了一組Map/Reduce作業(yè)以及他們之間的依賴關(guān)系。

            數(shù)據(jù)壓縮
            Hadoop Map/Reduce框架為應(yīng)用程序的寫入文件操作提供壓縮工具,這些工具可以為map輸出的中間數(shù)據(jù)和作業(yè)最終輸出數(shù)據(jù)(例如reduce的輸出)提供支持。它還附帶了一些 CompressionCodec的實現(xiàn),比如實現(xiàn)了 zlib和lzo壓縮算法。 Hadoop同樣支持gzip文件格式。

            考慮到性能問題(zlib)以及Java類庫的缺失(lzo)等因素,Hadoop也為上述壓縮解壓算法提供本地庫的實現(xiàn)。更多的細節(jié)請參考 這里。

            中間輸出
            應(yīng)用程序可以通過 JobConf.setCompressMapOutput(boolean)api控制map輸出的中間結(jié)果,并且可以通過 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec。

            作業(yè)輸出
            應(yīng)用程序可以通過 FileOutputFormat.setCompressOutput(JobConf, boolean) api控制輸出是否需要壓縮并且可以使用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec。

            如果作業(yè)輸出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,來設(shè)定 SequenceFile.CompressionType (i.e.RECORD / BLOCK - 默認是RECORD)。

            例子:WordCount v2.0
            這里是一個更全面的WordCount例子,它使用了我們已經(jīng)討論過的很多Map/Reduce框架提供的功能。

            運行這個例子需要HDFS的某些功能,特別是 DistributedCache相關(guān)功能。因此這個例子只能運行在 偽分布式 或者 完全分布式模式的 Hadoop上。

            源代碼
              WordCount.java
            1. package org.myorg;
            2.  
            3. import java.io.*;
            4. import java.util.*;
            5.  
            6. import org.apache.hadoop.fs.Path;
            7. import org.apache.hadoop.filecache.DistributedCache;
            8. import org.apache.hadoop.conf.*;
            9. import org.apache.hadoop.io.*;
            10. import org.apache.hadoop.mapred.*;
            11. import org.apache.hadoop.util.*;
            12.  
            13. public class WordCount extends Configured implements Tool {
            14.  
            15.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
            16.  
            17.      static enum Counters { INPUT_WORDS }
            18.  
            19.      private final static IntWritable one = new IntWritable(1);
            20.      private Text word = new Text();
            21.  
            22.      private boolean caseSensitive = true;
            23.      private Set<String> patternsToSkip = new HashSet<String>();
            24.  
            25.      private long numRecords = 0;
            26.      private String inputFile;
            27.  
            28.      public void configure(JobConf job) {
            29.        caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
            30.        inputFile = job.get("map.input.file");
            31.  
            32.        if (job.getBoolean("wordcount.skip.patterns", false)) {
            33.          Path[] patternsFiles = new Path[0];
            34.          try {
            35.            patternsFiles = DistributedCache.getLocalCacheFiles(job);
            36.          } catch (IOException ioe) {
            37.            System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
            38.          }
            39.          for (Path patternsFile : patternsFiles) {
            40.            parseSkipFile(patternsFile);
            41.          }
            42.        }
            43.      }
            44.  
            45.      private void parseSkipFile(Path patternsFile) {
            46.        try {
            47.          BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
            48.          String pattern = null;
            49.          while ((pattern = fis.readLine()) != null) {
            50.            patternsToSkip.add(pattern);
            51.          }
            52.        } catch (IOException ioe) {
            53.          System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
            54.        }
            55.      }
            56.  
            57.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            58.        String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
            59.  
            60.        for (String pattern : patternsToSkip) {
            61.          line = line.replaceAll(pattern, "");
            62.        }
            63.  
            64.        StringTokenizer tokenizer = new StringTokenizer(line);
            65.        while (tokenizer.hasMoreTokens()) {
            66.          word.set(tokenizer.nextToken());
            67.          output.collect(word, one);
            68.          reporter.incrCounter(Counters.INPUT_WORDS, 1);
            69.        }
            70.  
            71.        if ((++numRecords % 100) == 0) {
            72.          reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
            73.        }
            74.      }
            75.    }
            76.  
            77.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            78.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            79.        int sum = 0;
            80.        while (values.hasNext()) {
            81.          sum += values.next().get();
            82.        }
            83.        output.collect(key, new IntWritable(sum));
            84.      }
            85.    }
            86.  
            87.    public int run(String[] args) throws Exception {
            88.      JobConf conf = new JobConf(getConf(), WordCount.class);
            89.      conf.setJobName("wordcount");
            90.  
            91.      conf.setOutputKeyClass(Text.class);
            92.      conf.setOutputValueClass(IntWritable.class);
            93.  
            94.      conf.setMapperClass(Map.class);
            95.      conf.setCombinerClass(Reduce.class);
            96.      conf.setReducerClass(Reduce.class);
            97.  
            98.      conf.setInputFormat(TextInputFormat.class);
            99.      conf.setOutputFormat(TextOutputFormat.class);
            100.  
            101.      List<String> other_args = new ArrayList<String>();
            102.      for (int i=0; i < args.length; ++i) {
            103.        if ("-skip".equals(args[i])) {
            104.          DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
            105.          conf.setBoolean("wordcount.skip.patterns", true);
            106.        } else {
            107.          other_args.add(args[i]);
            108.        }
            109.      }
            110.  
            111.      FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
            112.      FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
            113.  
            114.      JobClient.runJob(conf);
            115.      return 0;
            116.    }
            117.  
            118.    public static void main(String[] args) throws Exception {
            119.      int res = ToolRunner.run(new Configuration(), new WordCount(), args);
            120.      System.exit(res);
            121.    }
            122. }
            123.  

            運行樣例
            輸入樣例:

            $ bin/hadoop dfs -ls /usr/joe/wordcount/input/
            /usr/joe/wordcount/input/file01
            /usr/joe/wordcount/input/file02

            $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
            Hello World, Bye World!

            $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
            Hello Hadoop, Goodbye to hadoop.

            運行程序:

            $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

            輸出:

            $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
            Bye 1
            Goodbye 1
            Hadoop, 1
            Hello 2
            World! 1
            World, 1
            hadoop. 1
            to 1

            注意此時的輸入與第一個版本的不同,輸出的結(jié)果也有不同。

            現(xiàn)在通過DistributedCache插入一個模式文件,文件中保存了要被忽略的單詞模式。

            $ hadoop dfs -cat /user/joe/wordcount/patterns.txt
            \.
            \,
            \!
            to

            再運行一次,這次使用更多的選項:

            $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

            應(yīng)該得到這樣的輸出:

            $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
            Bye 1
            Goodbye 1
            Hadoop 1
            Hello 2
            World 2
            hadoop 1

            再運行一次,這一次關(guān)閉大小寫敏感性(case-sensitivity):

            $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

            輸出:

            $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
            bye 1
            goodbye 1
            hadoop 2
            hello 2
            world 2

            程序要點
            通過使用一些Map/Reduce框架提供的功能,WordCount的第二個版本在原始版本基礎(chǔ)上有了如下的改進:

            展示了應(yīng)用程序如何在Mapper (和Reducer)中通過configure方法 修改配置參數(shù)(28-43行)。
            展示了作業(yè)如何使用DistributedCache 來分發(fā)只讀數(shù)據(jù)。 這里允許用戶指定單詞的模式,在計數(shù)時忽略那些符合模式的單詞(104行)。
            展示Tool接口和GenericOptionsParser處理Hadoop命令行選項的功能 (87-116, 119行)。
            展示了應(yīng)用程序如何使用Counters(68行),如何通過傳遞給map(和reduce) 方法的Reporter實例來設(shè)置應(yīng)用程序的狀態(tài)信息(72行)。
            Java和JNI是Sun Microsystems, Inc.在美國和其它國家的注冊商標。

             

            本文來自CSDN博客,轉(zhuǎn)載請標明出處:http://blog.csdn.net/superxgl/archive/2010/01/11/5171929.aspx

            posted on 2010-07-19 14:40 koson 閱讀(3660) 評論(0)  編輯 收藏 引用 所屬分類: Hadoop
            久久精品成人欧美大片| 香蕉久久夜色精品国产尤物| 欧美黑人又粗又大久久久| 国产亚洲综合久久系列| 性做久久久久久久久久久| 精品国际久久久久999波多野| 伊人色综合久久天天人手人婷 | 久久亚洲精品无码AV红樱桃| 99久久婷婷国产综合精品草原| 99re久久精品国产首页2020| 99久久亚洲综合精品成人| 久久91亚洲人成电影网站| 久久综合精品国产一区二区三区 | 亚洲国产精品综合久久网络 | 久久精品无码一区二区app| 日本精品一区二区久久久| 2021最新久久久视精品爱| 国产精品久久网| 一本久久知道综合久久| 久久综合综合久久狠狠狠97色88| 久久久久人妻精品一区二区三区| 国产99精品久久| 精品国产99久久久久久麻豆| 国产一区二区精品久久凹凸| 蜜臀av性久久久久蜜臀aⅴ麻豆| 久久av免费天堂小草播放| 国产亚洲色婷婷久久99精品| 麻豆精品久久久久久久99蜜桃| 久久亚洲国产午夜精品理论片| 精品国产日韩久久亚洲| 久久国产成人亚洲精品影院| 久久AAAA片一区二区| 久久66热人妻偷产精品9| 午夜肉伦伦影院久久精品免费看国产一区二区三区 | 久久丫忘忧草产品| 久久久久噜噜噜亚洲熟女综合| 久久人人添人人爽添人人片牛牛| 欧美va久久久噜噜噜久久| 久久无码一区二区三区少妇| 亚洲精品乱码久久久久久蜜桃不卡 | 久久精品国产亚洲AV不卡|