Flink slot分配

在taskmanager启动时候向jobmanager发起注册操作的同时会告诉scheduler自己这个taskmanager节点上有slot可用了。

public void newSlotAvailable(final Instance instance) {
 
 // WARNING: The asynchrony here is necessary, because we cannot guarantee the order  // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:  //  // -> The scheduler needs to grab them (1) global scheduler lock  // (2) slot/instance lock  // -> The slot releasing grabs (1) slot/instance (for releasing) and  // (2) scheduler (to check whether to take a new task item  //  // that leads with a high probability to deadlocks, when scheduling fast 
 newlyAvailableInstances.add(instance);

 executor.execute(new Runnable() {
  @Override
  public void run() {
   handleNewSlot();
  }
 });
}

上面的注释写的很清楚,这里为了避免死锁采用了异步的方式进行处理。handleNewSlot方法的代码如下。

private void handleNewSlot() {
 
 synchronized (globalLock) {
  Instance instance = this.newlyAvailableInstances.poll();
  if (instance == null || !instance.hasResourcesAvailable()) {
   // someone else took it    return;
  }
  
  QueuedTask queued = taskQueue.peek();
  
  // the slot was properly released, we can allocate a new one from that instance   
  if (queued != null) {
   ScheduledUnit task = queued.getTask();
   ExecutionVertex vertex = task.getTaskToExecute().getVertex();
   
   try {
    SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
    if (newSlot != null) {
     
     // success, remove from the task queue and notify the future      taskQueue.poll();
     if (queued.getFuture() != null) {
      try {
       queued.getFuture().complete(newSlot);
      }
      catch (Throwable t) {
       LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), t);
       task.getTaskToExecute().fail(t);
      }
     }
    }
   }
   catch (InstanceDiedException e) {
    if (LOG.isDebugEnabled()) {
     LOG.debug("Instance " + instance + " was marked dead asynchronously.");
    }
    
    removeInstance(instance);
   }
  }
  else {
   this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
  }
 }
}

这里会先判断有没有可以用的instance,其实也就是taskmanager。或者instance队列里最新的这个instance上还有没有空闲的slot。如果满足条件接下来会从taskQueue里取出最近加入的task并对该task分配slot,那么这个task是什么时候加入进去的呢,这个等会再说。可以看到,如果taskQueue队列里有task,那么将从刚才获取到的instance内分配一个slot给它,这里先别管什么simpleSlot。这里的allocateSimpleSlot方法的参数是jobId,如果你了解Flink的DAG生成情况就应该知道Flink中的DAG是两层的,第一层是jobGraph,jobGraph中的每个点就是一个operatorChain。第二层就是executionGraph,executionGraph是jobGraph的并行化,也就是说executionGraph中的每个点是jobGraph中的点的子任务。executionGraph中的点被表示成executionVertex,jobGraph中的点被表示成jobVertex,在executionVertex中会封装进自己所属的jobVertex,不过这里不叫jobVertex而是叫executionJobVertex。所以上面的vertex.getJobId方法调用的其实是executionJobVertex.getJobId。ok,如果slot分配成功那么将task从taskQueue里移除。当然还有一种情况就是taskQueue里没有task,这个时候就将instance装进instancesWithAvailableResources里供调用。

问题来了,instancesWithAvailableResources里的instance什么时候会被调用呢?答案是在scheduleTask方法里会被调用。接下来来看这个方法都干了什么。顺便提一嘴,scheduleTask是在submitTask的时候被调用的。

private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource, Iterable<TaskManagerLocation> preferredLocations) throws NoResourceAvailableException {
 if (task == null) {
  throw new NullPointerException();
 }
 if (LOG.isDebugEnabled()) {
  LOG.debug("Scheduling task " + task);
 }

 final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 
 final boolean forceExternalLocation = false &&
        preferredLocations != null && preferredLocations.iterator().hasNext();

 synchronized (globalLock) {

  SlotSharingGroup sharingUnit = task.getSlotSharingGroup();

  if (sharingUnit != null) {

   // 1) === If the task has a slot sharing group, schedule with shared slots === 
   if (queueIfNoResource) {
    throw new IllegalArgumentException(
      "A task with a vertex sharing group was scheduled in a queued fashion.");
   }

   final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
   final CoLocationConstraint constraint = task.getLocationConstraint();
   
   // sanity check that we do not use an externally forced location and a co-location constraint together    if (constraint != null && forceExternalLocation) {
    throw new IllegalArgumentException("The scheduling cannot be constrained simultaneously by a "
      + "co-location constraint and an external location constraint.");
   }
   
   // get a slot from the group, if the group has one for us (and can fulfill the constraint)    final SimpleSlot slotFromGroup;
   if (constraint == null) {
    slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
   }
   else {
    slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations);
   }

   SimpleSlot newSlot = null;
   SimpleSlot toUse = null;

   // the following needs to make sure any allocated slot is released in case of an error    try {

    // check whether the slot from the group is already what we want.     // any slot that is local, or where the assignment was unconstrained is good!     if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) {

     // if this is the first slot for the co-location constraint, we lock      // the location, because we are quite happy with the slot      if (constraint != null && !constraint.isAssigned()) {
      constraint.lockLocation();
     }

     updateLocalityCounters(slotFromGroup, vertex);
     return slotFromGroup;
    }

    // the group did not have a local slot for us. see if we can one (or a better one) 
    // our location preference is either determined by the location constraint, or by the     // vertex's preferred locations     final Iterable<TaskManagerLocation> locations;
    final boolean localOnly;
    if (constraint != null && constraint.isAssigned()) {
     locations = Collections.singleton(constraint.getLocation());
     localOnly = true;
    }
    else {
     locations = preferredLocations;
     localOnly = forceExternalLocation;
    }

    newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);

    if (newSlot == null) {
     if (slotFromGroup == null) {
      // both null, which means there is nothing available at all 
      if (constraint != null && constraint.isAssigned()) {
       // nothing is available on the node where the co-location constraint forces us to        throw new NoResourceAvailableException("Could not allocate a slot on instance " +
         constraint.getLocation() + ", as required by the co-location constraint.");
      }
      else if (forceExternalLocation) {
       // could not satisfy the external location constraint        String hosts = getHostnamesFromInstances(preferredLocations);
       throw new NoResourceAvailableException("Could not schedule task " + vertex
         + " to any of the required hosts: " + hosts);
      }
      else {
       // simply nothing is available        throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
         getTotalNumberOfSlots(), getNumberOfAvailableSlots());
      }
     }
     else {
      // got a non-local from the group, and no new one, so we use the non-local       // slot from the sharing group       toUse = slotFromGroup;
     }
    }
    else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
     // if there is no slot from the group, or the new slot is local,      // then we use the new slot      if (slotFromGroup != null) {
      slotFromGroup.releaseSlot();
     }
     toUse = newSlot;
    }
    else {
     // both are available and usable. neither is local. in that case, we may      // as well use the slot from the sharing group, to minimize the number of      // instances that the job occupies      newSlot.releaseSlot();
     toUse = slotFromGroup;
    }

    // if this is the first slot for the co-location constraint, we lock     // the location, because we are going to use that slot     if (constraint != null && !constraint.isAssigned()) {
     constraint.lockLocation();
    }

    updateLocalityCounters(toUse, vertex);
   }
   catch (NoResourceAvailableException e) {
    throw e;
   }
   catch (Throwable t) {
    if (slotFromGroup != null) {
     slotFromGroup.releaseSlot();
    }
    if (newSlot != null) {
     newSlot.releaseSlot();
    }

    ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");
   }

   return toUse;
  }
  else {
   
   // 2) === schedule without hints and sharing ===    
   SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
   if (slot != null) {
    updateLocalityCounters(slot, vertex);
    return slot;
   }
   else {
    // no resource available now, so queue the request     if (queueIfNoResource) {
     CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
     this.taskQueue.add(new QueuedTask(task, future));
     return future;
    }
    else if (forceExternalLocation) {
     String hosts = getHostnamesFromInstances(preferredLocations);
     throw new NoResourceAvailableException("Could not schedule task " + vertex
       + " to any of the required hosts: " + hosts);
    }
    else {
     throw new NoResourceAvailableException(getNumberOfAvailableInstances(),
       getTotalNumberOfSlots(), getNumberOfAvailableSlots());
    }
   }
  }
 }
}

这个方法很长,那么我们慢慢来看它都干了什么。

首先,拿到task的SlotSharingGroup,所谓SlotSharingGroup是指在flink中两个任务的subtask可以跑在同一个slot中,默认情况下所有的任务都属于default组。不过这个限制不是“硬”规定,也就是说可以不遵守。接下来可以看到它会去获取CoLocationConstraint实例,这个实例的作用也是限制两个subtask跑在同一个slot中,不过这里跟SlotSharingGroup不同的是这个限制是“硬”规定,而且这两个subtask在自身所属于的task中的索引是相同的。CoLocationConstraint通常用来限定迭代中的反馈边和input。

再来看,preferredLocations是干嘛的呢?在Flink中有两种调度模式

public enum ScheduleMode {

 /** Schedule tasks lazily from the sources. Downstream tasks are started once their input data are ready */
 LAZY_FROM_SOURCES,

 /** Schedules all tasks immediately. */
 EAGER;
 
 /**
  * Returns whether we are allowed to deploy consumers lazily.
  */
 public boolean allowLazyDeployment() {
  return this == LAZY_FROM_SOURCES;
 }
 
}

LAZY_FROM_SOURCES模式下,会从source节点开始调度,下游任务的调度是通过consumer回调通知的方式进行,前面也提到过相关的内容(Flink ResultPartition分析)。在本文中,我们都默认使用的是LAZY_FROM_SOURCES。之所以要提到这个是因为在scheduler分配slot的时候除了会根据前面提到的两种限制条件外还会用到一个称为“偏好位置”的东西,这个东西是在jobmanager开始进行调度的时候就为executionVertex指定好的。也就是下面这个方法。

private void scheduleLazy(SlotProvider slotProvider) {
 // simply take the vertices without inputs.  for (ExecutionJobVertex ejv : verticesInCreationOrder) {
  if (ejv.getJobVertex().isInputVertex()) {
   ejv.scheduleAll(
    slotProvider,
    allowQueuedScheduling,
    LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty   }
 }
}
-------------------------------------------------------------------------------------------
public enum LocationPreferenceConstraint {
 ALL, // wait for all inputs to have a location assigned  ANY // only consider those inputs who already have a location assigned }  

flink会根据这个条件来计算具体哪些taskmanager可以作为“偏好位置“。

public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
 final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
 final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;

 switch(locationPreferenceConstraint) {
  case ALL:
   preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
   break;
  case ANY:
   final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size());

   for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
    if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) {
     final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);

     if (taskManagerLocation == null) {
      throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");
     }

     completedTaskManagerLocations.add(taskManagerLocation);
    }
   }

   preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
   break;  default:
   throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.');
 }

 return preferredLocationsFuture;
}

因为前面调度的时候用的LocationPreferenceConstraint.ALL,所以为了简化思路,这个方法我们也是以LocationPreferenceConstraint.ALL为例进行说明,其实这里比较关键的是getVertex().getPreferredLocationsBasedOnInputs()的调用,这个方法的意义是获取当前算子的input算子所处在位置,然后把这些位置当做“偏好位置”,当然这个方法里还做了一些过滤性的操作,这里我们先不关心。这里拿到的“偏好位置”就是我们在scheduleTask里用到的preferredLocations。

再回到scheduleTask方法,它接下来会根据CoLocationConstraint的存在与否来判断如何从当前任务所属的SlotSharingGroup中分配slot。

if (constraint == null) {
 slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
}
else {
 slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations);
}

这个assignment是SlotSharingGroupAssignment实例,这里先不管SlotSharingGroupAssignment具体细节,只需要知道它是管理SlotSharingGroup的slot分配的即可。

接下来,我们进入getSlotForTask方法看看,先来看不需要constraint的。

public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
 synchronized (lock) {
  Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);

  if (p != null) {
   SharedSlot ss = p.f0;
   SimpleSlot slot = ss.allocateSubSlot(vertexID);
   slot.setLocality(p.f1);
   return slot;
  }
  else {
   return null;
  }
 }
}

这里比较核心的是getSlotForTaskInternal方法。那就接着看这个方法。

// get the available slots for the group Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);

if (slotsForGroup == null) {
 // we have a new group, so all slots are available  slotsForGroup = new LinkedHashMap<>();
 availableSlotsPerJid.put(groupId, slotsForGroup);

 for (SharedSlot availableSlot : allSlots) {
  putIntoMultiMap(slotsForGroup, availableSlot.getTaskManagerID(), availableSlot);
 }
}
else if (slotsForGroup.isEmpty()) {
 // the group exists, but nothing is available for that group  return null;
}

首先会从availableSlotsPerJid里取出分配给该jobVertex的slot,这里的groupId就是jobVertexId。然后判断能不能取到,如果没有取到就把当前的所有slot当做该jobVertexId的可用slot。如果取到了但是为空则直接返回null。接下来再看我们取到这个slotsForGroup之后会干些什么。

// check whether we can schedule the task to a preferred location boolean didNotGetPreferred = false;

if (preferredLocations != null) {
 for (TaskManagerLocation location : preferredLocations) {

  // set the flag that we failed a preferred location. If one will be found,   // we return early anyways and skip the flag evaluation   didNotGetPreferred = true;

  SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID());
  if (slot != null && slot.isAlive()) {
   return new Tuple2<>(slot, Locality.LOCAL);
  }
 }
}

如果“偏好位置”列表不为空,根据“偏好位置”从slotsForGroup中取slot,一旦发现有满足跟任意一个“偏好位置”在同一个taskmanager上的slot就立即返回。如果没有找到满足条件的,那么会接着往下执行。

// if we want only local assignments, exit now with a "not found" result if (didNotGetPreferred && localOnly) {
 return null;
}

Locality locality = didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED;

// schedule the task to any available location SharedSlot slot;
while ((slot = pollFromMultiMap(slotsForGroup)) != null) {
 if (slot.isAlive()) {
  return new Tuple2<>(slot, locality);
 }
}

// nothing available after all, all slots were dead return null;

这里的localOnly为false,所以不会立即返回null。如果“偏好位置”列表不为空,那么didNotGetPreferred为true同时locality为Locality.NON_LOCAL,反之,didNotGetPreferred则为false同时locality为Locality.UNCONSTRAINED。接下来将从slotsForGroup中随意取一个slot返回。getSlotForTask方法将会从这里返回的sharedSlot里分配一个simpleSlot并返回,这次就返回到了scheduleTask方法了。

scheduleTask方法接下来会判断刚才返回的slotFromGroup是不是为null,同时会检查这个slot的locality是否为NON_LOCAL,如果locality是LOCAL或者UNCONSTRAINED,那么这个slot就是分配给该task的slot。但是如果不满足这两个条件,那么接下来还有一大堆的逻辑。

newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);

这个方法里面的逻辑主要是先找到任意一个“偏好位置”所在的taskmanager,然后从这个taskmanager上分配slot,然后能取到可用的slot,那么就将刚才获取的slotFromGroup释放掉。如果没有在“偏好位置”上找到满足条件的slot,那么就用刚才获取的slotFromGroup,并将这个slot释放掉。

else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
 // if there is no slot from the group, or the new slot is local,  // then we use the new slot  if (slotFromGroup != null) {
  slotFromGroup.releaseSlot();
 }
 toUse = newSlot;
}
else {
 // both are available and usable. neither is local. in that case, we may  // as well use the slot from the sharing group, to minimize the number of  // instances that the job occupies  newSlot.releaseSlot();
 toUse = slotFromGroup;
}

刚才所说的都是task有所属的slotGroup,如果没有呢?那就很简单了。

protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
          Iterable<TaskManagerLocation> requestedLocations,
          boolean localOnly) {
 // we need potentially to loop multiple times, because there may be false positives  // in the set-with-available-instances  while (true) {
  Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);

  if (instanceLocalityPair == null){
   return null;
  }

  Instance instanceToUse = instanceLocalityPair.getLeft();
  Locality locality = instanceLocalityPair.getRight();

  try {
   SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
   
   // if the instance has further available slots, re-add it to the set of available resources.    if (instanceToUse.hasResourcesAvailable()) {
    this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
   }
   
   if (slot != null) {
    slot.setLocality(locality);
    return slot;
   }
  }
  catch (InstanceDiedException e) {
   // the instance died it has not yet been propagated to this scheduler    // remove the instance from the set of available instances    removeInstance(instanceToUse);
  }
  
  // if we failed to get a slot, fall through the loop  }
}

就是从“偏好位置”所在的taskmanager上获取一个simpleSlot即可。

else {
 // no resource available now, so queue the request  if (queueIfNoResource) {
  CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
  this.taskQueue.add(new QueuedTask(task, future));
  return future;
 }

但是如果只是得到了null的话同时又设置了queueIfNoResource的话,这个task就会被加到taskQueue里,这也就是前面说的taskQueue里的元素的来源。当然,如果没有设置queueIfNoResource的话将会直接抛出异常。

    原文作者:qusijun
    原文地址: https://zhuanlan.zhihu.com/p/36525639
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞