在taskmanager启动时候向jobmanager发起注册操作的同时会告诉scheduler自己这个taskmanager节点上有slot可用了。
public void newSlotAvailable(final Instance instance) {
// WARNING: The asynchrony here is necessary, because we cannot guarantee the order // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks: // // -> The scheduler needs to grab them (1) global scheduler lock // (2) slot/instance lock // -> The slot releasing grabs (1) slot/instance (for releasing) and // (2) scheduler (to check whether to take a new task item // // that leads with a high probability to deadlocks, when scheduling fast
newlyAvailableInstances.add(instance);
executor.execute(new Runnable() {
@Override
public void run() {
handleNewSlot();
}
});
}
上面的注释写的很清楚,这里为了避免死锁采用了异步的方式进行处理。handleNewSlot方法的代码如下。
private void handleNewSlot() {
synchronized (globalLock) {
Instance instance = this.newlyAvailableInstances.poll();
if (instance == null || !instance.hasResourcesAvailable()) {
// someone else took it return;
}
QueuedTask queued = taskQueue.peek();
// the slot was properly released, we can allocate a new one from that instance
if (queued != null) {
ScheduledUnit task = queued.getTask();
ExecutionVertex vertex = task.getTaskToExecute().getVertex();
try {
SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
if (newSlot != null) {
// success, remove from the task queue and notify the future taskQueue.poll();
if (queued.getFuture() != null) {
try {
queued.getFuture().complete(newSlot);
}
catch (Throwable t) {
LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), t);
task.getTaskToExecute().fail(t);
}
}
}
}
catch (InstanceDiedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Instance " + instance + " was marked dead asynchronously.");
}
removeInstance(instance);
}
}
else {
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
}
}
}
这里会先判断有没有可以用的instance,其实也就是taskmanager。或者instance队列里最新的这个instance上还有没有空闲的slot。如果满足条件接下来会从taskQueue里取出最近加入的task并对该task分配slot,那么这个task是什么时候加入进去的呢,这个等会再说。可以看到,如果taskQueue队列里有task,那么将从刚才获取到的instance内分配一个slot给它,这里先别管什么simpleSlot。这里的allocateSimpleSlot方法的参数是jobId,如果你了解Flink的DAG生成情况就应该知道Flink中的DAG是两层的,第一层是jobGraph,jobGraph中的每个点就是一个operatorChain。第二层就是executionGraph,executionGraph是jobGraph的并行化,也就是说executionGraph中的每个点是jobGraph中的点的子任务。executionGraph中的点被表示成executionVertex,jobGraph中的点被表示成jobVertex,在executionVertex中会封装进自己所属的jobVertex,不过这里不叫jobVertex而是叫executionJobVertex。所以上面的vertex.getJobId方法调用的其实是executionJobVertex.getJobId。ok,如果slot分配成功那么将task从taskQueue里移除。当然还有一种情况就是taskQueue里没有task,这个时候就将instance装进instancesWithAvailableResources里供调用。
问题来了,instancesWithAvailableResources里的instance什么时候会被调用呢?答案是在scheduleTask方法里会被调用。接下来来看这个方法都干了什么。顺便提一嘴,scheduleTask是在submitTask的时候被调用的。
private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource, Iterable<TaskManagerLocation> preferredLocations) throws NoResourceAvailableException {
if (task == null) {
throw new NullPointerException();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling task " + task);
}
final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
final boolean forceExternalLocation = false &&
preferredLocations != null && preferredLocations.iterator().hasNext();
synchronized (globalLock) {
SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
if (sharingUnit != null) {
// 1) === If the task has a slot sharing group, schedule with shared slots ===
if (queueIfNoResource) {
throw new IllegalArgumentException(
"A task with a vertex sharing group was scheduled in a queued fashion.");
}
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
final CoLocationConstraint constraint = task.getLocationConstraint();
// sanity check that we do not use an externally forced location and a co-location constraint together if (constraint != null && forceExternalLocation) {
throw new IllegalArgumentException("The scheduling cannot be constrained simultaneously by a "
+ "co-location constraint and an external location constraint.");
}
// get a slot from the group, if the group has one for us (and can fulfill the constraint) final SimpleSlot slotFromGroup;
if (constraint == null) {
slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
}
else {
slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations);
}
SimpleSlot newSlot = null;
SimpleSlot toUse = null;
// the following needs to make sure any allocated slot is released in case of an error try {
// check whether the slot from the group is already what we want. // any slot that is local, or where the assignment was unconstrained is good! if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) {
// if this is the first slot for the co-location constraint, we lock // the location, because we are quite happy with the slot if (constraint != null && !constraint.isAssigned()) {
constraint.lockLocation();
}
updateLocalityCounters(slotFromGroup, vertex);
return slotFromGroup;
}
// the group did not have a local slot for us. see if we can one (or a better one)
// our location preference is either determined by the location constraint, or by the // vertex's preferred locations final Iterable<TaskManagerLocation> locations;
final boolean localOnly;
if (constraint != null && constraint.isAssigned()) {
locations = Collections.singleton(constraint.getLocation());
localOnly = true;
}
else {
locations = preferredLocations;
localOnly = forceExternalLocation;
}
newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);
if (newSlot == null) {
if (slotFromGroup == null) {
// both null, which means there is nothing available at all
if (constraint != null && constraint.isAssigned()) {
// nothing is available on the node where the co-location constraint forces us to throw new NoResourceAvailableException("Could not allocate a slot on instance " +
constraint.getLocation() + ", as required by the co-location constraint.");
}
else if (forceExternalLocation) {
// could not satisfy the external location constraint String hosts = getHostnamesFromInstances(preferredLocations);
throw new NoResourceAvailableException("Could not schedule task " + vertex
+ " to any of the required hosts: " + hosts);
}
else {
// simply nothing is available throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
}
}
else {
// got a non-local from the group, and no new one, so we use the non-local // slot from the sharing group toUse = slotFromGroup;
}
}
else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
// if there is no slot from the group, or the new slot is local, // then we use the new slot if (slotFromGroup != null) {
slotFromGroup.releaseSlot();
}
toUse = newSlot;
}
else {
// both are available and usable. neither is local. in that case, we may // as well use the slot from the sharing group, to minimize the number of // instances that the job occupies newSlot.releaseSlot();
toUse = slotFromGroup;
}
// if this is the first slot for the co-location constraint, we lock // the location, because we are going to use that slot if (constraint != null && !constraint.isAssigned()) {
constraint.lockLocation();
}
updateLocalityCounters(toUse, vertex);
}
catch (NoResourceAvailableException e) {
throw e;
}
catch (Throwable t) {
if (slotFromGroup != null) {
slotFromGroup.releaseSlot();
}
if (newSlot != null) {
newSlot.releaseSlot();
}
ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");
}
return toUse;
}
else {
// 2) === schedule without hints and sharing ===
SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
if (slot != null) {
updateLocalityCounters(slot, vertex);
return slot;
}
else {
// no resource available now, so queue the request if (queueIfNoResource) {
CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
this.taskQueue.add(new QueuedTask(task, future));
return future;
}
else if (forceExternalLocation) {
String hosts = getHostnamesFromInstances(preferredLocations);
throw new NoResourceAvailableException("Could not schedule task " + vertex
+ " to any of the required hosts: " + hosts);
}
else {
throw new NoResourceAvailableException(getNumberOfAvailableInstances(),
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
}
}
}
}
}
这个方法很长,那么我们慢慢来看它都干了什么。
首先,拿到task的SlotSharingGroup,所谓SlotSharingGroup是指在flink中两个任务的subtask可以跑在同一个slot中,默认情况下所有的任务都属于default组。不过这个限制不是“硬”规定,也就是说可以不遵守。接下来可以看到它会去获取CoLocationConstraint实例,这个实例的作用也是限制两个subtask跑在同一个slot中,不过这里跟SlotSharingGroup不同的是这个限制是“硬”规定,而且这两个subtask在自身所属于的task中的索引是相同的。CoLocationConstraint通常用来限定迭代中的反馈边和input。
再来看,preferredLocations是干嘛的呢?在Flink中有两种调度模式
public enum ScheduleMode {
/** Schedule tasks lazily from the sources. Downstream tasks are started once their input data are ready */
LAZY_FROM_SOURCES,
/** Schedules all tasks immediately. */
EAGER;
/**
* Returns whether we are allowed to deploy consumers lazily.
*/
public boolean allowLazyDeployment() {
return this == LAZY_FROM_SOURCES;
}
}
LAZY_FROM_SOURCES模式下,会从source节点开始调度,下游任务的调度是通过consumer回调通知的方式进行,前面也提到过相关的内容(Flink ResultPartition分析)。在本文中,我们都默认使用的是LAZY_FROM_SOURCES。之所以要提到这个是因为在scheduler分配slot的时候除了会根据前面提到的两种限制条件外还会用到一个称为“偏好位置”的东西,这个东西是在jobmanager开始进行调度的时候就为executionVertex指定好的。也就是下面这个方法。
private void scheduleLazy(SlotProvider slotProvider) {
// simply take the vertices without inputs. for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty }
}
}
-------------------------------------------------------------------------------------------
public enum LocationPreferenceConstraint {
ALL, // wait for all inputs to have a location assigned ANY // only consider those inputs who already have a location assigned }
flink会根据这个条件来计算具体哪些taskmanager可以作为“偏好位置“。
public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
switch(locationPreferenceConstraint) {
case ALL:
preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
break;
case ANY:
final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size());
for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) {
final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);
if (taskManagerLocation == null) {
throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");
}
completedTaskManagerLocations.add(taskManagerLocation);
}
}
preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
break; default:
throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.');
}
return preferredLocationsFuture;
}
因为前面调度的时候用的LocationPreferenceConstraint.ALL,所以为了简化思路,这个方法我们也是以LocationPreferenceConstraint.ALL为例进行说明,其实这里比较关键的是getVertex().getPreferredLocationsBasedOnInputs()的调用,这个方法的意义是获取当前算子的input算子所处在位置,然后把这些位置当做“偏好位置”,当然这个方法里还做了一些过滤性的操作,这里我们先不关心。这里拿到的“偏好位置”就是我们在scheduleTask里用到的preferredLocations。
再回到scheduleTask方法,它接下来会根据CoLocationConstraint的存在与否来判断如何从当前任务所属的SlotSharingGroup中分配slot。
if (constraint == null) {
slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
}
else {
slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations);
}
这个assignment是SlotSharingGroupAssignment实例,这里先不管SlotSharingGroupAssignment具体细节,只需要知道它是管理SlotSharingGroup的slot分配的即可。
接下来,我们进入getSlotForTask方法看看,先来看不需要constraint的。
public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
synchronized (lock) {
Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);
if (p != null) {
SharedSlot ss = p.f0;
SimpleSlot slot = ss.allocateSubSlot(vertexID);
slot.setLocality(p.f1);
return slot;
}
else {
return null;
}
}
}
这里比较核心的是getSlotForTaskInternal方法。那就接着看这个方法。
// get the available slots for the group Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
if (slotsForGroup == null) {
// we have a new group, so all slots are available slotsForGroup = new LinkedHashMap<>();
availableSlotsPerJid.put(groupId, slotsForGroup);
for (SharedSlot availableSlot : allSlots) {
putIntoMultiMap(slotsForGroup, availableSlot.getTaskManagerID(), availableSlot);
}
}
else if (slotsForGroup.isEmpty()) {
// the group exists, but nothing is available for that group return null;
}
首先会从availableSlotsPerJid里取出分配给该jobVertex的slot,这里的groupId就是jobVertexId。然后判断能不能取到,如果没有取到就把当前的所有slot当做该jobVertexId的可用slot。如果取到了但是为空则直接返回null。接下来再看我们取到这个slotsForGroup之后会干些什么。
// check whether we can schedule the task to a preferred location boolean didNotGetPreferred = false;
if (preferredLocations != null) {
for (TaskManagerLocation location : preferredLocations) {
// set the flag that we failed a preferred location. If one will be found, // we return early anyways and skip the flag evaluation didNotGetPreferred = true;
SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID());
if (slot != null && slot.isAlive()) {
return new Tuple2<>(slot, Locality.LOCAL);
}
}
}
如果“偏好位置”列表不为空,根据“偏好位置”从slotsForGroup中取slot,一旦发现有满足跟任意一个“偏好位置”在同一个taskmanager上的slot就立即返回。如果没有找到满足条件的,那么会接着往下执行。
// if we want only local assignments, exit now with a "not found" result if (didNotGetPreferred && localOnly) {
return null;
}
Locality locality = didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED;
// schedule the task to any available location SharedSlot slot;
while ((slot = pollFromMultiMap(slotsForGroup)) != null) {
if (slot.isAlive()) {
return new Tuple2<>(slot, locality);
}
}
// nothing available after all, all slots were dead return null;
这里的localOnly为false,所以不会立即返回null。如果“偏好位置”列表不为空,那么didNotGetPreferred为true同时locality为Locality.NON_LOCAL,反之,didNotGetPreferred则为false同时locality为Locality.UNCONSTRAINED。接下来将从slotsForGroup中随意取一个slot返回。getSlotForTask方法将会从这里返回的sharedSlot里分配一个simpleSlot并返回,这次就返回到了scheduleTask方法了。
scheduleTask方法接下来会判断刚才返回的slotFromGroup是不是为null,同时会检查这个slot的locality是否为NON_LOCAL,如果locality是LOCAL或者UNCONSTRAINED,那么这个slot就是分配给该task的slot。但是如果不满足这两个条件,那么接下来还有一大堆的逻辑。
newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);
这个方法里面的逻辑主要是先找到任意一个“偏好位置”所在的taskmanager,然后从这个taskmanager上分配slot,然后能取到可用的slot,那么就将刚才获取的slotFromGroup释放掉。如果没有在“偏好位置”上找到满足条件的slot,那么就用刚才获取的slotFromGroup,并将这个slot释放掉。
else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
// if there is no slot from the group, or the new slot is local, // then we use the new slot if (slotFromGroup != null) {
slotFromGroup.releaseSlot();
}
toUse = newSlot;
}
else {
// both are available and usable. neither is local. in that case, we may // as well use the slot from the sharing group, to minimize the number of // instances that the job occupies newSlot.releaseSlot();
toUse = slotFromGroup;
}
刚才所说的都是task有所属的slotGroup,如果没有呢?那就很简单了。
protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
Iterable<TaskManagerLocation> requestedLocations,
boolean localOnly) {
// we need potentially to loop multiple times, because there may be false positives // in the set-with-available-instances while (true) {
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);
if (instanceLocalityPair == null){
return null;
}
Instance instanceToUse = instanceLocalityPair.getLeft();
Locality locality = instanceLocalityPair.getRight();
try {
SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
// if the instance has further available slots, re-add it to the set of available resources. if (instanceToUse.hasResourcesAvailable()) {
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
}
if (slot != null) {
slot.setLocality(locality);
return slot;
}
}
catch (InstanceDiedException e) {
// the instance died it has not yet been propagated to this scheduler // remove the instance from the set of available instances removeInstance(instanceToUse);
}
// if we failed to get a slot, fall through the loop }
}
就是从“偏好位置”所在的taskmanager上获取一个simpleSlot即可。
else {
// no resource available now, so queue the request if (queueIfNoResource) {
CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
this.taskQueue.add(new QueuedTask(task, future));
return future;
}
但是如果只是得到了null的话同时又设置了queueIfNoResource的话,这个task就会被加到taskQueue里,这也就是前面说的taskQueue里的元素的来源。当然,如果没有设置queueIfNoResource的话将会直接抛出异常。