大數據夢工廠( 0011 - YARN核心設計解析)


1 - YARN RPC架構設計

YARN RPC Server 處理流程大致可以分為四個階段:建立連接、接收請求、處理請求和返回結果。各階段實現如下圖所示:

1.1 - 建立連接

整個 YARN RPC Server 只有一個 Listener 線程,且包含一個 Selector 對象,用于監聽 OP_ACCEPT 事件。統一負責監聽是否有來自各個客戶端的 RPC 連接請求到達,并采用輪詢策略選擇一個 Reader 線程處理新連接。

1.2 - 接收請求

當 Listener 完成客戶端的連接之后,通過輪詢方式找到一個 Reader 線程處理,并將新的 RPC 請求封裝成固定的格式(Call 類),放到一個共享隊列(callQueue)中。可同時存在多個 Reader 線程,且包含一個 Selector 對象,用于監聽 OP_READ 事件。

1.3 - 處理請求

Handler 線程(可同時存在多個)并行從共享隊列(callQueue)中讀取 Call 對象,執行對應的函數調用,并嘗試直接將結果返回給對應的客戶端。但某些函數調用返回結果很大或者網絡速度過慢,可能難以將結果一次性發送到客戶端,此時,Handler 線程就會為對應客戶端生成一個 Connection 對象,同時創建一個 responseQueue 隊列來儲存結果,最后將結果寫到 Responder 線程。

1.4 - 返回結果

Server 端只有一個 Responder 線程,且包含一個 Selector 對象,用于監聽 OP_WRITE 事件。當 Handler 線程沒能將結果一次性發送到對應客戶端時,會向該 Selector 對象注冊 OP_WRITE 事件,進而由 Responder 線程采用異步方式繼續發送未發送完成的結果。

1.5 - RPC 參數調優

Hadoop RPC 主要的配置參數如下:
1、Reader 線程數量
由參數 ipc.server.read.threadpool.size 配置,默認是 1。默認情況下,一個 RPC Server 只包含一個 Reader 線程。

2、每個 Handler 線程對應的最大 Call 數量
由參數 ipc.server.handler.queue.size 配置,默認是 100。默認情況下,每個 Handler 線程對應的 Call 對列長度為 100。例如:如果 Handler 線程數是 10,則整個 Call 隊列(即共享隊列 callQueue)最大長度為:100 x 10 = 1000

3、Handler 線程數量
在 HDFS 的 NameNode 中對應的 Handler 數量由參數 dfs.datanode.handler.count 配置,默認是 10。
在 YARN 的 ResourceManager 中對應的 Handler 數量由參數 yarn.resourcemanager.resource-tracker.client.thread-count 配置,默認是 50。

4、客戶端最大重試次數
由參數 ipc.client.connect.max.retries 配置,默認是 10。也就是會連續重試 10 次。

2 - YARN通信協議

RPC 協議是連接各個組件的 “大動脈”。在 YARN 中,任何兩個需要相互通信的組件之間只有一個 RPC 協議,而對于任何一個 RPC 協議,通信雙方有一端是 Client,另一端是 Server,且總是 Client 主動連接 Server 的。因此,YARN 實際上采用的是拉模式(pull-based)通信模型。如下圖所示:

YARN 主要由以下幾個 RPC 協議組成:

  • ApplicationClientProtocol:JobClient(作業提交客戶端)與 RM 之間的協議。JobClient 通過該 RPC 協議提交應用程序、 查詢應用程序狀態等。
  • ResourceTrackerProtocol:NM 與 RM 之間的協議。NM 通過該 RPC 協議向 RM 注冊,并定時發送心跳信息,匯報當前節點的資源使用情況和 Container 運行情況。
  • ApplicationMasterProtocol:AM 與 RM 之間的協議。AM 通過該 RPC 協議向 RM 注冊和撤銷自己,并為各個任務申請資源。
  • ContainerManagementProtocol:AM 與 NM 之間的協議。 AM 通過該 RPC 要求 NM 啟動或者停止 Container,獲取各個 Container 的使用狀態等信息。
  • ResourceManagerAdministrationProtocol:Admin 與 RM 之間的通信協議。Admin 通過該 RPC 協議更新系統配置文件。例如:節點黑白名單、用戶隊列權限等。
  • HAServiceProtocol:Active RM 和 Standby RM 之間的通信協議。提供狀態監控和 Failover 的 HA 服務。
  • TaskUmbilicalProtocol:YarnChild 和 MRAppMaster 之間的通信協議。用于 MRAppMaster 監控跟蹤 YarnChild 的運行狀態,YarnChild 向 MRAppMaster 拉取 - Task 任務信息。
  • MRClientProtocol:JobClient 和 AM 之間的通信協議。用于客戶端拉取應用程序的執行狀態,以及應用程序返回執行結果給 JobClient。
  • ApplicationHistoryProtocol:JobClient 和 JobHistory Server 之間的通信協議。用于獲取已完成應用程序的信息等。

3 - YARN Service工作機制

對于生命周期較長的對象,使用服務的對象管理模型進行管理。該模型主要特點如下:

  • 將每個被服務化的對象分為 4 個狀態:NOTINITED(被創建)、INITED(已初始化)、STARTED(已啟動)、STOPPED(已停止)。
  • 任何服務狀態變化都可以觸發另外一些動作。
  • 可通過組合的方式對任意服務進行組合,以便進行統一管理。也就是說,一個父 Service 可能會有多個子 Service。

3.1 - YARN 服務模型的類圖

YARN 中關于服務模型的類圖位于包 org.apache.hadoop.service 中,如下圖所示:

在 YARN 中,會有非常多的服務對象,且都實現了接口 Service,定義了服務初始化、啟動、停止等操作。YARN 中所有對象,如果是組合服務,直接繼承 CompositeService 類,否則繼承 AbstractService 類。如下圖所示:

ResourceManager 是一個組合服務,包括 ClientRMService、ApplicationMasterLauncher、ApplicationMasterService 等服務對象。

NodeManager 也屬于組合服務,它們內部包含多個單一服務和組合服務,以實現對內部多種服務的統一管理。

3.2 - Service 的定義

public interface Service extends Closeable {  public enum STATE {    NOTINITED(0, "NOTINITED"),    INITED(1, "INITED"),    STARTED(2, "STARTED"),    STOPPED(3, "STOPPED");  }  // 服務初始化  void init(Configuration config);  // 服務啟動  void start();  // 服務停止  void stop();  // 服務關閉  void close() throws IOException;}

4 - YARN AsyncDispatcher事件模型

4.1 - 事件處理模型

YARN 采用了事件驅動的并發模型,其核心服務是一個中央異步調度器(AsyncDispatcher)。包括 ResourceManager、NodeManager、MRAppMaster 等,它們共同維護了一個事件(Event)與事件處理器(EventHandler)的映射表,用來處理各個事件。其事件處理模型如下圖所示:

并發處理流程包括 5 個步驟:
1、各業務類型的處理請求以 Event 的形式提交到事件隊列(Event Queue)中;
2、AsyncDispatcher 創建 HandlerThread 線程消費事件隊列,并將 Event 傳遞給對應的 EventHandler;
3、該 EventHandler 可能將 Event 轉發給另外一個 EventHandler,也有可能轉發給帶有有限狀態機(StateMachine)的 EventHandler;
4、將 StateMachine 的處理結果以 Event 的形式輸出到 AsyncDispatcher;
5、如果有新的 Event 會再次被 AsyncDispatcher 轉發給下一個 EventHandler,直至處理完成(達到終止條件)。

例如: MRAppMaster 內部包含一個中央異步調度器(AsyncDispatcher),并注冊了 TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl 等一系列事件/事件處理器,由中央異步調度器統一管理和調度。

4.2 - 事件與事件處理器

通過引入服務化和事件驅動的設計思想,使得 YARN 具有低耦合、高內聚的特點,各個模塊只需要完成各自的功能,而模塊之間則采用事件相互關聯。事件與事件處理器的的類圖位于包 org.apache.hadoop.yarn.event 中,如下圖所示:

ResourceManager 內部事件與事件處理器交互圖如下:

5 - YARN StateMachine 狀態機

狀態機(StateMachine)是由一組狀態組成:

  • 初始狀態
  • 中間狀態
  • 最終狀態

當狀態機從初始狀態開始運行,經過一系列中間狀態后,到達最終狀態時退出。也就是說,在一個狀態機中,每個狀態都可以接收一組特定事件,并根據具體的事件類型轉換到另一個狀態。當狀態機轉換到最終狀態時,則退出。

5.1 - 狀態機轉換方式

在 YARN 中,每種狀態轉換(doTransition() 方法執行狀態轉換,addTransition() 方法注冊狀態轉換)由一個四元組表示,分別是:

  • 轉換前狀態(preState)
  • 轉換后狀態(postState)
  • 事件(event)
  • 回調函數(hook)

YARN 定義了三種狀態轉換方式,具體如下:
1、一個初始狀態、一個最終狀態、一種事件
該方式表示經過處理之后,無論如何,進入到一個唯一狀態。

初始狀態:最終狀態:事件 = 1:1:1

2、 一個初始狀態、多個最終狀態、一種事件
該方式表示不同的邏輯處理結果,可能導致進入不同的狀態。

初始狀態:最終狀態:事件 = 1:N:1

3、一個初始狀態、一個最終狀態、多種事件
該方式表示多個不同的事件,可能觸發到多個不同狀態的轉換。

初始狀態:最終狀態:事件 = 1:1:N

5.2 - 狀態機類

YARN 實現了一個非常簡單的狀態機庫,在 org.apache.hadoop.yarn.state 包中。

YARN 對外提供了一個狀態機工廠 StatemachineFactory,它提供多種 addTransition() 方法供用戶添加各種狀態轉移,一旦狀態機添加完畢后,可通過調用 installTopology() 完成一個狀態機的構建。如下圖所示:

5.3 - 狀態機可視化

YARN 中實現了多個狀態機對象,包括:

  • ResourceManager 中的 RMAppImpl、RMAppAttemptImpl、RMContainerImpl 和 RMNodeImpl 等。
  • NodeManager 中的 ApplicationImpl、ContainerImpl 和 LocalizedResource 等。
  • MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等。

為了便于查看這些狀態機的狀態變化以及相關事件,YARN 提供了一個狀態機可視化工具,具體操作步驟如下:
1、將狀態機轉化為 graphviz(.gv) 格式的文件,在源代碼根目錄下進行編譯

[root@hadoop-01 hadoop-2.10.1-src]# mvn compile -Pvisualize

生成 3 個 *.gv 文件:

[root@hadoop-01 hadoop-2.10.1-src]# ls -l *.gv-rw-r--r-- 1 root root 16698 Sep 10 09:37 MapReduce.gv-rw-r--r-- 1 root root 12075 Sep 10 09:35 NodeManager.gv-rw-r--r-- 1 root root 14641 Sep 10 09:35 ResourceManager.gv

2、使用可視化包 graphviz 中的相關命令生成狀態機圖

[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng ResourceManager.gv > ResourceManager.png[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng NodeManager.gv > NodeManager.png[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng MapReduce.gv > MapReduce.png

如果尚未安裝 graphviz 包,操作該步驟之前先要安裝該包,Centos-7.x 安裝命令如下:

[root@hadoop-01 hadoop-2.10.1-src]# yum install graphviz

ResourceManager 狀態機如下圖所示:

NodeManager 狀態機如下圖所示:

MapReduce 狀態機如下圖所示:

每一個狀態機,其實本身也是一個事件處理器(EventHandler)。


::: hljs-center
掃一掃,我們的故事就開始了。
:::