Flink slot分配


public void newSlotAvailable(final Instance instance) {
 // WARNING: The asynchrony here is necessary, because we cannot guarantee the order  // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:  //  // -> The scheduler needs to grab them (1) global scheduler lock  // (2) slot/instance lock  // -> The slot releasing grabs (1) slot/instance (for releasing) and  // (2) scheduler (to check whether to take a new task item  //  // that leads with a high probability to deadlocks, when scheduling fast 

 executor.execute(new Runnable() {
  public void run() {


private void handleNewSlot() {
 synchronized (globalLock) {
  Instance instance = this.newlyAvailableInstances.poll();
  if (instance == null || !instance.hasResourcesAvailable()) {
   // someone else took it    return;
  QueuedTask queued = taskQueue.peek();
  // the slot was properly released, we can allocate a new one from that instance   
  if (queued != null) {
   ScheduledUnit task = queued.getTask();
   ExecutionVertex vertex = task.getTaskToExecute().getVertex();
   try {
    SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
    if (newSlot != null) {
     // success, remove from the task queue and notify the future      taskQueue.poll();
     if (queued.getFuture() != null) {
      try {
      catch (Throwable t) {
       LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), t);
   catch (InstanceDiedException e) {
    if (LOG.isDebugEnabled()) {
     LOG.debug("Instance " + instance + " was marked dead asynchronously.");
  else {
   this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);



private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource, Iterable<TaskManagerLocation> preferredLocations) throws NoResourceAvailableException {
 if (task == null) {
  throw new NullPointerException();
 if (LOG.isDebugEnabled()) {
  LOG.debug("Scheduling task " + task);

 final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 final boolean forceExternalLocation = false &&
        preferredLocations != null && preferredLocations.iterator().hasNext();

 synchronized (globalLock) {

  SlotSharingGroup sharingUnit = task.getSlotSharingGroup();

  if (sharingUnit != null) {

   // 1) === If the task has a slot sharing group, schedule with shared slots === 
   if (queueIfNoResource) {
    throw new IllegalArgumentException(
      "A task with a vertex sharing group was scheduled in a queued fashion.");

   final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
   final CoLocationConstraint constraint = task.getLocationConstraint();
   // sanity check that we do not use an externally forced location and a co-location constraint together    if (constraint != null && forceExternalLocation) {
    throw new IllegalArgumentException("The scheduling cannot be constrained simultaneously by a "
      + "co-location constraint and an external location constraint.");
   // get a slot from the group, if the group has one for us (and can fulfill the constraint)    final SimpleSlot slotFromGroup;
   if (constraint == null) {
    slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
   else {
    slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations);

   SimpleSlot newSlot = null;
   SimpleSlot toUse = null;

   // the following needs to make sure any allocated slot is released in case of an error    try {

    // check whether the slot from the group is already what we want.     // any slot that is local, or where the assignment was unconstrained is good!     if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) {

     // if this is the first slot for the co-location constraint, we lock      // the location, because we are quite happy with the slot      if (constraint != null && !constraint.isAssigned()) {

     updateLocalityCounters(slotFromGroup, vertex);
     return slotFromGroup;

    // the group did not have a local slot for us. see if we can one (or a better one) 
    // our location preference is either determined by the location constraint, or by the     // vertex's preferred locations     final Iterable<TaskManagerLocation> locations;
    final boolean localOnly;
    if (constraint != null && constraint.isAssigned()) {
     locations = Collections.singleton(constraint.getLocation());
     localOnly = true;
    else {
     locations = preferredLocations;
     localOnly = forceExternalLocation;

    newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);

    if (newSlot == null) {
     if (slotFromGroup == null) {
      // both null, which means there is nothing available at all 
      if (constraint != null && constraint.isAssigned()) {
       // nothing is available on the node where the co-location constraint forces us to        throw new NoResourceAvailableException("Could not allocate a slot on instance " +
         constraint.getLocation() + ", as required by the co-location constraint.");
      else if (forceExternalLocation) {
       // could not satisfy the external location constraint        String hosts = getHostnamesFromInstances(preferredLocations);
       throw new NoResourceAvailableException("Could not schedule task " + vertex
         + " to any of the required hosts: " + hosts);
      else {
       // simply nothing is available        throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
         getTotalNumberOfSlots(), getNumberOfAvailableSlots());
     else {
      // got a non-local from the group, and no new one, so we use the non-local       // slot from the sharing group       toUse = slotFromGroup;
    else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
     // if there is no slot from the group, or the new slot is local,      // then we use the new slot      if (slotFromGroup != null) {
     toUse = newSlot;
    else {
     // both are available and usable. neither is local. in that case, we may      // as well use the slot from the sharing group, to minimize the number of      // instances that the job occupies      newSlot.releaseSlot();
     toUse = slotFromGroup;

    // if this is the first slot for the co-location constraint, we lock     // the location, because we are going to use that slot     if (constraint != null && !constraint.isAssigned()) {

    updateLocalityCounters(toUse, vertex);
   catch (NoResourceAvailableException e) {
    throw e;
   catch (Throwable t) {
    if (slotFromGroup != null) {
    if (newSlot != null) {

    ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");

   return toUse;
  else {
   // 2) === schedule without hints and sharing ===    
   SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
   if (slot != null) {
    updateLocalityCounters(slot, vertex);
    return slot;
   else {
    // no resource available now, so queue the request     if (queueIfNoResource) {
     CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
     this.taskQueue.add(new QueuedTask(task, future));
     return future;
    else if (forceExternalLocation) {
     String hosts = getHostnamesFromInstances(preferredLocations);
     throw new NoResourceAvailableException("Could not schedule task " + vertex
       + " to any of the required hosts: " + hosts);
    else {
     throw new NoResourceAvailableException(getNumberOfAvailableInstances(),
       getTotalNumberOfSlots(), getNumberOfAvailableSlots());




public enum ScheduleMode {

 /** Schedule tasks lazily from the sources. Downstream tasks are started once their input data are ready */

 /** Schedules all tasks immediately. */
  * Returns whether we are allowed to deploy consumers lazily.
 public boolean allowLazyDeployment() {
  return this == LAZY_FROM_SOURCES;

LAZY_FROM_SOURCES模式下,会从source节点开始调度,下游任务的调度是通过consumer回调通知的方式进行,前面也提到过相关的内容(Flink ResultPartition分析)。在本文中,我们都默认使用的是LAZY_FROM_SOURCES。之所以要提到这个是因为在scheduler分配slot的时候除了会根据前面提到的两种限制条件外还会用到一个称为“偏好位置”的东西,这个东西是在jobmanager开始进行调度的时候就为executionVertex指定好的。也就是下面这个方法。

private void scheduleLazy(SlotProvider slotProvider) {
 // simply take the vertices without inputs.  for (ExecutionJobVertex ejv : verticesInCreationOrder) {
  if (ejv.getJobVertex().isInputVertex()) {
    LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty   }
public enum LocationPreferenceConstraint {
 ALL, // wait for all inputs to have a location assigned  ANY // only consider those inputs who already have a location assigned }  


public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
 final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
 final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;

 switch(locationPreferenceConstraint) {
  case ALL:
   preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
  case ANY:
   final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size());

   for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
    if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) {
     final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);

     if (taskManagerLocation == null) {
      throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");


   preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
   break;  default:
   throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.');

 return preferredLocationsFuture;



if (constraint == null) {
 slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
else {
 slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations);



public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
 synchronized (lock) {
  Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);

  if (p != null) {
   SharedSlot ss = p.f0;
   SimpleSlot slot = ss.allocateSubSlot(vertexID);
   return slot;
  else {
   return null;


// get the available slots for the group Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);

if (slotsForGroup == null) {
 // we have a new group, so all slots are available  slotsForGroup = new LinkedHashMap<>();
 availableSlotsPerJid.put(groupId, slotsForGroup);

 for (SharedSlot availableSlot : allSlots) {
  putIntoMultiMap(slotsForGroup, availableSlot.getTaskManagerID(), availableSlot);
else if (slotsForGroup.isEmpty()) {
 // the group exists, but nothing is available for that group  return null;


// check whether we can schedule the task to a preferred location boolean didNotGetPreferred = false;

if (preferredLocations != null) {
 for (TaskManagerLocation location : preferredLocations) {

  // set the flag that we failed a preferred location. If one will be found,   // we return early anyways and skip the flag evaluation   didNotGetPreferred = true;

  SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID());
  if (slot != null && slot.isAlive()) {
   return new Tuple2<>(slot, Locality.LOCAL);


// if we want only local assignments, exit now with a "not found" result if (didNotGetPreferred && localOnly) {
 return null;

Locality locality = didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED;

// schedule the task to any available location SharedSlot slot;
while ((slot = pollFromMultiMap(slotsForGroup)) != null) {
 if (slot.isAlive()) {
  return new Tuple2<>(slot, locality);

// nothing available after all, all slots were dead return null;



newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);


else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
 // if there is no slot from the group, or the new slot is local,  // then we use the new slot  if (slotFromGroup != null) {
 toUse = newSlot;
else {
 // both are available and usable. neither is local. in that case, we may  // as well use the slot from the sharing group, to minimize the number of  // instances that the job occupies  newSlot.releaseSlot();
 toUse = slotFromGroup;


protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
          Iterable<TaskManagerLocation> requestedLocations,
          boolean localOnly) {
 // we need potentially to loop multiple times, because there may be false positives  // in the set-with-available-instances  while (true) {
  Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);

  if (instanceLocalityPair == null){
   return null;

  Instance instanceToUse = instanceLocalityPair.getLeft();
  Locality locality = instanceLocalityPair.getRight();

  try {
   SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
   // if the instance has further available slots, re-add it to the set of available resources.    if (instanceToUse.hasResourcesAvailable()) {
    this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
   if (slot != null) {
    return slot;
  catch (InstanceDiedException e) {
   // the instance died it has not yet been propagated to this scheduler    // remove the instance from the set of available instances    removeInstance(instanceToUse);
  // if we failed to get a slot, fall through the loop  }


else {
 // no resource available now, so queue the request  if (queueIfNoResource) {
  CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
  this.taskQueue.add(new QueuedTask(task, future));
  return future;


    原文地址: https://zhuanlan.zhihu.com/p/36525639