Hadoop中央异步调度器AsyncDispatcher

最近工作接触到事件驱动模型,Hadoop中YARN采用的就是事件驱动并发模型, 这里作为
学习,记录一下整个流程
前提是需要了解hadoop的事件库

流程

创建一个AsyncDispatcher对象
向AsyncDispatcher中注册枚举类型和事件的处理器(EventHandler)
然后启动,启动后会创建一个eventHandlingThread线程
eventHandlingThread线程执行的流程:从等待队列中取出事件,从事件对象中获取事件类型,然后根据事件类型获取事件处理器,然后进行事件的处理
提交任务的流程:
将任务放入等待的队列,就可以离开了

源码分析

初始化AsyncDispatcher

public AsyncDispatcher() {
    this(new LinkedBlockingQueue<Event>());
}

public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
    super("Dispatcher");
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
}

可以看到AsyncDispatcher内部拥有两个关键的变量
LinkedBlockingQueue队列用来存放事件
eventDispatchers Map对象 用来存放 枚举类型和事件的处理器

进行register()

public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }
  可以看到register就用到了初始化时候的eventDispatchers变量,
  用来存储eventType,和EventHandler,因此我们在开发的时候需要定义这两个类。

启动serviceStart()

可以看到会创建一个独立的线程
protected void serviceStart() throws Exception {
    //start all the components
    super.serviceStart();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName("AsyncDispatcher event handler");
    eventHandlingThread.start();
}

线程的作用是不断轮训,检查eventQueue是否有新的事件进来,如果有就进行dispatch(event)
Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          drained = eventQueue.isEmpty();
          // blockNewEvents is only set when dispatcher is draining to stop,
          // adding this check is to avoid the overhead of acquiring the lock
          // and calling notify every time in the normal run of the loop.
          if (blockNewEvents) {//服务是否进行stop操作,如果执行了stop操作,会进行判断队列是否为空,及时通知停止操作
            synchronized (waitForDrained) {
              if (drained) {
                waitForDrained.notify();
              }
            }
          }
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);
          }
        }
      }
    };
  }

 protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
}

GenericEventHandler上面处理事件的流程已经清楚了,那是在哪里把事件放入队列的?

AysncDispatcher是异步调度器,不会立即处理我们加入的事件,而是放入队列

比如:getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL));

 class GenericEventHandler implements EventHandler<Event> {
    public void handle(Event event) {
      if (blockNewEvents) {
        return;
      }
      drained = false;

      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        throw new YarnRuntimeException(e);
      }
    };
}

停止

关闭的时候会一秒轮训一次,判断是否还有事件没有处理完成,如果还存在事件就等待
事件执行完毕。
protected void serviceStop() throws Exception {
    if (drainEventsOnStop) {
      blockNewEvents = true;//服务是否进行stop操作
      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
      synchronized (waitForDrained) {
        while (!drained && eventHandlingThread.isAlive()) {
          waitForDrained.wait(1000);
          LOG.info("Waiting for AsyncDispatcher to drain.");
        }
      }
    }
    stopped = true;
    if (eventHandlingThread != null) {
      eventHandlingThread.interrupt();
      try {
        eventHandlingThread.join();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);
      }
    }

    // stop all the components
    super.serviceStop();
  }

最后来关注几个变量

private volatile boolean drained = true;
 
drained = eventQueue.isEmpty();代表队列是否是空的
private Object waitForDrained = new Object();//定义一个对象锁

如果队列为空则唤醒waitForDrained等待的地方
 synchronized (waitForDrained) {
              if (drained) { 
                waitForDrained.notify();
              }
 }

停止服务的时候如果队列不为空,则会等待
synchronized (waitForDrained) {
        while (!drained && eventHandlingThread.isAlive()) {
          waitForDrained.wait(1000);
          LOG.info("Waiting for AsyncDispatcher to drain.");
        }
      }
  }

private volatile boolean blockNewEvents = false;//服务是否进行stop操作,如果进行了stop则置为true

代码案例

https://github.com/lizu18xz/study
点赞