YARN运行流程源码分析

07-06 1028阅读

一、yarn任务执行流程图

YARN运行流程源码分析

在分析任务之前先走一下yarn内部的流程细节。

二、RM 内部处理提交阶段运行流程

如上图流程所示:

1.client 提交任务给yarn,yarn 这边会获取任务的AM相关资源,client在提交阶段会上传job.split (数据切分,相应的map数量), job.xml (配置相关)等信息,以及认证信息,封装成AM启动的资源,AM启动过程会去下载。

YARN运行流程源码分析

接下来会执行 rmAppManager.submitApplication ,内部调用createAndPopulateNewRMApp ,yarn内部会创建一个RMAppImpl内存对象,并维护在内存中。这里会判断是否是CS调度器,如果是CS调度器还会检查用户是否有队列提交权限。

YARN运行流程源码分析

并且做初始的认证工作,并renew token ,并触发一个RMAppEvent事件,时间类型 RMAppEventType.START。

YARN运行流程源码分析

2.上面这个start事件会通过RM第一层的异步事件分发器进行分发:如下8种事件类型对应8种事件处理器。RMAppEventType 对应 ApplicationEventDispatcher 处理器,这个事件处理器也是一个事件分发器,进行第二次转发。

YARN运行流程源码分析

scheduler.event.SchedulerEventType" -> {EventDispatcher@8556} "Service SchedulerEventDispatcher rmapp.RMAppEventType" -> {ResourceManager$ApplicationEventDispatcher@8557} rmapp.attempt.RMAppAttemptEventType" -> {AsyncDispatcher$MultiListenerHandler@8558} rmnode.RMNodeEventType" -> {ResourceManager$NodeEventDispatcher@8559} RMFatalEventType" -> {ResourceManager$RMFatalEventDispatcher@8560} RMAppManagerEventType" -> {RMAppManager@8561} NodesListManagerEventType" -> {NodesListManager@8562} "Service NodesListManager amlauncher.AMLauncherEventType" -> {ApplicationMasterLauncher@8563} "Service ApplicationMasterLauncher

3. 经过上面这层转发,调用的是刚刚创建的 RMAppImpl handle方法。RMAppImpl 本身维护了一个状态机,根据当前状态调用相应的状态转换方法。此时 调用该状态转换过程:

Transition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) ,执行 RMAppNewlySavingTransition.transition 方法。

YARN运行流程源码分析

YARN运行流程源码分析

YARN运行流程源码分析

4.上面方法会将app相关提交信息持久化,以便于恢复,这一步也会产生一个事件 :RMStateStoreAppEvent,事件类型:RMStateStoreEventType.STORE_APP,该事件会进入RMStateStore 的 AsyncDispatcher 事件分发器,

dispatcher = new AsyncDispatcher("RM StateStore dispatcher");
dispatcher.init(conf);
rmStateStoreEventHandler = new ForwardingEventHandler();
dispatcher.register(RMStateStoreEventType.class,  rmStateStoreEventHandler);

最终会调用 ForwardingEventHandler 的handle方法。

RMStateStore 的 Transition(RMStateStoreState.ACTIVE,EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_APP, new StoreAppTransition())

StoreAppTransition.transition 方法。 存储完app信息,产生RMAppEvent 事件,事件类型 RMAppEventType.APP_NEW_SAVED。

YARN运行流程源码分析

这个事件会进入第一层事件分发器 rmDispatcher , 最终会执行RMAppImpl 的状态转换方法,此时app 状态 APP_NEW_SAVED 。

YARN运行流程源码分析

HADOOP-878 输出Application Report信息到本地磁盘 任务监控的数据逻辑如下框所示,所以说,只有在改app有相关事件处理时,才会记录app运行状态的数据。

YARN运行流程源码分析

根据这个事件类型 ,就会进行app的调度执行 :Transition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())

YARN运行流程源码分析

AddApplicationToSchedulerTransition 直接产生 AppAddedSchedulerEvent 这个事件,事件类型:SchedulerEventType.APP_ADDED 。这个事件也会进入到第一层的事件分发器,根据上面的8种事件类型,最终 EventDispatcher 内部handler :ResourceScheduler 进行处理。

CS 与 FS 调度器提交到队列的流程基本一致,区别看后面的分配流程,队列监控组件会添加到内部内存对象里,同时会在调度器内部构建一个调度的app 对象SchedulerApplication。

判断相应的队列 ,以及队列的提交权限,都符合,就会getMetrics 增加提交的应用,并产生 RMAppEvent ,事件类型:RMAppEventType.APP_ACCEPTED 。

Transition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())

YARN运行流程源码分析

YARN运行流程源码分析

StartAppAttemptTransition.transition 会创建 RMAppAttemptImpl ,维护在当前 RMAppImpl 内,后续如果有重试,就会产生多个。 并且会产生 RMAppStartAttemptEvent 事件 ,事件类型 :RMAppAttemptEventType.START 。

YARN运行流程源码分析

YARN运行流程源码分析

YARN运行流程源码分析

ApplicationAttemptEventDispatcher 处理,最终调用 RMAppAttemptImpl 的handle。 RMAppAttemptImpl 也维护了一个状态机。 根据事件类型,执行的是这个转换过程:

Transition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition())

AttemptStartedTransition.transition . 此时会注册 到内部

appAttempt.masterService.registerAppAttempt(appAttempt.applicationAttemptId);

并会 产生 AppAttemptAddedSchedulerEvent 事件,事件类型:SchedulerEventType.APP_ATTEMPT_ADDED。

YARN运行流程源码分析

YARN运行流程源码分析

FairScheduler handle

FSLeafQueue queue = (FSLeafQueue) application.getQueue();
queue.addApp(attempt, runnable);
if (runnable) {
  runnableApps.add(app);
} else {
  nonRunnableApps.add(app);
}
incUsedResource(app.getResourceUsage());

产生 RMAppAttemptEvent事件,事件类型:RMAppAttemptEventType.ATTEMPT_ADDED

Transition(RMAppAttemptState.SUBMITTED,

EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,

RMAppAttemptState.SCHEDULED),

RMAppAttemptEventType.ATTEMPT_ADDED,

new ScheduleTransition())

ScheduleTransition.transition 进行 AM资源的申请

YARN运行流程源码分析

这里会调用具体的调度器进行处理,这里面就会更新 CS 调度器内部维护的app 等待被调度的资源。

YARN运行流程源码分析

具体看看修改的内存对象

YARN运行流程源码分析

到此为止,RM 内部处理提交阶段的流程走完了。后面就是NM RM心跳交互触发调度的流程。

三、NM 与 RM 心跳流程

在分析RM 与NM 心跳流程前,先看看经过RM处理提交阶段后RM内存内部关键的内存对象

YARN运行流程源码分析

YARN运行流程源码分析

YARN运行流程源码分析

YARN运行流程源码分析

RM本身会维护一个Application的对象RMAppimpl, 可以看到CS调度器内存中维护一个调度的app SchedulerApplication对象 , 这个SchedulerApplication等待的资源是 memory:2048 , Cores:1 。

接下来看的是NM 与 RM 心跳,触发资源分配的逻辑,NM 与 RM 通信的协议是 ResourceTracker,只有这三个方法,最主要是nodeHeartbeat方法。

YARN运行流程源码分析

继续跟踪代码 看看RM怎么处理NM心跳

YARN运行流程源码分析

上面的代码会产生一个 nodeStatusEvent

YARN运行流程源码分析

看看具体的处理器 NodeEventDispatcher

YARN运行流程源码分析

它的处理逻辑跟RMApp的处理逻辑一样,找到具体的RMNodeImpl进行处理

YARN运行流程源码分析

RMNodeImpl 本身也维护了一个状态机,根据当前是Running 状态,调用具体的转换方法

YARN运行流程源码分析

最终会触发一个调度事件

YARN运行流程源码分析

事件类型为 SchedulerEventType.NODE_UPDATE。

YARN运行流程源码分析

它的处理器 SchedulerEventDispatcher

YARN运行流程源码分析

他这边也是一个生产者与消费者模型,放到一个队列

YARN运行流程源码分析

有一个线程进行消费,这个handle就是RM里面的调度器CapacityScheduler 。

YARN运行流程源码分析

看看CapacityScheduler 具体的处理逻辑

YARN运行流程源码分析

到这里 allocateContainersToNode ,基本就是进行资源的分配。

YARN运行流程源码分析

继续往下看 ,看看具体的资源分配逻辑 ,前面会做很多复杂的判断,省略,最终会调用到,ParentQueue,这里会判断是否有等待分配的资源

YARN运行流程源码分析

最终还是要分配到叶子队列,会一直递归找到叶子队列,具体调度器的分配流程,后面再详细分析。

YARN运行流程源码分析

同样通过NM心跳就能获取分配的结果。

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]