最近工作接触到事件驱动模型,Hadoop中YARN采用的就是事件驱动并发模型, 这里作为
学习,记录一下整个流程
前提是需要了解hadoop的事件库
流程
创建一个AsyncDispatcher对象
向AsyncDispatcher中注册枚举类型和事件的处理器(EventHandler)
然后启动,启动后会创建一个eventHandlingThread线程
eventHandlingThread线程执行的流程:从等待队列中取出事件,从事件对象中获取事件类型,然后根据事件类型获取事件处理器,然后进行事件的处理
提交任务的流程:
将任务放入等待的队列,就可以离开了
源码分析
初始化AsyncDispatcher
public AsyncDispatcher() {
this(new LinkedBlockingQueue<Event>());
}
public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
super("Dispatcher");
this.eventQueue = eventQueue;
this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
}
可以看到AsyncDispatcher内部拥有两个关键的变量
LinkedBlockingQueue队列用来存放事件
eventDispatchers Map对象 用来存放 枚举类型和事件的处理器
进行register()
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
可以看到register就用到了初始化时候的eventDispatchers变量,
用来存储eventType,和EventHandler,因此我们在开发的时候需要定义这两个类。
启动serviceStart()
可以看到会创建一个独立的线程
protected void serviceStart() throws Exception {
//start all the components
super.serviceStart();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName("AsyncDispatcher event handler");
eventHandlingThread.start();
}
线程的作用是不断轮训,检查eventQueue是否有新的事件进来,如果有就进行dispatch(event)
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {//服务是否进行stop操作,如果执行了stop操作,会进行判断队列是否为空,及时通知停止操作
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event);
}
}
}
};
}
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
GenericEventHandler上面处理事件的流程已经清楚了,那是在哪里把事件放入队列的?
AysncDispatcher是异步调度器,不会立即处理我们加入的事件,而是放入队列
比如:getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL));
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
throw new YarnRuntimeException(e);
}
};
}
停止
关闭的时候会一秒轮训一次,判断是否还有事件没有处理完成,如果还存在事件就等待
事件执行完毕。
protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;//服务是否进行stop操作
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
synchronized (waitForDrained) {
while (!drained && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain.");
}
}
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
// stop all the components
super.serviceStop();
}
最后来关注几个变量
private volatile boolean drained = true;
drained = eventQueue.isEmpty();代表队列是否是空的
private Object waitForDrained = new Object();//定义一个对象锁
如果队列为空则唤醒waitForDrained等待的地方
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
停止服务的时候如果队列不为空,则会等待
synchronized (waitForDrained) {
while (!drained && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain.");
}
}
}
private volatile boolean blockNewEvents = false;//服务是否进行stop操作,如果进行了stop则置为true
代码案例
https://github.com/lizu18xz/study