Spark的动态资源分配算法

07-21 460阅读

文章目录

  • 前言
  • 基于任务需求进行资源请求的整体过程
  • 资源申请的生成过程详解
    • 资源申请的生成过程的简单例子
    • 资源调度算法的代码解析
    • 申请资源以后的处理:Executor的启动或者结束
      • 对于新启动的Container的处理
      • 对于结束的Container的处理
      • 基于资源分配结果进行任务调度
        • PendingTask的生成:
          • TaskLocation的构造过程
          • 根据TaskLocation信息,将Task添加到不同的pendingTask数组中
          • 可用的LocationLevel的计算
          • 基于Locality的Task调度
          • 结语

            前言

            在TODO这篇文章中,介绍了Spark RPC通信的基本流程。可以看到,Spark中的Driver通过Stage调度生成了物理执行计划,这个物理执行计划包括了所有需要运行的Task,以及,最关键的,这些Task希望运行的节点信息,我们叫做Locality Preference,即本地性偏好。

            但是在Yarn的场景下,资源是以Executor(Container)为单位进行调度,整个Container的粒度与Task不一一对应,Container的生命周期与Task不一一对应。在这种场景下,动态资源调度的基本任务就是

            • 根据这些Task以及Task的本地性偏好,向Yarn申请Container作为Executor以执行Task;这里最重要的,是在资源申请中表达出对应的Task的位置偏好。
            • 在申请完资源以后,根据Task的本地性偏好,将Task调度到申请到的资源上面去。这里最重要的,是尽量满足Task的本地性偏好。

              本文将详细讲解这个过程。

              对于资源申请算法的基本流程,以及将Task和资源进行匹配的基本流程,本文都用实际例子进行讲解。

              基于任务需求进行资源请求的整体过程

              向Yarn请求资源是由客户端向ApplicationMaster申请,然后ApplicationMaster向Yarn发起请求的,而不是客户端直接向Yarn申请的。

              资源是为了服务于Task的运行,Task的生成显然是Driver端负责的,Driver会根据物理执行计划生成的Task信息发送给ApplicationMaster,ApplicationMaster根据这些Task的相关信息进行资源申请。

              ApplicationMaster启动以后,会有一个独立线程不断通过调用YarnAllocator.allocateResources()进行持续的资源更新(查看ApplicationMaster的launchReporterThread()方法)。这里叫资源更新,而不叫资源申请,因为这里的操作包括新的资源的申请,旧的无用的Container的取消,以及Blocklist Node的更新等多种操作。

              总而言之,ApplicationMaster作为客户端和Yarn的中间方,其资源申请的方法allocateResource()在逻辑上的功能为:

              1. 粒度转换: 将Task级别的资源请求,转换为Container(Executor级别的资源请求)。这是一个游戏到粗的粒度的转换。
              2. 维度转换: Driver发过来的资源请求是资源的最终全局状态,而Yarn 的 API 要求的针对每一个Container进行增量请求。因此,allocateResources()会将Driver发送过来资源请求的最终状态,对比当前系统已经运行、分配未运行、已经发送请求但是还没有分配资源等等已经存在的状态,确定一个发送给Yarn的增量请求状态。这是一个全量到增量的维度的转换。
              3. 角度转换: Driver发过来的每个Task都带有各自Task的Locality,而发送给Yarn的Container请求又是带有Locality需求的Container需求。这是一个从Task到Container的角度的转换。

              ApplicationMaster端的allocateResources()方法的基本流程在代码YarnAllocator.allocateResources()中:

                 ---------------------------------- YarnAllocator ------------------------------------
                def allocateResources(): Unit = synchronized {
                  
                  updateResourceRequests() // 与Yarn进行资源交互
                  
                  val allocateResponse = amClient.allocate(progressIndicator) // 从Yarn端获取资源结果,包括新分配的、已经结束的等等
                  val allocatedContainers = allocateResponse.getAllocatedContainers()
              	handleAllocatedContainers(allocatedContainers.asScala) // 处理新分配的Container
                  val completedContainers = allocateResponse.getCompletedContainersStatuses() // 处理已经结束的Container
                   processCompletedContainers(completedContainers.asScala)
                  }
                }
              

              ApplicationMaster端的allocateResources()的基本流程如下图所示:

              Spark的动态资源分配算法

              1. 生成资源请求,即将Driver发送过来的全量的、Task粒度的资源请求和Host偏好信息,转换为对Yarn的、以Executor为粒度的资源请求
                updateResourceRequests()
                
              2. 将资源请求发送给Yarn并从Yarn上获取分配结果(基于Yarn的异步调度策略,这次获取的记过并非本次资源请求的分配结果)以进行后续处理:
                val allocatedContainers = allocateResponse.getAllocatedContainers()
                handleAllocatedContainers(allocatedContainers.asScala)
                val completedContainers = allocateResponse.getCompletedContainersStatuses()
                processCompletedContainers(completedContainers.asScala)
                

              可以看到,updateResourceRequests()是资源请求的核心方法,它会负责同Yarn进行通信以进行资源请求。

              在TODO中,我们也介绍过,生成资源请求,其决策过程发生在方法updateResourceRequests()中。我们主要来看updateResourceRequests()方法:

                 ---------------------------------- YarnAllocator ------------------------------------
                def updateResourceRequests(): Unit = {
                  // 获取已经发送给Yarn但是待分配的ContainerRequest,计算待分配容器请求的数量
                  // 这些ContainerRequest是之前通过调用amClient.addContainerRequest 发送出去的
                  val pendingAllocate = getPendingAllocate
                  val numPendingAllocate = pendingAllocate.size
                  // 还没有发送请求的executor的数量
                  val missing = targetNumExecutors - numPendingAllocate -
                    numExecutorsStarting.get - numExecutorsRunning.get
                  // 还没有发送给Yarn的资源请求
                  if (missing > 0) {      
                      /**
                     * 将待处理的container请求分为三组:本地匹配列表、本地不匹配列表和非本地列表。
                     */
                    val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
                      hostToLocalTaskCounts, pendingAllocate)
                    // staleRequests 的意思是,ApplicationMaster已经请求了这个Container,
                    // 但是这个ContainerRequest所要求的hosts里面没有一个是在 hostToLocalTaskCounts (即task所倾向于)中的,因此,需要取消这个Container Request,因为已经没有意义了
                    // cancel "stale" requests for locations that are no longer needed
                    staleRequests.foreach { stale =>
                      amClient.removeContainerRequest(stale)
                    }
                    val cancelledContainers = staleRequests.size
               
                    // consider the number of new containers and cancelled stale containers available
                    // 将新的container请求,以及刚刚取消的container,作为available container
                    val availableContainers = missing + cancelledContainers
                    // to maximize locality, include requests with no locality preference that can be cancelled
                    // 在availableContainers的基础上,再算上没有任何locality要求的并且还没有分配成功的container
                    val potentialContainers = availableContainers + anyHostRequests.size
                    // LocalityPreferredContainerPlacementStrategy,计算每一个Container 的Node locality和 Rack locality
                    val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
                      potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
                        allocatedHostToContainersMap, localRequests)
                    val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
                    // 遍历ContainerLocalityPreferences数组中的每一个ContainerLocalityPreferences
                    containerLocalityPreferences.foreach {
                      case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
                        newLocalityRequests += createContainerRequest(resource, nodes, racks)// 根据获取的locality,重新创建ContainerRequest请求
                    }
                    // 除了有locality需求的container以外,还有更多的available container需要被请求,因此对这些container请求也发送出去
                    if (availableContainers >= newLocalityRequests.size) {
                      // more containers are available than needed for locality, fill in requests for any host
                      for (i 
                        newLocalityRequests += createContainerRequest(resource, null, null) // 构造ContainerRequest对象
                      }
                    } else {
                      val numToCancel = newLocalityRequests.size - availableContainers
                      // cancel some requests without locality preferences to schedule more local containers
                      anyHostRequests.slice(0, numToCancel).foreach { nonLocal =
                        amClient.removeContainerRequest(nonLocal)
                      }
                    }
                  } else if (numPendingAllocate > 0 && missing  
              

              其基本过程为:

              1. 获取当前Pending的request(已经发送给Yarn但是还没有分配Container的请求),并将这些Pending的请求按照本地性的需求进行切分。这里的基本意图是,当前收到了来自Driver的全局的资源状态信息,而在Yarn上还有一部分之前的资源请求还没有分配Container,那么,会不会这些Pending Requewt中有些Request已经不需要了(满足不了任何一个task的locality需求)?

                      val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
                        hostToLocalTaskCounts, pendingAllocate)
                

                切分的过程,就是查看当前在Yarn这一端pending的所有的Container的locality与我们目前需求的所有(全局)的Task的locality的交集:

                -------------------------------------- YarnAllocator ----------------------------------
                  private def splitPendingAllocationsByLocality(
                      hostToLocalTaskCount: Map[String, Int], // 每一个host到希望分配上去的task的数量
                      pendingAllocations: Seq[ContainerRequest] // 还没有分配出去的ContainerRequest
                    ): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = {
                    val localityMatched = ArrayBuffer[ContainerRequest]()
                    val localityUnMatched = ArrayBuffer[ContainerRequest]()
                    val localityFree = ArrayBuffer[ContainerRequest]()
                    val preferredHosts = hostToLocalTaskCount.keySet
                    // 将当前已经发送给Yarn但是还没有分配的Container的请求进行切分
                    pendingAllocations.foreach { cr =>
                      val nodes = cr.getNodes // 这个 ContainerRequest 对节点的要求
                      if (nodes == null) {
                        localityFree += cr // 这个ContainerRequest对nodes没有要求,那么就是对本地性没有要求的Container请求
                      } else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) { // 这个Container的本地性要求和task期望分配的hosts集合有交集
                        localityMatched += cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去
                      } else { // 这个Container的本地性要求和task期望分配的hosts集合没有交集
                        localityUnMatched += cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去
                      }
                    }
                    // 切分结果 (localRequests, staleRequests, anyHostRequests)
                    (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
                  }
                }
                

                切分过程的具体流程如下图所示:

                Spark的动态资源分配算法

                切分的具体流程为 :

                • 如果这个Pending Container没有任何的locality要求,那么就是localityFree Container,即,其实际分配的位置有可能是当前所有tasks所希望的位置,也可能不是,那么这个container就是localityFree container
                  if (nodes == null) {
                          localityFree += cr // 这个ContainerRequest对nodes没有要求,那么就是对本地性没有要求的Container请求
                        } 
                  
                • 如果这个Pending Container有locality 要求,并且这个locality的nodes与当前所有tasks有交集,那么这个Pending Container就被划分为localityMatched,显然,这个Pending Container是不应该被取消的;
                  else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) { // 这个Container的本地性要求和task期望分配的hosts集合有交集
                          localityMatched += cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去
                        }
                  
                • 如果这个Pending Container有locality要求,但是这个locality中的nodes不在当前所有tasks的locality中的任何一个节点,即这个Pending Container实际分配的位置不可能是任何一个task所倾向于的位置,那么这个Pending Container就是localityUnMatched,显然,localityUnMatched container目前无法放置任何一个task,需要取消掉;
                  else { // 这个Container的本地性要求和task期望分配的hosts集合没有交集
                          localityUnMatched += cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去
                        }
                  
                • 对于localityUnmatched container,向Yarn发送请求,取消这种Container,这些被取消的Container在后面会重新申请,以便在申请的资源总量不变的情况下增强资源的本地特性:

                   staleRequests.foreach { stale =>
                     amClient.removeContainerRequest(stale)
                   }
                  
                • 计算总的Container的数量,包括:

                  • Pending Container中刚刚cancel的container的数量,这些Container刚刚取消了,我们可以再次申请这些Container,但是肯定会增强这些新的资源请求的locality,以最大化我们的Task的locality
                  • Pending Container中的locality free的Container数量,这些Container可能分配在集群中的任何地方
                  • 新增(missing)的Container请求,即当前的总的container请求中除去正在运行(已经有task在运行,numExecutorsRunning)和正在启动(已经分配但是还没分配task,numExecutorsStarting)的,再除去所有的pending的container(numPendingAllocate,是从Yarn的API中获取的数量,已经请求但是还没有分配成功的资源),多出来的Container:
                    	// 还没有发送请求的executor的数量
                    	val missing = targetNumExecutors - numPendingAllocate -
                    	 numExecutorsStarting.get - numExecutorsRunning.get 
                    	.....
                    	// consider the number of new containers and cancelled stale containers available
                    	// 将新的container请求,以及刚刚取消的container,作为available container
                    	val availableContainers = missing + cancelledContainers
                    	
                    	// to maximize locality, include requests with no locality preference that can be cancelled
                    	// 在availableContainers的基础上,再算上没有任何locality要求的并且还没有分配成功的container
                    	val potentialContainers = availableContainers + anyHostRequests.size
                    

                    在这里,val availableContainers = missing + cancelledContainers,即available container代表这次可以增量申请的最大的container数量,包括了这次的额外需求,以及刚刚取消的container(取消的container可以重新申请)

                  • 构建Container请求。这里会根据LocalityPreferredContainerPlacementStrategy的localityOfRequestedContainers来构建Container请求,返回Array[ContainerLocalityPreferences],每一个ContainerLocalityPreferences代表了一个带有对应host和rack信息的Container请求:

                       val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
                         potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
                           allocatedHostToContainersMap, localRequests)
                    
                  • 根据ContainerLocalityPreferences,转换成Yarn的 ContainerRequest

                       containerLocalityPreferences.foreach {
                         case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
                           newLocalityRequests += createContainerRequest(resource, nodes, racks)// 根据获取的locality,重新创建ContainerRequest请求
                         case _ =>
                       }
                    
                  • 如果可以申请的Container(available container)的数量大于刚刚计算完locality的Container数量,那么,为了将申请配额用尽,就再申请其相差的部分Container, 保证申请的Container的数量不小于Available Container的数量。

                     if (availableContainers >= newLocalityRequests.size) {
                       // more containers are available than needed for locality, fill in requests for any host
                       for (i 
                         newLocalityRequests += createContainerRequest(resource, null, null) // 构造ContainerRequest对象
                       }
                     }
                    
                           val numToCancel = newLocalityRequests.size - availableContainers
                           // cancel some requests without locality preferences to schedule more local containers
                           anyHostRequests.slice(0, numToCancel).foreach { nonLocal =
                             amClient.removeContainerRequest(nonLocal)
                           }
                    
                  • 调用Yarn的标准接口addContainerRequest(),将ContainerRequest发送给Yarn(其实这个接口并不会真正将请求发送出去,只会存放在RMAMClient端,真正发送是通过allocate()接口):

                    newLocalityRequests.foreach { request =>
                      amClient.addContainerRequest(request)
                    } // 在这里发送container的请求,从日志来看,资源请求已经发出来了,Yarn已经处理了
                    

              所以,从上面可以看到,最关键的方法是LocalityPreferredContainerPlacementStrategy.localityOfRequestedContainers()方法,它根据当前的已有信息(总共的Container需求,有locality需求的task的数量,这些locality分布在每一个task上的数量等),生成一个Array[ContainerLocalityPreferences]数组,数组中的每一个元素代表了一个Container的需求,并包含了其locality的要求信息,然后基于生成的ContainerLocalityPreferences经过转换成ContainerRequest,发送给Yarn。

              资源申请的生成过程详解

              资源申请的生成,就是根据当前集群运行的基本情况,Task的基本需求,生成Yarn上的资源请求的过程。

              资源申请的生成过程的简单例子

              在了解其具体实现以前,我们以具体例子的方式,看一下localityOfRequestedContainers()方法的基本实现逻辑,从而对其动机和达成的效果有一个很好的理解,然后,我们再看其实现细节。

              1. 从任务调度去看,看到的是Task以及每个 Task的Locality倾向。比如,现在我们一共需要为30个Task分配资源,其中,20个Task的locality倾向为Host1,Host2,Host3,10个Task的Locality倾向为Host1, Host2, Host4, 因此,对应到每个Host上的Task权重如下表所示:

                Host 1Host 2Host 3Host 4
                20 Tasks202020
                10 Tasks101010
                Sum of Tasks30302010

                即,20个Task希望分配在Host1,Host2,Host3中的任何一个,有10个Task希望分配在Host1, Host2, Host4中的任何一个。如上表所示,综合来看,所有Task在四台机器上分配的权重是(30, 30, 20,10)。

              2. 假设一个Task需要的vCore是1,而一个Container(Executor)有2个vCore,因此,转换成Container以后的结果如下表所示:

                Host 1Host 2Host 3Host 4
                20 Tasks202020
                10 Tasks101010
                Sum of Tasks30302010
                Sum of Containers1515105

                上面的Sum of Containers的数字只是表示一个比例值,并不表示对应的Host上实际需要申请的Container的数量,我们实际需要的总的Container数量才15个。那么,这15个Container需求平均到每台Host上是多少呢?

              3. 比如Host 1的Sum of Container 为15, 所有Host的Sum of Container 是45,因此占比是1/3,所以平均下来分配到Host1上的Container数量应该是 15 * 1/3 = 5。经过向上取整(宁可稍微多分配也不要少分配)以后,每台机器所平均到的15个Container需求是:

                Host 1Host 2Host 3Host 4
                20 Tasks202020
                10 Tasks101010
                Sum of Tasks30302010
                Sum of Containers1515105
                Allocated Container Target5542
              4. 在这里,计算完总的Allocated Container Target以后,需要减去当前已经在该Host上已经存在(正在运行或者在这个Host上pending的Container),因为我们最终发送给Yarn的Container请求是增量请求。假设现在在每一个Host上已经存在的Container数量都是1,即15个Container中有4个Container是已经分配的,那么,减去已经存在的Container数量以后的结果如下表所示,所以,我们需要新申请12个Container:

                Host 1Host 2Host 3Host 4
                20 Tasks202020
                10 Tasks101010
                Sum of Tasks30302010
                Sum of Containers1515126
                Allocated Container Target5542
                Newly Allocated Container4431
              5. 将每个Host的Newly Allocated Container按照比例进行缩放,保证比例最大的那个Host(这里是Host1 和 Host 2)的比例值是需要新申请的Container的数量。在这里,扩大因子应该是 12(Container的总数量)/4(比例最大的Host的Average Allocated Container) = 3 :

                Host 1Host 2Host 3Host 4
                20 Tasks202020
                10 Tasks101010
                Sum of Tasks30302010
                Sum of Containers1515105
                Allocated Container Target5542
                Newly Allocated Container4431
                Round Up121293
              6. 开始发起资源请求。每一个Container请求的Locality中包含的Host如下表所示:

                Host 1Host 2Host 3Host 3
                3 Containers
                6 Containers
                3 Containers

                其含义是:

                • 3个Container请求的Locality是[Host1, Host2, Host3, Host4],即请求Yarn分配3个Container,并且尽量将它们分配在这4个Hosts中。此时剩下的Host比例为[9:9:6:0]
                • 6个Container请求的Locality是[Host1, Host2, Host3],即请求Yarn分配6个Container,并且尽量将它们分配在这3个Hosts中,此时剩下的Host中container比例为[3:3:0]
                • 3个Container请求的Locality是[Host1, Host2],即请求Yarn分配3个Container,并且尽量将它们分配在这2个Hosts中

                  这样,所有Host的Container比例就是12:12:9:3,平均到12个需分配的Container以后的比例是4:4:3:1,再加上已经分配在每个host上的1个Container,那么总的Container在每个Host上的比例就是5:5:4:2,这个比例和我们直接根据每个Host的task比例折算成的Container的比例15:15:10:5是大致相近的。

                  到了这里,我们可以理解了,为什么我们需要在 步骤5 做Round Up操作,并且Round Up的目标是将目前比例值最大的Host的比例值扩大为当前Container需求的最大值? 因为在步骤6中生成Container请求的时候,比例值最大的Host的比例值肯定是等于需要申请的Container数量的。

              资源调度算法的代码解析

              上面以实际例子解释了Spark将当前的Task的Locality需求信息转换成Yarn的资源请求的细节。下面,我们结合代码,详细看一下localityOfRequestedContainers()方法的实现细节:

                def localityOfRequestedContainers(
                    numContainer: Int, // 需要进行计算的container的数量,包括missing的,cancel掉的(本地性不符合任何task要求的pending container),以及对本地性没有要求的pending的container
                    numLocalityAwareTasks: Int, // 对locality有要求的task的数量,这个是Driver端通过stageIdToExecutorPlacementHints计算然后通过RequestExecutor传递过来的
                    hostToLocalTaskCount: Map[String, Int], // 在Stage提交了以后,这个map里面保存了从host到期望分配到这个host的task的数量,这个是Driver端通过stageIdToExecutorPlacementHints传递过来的
                    allocatedHostToContainersMap: HashMap[String, Set[ContainerId]], // 已经launch起来的host -> container的映射关系
                    localityMatchedPendingAllocations: Seq[ContainerRequest] // 对本地性有要求的pending的container
                  ): Array[ContainerLocalityPreferences] = {
                  //  预期的从host到期望在上面再launch的新的container数量的映射关系
                  val updatedHostToContainerCount = expectedHostToContainerCount(
                    numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
                      localityMatchedPendingAllocations)
                  // 希望再launch的所有Host上的container的数量之和,在这里的例子中,是15
                  val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
                  // The number of containers to allocate, divided into two groups, one with preferred locality,
                  // and the other without locality preference.
                  //  没有locality需求的container的数量
                  val requiredLocalityFreeContainerNum =
                    math.max(0, numContainer - updatedLocalityAwareContainerNum)
                  //  有locality需求的container的数量
                  val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum
                  val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()
                  if (requiredLocalityFreeContainerNum > 0) { // 如果有container是没有locality需求的
                    for (i 
                      containerLocalityPreferences += ContainerLocalityPreferences( // 为这些没有locality需求的container一一创建container需求
                        null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
                    }
                  }
                  if (requiredLocalityAwareContainerNum  0) { // 如果有container有locality需求
                    val largestRatio = updatedHostToContainerCount.values.max // 全局的所有host中最大的container数量
                    // Round the ratio of preferred locality to the number of locality required container
                    // number, which is used for locality preferred host calculating.
                    var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) =>
                      val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
                      (k, adjustedRatio.ceil.toInt) // 往上取整
                    }
                    // 每个有locality需求的的Container request,为他们确定对应的hosts和rack
                    for (i 
                      // Only filter out the ratio which is larger than 0, which means the current host can
                      // still be allocated with new container request.
                      val hosts = preferredLocalityRatio.filter(_._2  0).keys.toArray // 还有container可以分配的一个或者多个hosts
                      val racks = hosts.map { h =>
                        resolver.resolve(yarnConf, h) // 解析这些host所在的rack
                      }.toSet
                      // 每一个ContainerLocalityPreferences代表一个Container
                      containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
                      // Minus 1 each time when the host is used. When the current ratio is 0,
                      // which means all the required ratio is satisfied, this host will not be allocated again.
                      preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }
                    }
                  }
                  // containerLocalityPreferences中的每一项都会变成一个新的Container Request
                  containerLocalityPreferences.toArray
                }
              

              其参数的基本含义是:

              • numContainer: Int 需要进行计算的Container的数量,即可能进行分配的Container数量,包括Miss的container(还没有申请的Container),Cancel掉的(本地性不符合任何task要求,因此已经从Yarn上取消的pending container)。同时,还包括Pending Container中对本地性没有要求的Container,这一部分Container也是我们重新申请的对象,以最大化Locality。上文讲到过的updateResourceRequests()方法中的potentialContainers就是传入到该方法的numContainers参数:

                   // 将新的container请求,以及刚刚取消的container,作为available container
                   val availableContainers = missing + cancelledContainers
                   // to maximize locality, include requests with no locality preference that can be cancelled
                   // 在availableContainers的基础上,再算上没有任何locality要求的并且还没有分配成功的container
                   val potentialContainers = availableContainers + anyHostRequests.size
                
              • numLocalityAwareTasks: Int 对locality有要求的task的数量,这个是Driver端通过对stageIdToExecutorPlacementHints计算然后通过RequestExecutor传递过来的数值。已经说过,这是此时的全局状态量,而不是一个增量;

              • hostToLocalTaskCount: Map[String, Int] 在Stage提交了以后,这个map里面保存了从host到期望分配到这个host的task的数量,这个是Driver端通过stageIdToExecutorPlacementHints传递过来的,具体过程是:

                • 在Driver端,ExecutorAllocationManager的onStageSubmitted回调中,会将这个Stage的task preference存放在stageIdToExecutorPlacementHints中。

                  ----------------------------------------- ExecutorAllocationManager ----------------------------------------
                  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
                    .....
                      // 计算这个stage在每一个host上的task数量
                      // Compute the number of tasks requested by the stage on each host
                      var numTasksPending = 0
                      val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()
                      stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
                        // 对于每一个task的prefered location的list
                        numTasksPending += 1
                          // 对于这个task的每一个 preferred location
                        locality.foreach { location => // 对于这个locality中的每一个location
                         // 这个host上的task的数量+1
                         val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
                         hostToLocalTaskCountPerStage(location.host) = count
                        }
                      }
                      // 这个map的key是stage id,value是一个元组,记录了这个stage的pending的task的数量,以及从host到task count的map信息
                      stageIdToExecutorPlacementHints.put(stageId,
                        (numTasksPending, hostToLocalTaskCountPerStage.toMap))
                      updateExecutorPlacementHints() 
                  
                • 随后,ExecutorAllocationManager会有线程不断将这些信息通过RequestExecutors发送给远程的ApplicationMaster:

                    def start(): Unit = {
                      listenerBus.addToManagementQueue(listener)
                      val scheduleTask = new Runnable() {
                        override def run(): Unit = {
                          schedule() // 这里会根据需要更新numExecutorsTarget的数量,也会调用
                        }
                      }
                      executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
                      client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
                    }
                  
                • allocatedHostToContainersMap: HashMap[String, Set[ContainerId]] 已经launch起来的host -> container的映射关系。这是updateResource()方法每次通过Yarn的标准API allocate()向Yarn询问以后获取的结果。我们说过,allocate()接口用来向Yarn发送本次的资源请求,并返回当前Yarn为这个Application分配的Container的结果。由于Yarn端的资源分配是异步分配,因此allocate()返回的结果并非是这次请求的资源的分配结果,而是两次相邻的allocate()请求发生之间的新产生的资源分配结果;

                • localityMatchedPendingAllocations: Seq[ContainerRequest] 对本地性有要求的pending的container,其在方法splitPendingAllocationsByLocality()中对Pending的Container的Locality状态进行切分后,那些与当前请求的Task的Locality有交集的Pending Container将作为已经存在的Container,整个资源请求的目标,是使得新申请的Container和已经分配的Container加起来,其资源倾向和所有Task的统计倾向尽量匹配,从而最大程度满足Task的本地性需求。

                  localityOfRequestedContainers()算法的基本过程为:

                  1. 计算每一个Host上应该新分配的Container的数量的预期值。由于是新分配的Container的预期值,因此需要先根据每个Host上的预期存在的Container的总的数量,减去该Host上已经存在的Container:

                    val updatedHostToContainerCount = expectedHostToContainerCount(
                      numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
                        localityMatchedPendingAllocations)
                    

                    这里的计算,就是完成下表中从Sum of Tasks(每个机器上分配到的Task的比例) 到 Sum of Containers (每个机器上分配的Container的比例)的转换,然后根据Sum of Containers 减去每台机器上已经分配的Container,就得到了Average Allocated Container Total(每台机器上应该新分配的Container的数量):

                    Host 1Host 2Host 3Host 4
                    20 Tasks202020
                    10 Tasks101010
                    Sum of Tasks30302010
                    Sum of Containers1515126
                    Allocated Container Target5542
                    Newly-Allocated Container4431
                  2. 根据上面计算的分配结果,统计没有locality需求的Container的总数量和有locality需求的Container数量:

                    // 希望再launch的所有Host上的container的数量之和,在这里的例子中,是15
                    val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
                    // The number of containers to allocate, divided into two groups, one with preferred locality,
                    // and the other without locality preference.
                    //  没有locality需求的container的数量
                    val requiredLocalityFreeContainerNum =
                     math.max(0, numContainer - updatedLocalityAwareContainerNum)
                    //  有locality需求的container的数量
                    val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum
                    
                  3. 先为 没有locality需求的Container 构造ContainerLocalityPreferences,每一个ContainerLocalityPreferences对象对应了一个Container请求和这个请求的Locality需求。可以看到,这种没有Locality需求的Container的Host 偏好和Rack 偏好都是空的:

                    val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()
                    if (requiredLocalityFreeContainerNum > 0) { // 如果有container是没有locality需求的
                     for (i 
                       containerLocalityPreferences += ContainerLocalityPreferences( // 为这些没有locality需求的container一一创建container需求
                         null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
                     }
                    }
                     case(k, ratio) =
                       val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
                       (k, adjustedRatio.ceil.toInt) // 往上取整
                     }
                    

                    如下图所示,这里就是完成Round Up 这一步骤,将需要新分配的Container数量成比例放大,保证Container比例最大的Host(这里是Host1 和 Host 2)放大以后的值刚好等于需要分配的有Locality Preference 的 Container的总数量。

                    Host 1Host 2Host 3Host 4
                    20 Tasks202020
                    10 Tasks101010
                    Sum of Tasks30302010
                    Sum of Containers1515105
                    Allocated Container Target5542
                    Newly Allocated Container4431
                    Round Up121293

                    4.2 放大完成以后,开始进行分配。

                    当前有12 个Container需要分配,每一个Host的分配比例为(12,12,9,3)。分配过程上文已经经过,其代码如下:

                      // 每个有locality需求的的Container request,为他们确定对应的hosts和rack
                      for (i 
                        // Only filter out the ratio which is larger than 0, which means the current host can
                        // still be allocated with new container request.
                        val hosts = preferredLocalityRatio.filter(_._2  0).keys.toArray // 还有container可以分配的一个或者多个hosts
                        val racks = hosts.map { h =>
                          resolver.resolve(yarnConf, h) // 解析这些host所在的rack
                        }.toSet
                        // 每一个ContainerLocalityPreferences代表一个Container
                        containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
                        // Minus 1 each time when the host is used. When the current ratio is 0,
                        // which means all the required ratio is satisfied, this host will not be allocated again.
                        preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }
                      }
                    

                  申请资源以后的处理:Executor的启动或者结束

                  上面讲过,资源调度的入口方法allocateResources()会通过updateResourceRequests()来计算所需资源并向Yarn进行资源的更新,包括申请新的资源、释放无用的资源等:

                    def allocateResources(): Unit = synchronized {
                      
                      updateResourceRequests() // 
                      
                      val allocateResponse = amClient.allocate(progressIndicator)
                      val allocatedContainers = allocateResponse.getAllocatedContainers()
                  	handleAllocatedContainers(allocatedContainers.asScala)
                      val completedContainers = allocateResponse.getCompletedContainersStatuses()
                       processCompletedContainers(completedContainers.asScala)
                      }
                    }
                  

                  通过调用Yarn的标准API allocate(),获取了资源分配的结果。再次强调,Yarn这一端的资源调度是异步调度,因此这个资源分配的结果并不是刚刚通过addContainerRequest()进行资源申请的结果,只是调用者在两次调用allocate() API的之间Yarn对于这个Application的新的资源分配结果。拿到了分配的Container,Spark就可以将Executor启动起来了(注意,是启动一个空的Executor,不是启动Task)。启动起来的Executor随后就会向DriverEndpoint注册自己,通信的详细过程参考TODO。这里不再赘述。

                  对分配结果的处理,主要是处理已经分配的Container以及已经运行结束的Container:

                      val allocatedContainers = allocateResponse.getAllocatedContainers()
                      handleAllocatedContainers(allocatedContainers.asScala)
                      val completedContainers = allocateResponse.getCompletedContainersStatuses()
                      processCompletedContainers(completedContainers.asScala)
                  
                  1. 对于已经分配的Container,需要从Yarn的AMRMClient中将对应的资源请求删除,避免对同一个资源进行多次重复申请,然后启动对应的Executor。
                  2. 对于已经完成的Container,需要根据Container的退出状态,记录相关日志,同时,需要向Driver发送RemoveExecutor消息告知Driver这个Container的结束,Driver端会进行相关状态的维护。

                  对于新启动的Container的处理

                  对于一个刚刚分配成功的Container,其处理工作主要包括两个

                  • 一是从AMRMClient中将对应的资源请求删除,避免同一资源请求的Container被重复申请;
                  • 然后,在远程的NodeManager节点上启动Container。

                    这些过程在方法handleAllocatedContainers()中进行:

                    --------------------------------- YarnAllocator --------------------------------------
                    def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
                      val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
                      // 先处理Host Match的Container
                      val remainingAfterHostMatches = new ArrayBuffer[Container]
                      for (allocatedContainer 
                        matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
                          containersToUse, remainingAfterHostMatches)
                      }
                      // 处理Host Match以后剩余的Container
                      val remainingAfterRackMatches = new ArrayBuffer[Container]
                      for (allocatedContainer 
                        val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
                        matchContainerToRequest(allocatedContainer, rack, containersToUse,
                          remainingAfterRackMatches)
                      }
                      // 处理Host Match和Rack Match以后剩余的Container
                      val remainingAfterOffRackMatches = new ArrayBuffer[Container]
                      for (allocatedContainer 
                        matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
                          remainingAfterOffRackMatches)
                      }
                      // 在Host Match,Rack Match,以及ANY_HOST Match以后,依然还有剩余的Container,这只能是Bug
                      if (!remainingAfterOffRackMatches.isEmpty) {
                        for (container 
                          internalReleaseContainer(container)
                        }
                      }
                      /**
                       * 在这里会打印 Launching container container_1714042499037_5294_01_000002 on host
                        */
                      runAllocatedContainers(containersToUse)
                      }
                    
                     matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
                       containersToUse, remainingAfterHostMatches)
                    }
                    
                        // 这个Container的资源特性
                        val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,
                              resource.getVirtualCores)
                        // 以Priority,Resource(VCore, Memory),location作为ID,删除这个Container对应的资源请求
                        val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
                          matchingResource)
                        if (!matchingRequests.isEmpty) { // 匹配成功
                          val containerRequest = matchingRequests.get(0).iterator.next
                          amClient.removeContainerRequest(containerRequest) // 从AMRMClient中删除
                          containersToUse += allocatedContainer
                        } else {
                          remaining += allocatedContainer // 未匹配的Container放入remaining,接着进行其他匹配
                        }
                      }
                    
                      /**
                       * SparkRackResolver.
                       */
                      val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
                      matchContainerToRequest(allocatedContainer, rack, containersToUse,
                        remainingAfterRackMatches)
                    }
                    
                      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
                        remainingAfterOffRackMatches)
                    }
                    
                      for (container 
                        internalReleaseContainer(container)
                      }
                    }
                    
                        for (container  // 对于已经allocate并且资源已经匹配的container
                          executorIdCounter += 1
                          val executorHostname = container.getNodeId.getHost
                          val containerId = container.getId
                          val executorId = executorIdCounter.toString // 分配executorId
                          def updateInternalState(): Unit = synchronized {
                            numExecutorsRunning.incrementAndGet()
                            numExecutorsStarting.decrementAndGet()
                            executorIdToContainer(executorId) = container // executor 和 container的映射关系
                            containerIdToExecutorId(container.getId) = executorId // container 和 executor 的映射关系
                            /**
                             * Container launch起来以后,更新allocatedHostToContainersMap
                             */
                            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
                              new HashSet[ContainerId])
                            containerSet += containerId
                            allocatedContainerToHostMap.put(containerId, executorHostname)
                          }
                          numExecutorsStarting.incrementAndGet()
                          launcherPool.execute(new Runnable {
                              override def run(): Unit = {
                                              try {
                                  new ExecutorRunnable(
                                    ........
                                  ).run() // 运行ExecutorRunnable,用来和NodeManager通信来启动Container
                                  updateInternalState()
                              }
                            })
                        }
                      }
                    
                     override def run(): Unit = {
                       try {
                         new ExecutorRunnable(
                           Some(container),
                           conf,
                           sparkConf,
                           driverUrl,
                           executorId,
                           executorHostname,
                           executorMemory,
                           executorCores,
                           appAttemptId.getApplicationId.toString,
                           securityMgr,
                           localResources
                         ).run()
                    
                         numExecutorsRunning.incrementAndGet()
                         numExecutorsStarting.decrementAndGet()
                         executorIdToContainer(executorId) = container // executor 和 container的映射关系
                         containerIdToExecutorId(container.getId) = executorId // container 和 executor 的映射关系
                         /**
                          * Container launch起来以后,更新allocatedHostToContainersMap
                          */
                         val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
                           new HashSet[ContainerId])
                         containerSet += containerId
                         allocatedContainerToHostMap.put(containerId, executorHostname)
                       }
                    
                      for (completedContainer 
                        val containerId = completedContainer.getContainerId
                        val alreadyReleased = releasedContainers.remove(containerId) //  alreadyReleased记录这个Container是否已经被释放
                        val hostOpt = allocatedContainerToHostMap.get(containerId)
                        val onHostStr = hostOpt.map(host = s" on host: $host").getOrElse("")
                        val exitReason = if (!alreadyReleased) { // 这个Container还没有释放,那么走释放流程
                          // Decrement the number of executors running. The next iteration of
                          // the ApplicationMaster's reporting thread will take care of allocating.
                          numExecutorsRunning.decrementAndGet()
                          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
                          // there are some exit status' we shouldn't necessarily count against us, but for
                          // now I think its ok as none of the containers are expected to exit.
                          val exitStatus = completedContainer.getExitStatus
                          val (exitCausedByApp, containerExitReason) = exitStatus match {
                            case ContainerExitStatus.SUCCESS =
                              .....
                            case ContainerExitStatus.PREEMPTED =
                             ....
                            case VMEM_EXCEEDED_EXIT_CODE =
                              ....
                            case PMEM_EXCEEDED_EXIT_CODE =
                              ....
                          }
                          if (exitCausedByApp) {
                            logWarning(containerExitReason)
                          } else {
                            logInfo(containerExitReason)
                          }
                          ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
                        } else {
                          // 如果我们释放了这个Container,那么说明一定是Driver直接通过 killExecutor
                          // 释放掉了这个Container,而不是它自行结束
                          ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
                            s"Container $containerId exited from explicit termination request.")
                        }
                        // 解除host - container 以及 container -> host mapping
                        for {
                          host 
                          containerSet.remove(containerId) // 删除这个container
                          if (containerSet.isEmpty) { // 这个container是这个host上的最后一个container
                            allocatedHostToContainersMap.remove(host) // 删除host
                          } else {
                            allocatedHostToContainersMap.update(host, containerSet)
                          }
                          allocatedContainerToHostMap.remove(containerId) // 解除container - host map
                        }
                        // 解除container - executor mapping
                        containerIdToExecutorId.remove(containerId).foreach { eid =>
                          executorIdToContainer.remove(eid)
                          ....
                          if (!alreadyReleased) {
                            // The executor could have gone away (like no route to host, node failure, etc)
                            // Notify backend about the failure of the executor
                            numUnexpectedContainerRelease += 1
                            driverRef.send(RemoveExecutor(eid, exitReason))
                          }
                        }
                      }
                    }
                    

                    可以看到,方法processCompletedContainers()会遍历Yarn返回的每一个Completed(注意,Completed只是代表Container运行结束,但是运行结果可能是Succeed可能是Fail),然后逐个处理:

                    1. 如果Container此时并没有被释放,说明Container是自行结束,而不是Driver所杀死的。根据Container的退出状态和退出原因,打印日志:

                      val alreadyReleased = releasedContainers.remove(containerId) //  alreadyReleased记录这个Container是否已经被释放
                      val exitReason = if (!alreadyReleased) {
                      	val alreadyReleased = releasedContainers.remove(containerId) //  alreadyReleased记录这个Container是否已经被释放
                             val exitStatus = completedContainer.getExitStatus
                             val (exitCausedByApp, containerExitReason) = exitStatus match {
                               case ContainerExitStatus.SUCCESS =>
                                 .....
                               case ContainerExitStatus.PREEMPTED =>
                                ....
                               case VMEM_EXCEEDED_EXIT_CODE =>
                                 ....
                               case PMEM_EXCEEDED_EXIT_CODE =>
                                 ....
                             }
                             if (exitCausedByApp) {
                               logWarning(containerExitReason)
                             } else {
                               logInfo(containerExitReason)
                             }
                          }
                      
                    2. 如果我们发现这个Container已经在ReleasedContainer中存在,说明只能是Driver通过KillExecutor的方式将Container给Release了,而不是Container自行退出:

                              // 如果我们释放了这个Container,那么说明一定是Driver直接通过 killExecutor
                              // 释放掉了这个Container,而不是它自行结束
                              ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
                                s"Container $containerId exited from explicit termination request.")
                      

                      如果是Driver杀死了Executor,Driver会向AMEndpoint发送KillExecutor消息,AMEndpoint会将这个Executor从其维护的元数据信息中删除,将这个Kill掉的Executor的Container添加到releasedContainers中,同时通过AMRMClient向Yarn发送释放container的请求:

                        def killExecutor(executorId: String): Unit = synchronized {
                            val container = executorIdToContainer.get(executorId).get
                            internalReleaseContainer(container)
                            numExecutorsRunning.decrementAndGet()
                        }
                        private def internalReleaseContainer(container: Container): Unit = {
                          releasedContainers.add(container.getId()) // 将这个Container从releasedContainer中删除
                          amClient.releaseAssignedContainer(container.getId()) // 向Yarn发送释放Container的请求
                        }
                      
                    3. 删除这个Container和Host之间的映射关系,包括Host -> Container的映射关系以及反向的Container-> Host的映射关系

                         for {
                           host 
                           containerSet.remove(containerId)
                           if (containerSet.isEmpty) {
                             allocatedHostToContainersMap.remove(host)
                           } else {
                             allocatedHostToContainersMap.update(host, containerSet)
                           }
                           allocatedContainerToHostMap.remove(containerId)
                         }
                       eid =
                           executorIdToContainer.remove(eid)
                           ........
                           if (!alreadyReleased) {
                             // 这个Container不是Driver自行释放的,那么需要像Driver汇报一个RemoveExecutor消息
                             driverRef.send(RemoveExecutor(eid, exitReason))
                           }
                      
                            //  收到 RegisterExecutor 请求,这个请求发生在Executor启动以后,向Driver发送的信息
                        case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =
                            ......
                            executorRef.send(RegisteredExecutor)
                            .....
                            makeOffers()  // 尝试进行task的调度,针对所有可用的executor
                          }
                      
                    4. 来自Executor的StatusUpdate:在Executor上Task的运行状态发生变化,都会告知Driver,Driver认为此时集群的资源状态发生了变化,因此尝试进行一次task的调度。但是这次的Task的调度是针对这个Executor的,即只会调度适合运行在这个Executor上的Pending Task到这个Exeuctor上:

                      ------------------------------------ DriverEndpoint -----------------------------------
                      override def receive: PartialFunction[Any, Unit] = {
                       case StatusUpdate(executorId, taskId, state, data) =>
                         scheduler.statusUpdate(taskId, state, data.value)
                         if (TaskState.isFinished(state)) {
                           executorDataMap.get(executorId) match {
                             case Some(executorInfo) =>
                               executorInfo.freeCores += scheduler.CPUS_PER_TASK
                               makeOffers(executorId) // 尝试进行Task的调度,但是只针对这一个Executor
                              ...
                         }
                      
                    5. 来自TaskScheduler中的任务的相关变化,比如,Task的提交,Executor的丢失,Task的失败,推测执行等等,由于系统的可用资源发生了变化,因此TaskScheduler都会通过向DriverEndpoint发送ReviveOffers消息,以触发新一轮的Pending Task的调度。

                      ----------------------------------- DriverEndpoint --------------------------------
                          override def receive: PartialFunction[Any, Unit] = {
                            .....
                            case ReviveOffers =>
                              makeOffers()
                      
                    6. 来自SchedulerBackend的自我触发:在DriverEndpoint作为一个RpcEndpoint启动的时候,会启动一个ReviveThread,以固定频率,向自己发送ReviveOffers的本地消息(发送给自己),以触发Pending 的 Task到Container的调度:

                      -------------------------------- DriverEndpoint ---------------------------------------
                          override def onStart() {
                            // 定期恢复offer,以允许延迟调度工作
                            // Periodically revive offers to allow delay scheduling to work
                            val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
                            reviveThread.scheduleAtFixedRate(new Runnable {
                              override def run(): Unit = Utils.tryLogNonFatalError {
                                Option(self).foreach(_.send(ReviveOffers))
                              }
                            }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
                          }
                      

                    在以Locality为考虑重点的Task的调度,就是根据locality从高到低(executor-local 优先级最高),参照当前允许的优先级,取出对应的Task进行调度。如果当前优先级的Task已经调度完毕,或者当前locality的一部分或者全部Task经过了很久还没有完成调度(即当前的系统资源无法完全满足当前的locality需求),那么就需要降低locality再次尝试进行调度。

                    下文会详细讲解makeOffers()的具体流程。

                    PendingTask的生成:

                    Task的调度的重要逻辑是满足Task 的Location Preference, 即每一个Task对运行位置 (Executor, Host, Rack等等)上的偏好。我们先看看TaskLocation的含义和生成过程,然后看看Spark的Driver是怎样通过Locality-Aware的调度方式,尽最大可能满足每一个Task的本地性需求。

                    TaskLocation的构造过程

                    在Spark中,一个Task的Location归根结底是由这个Task的Split决定的,Split代表了一个切分,比如,对一个HDFS文件的切分。在Hadoop上,一个Split表示为一个InputSplit接口的实现类的一个实例。

                    一个HDFS文件的一个Split是由FileSplit对象表达,对象中hostInfos存放了一个SplitLocationInfo数组,每一个SplitLocationInfo对象存放了一个这个Split的一个Replica的Location信息(因为在HDFS上文件是多副本的),包括具体的Hostname(即DataNode所在的节点)以及这个Host是否在内存中缓存了这个Split的标记位。

                    当然,在HDFS的维度,我们说一个Split被缓存,其实缓存的是这个Split对应的Replica。在Hadoop中一个文件的Split对应一个FileSplit对象,代码如下:

                    --------------------------------------- FileSplit ------------------------------------------------
                     public FileSplit(Path file, long start, long length, String[] hosts,
                         String[] inMemoryHosts) {
                       this(file, start, length, hosts);
                       hostInfos = new SplitLocationInfo[hosts.length];
                       for (int i = 0; i  
                    

                    其中,一个Replica的Location信息对应了一个SplitLocationInfo对象,包含了这个Split是否在对应的Host上缓存了,以及,对应的Host(DataNode) 信息。在多副本的环境下,一个FileSplit对应的是一个数组 SplitLocationInfo[]:

                    ------------------------------------------ SplitLocationInfo -----------------------------------------
                    public class SplitLocationInfo {
                      private boolean inMemory;
                      private String location;
                      
                      public SplitLocationInfo(String location, boolean inMemory) {
                        this.location = location;
                        this.inMemory = inMemory;
                      }
                    

                    我们看一下Spark在提交Stage和Task的过程中是怎么获取Task的Location信息的,以及在获取以后是如何基于Location调度Task的。

                    在HDFS的场景下,Spark是将每一个Stage对应的RDD(一个NewHadoopRDD对象)中的Partition(一个NewHadoopPartition对象)的Location信息(如果有)存放在这个NewHadoopPartition的serializableHadoopSplit中,其实是对InputSplit的封装:

                    --------------------------------------- NewHadoopPartition ------------------------------------
                    private[spark] class NewHadoopPartition(
                        rddId: Int,
                        val index: Int,
                        rawSplit: InputSplit with Writable) // 这个HadoopPartition对应的InputSplit
                      extends Partition {
                      val serializableHadoopSplit = new SerializableWritable(rawSplit)
                    

                    Stage以及Partition的生成是在提交以前进行的,即执行计划的生成阶段形成的。那么,到了Stage的提交阶段,是怎么利用Stage和Partition中的Location信息,生成具有Location Preference的Task信息的呢?我们看看Stage提交的时候是如何使用Partition的Location信息的。

                    下面的这段代码是一段经典代码,Spark中DagScheduler提交Stage的时候,会首先检查这个Stage是否有未提交的Parent Stage:

                    • 如果有,会首先递归提交Parent Stage,然后把当前Stage放入到waitingStages中。waitingStage指的是那些parentStage还没有完成、因此需要等待的Stage;
                    • 如果没有Missing Parent Stage,那么就可以提交当前的Stage:
                        /** Submits stage, but first recursively submits any missing parents. */
                        private def submitStage(stage: Stage) {
                          val jobId = activeJobForStage(stage)
                          if (jobId.isDefined) {
                            if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
                              val missing = getMissingParentStages(stage).sortBy(_.id)
                              if (missing.isEmpty) { // 没有missing parent stage,那么提交这个Stage的所有task
                                submitMissingTasks(stage, jobId.get)
                              } else {
                                for (parent  // 有missing parent stage,那么先提交
                                  submitStage(parent) 
                                }
                                waitingStages += stage // 将当前stage放入到waitingStages中随后提交
                              }
                            }
                          } 
                        }
                      
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]