开发自己的项目有一段时间了,因为是个长时间跑的服务器端程序,所以异常处理显得尤为重要。
对于异常的抓取和日志(狭义上的日志)的分析一点都不能落下。
我们使用了Java自带的Executor模块,我只是稍微看了下Executors当中三个线程池的实现(策略为:Fixed, Cached, Schedule),其实光看名字就可以了解各自的一些策略信息。OK,这一次我需要一种策略合并Fixed和Cached的两种特点的自定义Executor。其实很简单,给Cached设置一个上线就是了。注意他们的同步队列使用的不同,用LinkedBlockingQueue是个不错的选择,至于BlockingQueue的实现可以自行谷歌(以后再记吧)。
先看写的简略的代码
package com.zjseek.recharge.core;
import com.zjseek.recharge.exception.SKErrorCode;
import com.zjseek.recharge.exception.SKOrderState;
import com.zjseek.recharge.model.OrderModel;
import com.zjseek.recharge.service.OrderService;
import org.apache.log4j.Logger;
import java.sql.Timestamp;
import java.util.concurrent.*;
/**
* Created by geminiwen on 14-6-28.
*/
public class OrderExceptionThreadExecutor extends ThreadPoolExecutor {
private Logger logger = Logger.getLogger(OrderExceptionThreadExecutor.class);
private OrderService orderService;
public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
init();
}
public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
init();
}
public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
init();
}
public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
init();
}
private void init() {
this.orderService = new OrderService();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Future<?> f = (Future<?>) r;
try {
f.get();
} catch (InterruptedException e) {
logger.error("线程池中发现异常,被中断", e);
} catch (ExecutionException e) {
logger.error("线程池中发现异常,被中断", e);
}
}
}
我这是一个订单处理流程,主要用到了一个protected方法,就是afterExecute。一看这个函数的样子,想当然的以为如果线程池中出了问题,异常自然回在第二个参数t中传过来。
也许的确是这样的,但是这里有一个区别。
我们知道ExecutorServcie中执行一个Runnable有两个方法,两个分别是
public void execute(Runnable command);
public <T> Future<T> submit(Runnable task, T result);
别看接受的参数差不多,其实submit最后是调用的execute的,而且在调用execute前,对task进行了一次封装,变成了RunnableFuture(它是接口,继承了Runnable和Future实际是一个实现类FutureTask)。
OK,对于实际操作Runnable的不同,暂时说到这,看下execute方法做了什么事
execute方法对进来的Runnable又包装成了worker然后进入runWorker
runWorker方法中有这么几行
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
好了,到了最关键的afterExecute这个步骤,我满心以为这里所有的异常都会通过thrown传递进来,看来我还是太年轻了,之前我们分析过,这个Runnable已经被submit封装成了FutureTask,那么这个task.run()除了我们自己定义的run任务之外,到底还干了啥呢?
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
OK,这段源码摘自FutureTask中的run方法,实际我们自己定义的任务已经变成了Callable:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
从它的构造函数就可以看出来。
然后我们在上面实际运行task的地方其实是c.call()这一句。
result = c.call();
我们写的任务全部在这句代码里面执行完毕了,看看外面都wrap了啥? OK 我们所有的Throwable全部已经被setException吃掉了,怎么还会抛出到外面那层的execute中呢?
所以我之前实验的时候,在submit中提交任务无论任务怎么抛异常,在afterExecute中的第二个参数是取不到的,原因就在这。
再回头看看针对submit改造的函数
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Future<?> f = (Future<?>) r;
try {
f.get();
} catch (InterruptedException e) {
logger.error("线程池中发现异常,被中断", e);
} catch (ExecutionException e) {
logger.error("线程池中发现异常,被中断", e);
}
}
当然,这里已经默认r是实现Future接口了。通过FutureTask的get方法,能把刚刚setException中的异常给抛出来,这样我们就能真的拿到这些异常了。
结论
如果我们关心线程池执行的结果,则需要使用submit来提交task,那么在afterExecute中对异常的处理也需要通过Future接口调用get方法去取结果,才能拿到异常,如果我们不关心这个任务的结果,可以直接使用ExecutorService中的execute方法(实际是继承Executor接口)来直接去执行任务,这样的话,我们的Runnable没有经过多余的封装,在runWorker中得到的异常也直接能在afterExecute中捕捉。
好了,以上就是对线程池异常捕捉的一个记录。想想应该不难,今天也是偶然机会看到的。今天在开发中碰到PHP锁的问题,头疼死了。