大数据梦工厂( 0011 - YARN核心设计解析)


1 - YARN RPC架构设计

YARN RPC Server 处理流程大致可以分为四个阶段:建立连接、接收请求、处理请求和返回结果。各阶段实现如下图所示:

1.1 - 建立连接

整个 YARN RPC Server 只有一个 Listener 线程,且包含一个 Selector 对象,用于监听 OP_ACCEPT 事件。统一负责监听是否有来自各个客户端的 RPC 连接请求到达,并采用轮询策略选择一个 Reader 线程处理新连接。

1.2 - 接收请求

当 Listener 完成客户端的连接之后,通过轮询方式找到一个 Reader 线程处理,并将新的 RPC 请求封装成固定的格式(Call 类),放到一个共享队列(callQueue)中。可同时存在多个 Reader 线程,且包含一个 Selector 对象,用于监听 OP_READ 事件。

1.3 - 处理请求

Handler 线程(可同时存在多个)并行从共享队列(callQueue)中读取 Call 对象,执行对应的函数调用,并尝试直接将结果返回给对应的客户端。但某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时,Handler 线程就会为对应客户端生成一个 Connection 对象,同时创建一个 responseQueue 队列来储存结果,最后将结果写到 Responder 线程。

1.4 - 返回结果

Server 端只有一个 Responder 线程,且包含一个 Selector 对象,用于监听 OP_WRITE 事件。当 Handler 线程没能将结果一次性发送到对应客户端时,会向该 Selector 对象注册 OP_WRITE 事件,进而由 Responder 线程采用异步方式继续发送未发送完成的结果。

1.5 - RPC 参数调优

Hadoop RPC 主要的配置参数如下:
1、Reader 线程数量
由参数 ipc.server.read.threadpool.size 配置,默认是 1。默认情况下,一个 RPC Server 只包含一个 Reader 线程。

2、每个 Handler 线程对应的最大 Call 数量
由参数 ipc.server.handler.queue.size 配置,默认是 100。默认情况下,每个 Handler 线程对应的 Call 对列长度为 100。例如:如果 Handler 线程数是 10,则整个 Call 队列(即共享队列 callQueue)最大长度为:100 x 10 = 1000

3、Handler 线程数量
在 HDFS 的 NameNode 中对应的 Handler 数量由参数 dfs.datanode.handler.count 配置,默认是 10。
在 YARN 的 ResourceManager 中对应的 Handler 数量由参数 yarn.resourcemanager.resource-tracker.client.thread-count 配置,默认是 50。

4、客户端最大重试次数
由参数 ipc.client.connect.max.retries 配置,默认是 10。也就是会连续重试 10 次。

2 - YARN通信协议

RPC 协议是连接各个组件的 “大动脉”。在 YARN 中,任何两个需要相互通信的组件之间只有一个 RPC 协议,而对于任何一个 RPC 协议,通信双方有一端是 Client,另一端是 Server,且总是 Client 主动连接 Server 的。因此,YARN 实际上采用的是拉模式(pull-based)通信模型。如下图所示:

YARN 主要由以下几个 RPC 协议组成:

  • ApplicationClientProtocol:JobClient(作业提交客户端)与 RM 之间的协议。JobClient 通过该 RPC 协议提交应用程序、 查询应用程序状态等。
  • ResourceTrackerProtocol:NM 与 RM 之间的协议。NM 通过该 RPC 协议向 RM 注册,并定时发送心跳信息,汇报当前节点的资源使用情况和 Container 运行情况。
  • ApplicationMasterProtocol:AM 与 RM 之间的协议。AM 通过该 RPC 协议向 RM 注册和撤销自己,并为各个任务申请资源。
  • ContainerManagementProtocol:AM 与 NM 之间的协议。 AM 通过该 RPC 要求 NM 启动或者停止 Container,获取各个 Container 的使用状态等信息。
  • ResourceManagerAdministrationProtocol:Admin 与 RM 之间的通信协议。Admin 通过该 RPC 协议更新系统配置文件。例如:节点黑白名单、用户队列权限等。
  • HAServiceProtocol:Active RM 和 Standby RM 之间的通信协议。提供状态监控和 Failover 的 HA 服务。
  • TaskUmbilicalProtocol:YarnChild 和 MRAppMaster 之间的通信协议。用于 MRAppMaster 监控跟踪 YarnChild 的运行状态,YarnChild 向 MRAppMaster 拉取 - Task 任务信息。
  • MRClientProtocol:JobClient 和 AM 之间的通信协议。用于客户端拉取应用程序的执行状态,以及应用程序返回执行结果给 JobClient。
  • ApplicationHistoryProtocol:JobClient 和 JobHistory Server 之间的通信协议。用于获取已完成应用程序的信息等。

3 - YARN Service工作机制

对于生命周期较长的对象,使用服务的对象管理模型进行管理。该模型主要特点如下:

  • 将每个被服务化的对象分为 4 个状态:NOTINITED(被创建)、INITED(已初始化)、STARTED(已启动)、STOPPED(已停止)。
  • 任何服务状态变化都可以触发另外一些动作。
  • 可通过组合的方式对任意服务进行组合,以便进行统一管理。也就是说,一个父 Service 可能会有多个子 Service。

3.1 - YARN 服务模型的类图

YARN 中关于服务模型的类图位于包 org.apache.hadoop.service 中,如下图所示:

在 YARN 中,会有非常多的服务对象,且都实现了接口 Service,定义了服务初始化、启动、停止等操作。YARN 中所有对象,如果是组合服务,直接继承 CompositeService 类,否则继承 AbstractService 类。如下图所示:

ResourceManager 是一个组合服务,包括 ClientRMService、ApplicationMasterLauncher、ApplicationMasterService 等服务对象。

NodeManager 也属于组合服务,它们内部包含多个单一服务和组合服务,以实现对内部多种服务的统一管理。

3.2 - Service 的定义

public interface Service extends Closeable {  public enum STATE {    NOTINITED(0, "NOTINITED"),    INITED(1, "INITED"),    STARTED(2, "STARTED"),    STOPPED(3, "STOPPED");  }  // 服务初始化  void init(Configuration config);  // 服务启动  void start();  // 服务停止  void stop();  // 服务关闭  void close() throws IOException;}

4 - YARN AsyncDispatcher事件模型

4.1 - 事件处理模型

YARN 采用了事件驱动的并发模型,其核心服务是一个中央异步调度器(AsyncDispatcher)。包括 ResourceManager、NodeManager、MRAppMaster 等,它们共同维护了一个事件(Event)与事件处理器(EventHandler)的映射表,用来处理各个事件。其事件处理模型如下图所示:

并发处理流程包括 5 个步骤:
1、各业务类型的处理请求以 Event 的形式提交到事件队列(Event Queue)中;
2、AsyncDispatcher 创建 HandlerThread 线程消费事件队列,并将 Event 传递给对应的 EventHandler;
3、该 EventHandler 可能将 Event 转发给另外一个 EventHandler,也有可能转发给带有有限状态机(StateMachine)的 EventHandler;
4、将 StateMachine 的处理结果以 Event 的形式输出到 AsyncDispatcher;
5、如果有新的 Event 会再次被 AsyncDispatcher 转发给下一个 EventHandler,直至处理完成(达到终止条件)。

例如: MRAppMaster 内部包含一个中央异步调度器(AsyncDispatcher),并注册了 TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl 等一系列事件/事件处理器,由中央异步调度器统一管理和调度。

4.2 - 事件与事件处理器

通过引入服务化和事件驱动的设计思想,使得 YARN 具有低耦合、高内聚的特点,各个模块只需要完成各自的功能,而模块之间则采用事件相互关联。事件与事件处理器的的类图位于包 org.apache.hadoop.yarn.event 中,如下图所示:

ResourceManager 内部事件与事件处理器交互图如下:

5 - YARN StateMachine 状态机

状态机(StateMachine)是由一组状态组成:

  • 初始状态
  • 中间状态
  • 最终状态

当状态机从初始状态开始运行,经过一系列中间状态后,到达最终状态时退出。也就是说,在一个状态机中,每个状态都可以接收一组特定事件,并根据具体的事件类型转换到另一个状态。当状态机转换到最终状态时,则退出。

5.1 - 状态机转换方式

在 YARN 中,每种状态转换(doTransition() 方法执行状态转换,addTransition() 方法注册状态转换)由一个四元组表示,分别是:

  • 转换前状态(preState)
  • 转换后状态(postState)
  • 事件(event)
  • 回调函数(hook)

YARN 定义了三种状态转换方式,具体如下:
1、一个初始状态、一个最终状态、一种事件
该方式表示经过处理之后,无论如何,进入到一个唯一状态。

初始状态:最终状态:事件 = 1:1:1

2、 一个初始状态、多个最终状态、一种事件
该方式表示不同的逻辑处理结果,可能导致进入不同的状态。

初始状态:最终状态:事件 = 1:N:1

3、一个初始状态、一个最终状态、多种事件
该方式表示多个不同的事件,可能触发到多个不同状态的转换。

初始状态:最终状态:事件 = 1:1:N

5.2 - 状态机类

YARN 实现了一个非常简单的状态机库,在 org.apache.hadoop.yarn.state 包中。

YARN 对外提供了一个状态机工厂 StatemachineFactory,它提供多种 addTransition() 方法供用户添加各种状态转移,一旦状态机添加完毕后,可通过调用 installTopology() 完成一个状态机的构建。如下图所示:

5.3 - 状态机可视化

YARN 中实现了多个状态机对象,包括:

  • ResourceManager 中的 RMAppImpl、RMAppAttemptImpl、RMContainerImpl 和 RMNodeImpl 等。
  • NodeManager 中的 ApplicationImpl、ContainerImpl 和 LocalizedResource 等。
  • MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等。

为了便于查看这些状态机的状态变化以及相关事件,YARN 提供了一个状态机可视化工具,具体操作步骤如下:
1、将状态机转化为 graphviz(.gv) 格式的文件,在源代码根目录下进行编译

[root@hadoop-01 hadoop-2.10.1-src]# mvn compile -Pvisualize

生成 3 个 *.gv 文件:

[root@hadoop-01 hadoop-2.10.1-src]# ls -l *.gv-rw-r--r-- 1 root root 16698 Sep 10 09:37 MapReduce.gv-rw-r--r-- 1 root root 12075 Sep 10 09:35 NodeManager.gv-rw-r--r-- 1 root root 14641 Sep 10 09:35 ResourceManager.gv

2、使用可视化包 graphviz 中的相关命令生成状态机图

[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng ResourceManager.gv > ResourceManager.png[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng NodeManager.gv > NodeManager.png[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng MapReduce.gv > MapReduce.png

如果尚未安装 graphviz 包,操作该步骤之前先要安装该包,Centos-7.x 安装命令如下:

[root@hadoop-01 hadoop-2.10.1-src]# yum install graphviz

ResourceManager 状态机如下图所示:

NodeManager 状态机如下图所示:

MapReduce 状态机如下图所示:

每一个状态机,其实本身也是一个事件处理器(EventHandler)。


::: hljs-center
扫一扫,我们的故事就开始了。
:::