Hadoop源碼分析六啟動文件namenode原理詳解

1、 namenode啟動

在本系列文章三中分析瞭hadoop的啟動文件,其中提到瞭namenode啟動的時候調用的類為

org.apache.hadoop.hdfs.server.namenode.NameNode

其main方法的內容如下:

 public static void main(String argv[]) throws Exception {
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
      System.exit(0);
    }
    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null) {
        namenode.join();
      }
    } catch (Throwable e) {
      LOG.error("Failed to start namenode.", e);
      terminate(1, e);
    }
  }

這段代碼的重點在第8行,這裡createNameNode方法創建瞭一個namenode對象,然後調用其join方法阻塞等待請求。

createNameNode方法的內容如下:

 public static NameNode createNameNode(String argv[], Configuration conf)
      throws IOException {
    LOG.info("createNameNode " + Arrays.asList(argv));
    if (conf == null)
      conf = new HdfsConfiguration();
    // Parse out some generic args into Configuration.
    GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
    argv = hParser.getRemainingArgs();
    // Parse the rest, NN specific args.
    StartupOption startOpt = parseArguments(argv);
    if (startOpt == null) {
      printUsage(System.err);
      return null;
    }
    setStartupOption(conf, startOpt);
    switch (startOpt) {
      case FORMAT: {
        boolean aborted = format(conf, startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());
        terminate(aborted ? 1 : 0);
        return null; // avoid javac warning
      }
      case GENCLUSTERID: {
        System.err.println("Generating new cluster id:");
        System.out.println(NNStorage.newClusterID());
        terminate(0);
        return null;
      }
      case FINALIZE: {
        System.err.println("Use of the argument '" + StartupOption.FINALIZE +
            "' is no longer supported. To finalize an upgrade, start the NN " +
            " and then run `hdfs dfsadmin -finalizeUpgrade'");
        terminate(1);
        return null; // avoid javac warning
      }
      case ROLLBACK: {
        boolean aborted = doRollback(conf, true);
        terminate(aborted ? 1 : 0);
        return null; // avoid warning
      }
      case BOOTSTRAPSTANDBY: {
        String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
        int rc = BootstrapStandby.run(toolArgs, conf);
        terminate(rc);
        return null; // avoid warning
      }
      case INITIALIZESHAREDEDITS: {
        boolean aborted = initializeSharedEdits(conf,
            startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());
        terminate(aborted ? 1 : 0);
        return null; // avoid warning
      }
      case BACKUP:
      case CHECKPOINT: {
        NamenodeRole role = startOpt.toNodeRole();
        DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
        return new BackupNode(conf, role);
      }
      case RECOVER: {
        NameNode.doRecovery(startOpt, conf);
        return null;
      }
      case METADATAVERSION: {
        printMetadataVersion(conf);
        terminate(0);
        return null; // avoid javac warning
      }
      case UPGRADEONLY: {
        DefaultMetricsSystem.initialize("NameNode");
        new NameNode(conf);
        terminate(0);
        return null;
      }
      default: {
        DefaultMetricsSystem.initialize("NameNode");
        return new NameNode(conf);
      }
    }
  }

這段代碼很簡單。主要做的操作有三個:

  • 1、 創建配置文件對象
  • 2、 解析命令行的參數
  • 3、 根據參數執行對應方法(switch塊)

其中創建的配置文件的為HdfsConfiguration(第5行),這裡的HdfsConfiguration繼承於Configuration類,它會加載hadoop的配置文件到內存中。然後解析傳入main方法的參數,根據這個參數執行具體的方法。正常啟動的時候執行的default裡的內容。default的內容也很簡單,就是創建一個Namenode對象。

這裡先從HdfsConfiguration開始分析,詳細講解hdfs的配置文件處理。

首先看HdfsConfiguration的初始化方法如下:

  public HdfsConfiguration() {
    super();
  }

這裡是調用其父類的初始化方法。

其父類為Configuration類,它的初始化方法如下:

  /** A new configuration. */
  public Configuration() {
    this(true);
  }

這裡可以看見他是調用瞭一個重載方法,傳入瞭一個參數:true。

接著細看這個重載方法:

  public Configuration(boolean loadDefaults) {
    this.loadDefaults = loadDefaults;
    updatingResource = new ConcurrentHashMap<String, String[]>();
    synchronized(Configuration.class) {
      REGISTRY.put(this, null);
    }
  }

這裡也很簡單,這裡主要是為兩個參數賦值,並將新創建的Configuration添加到REGISTRY中。
至此便創建好瞭一個配置文件。但是關於配置文件的初始化解析還未完成。在java裡可以使用static關鍵字聲明一段代碼塊,這段代碼塊在類加載的時候會被執行。在Configuration和HdfsConfiguration中都有靜態代碼塊。
首先在Configuration類中,在第682行有一段靜態代碼塊,其內容如下:

在這裡插入圖片描述

這段代碼的重點在第695行和第696行,這裡調用瞭一個addDefaultResource方法,這裡傳入瞭兩個參數core-default.xml和core-site.xml。其中core-site.xml就是在安裝hadoop的時候設置的配置文件。而core-default.xml是hadoop自帶的配置文件,這個文件可以在hadoop的官方文檔裡查到,文檔鏈接如下:https://hadoop.apache.org/docs/r2.7.6/hadoop-project-dist/hadoop-common/core-default.xml
同樣在hadoop的源碼裡也有這個文件,它在hadoop-common-XX.jar中。

接著繼續分析調用的addDefaultResource方法

其內容如下:

  public static synchronized void addDefaultResource(String name) {
    if(!defaultResources.contains(name)) {
      defaultResources.add(name);
      for(Configuration conf : REGISTRY.keySet()) {
        if(conf.loadDefaults) {
          conf.reloadConfiguration();
        }
      }
    }
  }

這段代碼也很簡單。首先是第二行先從defaultResources中判斷是否已經存在該配置文件,

這裡的defaultResources是一個list

其定義如下:

 private static final CopyOnWriteArrayList<String> defaultResources =
    new CopyOnWriteArrayList<String>();

若defaultResources中不存在這個配置文件,則繼續向下執行,將這個配置文件添加到defaultResources中(第3行)。然後遍歷REGISTRY中的key(第4行),這裡的key就是在上文提到的Configuration對象。然後根據其loadDefaults的值來判斷是否執行reloadConfiguration方法。
這裡的loadDefaults的值就是上文分析的傳入重載方法的值,上文傳入的為true,所以其創建的Configuration對象在這裡會執行reloadConfiguration方法。

reloadConfiguration方法內容如下:

  public synchronized void reloadConfiguration() {
    properties = null;                            // trigger reload
    finalParameters.clear();                      // clear site-limits
  }

這裡可以看見這個reloadConfiguration方法並沒有真正的重新加載配置文件而是將properties的值設置為空。

同樣在HdfsConfiguration也有類似的靜態代碼塊,在第30行,其內容如下:

在這裡插入圖片描述

這裡首先調用瞭一個addDeprecatedKeys方法然後調用瞭一個addDefaultResource。這裡的addDefaultResource傳瞭兩個文件hdfs-default.xml和hdfs-site.xml。其中hdfs-site.xml是安裝時的配置文件,hdfs-default.xml是其自帶的默認文件,同上文的core-default.xml一樣。官網鏈接為:https://hadoop.apache.org/docs/r2.7.6/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml。文件位於:hadoop-hdfs-2.7.6.jar。

其中addDeprecatedKeys方法內容如下:

 private static void addDeprecatedKeys() {
    Configuration.addDeprecations(new DeprecationDelta[] {
      new DeprecationDelta("dfs.backup.address",
        DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY),
      new DeprecationDelta("dfs.backup.http.address",
        DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY),
      new DeprecationDelta("dfs.balance.bandwidthPerSec",
        DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY),
      new DeprecationDelta("dfs.data.dir",
        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY),
      new DeprecationDelta("dfs.http.address",
        DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY),
      new DeprecationDelta("dfs.https.address",
        DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY),
      new DeprecationDelta("dfs.max.objects",
        DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY),
      new DeprecationDelta("dfs.name.dir",
        DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY),
      new DeprecationDelta("dfs.name.dir.restore",
        DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY),
      new DeprecationDelta("dfs.name.edits.dir",
        DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY),
      new DeprecationDelta("dfs.read.prefetch.size",
        DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY),
      new DeprecationDelta("dfs.safemode.extension",
        DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY),
      new DeprecationDelta("dfs.safemode.threshold.pct",
        DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY),
      new DeprecationDelta("dfs.secondary.http.address",
        DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY),
      new DeprecationDelta("dfs.socket.timeout",
        DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY),
      new DeprecationDelta("fs.checkpoint.dir",
        DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY),
      new DeprecationDelta("fs.checkpoint.edits.dir",
        DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY),
      new DeprecationDelta("fs.checkpoint.period",
        DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY),
      new DeprecationDelta("heartbeat.recheck.interval",
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY),
      new DeprecationDelta("dfs.https.client.keystore.resource",
        DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY),
      new DeprecationDelta("dfs.https.need.client.auth",
        DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY),
      new DeprecationDelta("slave.host.name",
        DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY),
      new DeprecationDelta("session.id",
        DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
      new DeprecationDelta("dfs.access.time.precision",
        DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY),
      new DeprecationDelta("dfs.replication.considerLoad",
        DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY),
      new DeprecationDelta("dfs.replication.interval",
        DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY),
      new DeprecationDelta("dfs.replication.min",
        DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY),
      new DeprecationDelta("dfs.replication.pending.timeout.sec",
        DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY),
      new DeprecationDelta("dfs.max-repl-streams",
        DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY),
      new DeprecationDelta("dfs.permissions",
        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY),
      new DeprecationDelta("dfs.permissions.supergroup",
        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY),
      new DeprecationDelta("dfs.write.packet.size",
        DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY),
      new DeprecationDelta("dfs.block.size",
        DFSConfigKeys.DFS_BLOCK_SIZE_KEY),
      new DeprecationDelta("dfs.datanode.max.xcievers",
        DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
      new DeprecationDelta("io.bytes.per.checksum",
        DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY),
      new DeprecationDelta("dfs.federation.nameservices",
        DFSConfigKeys.DFS_NAMESERVICES),
      new DeprecationDelta("dfs.federation.nameservice.id",
        DFSConfigKeys.DFS_NAMESERVICE_ID),
      new DeprecationDelta("dfs.client.file-block-storage-locations.timeout",
        DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS),
    });
  }

這段代碼很簡單,隻有一句話。調用瞭 Configuration的靜態方法addDeprecations,並向其中傳入瞭一個參數,參數類型為DeprecationDelta類的數組,並為數組中的數據進行賦值。

以上就是關於Hadoop源碼分析啟動文件namenode原理詳解的詳細內容,更多關於Hadoop源碼分析的資料請持續關註WalkonNet其它相關文章!

推薦閱讀: