JStorm源码分析系列--02--拓扑分配TopologyAssign

  写在前面的话,笔者第一次阅读框架源码,所以可能有些地方理解错误或者没有详细解释,如果在阅读过程发现错误很欢迎在文章下面评论指出。文章后续会陆续更新,可以关注或者收藏,转发请先私信我,谢谢。对了,笔者看的是2.2.1这个版本。上一篇博客,JStorm源码分析系列–01–Nimbus启动分析笔者讲解了Nimbus启动过程中做的一些基本的操作,在initFollowerThread方法中,如果当前的Nimbus变成Leader之后,这个方法内会负责执行一些初始化init操作。下面就来讲讲第一个初始化操作–拓扑分配。本文将详细(非常长,所以慢慢看)的讲解如何去为一个拓扑分配相应的资源。
  从方法initTopologyAssign开始,TopologyAssign是一个单例对象,在这个类的init方法内,做了简单的赋值操作之后,并初始化一个调度器实例对象之后,就建立一个守护线程,这个守护线程的目的是不断从TopologyAssign内部维护的一个阻塞队列中读取系统提交的拓扑任务,并调用相应的方法doTopologyAssignment进行分配操作。代码都比较简单,就不浪费版面去贴了。
  下面是doTopologyAssignment方法的源码,

    protected boolean doTopologyAssignment(TopologyAssignEvent event) {
        Assignment assignment;
        try {
            Assignment oldAssignment = null;
            boolean isReassign = event.isScratch();
            if (isReassign) {
                //如果存在旧的分配信息,需要先将旧的分配信息存储下来
                oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
            }
            //调用方法执行新的分配
            assignment = mkAssignment(event);
            //将task添加到集群的metrics中
            pushTaskStartEvent(oldAssignment, assignment, event);

            if (!isReassign) {
                //如果是新建的拓扑,需要把拓扑设置为active状态
                setTopologyStatus(event);
            }
        } catch (Throwable e) {
            LOG.error("Failed to assign topology " + event.getTopologyId(), e);
            event.fail(e.getMessage());
            return false;
        }

        if (assignment != null)
            //将拓扑备份到ZK上
            backupAssignment(assignment, event);
        event.done();
        return true;
    }

  所以,最重要的方法还是mkAssignment,这里执行了实际的分配操作。下面就来详细的介绍这个方法。

prepareTopologyAssign

  prepareTopologyAssign这个方法总体的目的为了初始化拓扑分配的上下文信息,生成一个TopologyAssignContext的实例对象。这个上下文对象需要存下拓扑的很多关键信息,包括拓扑的组件信息(用StormTopology对象保存,下文在添加acker的时候会详细介绍这个类),拓扑的配置信息,拓扑上所有的task id,以及死掉的task id,unstopped task id(这里的解释是,那些supervisor死掉但是worker还继续运行的称为unstopworker,而包含在unstopworker内的task则称为unstoppedTask)。以及这个拓扑能分配到的worker,以上提及的这些信息都会在这个方法内慢慢的初始化。下面一步步来看吧。prepareTopologyAssign方法的源码比较长,一部分一部分来讲解。

//创建一个上下文的实例对象
TopologyAssignContext ret = new TopologyAssignContext();

String topologyId = event.getTopologyId();
ret.setTopologyId(topologyId);

int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
ret.setTopologyMasterTaskId(topoMasterId);
LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);

Map<Object, Object> nimbusConf = nimbusData.getConf();
//根据拓扑id从nimbus上读取拓扑的配置信息
Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(topologyId, nimbusData.getBlobStore());
//这里读取拓扑中各个组件的一个结构,后续会讲解这个类的组成
StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, nimbusData.getBlobStore());
ret.setRawTopology(rawTopology);
//设置一些配置信息
Map stormConf = new HashMap();
stormConf.putAll(nimbusConf);
stormConf.putAll(topologyConf);
ret.setStormConf(stormConf);

  紧接着,根据目前集群的状态,初始化一份集群上所有的supervisor,并获取所有可用的worker

StormClusterState stormClusterState = nimbusData.getStormClusterState();

// get all running supervisor, don't need callback to watch supervisor
Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
// init all AvailableWorkerPorts
for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
     SupervisorInfo supervisor = supInfo.getValue();
     if (supervisor != null)
        //设置全部的端口都为可用,后面通过HB去除掉那些已经被使用的worker
        //supervisor是一个k-v,k是supervisorid,v是保存实例信息
        supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts());
}
//这个方法就是利用HB去掉那些挂掉的supervisor
//判断的方法是获取每个supervisor最近的HB时间,
//由当前时间减去最近HB时间和超时时间做对比。
getAliveSupervsByHb(supInfos, nimbusConf);

  接下来获取拓扑中定义的taskid对应上组件,这里要解释下,对于一个拓扑而言,taskid总是从1开始分配的,并且,相同的组件taskid是相邻的。比如你定义了一个SocketSpout(并行度5),一个PrintBolt(并行度4,那么SocketSpout的taskid可能是1-5,PrintBolt的taskid可能是6-9。

//这个k-v,k是taskid,v是拓扑内定义的组件的id。
//写过应用的同学都应该知道,TopologyBuilder在setSpout或者Bolt的时候,需要指定<组件id,对象,和并行度>。
//eg:builder.setSpout("integer", new ReceiverSpout(), 2);
Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
ret.setTaskToComponent(taskToComponent);

//获取所有的taskid。
Set<Integer> allTaskIds = taskToComponent.keySet();
ret.setAllTaskIds(allTaskIds);

  如果原来存在旧的拓扑分配信息,还需要设置unstoppedTasks,deadTasks,unstoppedWorkers等信息。然后调用getFreeSlots方法负责去除那些已经分配出去的worker。处理过程比较直观,获取集群上所有的拓扑分配信息,然后根据每个分配信息中保存的worker信息,从原先supInfos中移除那些被分配出去的worker。
  如果没有旧的分配信息,说明拓扑分配类型为ASSIGN_TYPE_NEW。如果存在同名的拓扑,也会把同名的拓扑设置旧的分配信息,放到上下文中。如果存在旧的分配信息,需要把旧的分配信息放入到上下文中,此外还要判断是ASSIGN_TYPE_REBALANCE还是ASSIGN_TYPE_MONITOR,因为还需要设置unstoppedWorkers的信息。到这里,预分配,创建拓扑分配上下文就完成了。目前我们带有比较重要的信息是拓扑所有的taskid,以及拓扑基本的组件信息。

集群assignTasks

  在完成拓扑上下文初始化之后,开始实际给拓扑分配相应的worker,不过这里需要判断是本地模式还是集群模式,本地模式下比较简单,找个一个合适的端口,然后新建一个worker的资源对象ResourceWorkerSlot,将一些关键信息如hostname,port,allTaskId配置好。因为local模式下比较简单,所以,即使设置多个worker也不会启动多个jvm。而在集群模式下,一个worker表示的是一个jvm进程。下面就重点讲解集群下的分配情况。我把集群上的分配过程(assignTasks这个方法)分成三个主要的部分,分别是资源准备,worker分配,task分配。

Set<ResourceWorkerSlot> assignments = null;
if (!StormConfig.local_mode(nimbusData.getConf())) {
    IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
    //集群下的分配,见下文讲解
    assignments = scheduler.assignTasks(context);
} else {
    assignments = mkLocalAssignment(context);
}

资源准备

  首先第一步是判断拓扑分配的类型是否符合要求,不符合则抛出异常。紧接着,根据上一个方法生成的拓扑分配上下文来生成一个默认的拓扑分配上下文实例对象,DefaultTopologyAssignContext这个类的构造方法执行了很多很细节的操作。包括为拓扑添加附加的组件,存储下taskid和组件的对应信息,计算拓扑需要的worker数目,计算unstopworker的数目等。

//根据之前的上下文,初始化一个分配的上下文对象
DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context);
if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) {
    freeUsed(defaultContext);
}

  下面代码是DefaultTopologyAssignContext的构造方法

public DefaultTopologyAssignContext(TopologyAssignContext context){
    super(context);
    try {
        sysTopology = Common.system_topology(stormConf, rawTopology);
    } catch (Exception e) {
        throw new FailedAssignTopologyException("Failed to generate system topology");
    }

    sidToHostname = generateSidToHost();
    hostToSid = JStormUtils.reverse_map(sidToHostname);

    if (oldAssignment != null && oldAssignment.getWorkers() != null) {
        oldWorkers = oldAssignment.getWorkers();
    } else {
        oldWorkers = new HashSet<ResourceWorkerSlot>();
    }

    refineDeadTasks();

    componentTasks = JStormUtils.reverse_map(context.getTaskToComponent());

    for (Entry<String, List<Integer>> entry : componentTasks.entrySet()) {
    List<Integer> componentTaskList = entry.getValue();
    Collections.sort(componentTaskList);
}

    totalWorkerNum = computeWorkerNum();
    unstoppedWorkerNum = computeUnstoppedAssignments();
}
添加附加组件

  从上面的代码可以看出在DefaultTopologyAssignContext的构造方法中,第一句是调用父类构造方法先去初始化一些参数,然后调用system_topology这个方法。下面来看看这个方法的内部。第一个方法就是添加一个acker到原来的拓扑中去。拓扑作为JStrom处理的一个逻辑模型,对用户提供了非常简单且强大的编程原语,只要分别继承两大组件,就可以构造一个拓扑模型,但是实际上,一个实际运行的拓扑模型远远不止用户定义的用于处理输入的spout和用于处理业务的bolt,JStorm为了保证消息的可靠性,拓扑Metrics管理,拓扑HB管理,再拓扑实际模型中添加了几个非常重要的bolt,下面就详细的介绍acker,用于保证消息的可靠性。

public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException {
    StormTopology ret = topology.deepCopy();
    add_acker(storm_conf, ret);
    addTopologyMaster(storm_conf, ret);
    add_metrics_component(ret);
    add_system_components(ret);
    return ret;
}
StormTopology

  这里先来介绍下StormTopology这个类,才能往下理解。StormTopology这个类用于存储拓扑的组件信息,在这个类内部,有三个非常重要的成员变量,分别存储spout和bolt以及state_spout,第三个笔者暂时没有弄清楚其作用,但是前两个就非常明显,分别存储拓扑的两大组件,spout和bolt

  private Map<String,SpoutSpec> spouts; // required
  private Map<String,Bolt> bolts; // required
  private Map<String,StateSpoutSpec> state_spouts; // required

  Map中的key表示我们定义的组件的id,上文提到过的id。SpoutSpec和Bolt中有两个重要的成员变量。

  private ComponentObject spout_object; // required
  private ComponentCommon common; // required

  ComponentObject用于存储序列化后的代码信息,第二个ComponentCommon用于存储很重要的配置信息,包括输入的流,输出的流和分组信息。有三个重要的成员变量

  //GlobalStreamId有两个String成员变量,componentId表示这个输入组件的流来源的那个组件id,
  //streamId表示componentId所输出的特定的流
  private Map<GlobalStreamId,Grouping> inputs; // 输入的来源和分组情况
  //StreamInfo有个重要的成员变量List<String> output_fields,表示输出的域。
  private Map<String,StreamInfo> streams; // 输出的流
  private int parallelism_hint; // 并行度

  根据上述的结构,StormTopology能够完整的表示拓扑中每个组件输出之后的流所流向的位置。

acker

  这一小节笔者不打算先从源码的角度入手,先来将一个acker的作用以及从一个小例子来讲解acker是怎样工作的。我们都知道作为一个流式处理框架,消息的可靠性是一个非常特性之一。除开更加高级的事务框架能保证消息只被处理一次(exactly-once),JStorm本身也提供了at-least-once,这个机制能保证消息一定会被处理。下面从一个例子的角度来讲解,这是如何实现的。
《JStorm源码分析系列--02--拓扑分配TopologyAssign》
  如上图所示,integer作为输入的spout,sliding和printer都是负责处理的bolt,Field表示之间输出的元组内的元素对应的key。StreamID为默认,不指定数据流分组的形式,则默认情况下shuffle。上述是一个非常简单的拓扑逻辑结构,然后在经过add_acker这个方法之后,实际的拓扑结构发生了一些变化,如下图
《JStorm源码分析系列--02--拓扑分配TopologyAssign》
  JStrom为原来的拓扑结构添加了一个_ack的bolt,负责维护拓扑的可靠性,大致的情况可以从上图中看出,每当一个元组被发送到拓扑下游bolt中去的时候,也会发送到_ack中去保存下来,然后后续处理的每个bolt每次调用ack函数都会发送给_ack(bolt),在指定时间间隔内收到最后处理的ack,那么_ack(bolt)就发送一个消息给最初的spout,则保证了一个元组的可靠性。所以综上,_ack这个Bolt就是维护了整个拓扑的可靠性,那么读者可能会问,_ack里面保存了那么多的消息,如果某个元组经过的组件非常多,是否会造成该元组的拓扑树变的很大。这里阿里利用异或,实现了一个非常简单且高效低耗的判断方法。
  其实在_ack中存储的内容非常简单,就是一个k-v键值对,k是一个随机无重复的id(root_id),且在元组被处理的整个过程中保持不变,将消息存储为<root_id,random>,random由每个收到元组的组件生成,每经过一个组件,random就会改变一次。如上图,integer在发送一个<root_id,x>给sliding之后,也会发送一个<root_id,x>给_ack,然后sliding经过处理之后,发送<root_id,y>给printer,并且发送一个<root_id,x^y>给_ack,然后当printer处理完之后在发送一个<root_id,y>给_ack,此时的_ack内部对于root_id这个消息的值是x^x^y^y=0。也就是处理成功,如果达到指定超时时间root_id对应的值还不是0,则需要通知给出这个元组的task(_ack也是一个bolt,所以内部也有保存某个消息的来源task),要求重发。以上就是JStorm用于保证消息可靠性所使用的方法,直观且简单。
  后续的几个方法如addTopologyMaster,add_metrics_component,add_system_components都是添加了相应的控件(bolt)来进行协同操作。比如topology master可以负责metrics,也可以负责baskpressure(反压)机制。笔者还没深入解读,相应部分后续再做相应的添加,这里先挖个坑。

计算worker数目

  在DefaultTopologyAssignContext的构造函数中,添加完附加的组件之后,紧接着获取supervisorid和hostname对应的键值对,如果存在旧的分配信息,则获取原先所有的worker,如果没有,则新建一个worker的集合。去除deadtaskid中那些在unstopworker内的task(这里的目的是分开处理,如果是new的情况下,这两个都是空集)。然后计算需要的worker数目。看下面的源码,

private int computeWorkerNum() {
    //获取拓扑设置的worker数目
    Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS));
    //
    int ret = 0, hintSum = 0, tmCount = 0;

    Map<String, Object> components =     ThriftTopologyUtils.getComponents(sysTopology);
    for (Entry<String, Object> entry : components.entrySet()) {
        String componentName = entry.getKey();
        Object component = entry.getValue();

        ComponentCommon common = null;
        if (component instanceof Bolt) {
            common = ((Bolt) component).get_common();
        }
        if (component instanceof SpoutSpec) {
            common = ((SpoutSpec) component).get_common();
        }
        if (component instanceof StateSpoutSpec) {
            common = ((StateSpoutSpec) component).get_common();
        }
        //获取每个组件中设置的并行度
        int hint = common.get_parallelism_hint();
        if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
            //如果是属于TM组件,则加到tmCount
            tmCount += hint;
            continue;
        }
        //这个变量存下所有组件并行度的和
        hintSum += hint;
    }
    
    //ret存下较小的值
    if (settingNum == null) {
        ret = hintSum;
    } else {
        ret =  Math.min(settingNum, hintSum);
    }
    //这里还需要判断主TM是否需要一个独立的worker节点用于处理
    Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf);
    if (isTmSingleWorker != null) {
        if (isTmSingleWorker == true) {
        ret += tmCount;
        setAssignSingleWorkerForTM(true);
    }
    } else {
        if (ret >= 10) {
            ret += tmCount;
        setAssignSingleWorkerForTM(true);
        }
    }
    return ret;
}

worker分配

  实例化完DefaultTopologyAssignContext之后,如果是rebalance类型,则还需要先将原先占用的那些worker给释放掉,具体做法就是将worker使用的端口放回可用端口集合中。几个变量的含义,needAssignTasks:就是指需要分配的task,也就是除去unstopworker中的那些task。allocWorkerNum:等于原先计算好的worker的数目-减去unstopworker的数目再减去keepAssigns(只有在拓扑类型是ASSIGN_TYPE_MONITOR才有的)的数目。实际worker分配中,最重要是方法WorkerScheduler.getAvailableWorkers。下面就来详细讲解这个方法内部怎么实现。

    int workersNum = getAvailableWorkersNum(context);
    if (workersNum < allocWorkerNum) {
        throw new FailedAssignTopologyException("there's no enough worker.allocWorkerNum="+ allocWorkerNum + ", availableWorkerNum="+ workersNum);
}
    workersNum = allocWorkerNum;
    List<ResourceWorkerSlot> assignedWorkers = new ArrayList<ResourceWorkerSlot>();

    getRightWorkers(context,needAssign,assignedWorkers,workersNum,getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));

  首先得知集群上可用的全部worker,如果可用的worker小于需要分配的worker数,则需要抛出异常。如果足够,则会分配足量的worker给指定的拓扑。调用getRightWorkers这个方法来获取合适的worker,这里所谓right的worker是指用户自定义的worker,可以指定worker的资源分配情况。

getRightWorkers

  分为两部分来讲解这个方法,首先是准备工作–getUserDefineWorkers这个方法,这个方法需要两个参数,拓扑的上下文信息context,用户自定义的worker列表workers。看下面的源码:

private List<ResourceWorkerSlot> getUserDefineWorkers(
            DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
    List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
    //如果没有用户自定义的worker,则没必要任何操作
    if (workers == null)
        return ret;
    Map<String, List<Integer>> componentToTask = (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context
                .getComponentTasks()).clone();
    //如果分配类型不是NEW,则还是从workers资源分配信息列表中去除unstopworker。
    //这里是用户有指定某些worker资源属于unstopworker才能去掉。
    if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
        checkUserDefineWorkers(context, workers, context.getTaskToComponent());
}
    //遍历用户定义的worker,去除那些没有分配task的worker
    //用户定义的worker中已经指定哪些task该分配到哪个worker中
    for (WorkerAssignment worker : workers) {
        ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,componentToTask);
        if (workerSlot.getTasks().size() != 0) {
            ret.add(workerSlot);
        }
    }
return ret;
}

  去除那些没有指定task的worker之后,真正进入getRightWorkers方法内部。源码如下,这里解释下五个参数的含义,context表示之前准备的拓扑上下文信息,needAssign表示这个拓扑需要分配的各个taskid,assignedWorkers表示用来存储那些在这个方法内分配到的worker资源,workersNum表示需要拓扑需要分配的worker数目,workers表示上个方法中用户自定义的可用的worker资源。简而言之,这个方法就是从workers中选出已经分配了指定的task的worker,然后存到assignedWorkers中去。

private void getRightWorkers(DefaultTopologyAssignContext context,
            Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers,
            int workersNum, Collection<ResourceWorkerSlot> workers) {
        Set<Integer> assigned = new HashSet<Integer>();
        List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
        if (workers == null)
            return;
        for (ResourceWorkerSlot worker : workers) {
            boolean right = true;
            Set<Integer> tasks = worker.getTasks();
            if (tasks == null)
                continue;
            for (Integer task : tasks) {
                if (!needAssign.contains(task) || assigned.contains(task)) {
                    right = false;
                    break;
                }
            }
            if (right) {
                assigned.addAll(tasks);
                users.add(worker);
            }
        }
        if (users.size() + assignedWorkers.size() > workersNum) {
            LOG.warn(
                    "There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}",
                    users, assignedWorkers, workersNum);
            return;
        }

        assignedWorkers.addAll(users);
        needAssign.removeAll(assigned);
    }

  上面代码主要的处理逻辑是在for循环中,在这个循环会去判断worker内是否存有本拓扑内的taskid,如果有则把worker存储起来,并且从taskid列表中移除掉那些分配出去的task,没有则直接退出了。

使用旧分配/rebalance

  回到getAvailableWorkers方法内,看下面这段代码。

    //如果配置指定要使用旧的分配,则从旧的分配中选出合适的worker。
        if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
            getRightWorkers(context, needAssign, assignedWorkers, workersNum,
                    context.getOldWorkers());
        } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
                && context.isReassign() == false) {
            //如果是rebalance,且可以使用原来的worker,将原来使用的worker存储起来。
            int cnt = 0;
            for (ResourceWorkerSlot worker : context.getOldWorkers()) {
                if (cnt < workersNum) {
                    ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
                    resFreeWorker.setPort(worker.getPort());
                    resFreeWorker.setHostname(worker.getHostname());
                    resFreeWorker.setNodeId(worker.getNodeId());
                    assignedWorkers.add(resFreeWorker);
                    cnt++;
                } else {
                    break;
                }
            }
        }
        // 计算TM bolt的个数
        int workersForSingleTM = 0;
        if (context.getAssignSingleWorkerForTM()) {
            for (Integer taskId : needAssign) {
                String componentName = context.getTaskToComponent().get(taskId);
                if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
                    workersForSingleTM++;
                }
            }
        }
        int restWokerNum = workersNum - assignedWorkers.size();
        if (restWokerNum < 0)
            throw new FailedAssignTopologyException(
                    "Too much workers are needed for user define or old assignments. workersNum="
                            + workersNum + ", assignedWokersNum="
                            + assignedWorkers.size());

  笔者一开始觉得上述的代码可能是在判断restWokerNum < 0是很可能会成立而导致抛出异常的,因为如果用户一开始就指定了worker分配信息,然后rebalance情况下,不断去添加旧的worker到assignedWorkers内,这样就会导致assignedWorkers的大小比实际需要的worker数目workersNum大。但是还没来得及用实际集群去测试,只是在github问了官方的人,如果有更新解决方案会后续再这里说明。

分配剩下的worker
    //restWokerNum是剩下需要的worker的数目,直接添加ResourceWorkerSlot实例对象。
    for (int i = 0; i < restWokerNum; i++) {
        assignedWorkers.add(new ResourceWorkerSlot());
    }
    //这里是获取那些专门指定运行拓扑的supervisor节点。
    List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context);
    if (isolationSupervisors.size() != 0) {
        putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(isolationSupervisors));
    } else {
        putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(context.getCluster()));
    }
    this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
    LOG.info("Assigned workers=" + assignedWorkers);
    return assignedWorkers;

  上述代码中的isolationSupervisors存放的是那些指定给这个拓扑的supervisor节点的id。如果有指定,则在这些特定的节点上分配,如果没有指定,那么,就在全局内分配。所以实际剩下的分配任务的是putAllWorkerToSupervisor这个方法,getResAvailSupervisors这个方法负责剔除那些无法分配worker的supervisor节点,因为节点上分配的worker已经满了。下面来介绍putAllWorkerToSupervisor这个方法的作用。
  putAllWorkerToSupervisor需要两个参数,第一个是已经分配的worker,包含那些还没有设定运行在那个节点的worker(上面直接新建的那些worker),第二个参数是目前可用的supervisor节点。下面是这个方法的代码

private void putAllWorkerToSupervisor( List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) {
    for (ResourceWorkerSlot worker : assignedWorkers) {
        if (worker.getHostname() != null) {
            for (SupervisorInfo supervisor : supervisors) {
                if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0) {
                    putWorkerToSupervisor(supervisor, worker);
                    break;
                }
            }
        }
    }
    supervisors = getResAvailSupervisors(supervisors);
    Collections.sort(supervisors, new Comparator<SupervisorInfo>() {

@Override
        public int compare(SupervisorInfo o1, SupervisorInfo o2) {
            // TODO Auto-generated method stub
            return -NumberUtils.compare( o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size());
        }
    });
    putWorkerToSupervisor(assignedWorkers, supervisors);
}

  进入方法的第一步,首先要做的事情,就是对于那些已经分配好节点的worker,从supervisor节点上给该worker分配一个合适的端口。putWorkerToSupervisor这方法主要的操作是从supervisor节点上获取一个可用的端口,然后设置worker的端口,并将该端口从supervisor节点的可用端口列表中移除。代码结构非常简单,如下:

private void putWorkerToSupervisor(SupervisorInfo supervisor, ResourceWorkerSlot worker) {
    int port = worker.getPort();
    if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
        port = supervisor.getAvailableWorkerPorts().iterator().next();
    }
    worker.setPort(port);
    supervisor.getAvailableWorkerPorts().remove(port);
    worker.setNodeId(supervisor.getSupervisorId());
}

  设置好了一部分已经分配好的worker之后,继续分配那些没有指定supervisor的worker。根据supervisor中可用端口逆序,从大到小排。然后调用putWorkerToSupervisor这个方法。
  putWorkerToSupervisor方法内部首先统计所有已经使用的端口,然后计算出一个理论的负载平均值{(所有使用掉的+将要分配的)/supervisor的个数,就会得到分配后,集群的一个理论上的负载值theoryAveragePorts,可以平摊到每个supervisor身上}。然后通过遍历需要分配worker的list,进行第一次分配,可以将worker依次分配到那些负载值(跟理论值的计算方式一样)小于理论平均负载的supervisor上。而超过负载的,则放进到负载列表中。经过一轮分配之后,如果还存在没有分配的worker(源码这里先进行排序再进行判断,很明显造成排序时间浪费的可能性)。根据supervisor中可用端口逆序,从大到小排序。再不断将worker分配进去。
  到这里,worker的分配就顺利结束了,总结一下,首先是根据拓扑信息初始化上下文信息,然后计算出实际使用的worker数目,如果这些worker有指定运行在某个supervisor节点上,那么就在节点上分配合适的worker。如果没有指定,那么就根据节点的负载情况,尽量平均的分配到每个supervisor节点上。如果大家的负载都比较大的情况下,再分配到哪些具有比较多的可用端口的节点,完成分配。

task分配

  getAvailableWorkers方法完成了worker的分配,以及如果用户指定了特定的worker上运行指定的task,剩下的taskid将会在接下来的方法中说明如何去分配。主要在TaskScheduler的构造函数中,这里需要三个参数,第一个是拓扑的上下文信息defaultContext,第二个是需要分配的task的列表needAssignTasks,以及上文中获取到的合适的worker列表availableWorkers。(ps:记住,前文如果没有指定特定的worker资源分配的信息,则没有taskid被分配到worker中去,也就是worker内部仅有supervisorid,内存,cpu,端口等信息,不存在tasks信息)。接下来看看TaskScheduler的构造函数。

    public TaskScheduler(DefaultTopologyAssignContext context, Set<Integer> tasks, List<ResourceWorkerSlot> workers) {
        this.tasks = tasks;
        LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers);
        this.context = context;
        this.taskContext =
                new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent());
        this.componentSelector = new ComponentNumSelector(taskContext);
        this.inputComponentSelector = new InputComponentNumSelector(taskContext);
        this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext);
        if (tasks.size() == 0)
            return;
        if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){
            // warning ! it doesn't consider HA TM now!!
            if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) {
                assignForTopologyMaster();
            }
        }

        int taskNum = tasks.size();
        Map<ResourceWorkerSlot, Integer> workerSlotIntegerMap = taskContext.getWorkerToTaskNum();
        Set<ResourceWorkerSlot> preAssignWorkers = new HashSet<ResourceWorkerSlot>();
        for (Entry<ResourceWorkerSlot, Integer> worker : workerSlotIntegerMap.entrySet()) {
            if (worker.getValue() > 0) {
                taskNum += worker.getValue();
                preAssignWorkers.add(worker.getKey());
            }
        }
        setTaskNum(taskNum, workerNum);

        // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers.
        // Remove the workers which have been assigned with enough workers.
        for (ResourceWorkerSlot worker : preAssignWorkers) {
            if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){

                Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
                if (doneWorkers != null) {
                    for (ResourceWorkerSlot doneWorker : doneWorkers) {
                        taskNum -= doneWorker.getTasks().size();
                        workerNum--;
                    }
                }

            }

        }
        setTaskNum(taskNum, workerNum);

        // For Scale-out case, the old assignment should be kept.
        if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) {
            keepAssignment(taskNum, context.getOldAssignment().getWorkers());
        }
    }
初始化

  在这个构造函数中,首先是构造一个task分配的上下文信息。这个对象主要需要维护的几个重要信息是

  • taskToComponent:一个Map,Key表示taskid,Value表示所对应的组件id。

  • supervisorToWorker:也是一个Map,Key表示这个拓扑分配的supervisorid,Value表示节点上分配到的worker列表。

  • relationship:维护这个拓扑的一个结构信息,依然是个Map,Key表示组件bolt/spout的组件id,Value表示的是,如果Key对应组件是一个bolt,则Value存下是所有输入到组件的对应组件的id。如果Key对应组件是一个spout,则Value存下是这个组件所有输出到的组件id。举个例子,integer(spout)输出到sliding(bolt),sliding(bolt)输出到printer(bolt)。则relationship存下的是[{integer,[sliding]},{sliding,[integer]},{printer,[sliding]}]。

  • workerToTaskNum:Map,Key表示一个worker,Value表示实际在这个worker上运行的task的总数目。

  • workerToComponentNum:Map,Key表示一个worker,Value表示一个Map,存下的是组件id,以及对应的数目。

  紧接着初始化三个selector,第一个是ComponentNumSelector(内部定义了一二WorkerComparator,负责对worker进行比对,对比worker内某个组件的task数目。以及对比每个supervisor上所有worker内某个组件的总task和),第二个是InputComponentNumSelector(内部也是定义了两个比对函数,一个是获取worker内某个组件的全部输入的task个数,以及在整个supervisor上的全部输入task个数),第三个是TotalTaskNumSelector(worker内全部task的个数,和supervisor上全部task的个数)。这三个selector的目的都是为了后续合理的将task分配到这些worker上做的准备。

分配TM bolt

  如果集群资源足够,用户定义TM需要单独分配到一个独立的worker上,则需要调用assignForTopologyMaster进行单独分配。

private void assignForTopologyMaster() {
        int taskId = context.getTopologyMasterTaskId();
        ResourceWorkerSlot workerAssigned = null;
        int workerNumOfSuperv = 0;
        for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){
            List<ResourceWorkerSlot> workers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId());
            if (workers != null && workers.size() > workerNumOfSuperv) {
                for (ResourceWorkerSlot worker : workers) {
                    Set<Integer> tasks = worker.getTasks();
                    if (tasks == null || tasks.size() == 0) {
                        workerAssigned = worker;
                        workerNumOfSuperv = workers.size();
                        break;
                    }
                }
            }
        }

        if (workerAssigned == null)
            throw new FailedAssignTopologyException("there's no enough workers for the assignment of topology master");
        updateAssignedTasksOfWorker(taskId, workerAssigned);
        taskContext.getWorkerToTaskNum().remove(workerAssigned);
        assignments.add(workerAssigned);
        tasks.remove(taskId);
        workerNum--;
        LOG.info("assignForTopologyMaster, assignments=" + assignments);
    }

  这个方法首先是找出某个最合适的worker,这个worker符合两个条件,一是没有分配其他的task,第二,worker所在的supervisor相对分配了最多的worker,第二点的目的是保证负载均衡。如果找不到合适的worker,那么就抛出异常。如果能找到的话,就把负责TM的task分配给这个worker。updateAssignedTasksOfWorker这个方法的目的就是更新新的分配情况。

task分配

  接下来获取全部的task数目,以及已经分配出去的worker列表preAssignWorkers。根据获得的总task数目来计算每个worker上平均的task数目avgTaskNum,以及剩下多少还没有分配出去的task(总task%总worker,求得余数leftTaskNum)。然后遍历preAssignWorkers,调用方法removeWorkerFromSrcPool来判断一个worker是否分配了足够的task,并且移除那些已经合理分配的task和worker。

for (ResourceWorkerSlot worker : preAssignWorkers) {
            if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){

                Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
                if (doneWorkers != null) {
                    for (ResourceWorkerSlot doneWorker : doneWorkers) {
                        taskNum -= doneWorker.getTasks().size();
                        workerNum--;
                    }
                }

            }

        }

  removeWorkerFromSrcPool这个方法挺有趣的,第一次看的时候有点懵逼,但是其实仔细看下就很明确了。下面我简单讲解下:

private Set<ResourceWorkerSlot> removeWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) {
        Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();

        if (leftTaskNum <= 0) {
            if (taskNum >= avgTaskNum) {
                taskContext.getWorkerToTaskNum().remove(worker);
                assignments.add(worker);
                ret.add(worker);
            }
        } else {
            if (taskNum > avgTaskNum ) {
                taskContext.getWorkerToTaskNum().remove(worker);
                leftTaskNum = leftTaskNum -(taskNum -avgTaskNum);
                assignments.add(worker);
                ret.add(worker);
            }
            if (leftTaskNum <= 0) {
                List<ResourceWorkerSlot> needDelete = new ArrayList<ResourceWorkerSlot>();
                for (Entry<ResourceWorkerSlot, Integer> entry : taskContext.getWorkerToTaskNum().entrySet()) {
                    if (avgTaskNum != 0 && entry.getValue() == avgTaskNum)
                        needDelete.add(entry.getKey());
                }
                for (ResourceWorkerSlot workerToDelete : needDelete) {
                    taskContext.getWorkerToTaskNum().remove(workerToDelete);
                    assignments.add(workerToDelete);
                    ret.add(workerToDelete);
                }
            }
        }

        return ret;
    }

  ret保存的是需要返回给调用者需要移除的worker集合。看这个方法,首先判断,在剩余数小于等于0的情况,如果当前worker内的task数目大于等于平均数,说明这个worker的确分配了合理的task。(原因是,如果leftTaskNum小于等于0,是不是就看成,平均数会比正常情况下加1。举个例子,有3个盒子,10个球放进去,那么,平均数为3的情况下,余数为1,如果平均数为4,那么余数就是-2了)。如果leftTaskNum大于0,判断就复杂一点,首先如果数目taskNum大于平均的avgTaskNum,说明这个worker多分配了一些task,那么这些多分配的就必须从leftTaskNum减去。甚至可能taskNum的数目大于avgTaskNum+leftTaskNum的数目,那么直接导致leftTaskNum小于等于0。在leftTaskNum小于等于0的情况下,找出分配上下文中worker分配的task数目刚好是平均数的worker,存在needDelete列表中。然后遍历这个列表,把这些worker从加到需要移除的集合ret中,并返回。(因为如果有某个worker分配的数目多于avgTaskNum+leftTaskNum的数目,那么那些分配数是平均数的worker肯定是合理的,剩下那些分配小于平均数的才是需要调整的)。
  在执行完上述的操作之后,更新下目前的平均数avgTaskNum和分配剩余的task数目leftTaskNum。(此刻还有一些task尚未实际分配),完成分配的调度是在assign方法中。在这个方法内,如果已经没有需要分配的task,则将原来已经分配好的返回就行了。如果还存在需要分配的task,遍历这个需要分配的task列表,如果task对应的组件属于系统组件(组件id为__acker或者__topology_master的组件),则存下来,如果是一般的task,则调用chooseWorker方法选择一个合适的worker,然后将task分配到worker上。(当然这里还需要做一些额外的操作,比如清除那些已经合理的分配的worker,通过调用removeWorkerFromSrcPool这个方法去清除)。而chooseWorker这个方法利用的就是前文提到的三个selector来选择最佳的supervisor,选择最佳的worker(需要考虑这个task接收的input,需要考虑supervisor节点的负载情况和worker内的负载情况)。分配完普通的task之后,在分配系统组件,分配方式也是一样的。
  至此,task的分配也完成,总结一下,除开那些已经指定的分配外,比较重要的是,定义合理的selector(综合考虑节点负载,worker负载,已经input输入,考虑本地化)。分配的同时不断去检测是否已经有worker已经合理分配了,就不要在继续分配到那个worker上。

HeartBeat操作

  上述完成task和worker的分配之后,回到mkAssignment方法。剩下的操作就是设置task的HB起始时间和超时时间。这些比较简单就不再细说了。

结束语

  解读拓扑分配的过程可以让我们更加清楚,我们写的一个逻辑拓扑,实际上是如何变成一个可以实际运行在集群的拓扑。以及拓扑如何保证负载均衡等问题。笔者后续还会更新JStorm几个比较重要的特性的源码分析。包括如何实现反压机制,如何实现nimbus和supervisor容错,supervisor启动的时候需要执行那些操作。

    原文作者:HashMap源码分析
    原文地址: https://segmentfault.com/a/1190000009083097
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞