在线JDK地址:
中文: http://tool.oschina.net/apidocs/apidoc?api=jdk-zh
英文: http://tool.oschina.net/apidocs/apidoc?api=jdk_7u4
在线源码地址:java.util.concurrent.Executors java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.Executors
Executors中 newFixedThreadPool 方法
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
Executors中 newSingleThreadExecutor 方法
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
Executors中 newCachedThreadPool 方法
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
Executors中 newScheduledThreadPool 方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor 构造方法如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /** * Core pool size, updated only while holding mainLock, but * volatile to allow concurrent readability even during updates. */ private volatile int corePoolSize; /** * Maximum pool size, updated only while holding mainLock but * volatile to allow concurrent readability even during updates. */ private volatile int maximumPoolSize; /** * The queue used for holding tasks and handing off to worker * threads. Note that when using this queue, we do not require * that workQueue.poll() returning null necessarily means that * workQueue.isEmpty(), so must sometimes check both. This * accommodates special-purpose queues such as DelayQueues for * which poll() is allowed to return null even if it may later * return non-null when delays expire. */ private final BlockingQueue<Runnable> workQueue; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime; /** * Factory for new threads. All threads are created using this * factory (via method addThread). All callers must be prepared * for addThread to fail by returning null, which may reflect a * system or user's policy limiting the number of threads. Even * though it is not treated as an error, failure to create threads * may result in new tasks being rejected or existing ones * remaining stuck in the queue. On the other hand, no special * precautions exist to handle OutOfMemoryErrors that might be * thrown while trying to create threads, since there is generally * no recourse from within this class. */ private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler;
ThreadPoolExecutor 中 execute 方法如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } } private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; } private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); boolean workerStarted = false; if (t != null) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; try { t.start(); workerStarted = true; } finally { if (!workerStarted) workers.remove(w); } } return t; } /** * Lock held on updates to poolSize, corePoolSize, * maximumPoolSize, runState, and workers set. */ private final ReentrantLock mainLock = new ReentrantLock();
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
ThreadPoolExecutor 中 其他方法:
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * The queue used for holding tasks and handing off to worker * threads. Note that when using this queue, we do not require * that workQueue.poll() returning null necessarily means that * workQueue.isEmpty(), so must sometimes check both. This * accommodates special-purpose queues such as DelayQueues for * which poll() is allowed to return null even if it may later * return non-null when delays expire. */ private final BlockingQueue<Runnable> workQueue; /** * Counter for completed tasks. Updated only on termination of * worker threads. */ private long completedTaskCount; public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) { if (w.isActive()) ++n; } return n; } finally { mainLock.unlock(); } } public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } } public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; if (w.isActive()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } }