青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

Javen-Studio 咖啡小屋

http://javenstudio.org - C++ Java 分布式 搜索引擎
Naven's Research Laboratory - Thinking of Life, Imagination of Future

  C++博客 :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
  24 隨筆 :: 57 文章 :: 170 評論 :: 4 Trackbacks
 

2          MapReduce框架結構

Map/Reduce是一個用于大規模數據處理的分布式計算模型,它最初是由Google工程師設計并實現的,Google已經將它完整的MapReduce論文公開發布了。其中對它的定義是,Map/Reduce是一個編程模型(programming model),是一個用于處理和生成大規模數據集(processing and generating large data sets)的相關的實現。用戶定義一個map函數來處理一個key/value對以生成一批中間的key/value對,再定義一個reduce函數將所有這些中間的有著相同keyvalues合并起來。很多現實世界中的任務都可用這個模型來表達。

 

HadoopMap/Reduce框架也是基于這個原理實現的,下面簡要介紹一下Map/Reduce框架主要組成及相互的關系。

2.1       總體結構

2.1.1            MapperReducer

運行于HadoopMapReduce應用程序最基本的組成部分包括一個Mapper和一個Reducer類,以及一個創建JobConf的執行程序,在一些應用中還可以包括一個Combiner類,它實際也是Reducer的實現。

2.1.2            JobTrackerTaskTracker

它們都是由一個master服務JobTracker和多個運行于多個節點的slaver服務TaskTracker兩個類提供的服務調度的。master負責調度job的每一個子任務task運行于slave上,并監控它們,如果發現有失敗的task就重新運行它,slave則負責直接執行每一個taskTaskTracker都需要運行在HDFSDataNode上,而JobTracker則不需要,一般情況應該把JobTracker部署在單獨的機器上。

2.1.3            JobClient

每一個job都會在用戶端通過JobClient類將應用程序以及配置參數Configuration打包成jar文件存儲在HDFS,并把路徑提交到JobTrackermaster服務,然后由master創建每一個Task(即MapTaskReduceTask)將它們分發到各個TaskTracker服務中去執行。

2.1.4            JobInProgress

JobClient提交job后,JobTracker會創建一個JobInProgress來跟蹤和調度這個job,并把它添加到job隊列里。JobInProgress會根據提交的job jar中定義的輸入數據集(已分解成FileSplit)創建對應的一批TaskInProgress用于監控和調度MapTask,同時在創建指定數目的TaskInProgress用于監控和調度ReduceTask,缺省為1ReduceTask

2.1.5            TaskInProgress

JobTracker啟動任務時通過每一個TaskInProgresslaunchTask,這時會把Task對象(即MapTaskReduceTask)序列化寫入相應的TaskTracker服務中,TaskTracker收到后會創建對應的TaskInProgress(此TaskInProgress實現非JobTracker中使用的TaskInProgress,作用類似)用于監控和調度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載job jar,并設置好環境變量后啟動一個獨立的java child進程來執行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。

2.1.6            MapTaskReduceTask

一個完整的job會自動依次執行MapperCombiner(在JobConf指定了Combiner時執行)和Reducer,其中MapperCombiner是由MapTask調用執行,Reducer則由ReduceTask調用,Combiner實際也是Reducer接口類的實現。Mapper會根據job jar中定義的輸入數據集按<key1,value1>對讀入,處理完成生成臨時的<key2,value2>對,如果定義了CombinerMapTask會在Mapper完成調用該Combiner將相同key的值做合并處理,以減少輸出結果集。MapTask的任務全完成即交給ReduceTask進程調用Reducer處理,生成最終結果<key3,value3>對。這個過程在下一部分再詳細介紹。

 

下圖描述了Map/Reduce框架中主要組成和它們之間的關系:

 

2.2       Job創建過程

2.2.1            JobClient.runJob() 開始運行job并分解輸入數據集

一個MapReduceJob會通過JobClient類根據用戶在JobConf類中定義的InputFormat實現類來將輸入的數據集分解成一批小的數據集,每一個小數據集會對應創建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調用FileInputFormat.getSplits()方法生成小數據集,如果判斷數據文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統一打包到jobFilejar中并存儲在HDFS中,再將jobFile路徑提交給JobTracker去調度和執行。

2.2.2            JobClient.submitJob() 提交jobJobTracker

jobFile的提交過程是通過RPC模塊(有單獨一章來詳細介紹)來實現的。大致過程是,JobClient類中通過RPC實現的Proxy接口調用JobTrackersubmitJob()方法,而JobTracker必須實現JobSubmissionProtocol接口。JobTracker則根據獲得的jobFile路徑創建與job有關的一系列對象(即JobInProgressTaskInProgress等)來調度并執行job

 

JobTracker創建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態信息,如執行時間、MapReduce任務完成的比例等。JobClient會根據這個JobStatus對象創建一個NetworkedJobRunningJob對象,用于定時從JobTracker獲得執行過程的統計數據來監控并打印到用戶的控制臺。

 

與創建Job過程相關的類和方法如下圖所示


 

2.3       Job執行過程

上面已經提到,job是統一由JobTracker來調度的,具體的Task分發給各個TaskTracker節點來執行。下面通過源碼來詳細解析執行過程,首先先從JobTracker收到JobClient的提交請求開始。

2.3.1            JobTracker初始化JobTask隊列過程

2.3.1.1     JobTracker.submitJob() 收到請求

JobTracker接收到新的job請求(即submitJob()函數被調用)后,會創建一個JobInProgress對象并通過它來管理和調度任務。JobInProgress在創建的時候會初始化一系列與任務有關的參數,如job jar的位置(會把它從HDFS復制本地的文件系統中的臨時目錄里),MapReduce的數據,job的優先級別,以及記錄統計報告的對象等。

2.3.1.2     JobTracker.resortPriority() 加入隊列并按優先級排序

JobInProgress創建后,首先將它加入到jobs隊列里,分別用一個map成員變量jobs用來管理所有jobs對象,一個list成員變量jobsByPriority用來維護jobs的執行優先級別。之后JobTracker會調用resortPriority()函數,將jobs先按優先級別排序,再按提交時間排序,這樣保證最高優先并且先提交的job會先執行。

2.3.1.3     JobTracker.JobInitThread 通知初始化線程

然后JobTracker會把此job加入到一個管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。通過此成員變量調用notifyAll()函數,會喚起一個用于初始化job的線程JobInitThread來處理(JobTracker會有幾個內部的線程來維護jobs隊列,它們的實現都在JobTracker代碼里,稍候再詳細介紹)。JobInitThread收到信號后即取出最靠前的job,即優先級別最高的job,調用JobInProgressinitTasks()函數執行真正的初始化工作。

2.3.1.4     JobInProgress.initTasks() 初始化TaskInProgress

Task的初始化過程稍復雜些,首先步驟JobInProgress會創建Map的監控對象。在initTasks()函數里通過調用JobClientreadSplitFile()獲得已分解的輸入數據的RawSplit列表,然后根據這個列表創建對應數目的Map執行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應的所有在HDFS里的blocks所在的DataNode節點的host,這個會在RawSplit創建時通過FileSplitgetLocations()函數獲取,該函數會調用DistributedFileSystemgetFileCacheHints()獲得(這個細節會在HDFS模塊中講解)。當然如果是存儲在本地文件系統中,即使用LocalFileSystem時當然只有一個location即“localhost”了。

 

其次JobInProgress會創建Reduce的監控對象,這個比較簡單,根據JobConf里指定的Reduce數目創建,缺省只創建1Reduce任務。監控和調度Reduce任務的也是TaskInProgress類,不過構造方法有所不同,TaskInProgress會根據不同參數分別創建具體的MapTask或者ReduceTask

 

JobInProgress創建完TaskInProgress后,最后構造JobStatus并記錄job正在執行中,然后再調用JobHistory.JobInfo.logStarted()記錄job的執行日志。到這里JobTracker里初始化job的過程全部結束,執行則是通過另一異步的方式處理的,下面接著介紹它。

 

與初始化Job過程相關的類和方法如下圖所示

 

2.3.2            TaskTracker執行Task的過程

Task的執行實際是由TaskTracker發起的,TaskTracker會定期(缺省為10秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進行一次通信,報告自己Task的執行狀態,接收JobTracker的指令等。如果發現有自己需要執行的新任務也會在這時啟動,即是在TaskTracker調用JobTrackerheartbeat()方法時進行,此調用底層是通過IPC層調用Proxy接口(在IPC章節詳細介紹)實現。這個過程實際比較復雜,下面一一簡單介紹下每個步驟。

2.3.2.1     TaskTracker.run() 連接JobTracker

TaskTracker的啟動過程會初始化一系列參數和服務(另有單獨的一節介紹),然后嘗試連接JobTracker服務(即必須實現InterTrackerProtocol接口),如果連接斷開,則會循環嘗試連接JobTracker,并重新初始化所有成員和參數,此過程參見run()方法。

2.3.2.2     TaskTracker.offerService() 主循環

如果連接JobTracker服務成功,TaskTracker就會調用offerService()函數進入主執行循環中。這個循環會每隔10秒與JobTracker通訊一次,調用transmitHeartBeat()獲得HeartbeatResponse信息。然后調用HeartbeatResponsegetActions()函數獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數組。再遍歷這個數組,如果是一個新任務指令即LaunchTaskAction則調用startNewTask()函數執行新任務,否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執行KillJobAction或者KillTaskAction等。

2.3.2.3     TaskTracker.transmitHeartBeat() 獲取JobTracker指令

transmitHeartBeat()函數處理中,TaskTracker會創建一個新的TaskTrackerStatus對象記錄目前任務的執行狀況,然后通過IPC接口調用JobTrackerheartbeat()方法發送過去,并接受新的指令,即返回值TaskTrackerAction數組。在這個調用之前,TaskTracker會先檢查目前執行的Task數目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設置heartbeat()askForNewTask參數為true。操作成功后再更新相關的統計信息等。

2.3.2.4     TaskTracker.startNewTask() 啟動新任務

此函數的主要任務就是創建TaskTracker$TaskInProgress對象來調度和監控任務,并把它加入到runningTasks隊列中。完成后則調用localizeJob()真正初始化Task并開始執行。

2.3.2.5     TaskTracker.localizeJob() 初始化job目錄等

此函數主要任務是初始化工作目錄workDir,再將job jar包從HDFS復制到本地文件系統中,調用RunJar.unJar()將包解壓到工作目錄。然后創建一個RunningJob并調用addTaskToJob()函數將它添加到runningJobs監控隊列中。完成后即調用launchTaskForJob()開始執行Task

2.3.2.6     TaskTracker.launchTaskForJob() 執行任務

啟動Task的工作實際是調用TaskTracker$TaskInProgresslaunchTask()函數來執行的。

2.3.2.7     TaskTracker$TaskInProgress.launchTask() 執行任務

執行任務前先調用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調用TaskcreateRunner()方法創建TaskRunner對象并調用其start()方法最后啟動Task獨立的java執行子進程。

2.3.2.8     Task.createRunner() 創建啟動Runner對象

Task有兩個實現版本,即MapTaskReduceTask,它們分別用于創建MapReduce任務。MapTask會創建MapTaskRunner來啟動Task子進程,而ReduceTask則創建ReduceTaskRunner來啟動。

2.3.2.9     TaskRunner.start() 啟動子進程真正執行Task

這里是真正啟動子進程并執行Task的地方。它會調用run()函數來處理。執行的過程比較復雜,主要的工作就是初始化啟動java子進程的一系列環境變量,包括設定工作目錄workDir,設置CLASSPATH環境變量等(需要將TaskTracker的環境變量以及job jar的路徑合并起來)。然后裝載job jar包,調用runChild()方法啟動子進程,即通過ProcessBuilder來創建,同時子進程的stdout/stdin/syslog的輸出定向到該Task指定的輸出日志目錄中,具體的輸出通過TaskLog類來實現。這里有個小問題,Task子進程只能輸出INFO級別日志,而且該級別是在run()函數中直接指定,不過改進也不復雜。

 

Job執行過程相關的類和方法如下圖所示


 

2.4       JobTrackerTaskTracker

如上面所述,JobTrackerTaskTrackerMapReduce框架最基本的兩個服務,其他所有處理均由它們調度執行,下面簡單介紹它們內部提供的服務及創建的線程,詳細過程下回分解J

2.4.1            JobTracker的服務和線程

JobTrackerMapReduce框架中最主要的類之一,所有job的執行都由它來調度,而且Hadoop系統中只配置一個JobTracker應用。啟動JobTracker后它會初始化若干個服務以及若干個內部線程用來維護job的執行過程和結果。下面簡單介紹一下它們。

 

首先,JobTracker會啟動一個interTrackerServer,端口配置在Configuration中的"mapred.job.tracker"參數,缺省是綁定8012端口。它有兩個用途,一是用于接收和處理TaskTrackerheartbeat等請求,即必須實現InterTrackerProtocol接口及協議。二是用于接收和處理JobClient的請求,如submitJobkillJob等,即必須實現JobSubmissionProtocol接口及協議。

 

其次,它會啟動一個infoServer,運行StatusHttpServer,缺省監聽50030端口。是一個web服務,用于給用戶提供web界面查詢job執行狀況的服務。

 

JobTracker還會啟動多個線程,ExpireLaunchingTasks線程用于停止那些未在超時時間內報告進度的TasksExpireTrackers線程用于停止那些可能已經當掉的TaskTracker,即長時間未報告的TaskTracker將不會再分配新的TaskRetireJobs線程用于清除那些已經完成很長時間還存在隊列里的jobsJobInitThread線程用于初始化job,這在前面章節已經介紹。TaskCommitQueue線程用于調度Task的那些所有與FileSystem操作相關的處理,并記錄Task的狀態等信息。

 

2.4.2            TaskTracker的服務和線程

TaskTracker也是MapReduce框架中最主要的類之一,它運行于每一臺DataNode節點上,用于調度Task的實際運行工作。它內部也會啟動一些服務和線程。

 

TaskTracker也會啟動一個StatusHttpServer服務來提供web界面的查詢Task執行狀態的工具。

 

其次,它還會啟動一個taskReportServer服務,這個用于提供給它的子進程即TaskRunner啟動的MapTask或者ReduceTask向它報告狀況,子進程的啟動命令實現在TaskTracker$Child類中,由TaskRunner.run()通過命令行參數傳入該服務地址和端口,即調用TaskTrackergetTaskTrackerReportAddress(),這個地址會在taskReportServer服務創建時獲得。

 

TaskTracker也會啟動一個MapEventsFetcherThread線程用于獲取Map任務的輸出數據信息。

 

2.5       Job狀態監控

未完待續


作者:naven 2008-02-21
posted on 2008-02-22 01:01 Javen-Studio 閱讀(17340) 評論(6)  編輯 收藏 引用

評論

# re: Annotated Hadoop: 第二節 MapReduce框架結構 2009-03-12 13:56 hadoop中文
歡迎大家到http://cn.hadoop.org/
討論,國內研究這個的人實在太少  回復  更多評論
  

# re: Annotated Hadoop: 第二節 MapReduce框架結構 2009-04-16 17:58 shiquan
great!!!!  回復  更多評論
  

# re: Annotated Hadoop: 第二節 MapReduce框架結構 2009-04-16 21:20 shiquan
很好,很強大.  回復  更多評論
  

# re: Annotated Hadoop: 第二節 MapReduce框架結構 2011-07-01 11:50 xp
寫的不錯。仔細研究好好學習。受教。  回復  更多評論
  

# re: Annotated Hadoop: 第二節 MapReduce框架結構 2011-12-11 18:41 hadoophobby
Job狀態監控這一節可以在哪看到呢,很想看到這一節的內容,期盼博主的回復  回復  更多評論
  


只有注冊用戶登錄后才能發表評論。
網站導航: 博客園   IT新聞   BlogJava   博問   Chat2DB   管理


青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            亚洲第一黄网| 亚洲美女视频| 国产精品vvv| 欧美sm视频| 国产欧美一区二区白浆黑人| 欧美成人在线影院| 国产一区二区久久| 亚洲一区二区成人在线观看| 亚洲精品免费在线| 久久久久久久久久久久久9999| 亚洲私人影院| 欧美国产一区二区三区激情无套| 久久久久久久综合| 国产精品欧美一区二区三区奶水| 亚洲国产天堂久久国产91| 国内精品久久久久久影视8| 亚洲综合丁香| 香蕉久久夜色精品国产| 欧美三级欧美一级| 亚洲美女淫视频| 一本色道88久久加勒比精品| 欧美成人网在线| 亚洲国产天堂久久综合| 亚洲人成人一区二区三区| 久久久久综合网| 嫩模写真一区二区三区三州| 樱桃成人精品视频在线播放| 久久精品国产一区二区电影| 久久视频免费观看| 伊人久久大香线蕉av超碰演员| 久久av二区| 免费高清在线视频一区·| 永久久久久久| 欧美风情在线观看| 亚洲精品国产日韩| 亚洲一区二区在线播放| 欧美午夜欧美| 亚洲综合第一| 久久天天综合| 日韩视频免费| 欧美午夜视频| 欧美一区二区三区日韩| 葵司免费一区二区三区四区五区| 亚洲第一精品夜夜躁人人爽| 欧美aa在线视频| 中文欧美在线视频| 久久精品亚洲乱码伦伦中文| 伊人久久久大香线蕉综合直播| 欧美va亚洲va香蕉在线| 亚洲另类自拍| 欧美一级视频一区二区| 国内成人精品2018免费看 | 亚洲人成久久| 亚洲一区二区三区四区中文| 国产农村妇女精品一二区| 久久久亚洲国产天美传媒修理工| 欧美韩日亚洲| 香蕉免费一区二区三区在线观看| 国内成+人亚洲| 欧美成人一区二区三区| 亚洲午夜av电影| 欧美成人精品在线视频| 亚洲一区二区视频| 国产一区91| 欧美日本精品| 久久国内精品视频| 亚洲美女在线一区| 久久久在线视频| 亚洲视频视频在线| 伊人天天综合| 国产精品永久| 欧美日韩久久久久久| 久久国产天堂福利天堂| 99伊人成综合| 欧美激情aⅴ一区二区三区| 午夜欧美视频| 亚洲美女在线一区| 狠狠色综合网| 国产精品主播| 欧美日韩成人在线视频| 久久久久久久性| 午夜精品久久久久久久99热浪潮| 最新国产の精品合集bt伙计| 久久久国产视频91| 亚洲影院在线观看| 日韩视频中文| 亚洲欧洲日韩女同| 国内外成人免费激情在线视频| 欧美午夜电影在线| 欧美激情第9页| 女同性一区二区三区人了人一| 欧美一区国产一区| 亚洲影院高清在线| 一区二区三区国产在线| 亚洲精品一区在线观看香蕉| 久久一区中文字幕| 久久久精品一区| 午夜精品免费在线| 亚洲欧美日韩电影| 亚洲一区二区三区四区在线观看 | 亚洲精品乱码久久久久久日本蜜臀| 国产亚洲福利一区| 国产欧美日韩精品a在线观看| 欧美亚洲不卡| 欧美四级电影网站| 国产精品美女黄网| 国产精品久久久久久久久久久久久 | 欧美一区二区三区免费看| 亚洲午夜激情| 亚洲一区二区三区免费观看 | 先锋a资源在线看亚洲| 亚洲一区二三| 性久久久久久久久久久久| 亚洲欧美在线一区| 午夜精品久久久久久久99黑人| 亚洲午夜国产一区99re久久| 这里只有精品视频| 亚洲男人的天堂在线观看| 亚洲综合日本| 久久精品国产99国产精品| 久久av老司机精品网站导航| 久久超碰97中文字幕| 欧美一区二区三区视频在线 | 久久国产精品久久w女人spa| 欧美中文在线免费| 久久综合色婷婷| 欧美精品一区二区三区久久久竹菊 | 亚洲欧美日韩第一区| 欧美一区二区三区另类| 久久久久久久波多野高潮日日| 久久久国产午夜精品| 欧美电影资源| 日韩亚洲欧美在线观看| 亚洲欧美国产精品桃花| 久久中文字幕一区二区三区| 欧美精品乱人伦久久久久久 | 国产精品免费aⅴ片在线观看| 国产欧美日韩不卡| 亚洲国产精品一区二区三区| 亚洲最黄网站| 欧美在线视频一区| 欧美激情bt| 亚洲影院免费| 麻豆成人综合网| 国产精品免费一区二区三区在线观看 | 国产一区二区三区免费在线观看| 亚洲国产一区二区视频| 亚洲综合精品四区| 欧美大片在线观看一区二区| 一区二区三区黄色| 开心色5月久久精品| 国产精品多人| 亚洲人成小说网站色在线| 亚洲欧美综合| 91久久香蕉国产日韩欧美9色| 亚洲综合三区| 欧美精品在线观看| 在线观看日韩av| 亚洲欧美卡通另类91av| 亚洲电影免费观看高清完整版在线 | 欧美激情国产高清| 亚洲你懂的在线视频| 欧美成人国产| 黄色影院成人| 欧美一级一区| 日韩小视频在线观看| 老巨人导航500精品| 国产日韩一区| 亚洲欧美日韩国产一区二区| 亚洲国产精品激情在线观看| 久久aⅴ国产欧美74aaa| 国产精品久久久久久久久婷婷| 亚洲欧洲精品一区| 久久综合九九| 欧美一级播放| 国产精品一区二区女厕厕| 日韩小视频在线观看专区| 欧美本精品男人aⅴ天堂| 性欧美办公室18xxxxhd| 国产精品自在欧美一区| 亚洲天堂偷拍| 99ri日韩精品视频| 欧美另类变人与禽xxxxx| 亚洲国产视频a| 欧美成年网站| 久久香蕉国产线看观看av| 国产一区二区丝袜高跟鞋图片| 欧美影院久久久| 亚洲伊人久久综合| 国产精品一区二区久久久| 亚洲午夜精品国产| 亚洲精选成人| 欧美天天影院| 亚洲免费在线视频一区 二区| 夜夜爽99久久国产综合精品女不卡| 欧美精品一区二区三区蜜桃 | 亚洲天堂久久| 中文一区二区| 国产美女精品视频| 久久精品夜色噜噜亚洲aⅴ|