java.util.concurrent.Executor
1 /** 2 * Executes the given command at some time in the future. The command 3 * may execute in a new thread, in a pooled thread, or in the calling 4 * thread, at the discretion of the {@code Executor} implementation. 5 * execute方法屏蔽了command如何被执行的具体机制, 6 * 比如command可以在当前线程中串行执行、可以在线程池中执行、可以总是创建一个新的线程来执行等等。 7 * //串行执行 8 * class DirectExecutor implements Executor { 9 * public void execute(Runnable r) { 10 * r.run(); 11 * } 12 * } 13 * //新建线程执行 14 * class ThreadPerTaskExecutor implements Executor { 15 * public void execute(Runnable r) { 16 * new Thread(r).start(); 17 * } 18 * } 19 */ 20 public interface Executor { 21 void execute(Runnable command); 22 }
java.util.concurrent.ExecutorService
1 /** 2 * An {@link Executor} that provides methods to manage termination and 3 * methods that can produce a {@link Future} for tracking progress of 4 * one or more asynchronous tasks. 5 * ExecutorService是Executor的一种增强,提供了一些管理方法, 6 * 包括终止Executor(可能是多个线程)、获得代表任务执行进度的Future。 7 */ 8 public interface ExecutorService extends Executor { 9 10 void execute(Runnable command); 11 12 Future<?> submit(Runnable task); 13 14 <T> Future<T> submit(Callable<T> task); 15 16 void shutdown(); 17 //返回未得到执行的任务 18 List<Runnable> shutdownNow(); 19 20 //...... 21 }
java.util.concurrent.ThreadPoolExecutor
1 //jdk中ExecutorService的一个标准实现就是ThreadPoolExecutor 2 public class ThreadPoolExecutor extends AbstractExecutorService { 3 /** 4 * @param corePoolSize the number of threads to keep in the pool, even 5 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 6 * @param maximumPoolSize the maximum number of threads to allow in the 7 * pool 8 * @param keepAliveTime when the number of threads is greater than 9 * the core, this is the maximum time that excess idle threads 10 * will wait for new tasks before terminating. 11 * @param unit the time unit for the {@code keepAliveTime} argument 12 * @param workQueue the queue to use for holding tasks before they are 13 * executed. This queue will hold only the {@code Runnable} 14 * tasks submitted by the {@code execute} method. 15 * @param threadFactory the factory to use when the executor 16 * creates a new thread 17 * 可以认为corePoolSize是最小线程数,除非设置了allowCoreThreadTimeOut; 18 * workQueue即任务队列。 19 */ 20 public ThreadPoolExecutor(int corePoolSize, 21 int maximumPoolSize, 22 long keepAliveTime, 23 TimeUnit unit, 24 BlockingQueue<Runnable> workQueue, 25 ThreadFactory threadFactory) { 26 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 27 threadFactory, defaultHandler); 28 } 29 30 public void execute(Runnable command) { 31 //任务不能为null 32 if (command == null) 33 throw new NullPointerException(); 34 //ctl即control state,前3位表示runState,后29位表示workerCount 35 int c = ctl.get(); 36 //如果线程数未达到corePoolSize则增加线程(不允许大于corePoolSize),并将command作为该线程的第一个任务执行(不入队) 37 if (workerCountOf(c) < corePoolSize) { 38 if (addWorker(command, true)) 39 return; 40 c = ctl.get(); 41 } 42 //入队 43 if (isRunning(c) && workQueue.offer(command)) { 44 int recheck = ctl.get(); 45 if (! isRunning(recheck) && remove(command)) 46 reject(command); 47 //此时可能线程数减少至0,仍然需要增加线程(否则就没有线程执行command了) 48 //允许大于corePoolSize,corePoolSize是允许为0的,maximumPoolSize必须大于0 49 else if (workerCountOf(recheck) == 0) 50 addWorker(null, false); 51 } 52 else if (!addWorker(command, false))//尝试以maximumPoolSize为上限添加worker,比如cachedThreadPool,其workQueue是一个SynchronousQueue(同步队列,该队列并没有真正的空间来存放插入的元素),当core数量的线程都有任务执行时,就会执行这里。 53 reject(command); 54 } 55 56 57 public Future<?> submit(Runnable task) { 58 if (task == null) throw new NullPointerException(); 59 //将task包装为一个FutureTask实例,重点来看这个FutureTask 60 RunnableFuture<Void> ftask = newTaskFor(task, null); 61 //仍然是执行execute 62 execute(ftask); 63 return ftask; 64 } 65 }
ThreadPoolExecutor的核心部件就是workQueue和workers,workQueue用于存放提交的任务,workers用于存放工作线程,workers模型是固定的。我们使用Executors可以创建cached、fixed等线程池,cached线程池的思路就是每个提交的任务都会有独立的线程来执行,而不会进入队列等待被执行(cached线程池所持有的workQueue是一个没有实际容量的队列),当线程数大于max阈值时,就会reject(默认策略是抛运行时异常,再对异常做出相应的处理)。fixed线程池由于线程数量固定,故需要将暂时无法处理的任务存入workQueue,如果workQueue也满了,则reject。
java.util.concurrent.FutureTask
1 //通过FutureTask的run、set、cancel三个方法可以一窥FutureTask的原理 2 public class FutureTask<V> implements RunnableFuture<V> { 3 //...... 4 5 public void run() { 6 if (state != NEW || 7 !UNSAFE.compareAndSwapObject(this, runnerOffset, 8 null, Thread.currentThread())) 9 return; 10 try { 11 Callable<V> c = callable; 12 //由于状态限制,一个FutureTask只能运行一次,这与Runnable不同,Thread是只能start一次 13 if (c != null && state == NEW) { 14 V result; 15 boolean ran; 16 try { 17 //执行自定义逻辑 18 result = c.call(); 19 ran = true; 20 } catch (Throwable ex) { 21 result = null; 22 ran = false; 23 setException(ex); 24 } 25 if (ran) 26 //设置返回值,设置后就可以通过get得到返回值了 27 set(result); 28 } 29 } finally { 30 // runner must be non-null until state is settled to 31 // prevent concurrent calls to run() 32 runner = null; 33 // state must be re-read after nulling runner to prevent 34 // leaked interrupts 35 int s = state; 36 if (s >= INTERRUPTING) 37 handlePossibleCancellationInterrupt(s); 38 } 39 } 40 41 protected void set(V v) { 42 //只有在NEW状态才会设置返回值 43 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 44 outcome = v; 45 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 46 //唤醒阻塞在Future上的线程 47 finishCompletion(); 48 } 49 } 50 51 public boolean cancel(boolean mayInterruptIfRunning) { 52 if (!(state == NEW && 53 UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 54 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 55 return false; 56 try { // in case call to interrupt throws exception 57 if (mayInterruptIfRunning) { 58 try { 59 Thread t = runner; 60 if (t != null) 61 //如果mayInterruptIfRunning为true,则中断当前任务。 62 //注意,中断并不意味任务一定会就此结束。 63 t.interrupt(); 64 } finally { // final state 65 //设置状态为INTERRUPTED(被中断) 66 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 67 } 68 } 69 } finally { 70 //唤醒阻塞在Future上的线程 71 //执行到这里并不意味着任务中断结束,任务仍然可能在执行,只不过不会设置返回值了, 72 //即便任务仍在执行,此后get也不会阻塞了,get返回null。 73 finishCompletion(); 74 } 75 return true; 76 } 77 //...... 78 }
java.util.concurrent.Executors
1 //Executors提供了一系列ThreadPoolExecutor实现的工厂方法 2 public class Executors { 3 //...... 4 public static ExecutorService newCachedThreadPool() { 5 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 6 60L, TimeUnit.SECONDS, 7 new SynchronousQueue<Runnable>()); 8 } 9 //...... 10 }