国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

[YARN] MRAppMaster心跳原理

NotFound / 2242人閱讀

摘要:博客原文最近集群遇到一個(gè)問(wèn)題就是集群在跑任務(wù)的時(shí)候,會(huì)超時(shí)而被,但任務(wù)重跑則成功,問(wèn)題是隨機(jī)的出現(xiàn)的,所以初步懷疑是因?yàn)樾奶鴧R報(bào)出現(xiàn)問(wèn)題或則因?yàn)榉泵ψ。驗(yàn)槟承C(jī)制導(dǎo)致等待不匯報(bào)心跳,所以我們還是先了解,是如何向匯報(bào)心跳的。

博客原文
hackshell

最近集群遇到一個(gè)問(wèn)題,就是集群在跑任務(wù)的時(shí)候,AM會(huì)超時(shí)10min而被KILL,但任務(wù)重跑則成功,問(wèn)題是隨機(jī)的出現(xiàn)的,所以初步懷疑是因?yàn)锳M心跳匯報(bào)出現(xiàn)問(wèn)題或則RM因?yàn)榉泵ang住,AM因?yàn)槟承C(jī)制導(dǎo)致等待10min不匯報(bào)心跳,所以我們還是先了解,AM是如何向RM匯報(bào)心跳的。

在MRAppMaster中,ContainerAllocatorRouter負(fù)責(zé)向RM申請(qǐng)資源(發(fā)送心跳)

RMContainerAllocator其最終父類是RMCommunicator,它實(shí)現(xiàn)了RMHeartbeatHandler接口

public interface RMHeartbeatHandler {
  long getLastHeartbeatTime(); // 獲取上一次心跳的時(shí)間
  void runOnNextHeartbeat(Runnable callback); // 回調(diào)注冊(cè)到callback隊(duì)列的callback函數(shù)
}

每一次心跳回來(lái),都會(huì)執(zhí)行一次注冊(cè)在heartbeatCallbacks中的回調(diào)函數(shù):

allocatorThread = new Thread(new Runnable() {
      @Override
      public void run() {
        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
              ......
              heartbeat();            
              lastHeartbeatTime = context.getClock().getTime();// 記錄上一次心跳時(shí)間
              executeHeartbeatCallbacks(); // 執(zhí)行回調(diào)函數(shù)
              ....
});

RMCommunicator類中:

private void executeHeartbeatCallbacks() {
    Runnable callback = null;
    while ((callback = heartbeatCallbacks.poll()) != null) {
      callback.run();
    }
  }

在RMCommunicator啟動(dòng)時(shí),首先會(huì)向RM注冊(cè),把自己的host和port告訴RM,然后在啟動(dòng)一條線程(startAllocatorThread)定期的調(diào)用RMContainerAllocator中實(shí)現(xiàn)的heartbeat方法(向RM申請(qǐng)資源,定期匯報(bào)信息,告訴RM自己還活著)。

AM初始化同時(shí)也會(huì)初始化RMCommunicator:

protected void serviceStart() throws Exception {
  scheduler= createSchedulerProxy(); // 獲取RM的代理
  register(); // 注冊(cè)
  startAllocatorThread(); // 心跳線程
....
}

AM的ContainerAllocatorRouter事件處理流程如下圖:

注冊(cè)流程:

調(diào)用RMCommunicator遠(yuǎn)程調(diào)用ApplicationMasterService的registerApplicationMaster方法,設(shè)置維護(hù)responseId,然后把它加入AMLivelinessMonitor中,并使用map記錄時(shí)間,用來(lái)監(jiān)控AM是否因?yàn)殚L(zhǎng)時(shí)間沒(méi)有心跳而超時(shí),如果AM長(zhǎng)時(shí)間沒(méi)有心跳信息更新,RM就會(huì)通知NodeManager把AM移除。

心跳線程:

在發(fā)送心跳的過(guò)程中,即也是獲取資源的過(guò)程

@Override
  protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    List allocatedContainers = getResources();// 重要的方法
    if (allocatedContainers.size() > 0) {
      scheduledRequests.assign(allocatedContainers);
    }
   ......
  }

獲取資源的過(guò)程:

private List getResources() throws Exception {
     ...
     response = makeRemoteRequest(); // 和RM進(jìn)行交互
     ...
     // 優(yōu)先處理RM發(fā)送過(guò)來(lái)的命令
     if (response.getAMCommand() != null) {
         switch(response.getAMCommand()) {
                case AM_RESYNC:
                case AM_SHUTDOWN:
                     eventHandler.handle(new JobEvent(this.getJob().getID(),
                                     JobEventType.JOB_AM_REBOOT));
                     throw new YarnRuntimeException("Resource Manager doesn"t recognize AttemptId: " +
                             this.getContext().getApplicationID());
                default:
                     ....
      }
     // 等等一系列處理
}
}

構(gòu)建請(qǐng)求:

protected AllocateResponse makeRemoteRequest() throws IOException {
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList(ask),
          new ArrayList(release), blacklistRequest);
    AllocateResponse allocateResponse;
    allocateResponse = scheduler.allocate(allocateRequest); // RPC調(diào)用ApplicationMasterService的allocate方法
    .....
}

每一次心跳的調(diào)用都會(huì)刷新AMLivelinessMonitor的時(shí)間,代表AM還活著

而且我們通過(guò)代碼可以看出,資源請(qǐng)求被封裝為一個(gè)ask,即一個(gè)ResourceRequest的ArrayList的資源列表 例如:

priority:20 host:host9 capability:
priority:20 host:host2 capability:
priority:20 host:host10 capability:
priority:20 host:/rack/rack3203 capability:
priority:20 host:/rack/rack3202 capability:
priority:20 host:* capability:

然而,ask是如何被構(gòu)造的呢?

RMContainerAllocator中的addMap,addReduce,assign方法中對(duì)ask的數(shù)據(jù)內(nèi)容進(jìn)行了修改

addContainerReq --> addResourceRequest --> addResourceRequestToAsk;

通過(guò)在代碼自己添加日志可以看出,資源會(huì)被分為local,rack,和any級(jí)別去申請(qǐng)資源

最終變?yōu)橐粋€(gè)ask list發(fā)送到RM上:

 ask Capability: ResourceName:* NumContainers:384 Priority:20 RelaxLocality:true
 ask Capability: ResourceName:/rack/rack3201 NumContainers:227 Priority:20 RelaxLocality:true
 ask Capability: ResourceName:/rack/rack3202 NumContainers:231 Priority:20 RelaxLocality:true
 ask Capability: ResourceName:/rack/rack3203 NumContainers:152 Priority:20 RelaxLocality:true
 ask Capability: ResourceName:/rack/rack3204 NumContainers:158 Priority:20 RelaxLocality:true
 ask Capability: ResourceName:host1 NumContainers:46 Priority:20 RelaxLocality:true
 ask Capability: ResourceName:host5 NumContainers:52 Priority:20 RelaxLocality:true
 ask Capability: ResourceName:host6 NumContainers:38 Priority:20 RelaxLocality:true

類似日志為:

getResources() for application_1438330253091_0004: ask=29 release= 0 newContainers=0 finishedContainers=0 resourcelimit= knownNMs=24

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/64422.html

相關(guān)文章

  • 0011 - YARN核心設(shè)計(jì)解析

    摘要:包括等,它們共同維護(hù)了一個(gè)事件與事件處理器的映射表,用來(lái)處理各個(gè)事件。例如內(nèi)部包含一個(gè)中央異步調(diào)度器,并注冊(cè)了等一系列事件事件處理器,由中央異步調(diào)度器統(tǒng)一管理和調(diào)度。當(dāng)狀態(tài)機(jī)轉(zhuǎn)換到最終狀態(tài)時(shí),則退出。 大數(shù)據(jù)夢(mèng)工廠( 0011 - YARN核心設(shè)計(jì)解析)1 - YARN RPC架構(gòu)設(shè)計(jì)YARN RPC Serv...

    KoreyLee 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<