线程池无论是在Java开发还是Android开发中,都是一个很常见也很重要的知识点,在面试中也常常被考官问到,那么本博文就带大家一探线程池的究竟。
为什么要使用线程池
在我们日常开发中,当遇到异步或者线程问题时,我们首先想到的是使用Thread或者Runnable来处理,在Android中我们还会结合Handler来实现线程的切换,Thread的使用也比较简单,在处理一些简单的异步任务时也是我们的不二之选。
但是,如果出现需要处理大量异步任务的情况时,使用new Thread的方式就会给我们带来很多问题:
- 频繁的创建线程,线程执行完之后又被回收,又会导致频繁的GC,无法重用线程。
- 如果线程的创建耗时或者需要消耗资源,频繁的创建和回收会对系统造成负担。
- 线程缺乏统一管理,各线程之间互相竞争,降低程序的运行效率。
如果我们使用线程池,那么就能有效的解决以上的问题,线程池主要有以下优点:
- 可以重用已经创建好的线程,避免频繁创建进而导致的频繁GC。
- 可以有效控制和管理线程并发,提高性能。
- 可以有效控制线程的执行,比如:定时执行,取消执行等等。
如何创建、使用线程池
相信大家已经对如何创建、使用线程池了如指掌了,我们举几个简单的例子,以方便后续的博文讲解。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
一探究竟
在平时的开发中,我们通常通过如下的代码构造一个固定数量线程的线程池:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
我们来看下固定线程数的线程池是如何构造出来的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
原来我们常用的线程池都是通过对ThreadPoolExecutor进行不同配置来实现的,那么我们就从这个ThreadPoolExecutor来开始分析吧!
1.ThreadPoolExecutor
ThreadPoolExecutor有四个构造函数,我们来看一些最复杂的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
这个构造函数有7个参数,而其他构造函数也是复用了该构造函数,我们来分析下这几个参数的意义:
- corePoolSize
线程池中核心线程数。
线程池在新建线程时,如果当前线程池总数小于核心线程数(corePoolSize),那么新建的线程是核心线程;否则,新建的线程则为非核心线程。
除非调用了allowCoreThreadTimeOut方法设置了空闲等待时间,否则这些线程一直存在于线程池中,即使他们处于空闲状态。
- maximumPoolSize
线程池中允许存在的最大线程数量。
maximumPoolSize = 核心线程数(corePoolSize)+非核心线程数。
- keepAliveTime
非核心线程空闲等待时间(超时时长),当系统中非核心线程闲置时间超过keepAliveTime后,非核心线程就会被回收。
如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长。
- unit
keepAliveTime的单位,有纳秒、微秒、毫秒、秒、分、时、天等。
TimeUnit是一个枚举类型,包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天
- workQueue
线程池中待执行任务队列。该队列用于存储由execute提交的Runnable任务。
常用的任务队列主要有以下几种:
SynchronousQueue
这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。LinkedBlockingQueue
这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize.ArrayBlockingQueue
可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。DelayQueue
队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
- threadFactory
为线程池创建新线程的工厂类,这个我们一般使用默认即可。
- handler
拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到 最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会 抛出一个RejectedExecutionException。
这就是是ThreadPoolExecutor的构造方法参数的解释,我们的线程提交到线程池之后又是按照什么样的策略去运行呢?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get(); // 获取线程池中的线程数目
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 启动一个核心线程,true代表的是核心线程
return;
c = ctl.get();
}
// 向队列中添加任务,注意offer返回结果(队列已满时返回false,add直接抛出异常)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
从上面的代码中我们可以发现,当一个任务被加入线程池时,ThreadPoolExecutor执行策略如下:
如果当前线程池中的线程数少于核心线程数(corePoolSize),那么就启动一个新的线程(核心线程)执行任务。
线程池中的线程数已经达到核心线程数(corePoolSize),而且workQueue未满,则将任务放入workQueue中等待执行。
如果线程池中的线程数已经达到核心线程数(corePoolSize)但未超过最大线程数(maximumPoolSize),而且workQueue已满,则开启一个非核心线程来执行任务。
如果线程池中的线程数已经超过最大线程数(maximumPoolSize),则拒绝执行该任务。
2. FixedThreadPool
FixedThreadPool是一个核心线程数量固定的线程池,创建以及构造如下:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
从构造上分析,我们得出FixedThreadPool有以下特点:
所有线程均为核心线程,不存在非核心线程。(corePoolSize和maximumPoolSize相等,均为nThreads)。
不存在线程超时的情况。(keepAliveTime==0)
线程队列(LinkedBlockingQueue)的默认大小为Integer.MAX_VALUE(2的31次方减1),因此我们可以往队列里添加多个任务。
我们来验证下分析是否正确:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 30; i++) {
final int finalI = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());
}
};
fixedThreadPool.execute(runnable);
}
运行结果如下:
The runnable num is :0 and thread name :pool-1-thread-1
The runnable num is :1 and thread name :pool-1-thread-2
The runnable num is :2 and thread name :pool-1-thread-3
The runnable num is :3 and thread name :pool-1-thread-1
The runnable num is :4 and thread name :pool-1-thread-2
The runnable num is :5 and thread name :pool-1-thread-3
The runnable num is :6 and thread name :pool-1-thread-1
The runnable num is :7 and thread name :pool-1-thread-2
The runnable num is :8 and thread name :pool-1-thread-3
The runnable num is :9 and thread name :pool-1-thread-1
The runnable num is :10 and thread name :pool-1-thread-2
The runnable num is :11 and thread name :pool-1-thread-3
The runnable num is :13 and thread name :pool-1-thread-2
The runnable num is :12 and thread name :pool-1-thread-1
The runnable num is :14 and thread name :pool-1-thread-3
运行结果与我们的分析一致,先往核心线程中添加三个任务,剩余任务进入到workQueue中等待,当有空闲的核心线程时就执行任务队列中的任务。
3. SingleThreadExecutor
SingleThreadExecutor与FixedThreadPool类似,不同的是SingleThreadExecutor线程池中只有一个固定的核心线程(可以将其看成是一个特殊的FixedThreadPool)。创建及构造如下:
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
从构造上分析,我们得出SingleThreadExecutor有以下特点:
只存在一个核心线程,不存在非核心线程。(corePoolSize和maximumPoolSize相等,均为nThreads)。
不存在线程超时的情况。(keepAliveTime==0)
线程队列的默认大小为Integer.MAX_VALUE(2的31次方减1)。
任务按照FIFO原则执行,不存在线程同步的问题(只有一个线程啊)。
我们来验证下我们的分析是否正确:
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int finalI = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());
}
};
singleThreadPool.execute(runnable);
}
运行结果如下:
The runnable num is :0 and thread name :pool-1-thread-1
The runnable num is :1 and thread name :pool-1-thread-1
The runnable num is :2 and thread name :pool-1-thread-1
The runnable num is :3 and thread name :pool-1-thread-1
The runnable num is :4 and thread name :pool-1-thread-1
4. CachedThreadPool
CachedThreadPool的创建及构造如下:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从构造上分析,我们得出SingleThreadExecutor有以下特点:
1.不存在核心线程,只有非核心线程,而且线程数目非常大(Integer.MAX_VALUE)。
存在线程超时机制,超时时间为60s。(keepAliveTime==60s)
使用SynchronousQueue来管理任务队列。
从上面的特点我们可以分析出CachedThreadPool特别适合执行大量并发任务的场景,因为线程数目比较大,所以每当我们添加一个新任务进来的时候,如果线程池中有空闲的线程,则由该空闲的线程执行新任务,如果没有空闲线程,则创建新线程来执行任务。
我们来验证下我们的分析是否正确:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int finalI = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());
}
};
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(runnable);
}
运行结果如下:
The runnable num is :0 and thread name :pool-1-thread-1
The runnable num is :1 and thread name :pool-1-thread-2
The runnable num is :2 and thread name :pool-1-thread-3
The runnable num is :3 and thread name :pool-1-thread-2
The runnable num is :4 and thread name :pool-1-thread-3
The runnable num is :5 and thread name :pool-1-thread-1
The runnable num is :6 and thread name :pool-1-thread-2
The runnable num is :7 and thread name :pool-1-thread-1
The runnable num is :8 and thread name :pool-1-thread-3
The runnable num is :9 and thread name :pool-1-thread-2
5. ScheduledThreadPool
ScheduledThreadPool具备定时执行任务的特点,ScheduledThreadPool的创建及构造如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPool具有如下特点:
核心线程数量固定,而且非核心线程数目非常大(Integer.MAX_VALUE)。
存在线程超时机制,超时时间为10s。(keepAliveTime==10s)
使用DelayedWorkQueue来管理任务队列。
我们看下ScheduledThreadPool几个常用的API:
- 延迟执行任务
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
- 延迟定时执行任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
首次启动任务的时间间隔为initialDelay+period,然后每隔period执行一次任务。
- 延迟执行定时执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
首次启动任务的时间间隔为initialDelay,然后每隔delay执行一次任务。
Android中的线程池
在我们日常的Android开发中,我们经常使用的AsyncTask的实现就采用了线程池,我们来看下AsyncTask是如何实现的。
首先来看AsyncTask的一些属性声明:
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
// 核心线程数最少为2个,最多为4个。为了不影响系统运行,一般取CPU-1个。
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
// 最多存在的线程数为CPU*2-1
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
// 线程超时时间为30s
private static final int KEEP_ALIVE_SECONDS = 30;
// 线程生成工厂(没什么卵用,只是为了标识AsyncTask)
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
// 使用LinkedBlockingQueue作为任务管理队列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
// 根据上面的参数构造线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
// 允许核心线程超时存在
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
当我们调用了AsyncTask的execute方法后,执行了那些操作呢?
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
// 调用了该接口执行任务,exec为SerialExecutor类的实例
exec.execute(mFuture);
return this;
}
接着来看SerialExecutor类。
private static class SerialExecutor implements Executor {
// 将其当做栈使用即可
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
// 执行任务的接口
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
// 一个任务执行完,才会执行下一个任务(AsyncTask是可能会顺序任务的)
scheduleNext();
}
}
});
// 首个任务执行时,mActive为空调用scheduleNext进行执行
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
// 使用了我们创建的线程池完成了任务的执行
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
通过上面的简单分析,相信大家对AsyncTask的线程池机制有了一个更清晰的认识,其实很多我们看起来很牛逼的东西实现原理也很简单,只要我们多学习多看源码,就能了解其机制。
结束语
线程池在我们的实际开发中重要性不言而喻,在面试中也会被经常问到,希望大家通过本博文对线程池有一个清晰明了的认识,并在实际开发中使用它,谢谢。