• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            面對現實,超越自己
            逆水行舟,不進則退
            posts - 269,comments - 32,trackbacks - 0
            1、NameNode啟動加載元數據情景分析
            • NameNode函數里調用FSNamesystemm讀取dfs.namenode.name.dir和dfs.namenode.edits.dir構建FSDirectory。
            • FSImage類recoverTransitionRead和saveNameSpace分別實現了元數據的檢查、加載、內存合并和元數據的持久化存儲。
            • saveNameSpace將元數據寫入到磁盤,具體操作步驟:首先將current目錄重命名為lastcheckpoint.tmp;然后在創建新的current目錄,并保存文件;最后將lastcheckpoint.tmp重命名為privios.checkpoint.
            • checkPoint的過程:Secondary NameNode會通知nameNode產生一個edit log文件edits.new,之后所有的日志操作寫入到edits.new文件中。接下來Secondary NameNode會從namenode下載fsimage和edits文件,進行合并產生新的fsimage.ckpt;然后Secondary會將fsimage.ckpt文件上傳到namenode。最后namenode會重命名fsimage.ckpt為fsimage,edtis.new為edits;
            2、元數據更新及日志寫入情景分析
            以mkdir為例:
            logSync代碼分析:
            代碼:
            1. public void logSync () throws IOException {  
            2. ArrayList<EditLogOutputStream > errorStreams = null ;  
            3. long syncStart = 0;  
            4.   
            5. // Fetch the transactionId of this thread.  
            6. long mytxid = myTransactionId .get (). txid;  
            7. EditLogOutputStream streams[] = null;  
            8. boolean sync = false;  
            9. try {  
            10. synchronized (this) {  
            11. assert editStreams. size() > 0 : "no editlog streams" ;  
            12. printStatistics (false);  
            13. // if somebody is already syncing, then wait  
            14. while (mytxid > synctxid && isSyncRunning) {  
            15. try {  
            16. wait (1000 );  
            17. catch (InterruptedException ie ) {  
            18. }  
            19. }  
            20. //  
            21. // If this transaction was already flushed, then nothing to do  
            22. //  
            23. if (mytxid <= synctxid ) {  
            24. numTransactionsBatchedInSync ++;  
            25. if (metrics != null// Metrics is non-null only when used inside name node  
            26. metrics .transactionsBatchedInSync .inc ();  
            27. return;  
            28. }  
            29. // now, this thread will do the sync  
            30. syncStart = txid ;  
            31. isSyncRunning = true;  
            32. sync = true;  
            33. // swap buffers  
            34. for( EditLogOutputStream eStream : editStreams ) {  
            35. eStream .setReadyToFlush ();  
            36. }  
            37. streams =  
            38. editStreams .toArray (new EditLogOutputStream[editStreams. size()]) ;  
            39. }  
            40. // do the sync  
            41. long start = FSNamesystem.now();  
            42. for (int idx = 0; idx < streams. length; idx++ ) {  
            43. EditLogOutputStream eStream = streams [idx ];  
            44. try {  
            45. eStream .flush ();  
            46. catch (IOException ie ) {  
            47. FSNamesystem .LOG .error ("Unable to sync edit log." , ie );  
            48. //  
            49. // remember the streams that encountered an error.  
            50. //  
            51. if (errorStreams == null) {  
            52. errorStreams = new ArrayList <EditLogOutputStream >( 1) ;  
            53. }  
            54. errorStreams .add (eStream );  
            55. }  
            56. }  
            57. long elapsed = FSNamesystem.now() - start ;  
            58. processIOError (errorStreams , true);  
            59. if (metrics != null// Metrics non-null only when used inside name node  
            60. metrics .syncs .inc (elapsed );  
            61. finally {  
            62. synchronized (this) {  
            63. synctxid = syncStart ;  
            64. if (sync ) {  
            65. isSyncRunning = false;  
            66. }  
            67. this.notifyAll ();  
            68. }  
            69. }  
            70. }  

            3、Backup Node 的checkpoint的過程分析:
            1. /** 
            2. * Create a new checkpoint 
            3. */  
            4. void doCheckpoint() throws IOException {  
            5. long startTime = FSNamesystem.now ();  
            6. NamenodeCommand cmd =  
            7. getNamenode().startCheckpoint( backupNode. getRegistration());  
            8. CheckpointCommand cpCmd = null;  
            9. switch( cmd. getAction()) {  
            10. case NamenodeProtocol .ACT_SHUTDOWN :  
            11. shutdown() ;  
            12. throw new IOException ("Name-node " + backupNode .nnRpcAddress  
            13. " requested shutdown.");  
            14. case NamenodeProtocol .ACT_CHECKPOINT :  
            15. cpCmd = (CheckpointCommand )cmd ;  
            16. break;  
            17. default:  
            18. throw new IOException ("Unsupported NamenodeCommand: "+cmd.getAction()) ;  
            19. }  
            20.   
            21. CheckpointSignature sig = cpCmd. getSignature();  
            22. assert FSConstants.LAYOUT_VERSION == sig .getLayoutVersion () :  
            23. "Signature should have current layout version. Expected: "  
            24. + FSConstants.LAYOUT_VERSION + " actual " + sig. getLayoutVersion();  
            25. assert !backupNode .isRole (NamenodeRole .CHECKPOINT ) ||  
            26. cpCmd. isImageObsolete() : "checkpoint node should always download image.";  
            27. backupNode. setCheckpointState(CheckpointStates .UPLOAD_START );  
            28. if( cpCmd. isImageObsolete()) {  
            29. // First reset storage on disk and memory state  
            30. backupNode. resetNamespace();  
            31. downloadCheckpoint(sig);  
            32. }  
            33.   
            34. BackupStorage bnImage = getFSImage() ;  
            35. bnImage. loadCheckpoint(sig);  
            36. sig.validateStorageInfo( bnImage) ;  
            37. bnImage. saveCheckpoint();  
            38.   
            39. if( cpCmd. needToReturnImage())  
            40. uploadCheckpoint(sig);  
            41.   
            42. getNamenode() .endCheckpoint (backupNode .getRegistration (), sig );  
            43.   
            44. bnImage. convergeJournalSpool();  
            45. backupNode. setRegistration(); // keep registration up to date  
            46. if( backupNode. isRole( NamenodeRole.CHECKPOINT ))  
            47. getFSImage() .getEditLog (). close() ;  
            48. LOG. info( "Checkpoint completed in "  
            49. + (FSNamesystem .now() - startTime )/ 1000 + " seconds."  
            50. " New Image Size: " + bnImage .getFsImageName (). length()) ;  
            51. }  
            52. }  

            4、元數據可靠性機制。
            • 配置多個備份路徑。NameNode在更新日志或進行Checkpoint的過程,會將元數據放在多個目錄下。
            • 對于沒一個需要保存的元數據文件,都創建一個輸出流,對訪問過程中出現的異常輸出流進行處理,將其移除。并再合適的時機再次檢查移除的數據量是否恢復正常。有效的保證了備份輸出流的異常問題。
            • 采用了多種機制來保證元數據的可靠性。例如在checkpoint的過程中,分為幾個階段,通過不同的文件名來標識當前所處的狀態。為存儲失敗后進行恢復提供了可能。
            5、元數據的一致性機制。
            • 首先從NameNode啟動時,對每個備份目錄是否格式化、目錄元數據文件名是否正確等進行檢查,確保元數據文件間的狀態一致性,然后選取最新的加載到內存,這樣可以確保HDFS當前狀態和最后一次關閉時的狀態一致性。
            • 其次,通過異常輸出流的處理,可以確保正常輸出流數據的一致性。
            • 運用同步機制,確保了輸出流一致性問題。

              本文轉自:
              http://blog.csdn.net/kntao/article/details/7770597
            posted on 2013-05-24 15:29 王海光 閱讀(470) 評論(0)  編輯 收藏 引用 所屬分類: Linux
            成人国内精品久久久久影院VR| 欧美午夜精品久久久久免费视| 久久香蕉一级毛片| 国内精品免费久久影院| 国产精品久久新婚兰兰| 7777久久亚洲中文字幕| 久久91这里精品国产2020| 亚洲中文字幕无码一久久区| 青青草原综合久久大伊人精品| 武侠古典久久婷婷狼人伊人| 久久久久综合网久久| 久久久久久精品无码人妻| 欧美综合天天夜夜久久| 少妇内射兰兰久久| 青春久久| 久久国产精品二国产精品| 久久精品欧美日韩精品| 久久精品人妻中文系列| 日本精品一区二区久久久| 久久精品一区二区国产| 无码国内精品久久人妻| 2020久久精品亚洲热综合一本| 精品久久综合1区2区3区激情| 久久精品国产亚洲AV无码偷窥| 久久久这里有精品| 中文精品99久久国产| 无码人妻久久一区二区三区蜜桃| 精品久久久久久中文字幕| 97精品久久天干天天天按摩| 人妻精品久久久久中文字幕69 | 老司机国内精品久久久久| 狼狼综合久久久久综合网| 久久狠狠爱亚洲综合影院| 香蕉久久夜色精品国产尤物| 人妻无码精品久久亚瑟影视| 亚洲午夜无码久久久久小说| 久久精品极品盛宴观看| 久久青青色综合| 久久99精品久久只有精品| AV狠狠色丁香婷婷综合久久| 久久91亚洲人成电影网站|