1. 程式人生 > 其它 >Hadoop核心原始碼解析——NameNode啟動原始碼

Hadoop核心原始碼解析——NameNode啟動原始碼

尊重原創版權: https://www.gewuweb.com/hot/5718.html

Hadoop核心原始碼解析——NameNode啟動原始碼

Hadoop NameNode工作機制,如下圖所示:

NameNode啟動流程原始碼如下圖所示:

0)在pom.xml中增加如下依賴

<dependencies>
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-client</artifactId>
              <version>3.1.3</version>
       </, dependency>
 
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-hdfs</artifactId>
              <version>3.1.3</version>
       </dependency>
 
       <dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-hdfs-client</artifactId>
              <version>3.1.3</version>
              <scope>provided</scope>
       </dependency>
</dependencies>

1)ctrl + n 全域性查詢namenode,進入NameNode.java

NameNode官方說明

NameNode servesas both directory namespace manager and "inode table" for the
HadoopDFS. There is a single NameNode running in any DFS deployment. (Well,
exceptwhen there is a second backup/failover NameNode, or when using
federatedNameNodes.) The NameNode controls two critical tables: 1)
filename->blocksequence(namespace) 2) block->machinelist ("inodes") The first
table isstored on disk and is very precious. The second table is rebuilt every
time theNameNode comes up. 'NameNode' refers to both this class as well as the
'NameNodeserver'. The 'FSNamesystem' class actually performs most of the
filesystemmanagement. The majority of the 'NameNode' class itself is concerned
withexposing the IPC interface and the HTTP server to the outside world, plus
someconfiguration management. NameNode implements the ClientProtocol
interface,which allows clients to ask for DFS services. ClientProtocol is not
designedfor direct use by authors of DFS client code. End-users should instead
use theFileSystem class. NameNode also implements the DatanodeProtocol
interface, usedby DataNodes that actually store DFS data blocks. These methods
are invokedrepeatedly and automatically by all the DataNodes in a DFS
deployment. NameNodealso implements the NamenodeProtocol interface, used by
secondary namenodes orrebalancing processes to get partial NameNode state, for
example partialblocksMap etc.

2)ctrl + f,查詢main方法

NameNode.java

public static void main(String argv[]) throws Exception {
       if (DFSUtil.parseHelpArgument(argv,NameNode.USAGE, System.out, true)) {
              System.exit(0);
       }
 
       try {
              StringUtils.startupShutdownMessage(NameNode.class,argv, LOG);
              // 建立NameNode
              NameNode namenode = createNameNode(argv, null);
              if (namenode != null) {
                     namenode.join();
              }
       } catch (Throwable e) {
              LOG.error("Failed to startnamenode.", e);
              terminate(1, e);
       }
}

點選createNameNode

public static NameNode createNameNode(String argv[], Configurationconf)
    throws IOException {
  … …
  StartupOption startOpt = parseArguments(argv);
  if (startOpt == null) {
    printUsage(System.err);
    return null;
  }
  setStartupOption(conf, startOpt);
 
  boolean aborted = false;
  switch (startOpt) {
  case FORMAT:
    aborted = format(conf,startOpt.getForceFormat(),
        startOpt.getInteractiveFormat());
    terminate(aborted ? 1 : 0);
    return null; // avoid javac warning
  case GENCLUSTERID:
    … …
  default:
   DefaultMetricsSystem.initialize("NameNode");
       // 建立NameNode物件
    return new NameNode(conf);
  }
}

點選NameNode

public NameNode(Configuration conf) throwsIOException {
  this(conf, NamenodeRole.NAMENODE);
}
 
protected NameNode(Configuration conf,NamenodeRole role)
    throws IOException {
  ... ...
 
  try {
    initializeGenericKeys(conf, nsId, namenodeId);
    initialize(getConf());
    ... ...
  } catch (IOException e) {
    this.stopAtException(e);
    throw e;
  } catch (HadoopIllegalArgumentException e) {
    this.stopAtException(e);
    throw e;
  }
  this.started.set(true);
}

點選initialize

protected void initialize(Configuration conf) throwsIOException {
  ... ...
 
  if (NamenodeRole.NAMENODE == role) {
       // 啟動HTTP服務端(9870)
    startHttpServer(conf);
  }
 
  // 載入映象檔案和編輯日誌到記憶體
  loadNamesystem(conf);
  startAliasMapServerIfNecessary(conf);
 
  // 建立NN的RPC服務端
  rpcServer = createRpcServer(conf);
 
  initReconfigurableBackoffKey();
 
  if (clientNamenodeAddress == null) {
    // This is expected for MiniDFSCluster. Setit now using
    // the RPC server's bind address.
    clientNamenodeAddress =
       NetUtils.getHostPortString(getNameNodeAddress());
    LOG.info("Clients are to use " +clientNamenodeAddress + " to access"
        + " this namenode/service.");
  }
  if (NamenodeRole.NAMENODE == role) {
   httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }
 
  // NN啟動資源檢查
  startCommonServices(conf);
  startMetricsLogger(conf);
}

** 一、啟動9870埠服務 **

1)點選startHttpServer

NameNode.java
private void startHttpServer(final Configuration conf)throws IOException {
       httpServer = new NameNodeHttpServer(conf,this, getHttpServerBindAddress(conf));
       httpServer.start();
       httpServer.setStartupProgress(startupProgress);
}
 
protected InetSocketAddress getHttpServerBindAddress(Configurationconf) {
  InetSocketAddress bindAddress = getHttpServerAddress(conf);
 
  ... ...
  return bindAddress;
}
 
protected InetSocketAddress getHttpServerAddress(Configurationconf) {
  return getHttpAddress(conf);
}
 
public static InetSocketAddress getHttpAddress(Configurationconf) {
       return NetUtils.createSocketAddr(
     conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
}
 
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
 
public static final int    DFS_NAMENODE_HTTP_PORT_DEFAULT =
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
 
int  DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;

2)點選startHttpServer方法中的httpServer.start();

NameNodeHttpServer.java

void start() throws IOException {
  ... ...
  // Hadoop自己封裝了HttpServer,形成自己的HttpServer2
  HttpServer2.Builder builder= DFSUtil.httpServerTemplateForNNAndJN(conf,
      httpAddr, httpsAddr, "hdfs",
     DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
     DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
  ... ...
 
  httpServer=builder.build();
 
  ... ...
 
 httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
 httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
  setupServlets(httpServer,conf);
  httpServer.start();
 
  ... ...
}

點選setupServlets

private static void setupServlets(HttpServer2 httpServer,Configuration conf) {
       httpServer.addInternalServlet("startupProgress",
              StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
       httpServer.addInternalServlet("fsck","/fsck", FsckServlet.class,
              true);
       httpServer.addInternalServlet("imagetransfer",ImageServlet.PATH_SPEC,
      ImageServlet.class, true);
}

** 二、 載入映象檔案和編輯日誌 **

1)點選loadNamesystem

NameNode.java

protected void loadNamesystem(Configuration conf) throwsIOException {
       this.namesystem = FSNamesystem.loadFromDisk(conf);
}
 
static FSNamesystem loadFromDisk(Configuration conf) throwsIOException {
 
  checkConfiguration(conf);
 
  FSImage fsImage = new FSImage(conf,
      FSNamesystem.getNamespaceDirs(conf),
      FSNamesystem.getNamespaceEditsDirs(conf));
 
  FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
  StartupOption startOpt =NameNode.getStartupOption(conf);
  if (startOpt == StartupOption.RECOVER) {
   namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
  }
 
  long loadStart = monotonicNow();
  try {
    namesystem.loadFSImage(startOpt);
  } catch (IOException ioe) {
    LOG.warn("Encountered exceptionloading fsimage", ioe);
    fsImage.close();
    throw ioe;
  }
  long timeTakenToLoadFSImage = monotonicNow()- loadStart;
  LOG.info("Finished loading FSImage in" + timeTakenToLoadFSImage + " msecs");
  NameNodeMetrics nnMetrics =NameNode.getNameNodeMetrics();
  if (nnMetrics != null) {
    nnMetrics.setFsImageLoadTime((int)timeTakenToLoadFSImage);
  }
 namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
  return namesystem;
}

** 三、 初始化NN的RPC服務端 **

1)點選createRpcServer

NameNode.java

protected NameNodeRpcServer createRpcServer(Configurationconf)
    throws IOException {
  return new NameNodeRpcServer(conf, this);
}

NameNodeRpcServer.java

public NameNodeRpcServer(Configuration conf, NameNodenn)
      throws IOException {
       ... ....     
    serviceRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(serviceRpcAddr.getPort())
        .setNumHandlers(serviceHandlerCount)
        .setVerbose(false)
       .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .build();
       ... ....     
}

** 四、NN啟動資源檢查 **

1)點選startCommonServices

NameNode.java

private void startCommonServices(Configuration conf) throwsIOException {
 
  namesystem.startCommonServices(conf, haContext);
 
  registerNNSMXBean();
  if (NamenodeRole.NAMENODE != role) {
    startHttpServer(conf);
   httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }
  rpcServer.start();
  try {
    plugins =conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
        ServicePlugin.class);
  } catch (RuntimeException e) {
    String pluginsValue =conf.get(DFS_NAMENODE_PLUGINS_KEY);
    LOG.error("Unable to load NameNodeplugins. Specified list of plugins: " +
        pluginsValue, e);
    throw e;
  }
  … …
}

2)點選startCommonServices

FSNamesystem.java

void startCommonServices(Configuration conf,HAContext haContext) throws IOException {
  this.registerMBean(); // register the MBeanfor the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
    nnResourceChecker = new NameNodeResourceChecker(conf);
    // 檢查是否有足夠的磁碟儲存元資料(fsimage(預設100m) editLog(預設100m))
    checkAvailableResources();
 
    assert!blockManager.isPopulatingReplQueues();
    StartupProgress prog =NameNode.getStartupProgress();
    prog.beginPhase(Phase.SAFEMODE);
longcompleteBlocksTotal = getCompleteBlocksTotal();
 
    // 安全模式
    prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);
 
    // 啟動塊服務
    blockManager.activate(conf,completeBlocksTotal);
  } finally {
   writeUnlock("startCommonServices");
  }
 
  registerMXBean();
  DefaultMetricsSystem.instance().register(this);
  if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
   dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress =NameNode.getServiceAddress(conf, true);
  this.nameNodeHostName = (serviceAddress !=null) ?
      serviceAddress.getHostName() :"";
}

點選NameNodeResourceChecker

NameNodeResourceChecker.java

public NameNodeResourceChecker(Configurationconf) throws IOException {
  this.conf = conf;
  volumes = new HashMap<String,CheckedVolume>();
 
  // dfs.namenode.resource.du.reserved預設值 1024 * 1024 * 100 =》100m
  duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
      DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
 
  Collection<URI> extraCheckedVolumes =Util.stringCollectionAsURIs(conf
     .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
 
  Collection<URI> localEditDirs =Collections2.filter(
      FSNamesystem.getNamespaceEditsDirs(conf),
      new Predicate<URI>() {
        @Override
        public boolean apply(URI input) {
          if(input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
            return true;
          }
         return false;
        }
      });
 
  // 對所有路徑進行資源檢查
  for (URI editsDirToCheck : localEditDirs) {
    addDirToCheck(editsDirToCheck,
       FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
            editsDirToCheck));
  }
 
  // All extra checked volumes are marked"required"
  for (URI extraDirToCheck :extraCheckedVolumes) {
    addDirToCheck(extraDirToCheck, true);
  }
 
  minimumRedundantVolumes = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
     DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);
}

點選checkAvailableResources

FNNamesystem.java

void checkAvailableResources() {
       long resourceCheckTime = monotonicNow();
       Preconditions.checkState(nnResourceChecker!= null,
              "nnResourceChecker notinitialized");
 
       // 判斷資源是否足夠,不夠返回false
       hasResourcesAvailable=nnResourceChecker.hasAvailableDiskSpace();
 
       resourceCheckTime = monotonicNow() -resourceCheckTime;
       NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime);
}

NameNodeResourceChecker.java

public boolean hasAvailableDiskSpace() {
       return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
      minimumRedundantVolumes);
}

NameNodeResourcePolicy.java

static boolean areResourcesAvailable(
    Collection<? extendsCheckableNameNodeResource> resources,
    int minimumRedundantResources) {
 
  // TODO: workaround:
  // - during startup, if there are no editsdirs on disk, then there is
  // a call to areResourcesAvailable() with nodirs at all, which was
  // previously causing the NN to entersafemode
  if (resources.isEmpty()) {
    return true;
  }
 
  int requiredResourceCount = 0;
  int redundantResourceCount = 0;
  int disabledRedundantResourceCount = 0;
 
  // 判斷資源是否充足
  for(CheckableNameNodeResource resource : resources) {
    if (!resource.isRequired()) {
      redundantResourceCount++;
      if (!resource.isResourceAvailable()) {
        disabledRedundantResourceCount++;
      }
    } else {
      requiredResourceCount++;
      if (!resource.isResourceAvailable()) {
        // Short circuit - a required resourceis not available. 不充足返回false
        return false;
      }
    }
  }
 
  if (redundantResourceCount == 0) {
    // If there are no redundant resources,return true if there are any
    // required resources available.
    return requiredResourceCount > 0;
  } else {
    return redundantResourceCount -disabledRedundantResourceCount >=
        minimumRedundantResources;
  }
}
 
interface CheckableNameNodeResource {
 
  public boolean isResourceAvailable();
 
  public boolean isRequired();
}

ctrl + h,查詢實現類CheckedVolume

NameNodeResourceChecker.java

public boolean isResourceAvailable() {
 
  // 獲取當前目錄的空間大小
  long availableSpace = df.getAvailable();
 
  if (LOG.isDebugEnabled()) {
    LOG.debug("Space available on volume'" + volume + "' is "
        + availableSpace);
  }
 
  // 如果當前空間大小,小於100m,返回false
  if (availableSpace < duReserved) {
    LOG.warn("Space available onvolume '" + volume + "' is "
       + availableSpace +
       ", which is below the configured reserved amount " +duReserved);
   return false;
  } else {
    return true;
  }
}

** 五、NN對心跳超時判斷 **

Ctrl + n 搜尋namenode,ctrl + f搜尋startCommonServices

點選namesystem.startCommonServices(conf,haContext);

點選blockManager.activate(conf,completeBlocksTotal);

點選datanodeManager.activate(conf);

DatanodeManager.java

void activate(final Configuration conf) {
  datanodeAdminManager.activate(conf);
  heartbeatManager.activate();
}
       DatanodeManager.java
void activate() {
  // 啟動的執行緒,搜尋run方法
  heartbeatThread.start();
}
 
public void run() {
  while(namesystem.isRunning()) {
    restartHeartbeatStopWatch();
    try {
      final long now = Time.monotonicNow();
      if (lastHeartbeatCheck +heartbeatRecheckInterval < now) {
              // 心跳檢查
       heartbeatCheck();
        lastHeartbeatCheck = now;
      }
      if (blockManager.shouldUpdateBlockKey(now- lastBlockKeyUpdate)) {
        synchronized(HeartbeatManager.this) {
          for(DatanodeDescriptor d : datanodes){
            d.setNeedKeyUpdate(true);
          }
        }
        lastBlockKeyUpdate = now;
      }
    } catch (Exception e) {
      LOG.error("Exception while checkingheartbeat", e);
    }
    try {
      Thread.sleep(5000);  // 5 seconds
    } catch (InterruptedException ignored) {
    }
    // avoid declaring nodes dead for anothercycle if a GC pause lasts
    // longer than the node recheck interval
    if (shouldAbortHeartbeatCheck(-5000)) {
      LOG.warn("Skipping next heartbeatscan due to excessive pause");
      lastHeartbeatCheck = Time.monotonicNow();
    }
  }
}
 
void heartbeatCheck() {
  final DatanodeManager dm =blockManager.getDatanodeManager();
 
  boolean allAlive = false;
  while (!allAlive) {
    // locate the first dead node.
    DatanodeDescriptor dead = null;
 
    // locate the first failed storage thatisn't on a dead node.
    DatanodeStorageInfo failedStorage = null;
 
    // check the number of stale nodes
    int numOfStaleNodes = 0;
    int numOfStaleStorages = 0;
    synchronized(this) {
      for (DatanodeDescriptor d : datanodes) {
        // check if an excessive GC pause hasoccurred
        if (shouldAbortHeartbeatCheck(0)) {
          return;
        }
              // 判斷DN節點是否結束通話
        if (dead == null && dm.isDatanodeDead(d)) {
          stats.incrExpiredHeartbeats();
          dead = d;
        }
        if (d.isStale(dm.getStaleInterval())) {
          numOfStaleNodes++;
        }
        DatanodeStorageInfo[] storageInfos =d.getStorageInfos();
        for(DatanodeStorageInfo storageInfo :storageInfos) {
          if(storageInfo.areBlockContentsStale()) {
            numOfStaleStorages++;
          }
 
          if (failedStorage == null &&
             storageInfo.areBlocksOnFailedStorage() &&
              d != dead) {
            failedStorage = storageInfo;
          }
        }
      }
     
      // Set the number of stale nodes in theDatanodeManager
      dm.setNumStaleNodes(numOfStaleNodes);
     dm.setNumStaleStorages(numOfStaleStorages);
    }
    ... ...
  }
}
 
boolean isDatanodeDead(DatanodeDescriptor node) {
  return (node.getLastUpdateMonotonic() <
          (monotonicNow() - heartbeatExpireInterval));
}
 
private long heartbeatExpireInterval;
// 10分鐘 + 30秒
this.heartbeatExpireInterval= 2 * heartbeatRecheckInterval + 10 * 1000 *heartbeatIntervalSeconds;
 
private volatile int heartbeatRecheckInterval;
heartbeatRecheckInterval = conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
 
private volatile long heartbeatIntervalSeconds;
heartbeatIntervalSeconds= conf.getTimeDuration(
       DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
public staticfinal long   DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;

** 六、 安全模式 **

FSNamesystem.java

void startCommonServices(Configuration conf,HAContext haContext) throws IOException {
  this.registerMBean(); // register the MBeanfor the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
    nnResourceChecker = new NameNodeResourceChecker(conf);
    // 檢查是否有足夠的磁碟儲存元資料(fsimage(預設100m) editLog(預設100m))
    checkAvailableResources();
 
    assert!blockManager.isPopulatingReplQueues();
    StartupProgress prog =NameNode.getStartupProgress();
 
    // 開始進入安全模式
    prog.beginPhase(Phase.SAFEMODE);
 
    // 獲取所有可以正常使用的block
long completeBlocksTotal = getCompleteBlocksTotal();
 
    prog.setTotal(Phase.SAFEMODE,STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);
 
    // 啟動塊服務
    blockManager.activate(conf, completeBlocksTotal);
  } finally {
    writeUnlock("startCommonServices");
  }
 
  registerMXBean();
 DefaultMetricsSystem.instance().register(this);
  if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
   dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress =NameNode.getServiceAddress(conf, true);
  this.nameNodeHostName = (serviceAddress !=null) ?
      serviceAddress.getHostName() :"";
}

點選getCompleteBlocksTotal

public long getCompleteBlocksTotal() {
  // Calculate number of blocks underconstruction
  long numUCBlocks = 0;
  readLock();
  try {
    // 獲取正在構建的block
    numUCBlocks =leaseManager.getNumUnderConstructionBlocks();
       // 獲取所有的塊 - 正在構建的block = 可以正常使用的block
    return getBlocksTotal() - numUCBlocks;
  } finally {
   readUnlock("getCompleteBlocksTotal");
  }
}

點選activate

public void activate(Configuration conf, longblockTotal) {
       pendingReconstruction.start();
       datanodeManager.activate(conf);
 
       this.redundancyThread.setName("RedundancyMonitor");
       this.redundancyThread.start();
 
       storageInfoDefragmenterThread.setName("StorageInfoMonitor");
       storageInfoDefragmenterThread.start();
       this.blockReportThread.start();
 
       mxBeanName = MBeans.register("NameNode","BlockStats", this);
 
       bmSafeMode.activate(blockTotal);
}

點選activate

void activate(long total) {
  assert namesystem.hasWriteLock();
  assert status == BMSafeModeStatus.OFF;
 
  startTime = monotonicNow();
 
  // 計算是否滿足塊個數的閾值
  setBlockTotal(total);
 
  // 判斷DataNode節點和塊資訊是否達到退出安全模式標準
  if (areThresholdsMet()) {
    boolean exitResult = leaveSafeMode(false);
    Preconditions.checkState(exitResult,"Failed to leave safe mode.");
  } else {
    // enter safe mode
status =BMSafeModeStatus.PENDING_THRESHOLD;
 
initializeReplQueuesIfNecessary();
 
    reportStatus("STATE* Safe modeON.", true);
    lastStatusReport = monotonicNow();
  }
}

點選setBlockTotal

void setBlockTotal(long total) {
  assert namesystem.hasWriteLock();
  synchronized (this) {
    this.blockTotal = total;
       // 計算閾值:例如:1000個正常的塊 * 0.999 = 999
   this.blockThreshold = (long) (total * threshold);
  }
 
  this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
}
 
this.threshold =conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
       DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
 
public staticfinal float  DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;

點選areThresholdsMet

private boolean areThresholdsMet() {
  assert namesystem.hasWriteLock();
  // Calculating the number of live datanodesis time-consuming
  // in large clusters. Skip it whendatanodeThreshold is zero.
  int datanodeNum = 0;
 
  if (datanodeThreshold > 0) {
    datanodeNum =blockManager.getDatanodeManager().getNumLiveDataNodes();
  }
  synchronized (this) {
  // 已經正常註冊的塊數》= 塊的最小閾值》=最小可用DataNode
   return blockSafe >= blockThreshold && datanodeNum >=datanodeThreshold;
  }
}

相關內容:

Hadoop(HDFS)之 資料完整性

Hadoop資料壓縮之壓縮方式選擇

手把手教你搞定Hadoop原始碼編譯

Hadoop(HDFS)之CheckPoint時間設定

Hadoop運維工具箱之HDFS叢集擴容與縮容

更多內容參考: https://www.gewuweb.com/sitemap.html