一,引言
1.遇到的问题
因为项目的特殊性,需要用户在保存数据到本地数据库后,刷新数据时后台同步上传本地数据的数据,为了增加上传图片和数据的效率,使用了线程池管理。
如果数据库操作不会造成主线程的卡顿,那么不用异步线程也行,我这里的数据库数据量太大,已经影响UI卡顿了。
2.处理方案
2.1 AsyncTask.THREAD_POOL_EXECUTOR
查询和数据操作都使用了
AsyncTask.THREAD_POOL_EXECUTOR
来进行数据的查询和异步操作。
2.1.1 为什么使用它
AsyncTask.THREAD_POOL_EXECUTOR是AsyncTask在3.0版本以前并发操作所使用线程池,在3.0以后可以调用++executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR)++ 来实现并发调用,用++execute++方法实际上调用的还是++executeOnExecutor++方法,只是使用的是只有一个线程的线程池。
/** @hide */
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
在SDK25的源码中,可以看到设置其他线程池为默认线程池的方法已经被标注为隐藏了,所以只能使用默认的串行线程池。
在3.0以后的版本使用AsyncTas的execute方法,都是串行的,当多个execute被执行时,会造成线程等待的效果。如果想要线程立即执行,还得使用executeOnExecutor来实现,线程池可以使用四大线程池如:Executors.newCachedThreadPool(),也可以使用AsyncTask.THREAD_POOL_EXECUTOR。
2.1.2 缺点
但使用AsyncTask.THREAD_POOL_EXECUTOR线程池来实现上传操作确实不可行的。
为了上传效率,上传的每个表,每条数据,每个图片都是使用了新的线程,这将会导致线程池堵塞,使与主线程交互有关的查询等操作耗时过长,影响用户体验。
所以上传的操作还是要自己再创建单独的线程池来进行管理,避免交互线程的堵塞。
2.2 自定义线程池
2.2.1 线程池的创建
线程池的创建一共有6个参数。
- CorePoolSize 这个参数是核心线程的数量,默认情况下是一直存活的。
如果把allowCoreThreadTimeOut设置为true,那么核心线程也会受keepAliveTime来决定超时时常。
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
- maximumPoolSize 线程池的最大容量,当活动线程到达这个值之后,后面新增的任务会堵塞,等有空余才添加进去。
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
- keepAliveTime 非核心线程闲置时的超时时长,超过这个时常非核心线程就会回收。如果设置了++allowCoreThreadTimeOut++为true,核心线程也会被限制。
private static final int KEEP_ALIVE_SECONDS = 30;
- unit keepAliveTime参数的时间单位,是个枚举。
TimeUnit.SECONDS
- workQueue 它是堵塞队列,用来实现数据共享。当队列没有数据时,消费者所有线程的自动挂起,直到有数据放入;当队列满时,生产者所有线程都自动挂起,直到队列有空位。++put/take++方法实现了上述功能,而++offer/poll++可以设置时常来实现线程挂起,超过时常返回布尔值。
常用的堵塞队列有ArrayBlockingQueue和LinkedBlockingQueue,他们最大的不用是前者put/take是用的一个锁,放与取无法实现并行;后者放一个锁,取一个锁,可以实现put/take的并行。
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
- RejectedExecutionHandler 拒绝的策略。在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会有问题,这时会用到拒绝策略:
- AbortPolicy :不执行,会抛出 RejectedExecutionException 异常。默认的策略。
- CallerRunsPolicy :由调用者(调用线程池的主线程)执行。
- DiscardOldestPolicy :抛弃等待队列中最老的。
- DiscardPolicy: 不做任何处理,即抛弃当前任务。
- threadFactory 为线程池中创建新线程,它是个接口,使用时需要实现newThread方法来创建。如果不指定它,将会使用默认的ThreadFactory:DefaultThreadFactory。
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "SynData #" + mCount.getAndIncrement());
}
};
2.2.2 线程池的执行逻辑
先说下execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1 当前线程数量小于核心线程数据时,创建一个新线程来运行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2 将任务加入等待队列,当等待队列还有空位插入成功,走入if
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3 当等待队列已满,则创建非核心线程来运行任务
else if (!addWorker(command, false))
reject(command);
}
上面是执行execute时,任务的处理,1是创建核心线程运行任务,3是创建非核心线程运行任务。那队列的任务如何运行呢?
addWorker中是创建线程来执行任务,封装在Worker中执行任务。
/**
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//实际上线程执行的任务,是下面那个方法的runWorker
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
如何执行等待队列里的任务,关键就在runWorker中了,可以看到线程实际都是runWorker方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask就是从队列中取出任务,这里循环到等待任务执行完成。
while (task != null || (task = getTask()) != null) {
//执行任务,省略
...
}
completedAbruptly = false;
} finally {
//当等待队列中的任务执行完,就移除这个Worker
processWorkerExit(w, completedAbruptly);
}
}
上面实现了循环执行等待队列,等待队列执行完就执行processWorkerExit来移除Worker。
等等,核心线程也会被销毁吗? 这就要看++getTask++方法了。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
...
//超时策略就在这里了
//当线程大于corePoolSize或允许核心线程超时时,timed为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//用来退出循环,回收线程,退出循环的条件,下面分析
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//当timed为true时,根据keepAliveTime来设置线程挂起时间,超过挂起时间则返回空
//如果timed为false,则挂起线程,等待队列插入数据
try {
//通过poll和take方法来实现线程回收
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
可以看出,等待队列的++poll++和++take++方法对于线程回收起到了至关重要的作用。
推测出可回收线程的条件:
- 当运行线程大于线程池容量时,一定会回收。
- 当运行线程小于线程池容量,大于核心线程数时,根据设置的非核心线程超时时间来获取任务,任务为空则回收。
- 当运行线程小于核心线程数大于1。当设置核心线程可回收,且根据超任务获取为空,则回收;核心线程不可回收时则挂起线程。
- 当运行线程等于1。设置核心线程可回收,且根据超时取出的任务为空,下次循环如果队列还是为空,则回收;核心线程不可回收则挂起线程。
总的来说,当运行线程小于核心线程数量,根据核心线程是否回收来决定核心线程是否要回收。
至此,对于线程池如何处理任务应该有了一定的了解。
2.3 线程池的一些方法
同步任务不允许并发执行,只能串行,这就需要在执行同步任务时判断同步任务是否正在执行。这里使用THREAD_POOL_EXECUTOR.getActiveCount() == 0来判断。
getActiveCount是用来获取线程池中运行线程的数量。
2.4 使用AtomicInteger
使用AtomicInteger来实现不同线程并发下的线程安全。
2.5 volatile
volatile是个不被推荐使用的变量,因为它的特性:具有 synchronized 的可见性特性,但是不具备原子特性。没有原子特性,无法满足读取-修改-写入操作序列组成的组合操作,使数据在这个过程中无法保持不变。使得 volatile 变量不能像 synchronized 那样普遍适用于实现线程安全。
正是由于其特性,在满足使用场景的情况下,volatile的使用更加简单,性能也更加由于锁。它的读操作几乎和非 volatile一样,写操作为了保证可见性需要实现内存界定使得开销比读大很多,但总开销比起锁还是更低。
差点忘了使用场景:
- 对变量的写操作不依赖于当前值。
- 该变量没有包含在具有其他变量的不变式中。
第一条很好理解,就是因为不具备原子特性,写过程不能保证值不变化,如果写要不依赖当前值。第二条也是因为原子特性,当前值如果变化了,不变式就失去了正确性。
总的来说,操作不依赖当前值就可以了。但它比起锁来说太容易出错了。
3.小结
对参数设置如果有一定的了解,对于使用上其实就没什么问题了。