- NameNode函數里調用FSNamesystemm讀取dfs.namenode.name.dir和dfs.namenode.edits.dir構建FSDirectory。
- FSImage類recoverTransitionRead和saveNameSpace分別實現了元數據的檢查、加載、內存合并和元數據的持久化存儲。
- saveNameSpace將元數據寫入到磁盤,具體操作步驟:首先將current目錄重命名為lastcheckpoint.tmp;然后在創建新的current目錄,并保存文件;最后將lastcheckpoint.tmp重命名為privios.checkpoint.
- checkPoint的過程:Secondary NameNode會通知nameNode產生一個edit log文件edits.new,之后所有的日志操作寫入到edits.new文件中。接下來Secondary NameNode會從namenode下載fsimage和edits文件,進行合并產生新的fsimage.ckpt;然后Secondary會將fsimage.ckpt文件上傳到namenode。最后namenode會重命名fsimage.ckpt為fsimage,edtis.new為edits;
2、元數據更新及日志寫入情景分析
以mkdir為例:

logSync代碼分析:

代碼:
- public void logSync () throws IOException {
- ArrayList<EditLogOutputStream > errorStreams = null ;
- long syncStart = 0;
- // Fetch the transactionId of this thread.
- long mytxid = myTransactionId .get (). txid;
- EditLogOutputStream streams[] = null;
- boolean sync = false;
- try {
- synchronized (this) {
- assert editStreams. size() > 0 : "no editlog streams" ;
- printStatistics (false);
- // if somebody is already syncing, then wait
- while (mytxid > synctxid && isSyncRunning) {
- try {
- wait (1000 );
- } catch (InterruptedException ie ) {
- }
- }
- //
- // If this transaction was already flushed, then nothing to do
- //
- if (mytxid <= synctxid ) {
- numTransactionsBatchedInSync ++;
- if (metrics != null) // Metrics is non-null only when used inside name node
- metrics .transactionsBatchedInSync .inc ();
- return;
- }
- // now, this thread will do the sync
- syncStart = txid ;
- isSyncRunning = true;
- sync = true;
- // swap buffers
- for( EditLogOutputStream eStream : editStreams ) {
- eStream .setReadyToFlush ();
- }
- streams =
- editStreams .toArray (new EditLogOutputStream[editStreams. size()]) ;
- }
- // do the sync
- long start = FSNamesystem.now();
- for (int idx = 0; idx < streams. length; idx++ ) {
- EditLogOutputStream eStream = streams [idx ];
- try {
- eStream .flush ();
- } catch (IOException ie ) {
- FSNamesystem .LOG .error ("Unable to sync edit log." , ie );
- //
- // remember the streams that encountered an error.
- //
- if (errorStreams == null) {
- errorStreams = new ArrayList <EditLogOutputStream >( 1) ;
- }
- errorStreams .add (eStream );
- }
- }
- long elapsed = FSNamesystem.now() - start ;
- processIOError (errorStreams , true);
- if (metrics != null) // Metrics non-null only when used inside name node
- metrics .syncs .inc (elapsed );
- } finally {
- synchronized (this) {
- synctxid = syncStart ;
- if (sync ) {
- isSyncRunning = false;
- }
- this.notifyAll ();
- }
- }
- }
3、Backup Node 的checkpoint的過程分析:

- /**
- * Create a new checkpoint
- */
- void doCheckpoint() throws IOException {
- long startTime = FSNamesystem.now ();
- NamenodeCommand cmd =
- getNamenode().startCheckpoint( backupNode. getRegistration());
- CheckpointCommand cpCmd = null;
- switch( cmd. getAction()) {
- case NamenodeProtocol .ACT_SHUTDOWN :
- shutdown() ;
- throw new IOException ("Name-node " + backupNode .nnRpcAddress
- + " requested shutdown.");
- case NamenodeProtocol .ACT_CHECKPOINT :
- cpCmd = (CheckpointCommand )cmd ;
- break;
- default:
- throw new IOException ("Unsupported NamenodeCommand: "+cmd.getAction()) ;
- }
- CheckpointSignature sig = cpCmd. getSignature();
- assert FSConstants.LAYOUT_VERSION == sig .getLayoutVersion () :
- "Signature should have current layout version. Expected: "
- + FSConstants.LAYOUT_VERSION + " actual " + sig. getLayoutVersion();
- assert !backupNode .isRole (NamenodeRole .CHECKPOINT ) ||
- cpCmd. isImageObsolete() : "checkpoint node should always download image.";
- backupNode. setCheckpointState(CheckpointStates .UPLOAD_START );
- if( cpCmd. isImageObsolete()) {
- // First reset storage on disk and memory state
- backupNode. resetNamespace();
- downloadCheckpoint(sig);
- }
- BackupStorage bnImage = getFSImage() ;
- bnImage. loadCheckpoint(sig);
- sig.validateStorageInfo( bnImage) ;
- bnImage. saveCheckpoint();
- if( cpCmd. needToReturnImage())
- uploadCheckpoint(sig);
- getNamenode() .endCheckpoint (backupNode .getRegistration (), sig );
- bnImage. convergeJournalSpool();
- backupNode. setRegistration(); // keep registration up to date
- if( backupNode. isRole( NamenodeRole.CHECKPOINT ))
- getFSImage() .getEditLog (). close() ;
- LOG. info( "Checkpoint completed in "
- + (FSNamesystem .now() - startTime )/ 1000 + " seconds."
- + " New Image Size: " + bnImage .getFsImageName (). length()) ;
- }
- }
4、元數據可靠性機制。
- 配置多個備份路徑。NameNode在更新日志或進行Checkpoint的過程,會將元數據放在多個目錄下。
- 對于沒一個需要保存的元數據文件,都創建一個輸出流,對訪問過程中出現的異常輸出流進行處理,將其移除。并再合適的時機再次檢查移除的數據量是否恢復正常。有效的保證了備份輸出流的異常問題。
- 采用了多種機制來保證元數據的可靠性。例如在checkpoint的過程中,分為幾個階段,通過不同的文件名來標識當前所處的狀態。為存儲失敗后進行恢復提供了可能。
5、元數據的一致性機制。
- 首先從NameNode啟動時,對每個備份目錄是否格式化、目錄元數據文件名是否正確等進行檢查,確保元數據文件間的狀態一致性,然后選取最新的加載到內存,這樣可以確保HDFS當前狀態和最后一次關閉時的狀態一致性。
- 其次,通過異常輸出流的處理,可以確保正常輸出流數據的一致性。
- 運用同步機制,確保了輸出流一致性問題。
本文轉自:http://blog.csdn.net/kntao/article/details/7770597