Java_多线程(创建/同步锁/等待唤醒机制/线程池/AsyncTask)
本文由 Luzhuo 编写,转发请保留该信息.
原文: http://blog.csdn.net/Rozol/article/details/77344792
本文讲解了: 线程的创建 / 同步锁机制 / 等待唤醒机制 / 线程池 / Android的AsyncTask源码分析
线程的创建
创建子线程大约可分为4中方式
public class Test {
public static void main(String[] args) {
// 方式一: 继承Thread类
run1();
// 方式二: 重写Thread的run方法
run2();
// 方式三: 实现Runnable接口
run3();
// 方式四: 实现Runnable接口2
run4();
}
/** * 方式一: 继承Thread类 */
private static void run1() {
MyThreadClass thread1_1 = new MyThreadClass();
MyThreadClass thread1_2 = new MyThreadClass("thread 1_2");
thread1_1.setName("thread 1_1");
thread1_1.start();
thread1_2.start();
}
/** * 方式二: 重写Thread的run方法 */
private static void run2() {
new Thread("thread 2"){
public void run() {
for (int x = 0; x < 100; x++) {
System.out.println(Thread.currentThread().getName() + ": " + x);
}
};
}.start();
}
/** * 方式二: 实现Runnable接口 */
private static void run3() {
Thread thread2_1 = new Thread(new MyRunnable());
thread2_1.setName("thread 3_1");
thread2_1.start();
Thread thread2_2 = new Thread(new MyRunnable(), "thread 3_2");
thread2_2.start();
}
/** * 方式三: 实现Runnable接口2 */
private static void run4() {
new Thread(new Runnable() {
@Override
public void run() {
for (int x = 0; x < 100; x++) {
System.out.println(Thread.currentThread().getName() + ": " + x);
}
}
}, "thread 4").start();
}
}
public class MyThreadClass extends Thread{
public MyThreadClass(){};
public MyThreadClass(String name){
super(name);
};
@Override
public void run(){ //继承Thread类并重写run()方法
for(int x = 0; x < 100; x++){
//public final String getName(); //返回该线程的名称
System.out.println(getName() + ": " + x);
}
}
}
public class MyRunnable implements Runnable{
@Override
public void run() {
for (int x = 0; x < 100; x++) {
//public static Thread currenthread(); //返回对当前正在执行的线程对象的引用
System.out.println(Thread.currentThread().getName() + ": " + x);
}
}
}
线程的同步锁
同步锁实现方式
/** * 同步锁的几种实现放方式 * 区别: * Synchronized: 采用CPU悲观锁机制(JVM执行), 线程是独占的, 当很多线程进程锁时会引起CPU频繁切换而影响性能 * Lock: java写的乐观锁, 每次不加锁假设没有冲突去执行, 如果发生冲突则重试 * @author Luzhuo */
public class Test {
public static void main(String[] args) {
// 方式1: 同步代码块
SynchronizedBlock block = new SynchronizedBlock();
Thread thread1_1 = new Thread(block, "1号窗口");
Thread thread1_2 = new Thread(block, "2号窗口");
thread1_1.start();
thread1_2.start();
// 方式2: 同步方法
SynchronizedMethod method = new SynchronizedMethod();
Thread thread2_1 = new Thread(method, "1号窗口");
Thread thread2_2 = new Thread(method, "2号窗口");
thread2_1.start();
thread2_2.start();
// 方式3: 静态同步方法
SynchronizedStaticMethod staticMethod = new SynchronizedStaticMethod();
Thread thread3_1 = new Thread(staticMethod, "1号窗口");
Thread thread3_2 = new Thread(staticMethod, "2号窗口");
thread3_1.start();
thread3_2.start();
// 方式4: Lock锁
LockBlock lockBlock = new LockBlock();
Thread thread4_1 = new Thread(lockBlock, "1号窗口");
Thread thread4_2 = new Thread(lockBlock, "2号窗口");
thread4_1.start();
thread4_2.start();
// ==========================================================
// 数据安全访问的几种方式
DataSecurity data = new DataSecurity();
Thread thread5_1 = new Thread(data, "thread 1");
Thread thread5_2 = new Thread(data, "thread 2");
thread5_1.start();
thread5_2.start();
}
}
/** * 同步代码块 * @author Luzhuo */
public class SynchronizedBlock implements Runnable{
private static int x = 100; // 票的数量
private Object obj = new Object();
@Override
public void run() {
while (true) {
syncBlock();
}
}
/** * 方式1: 同步代码块, 锁对象:任意对象 */
public void syncBlock() {
// synchronized代码同步锁
synchronized (obj) { // 锁对象是new Object();
if (x > 0) {
try {
// public static void sleep(long millis); // 以指定毫秒数内暂停线程
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第 " + (x--) + " 张票");
}
}
}
}
/** * 同步方法 * @author Luzhuo */
public class SynchronizedMethod implements Runnable{
private static int x = 100; // 票的数量
@Override
public void run() {
while(true){
syncMethod();
}
}
/** * 方式2: 同步方法: 锁对象:this */
private synchronized void syncMethod() { //锁对象是 this
if (x > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第 " + (x--) + " 张票");
}
}
}
/** * 静态同步方法 * @author Luzhuo * */
public class SynchronizedStaticMethod implements Runnable{
private static int x = 100; // 票的数量
@Override
public void run() {
while(true){
syncStaticMethod();
}
}
/** * 方式3: 静态同步方法: 锁对象: 类.class 字节码文件对象 */
private static synchronized void syncStaticMethod(){ //锁对象是 Ticket.class
if (x > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第 " + (x--) + " 张票");
}
}
}
/** * Lock是Java5之后加入的 * @author Luzhuo */
public class LockBlock implements Runnable{
private int x = 100; // 票的数量
@Override
public void run() {
while (true) {
lockBlock();
}
}
// 定义一个锁对象
private final Lock lock = new ReentrantLock();
/** * 方式4: Lock锁代码块 */
public void lockBlock() {
// void lock(); //获取锁
lock.lock();
try{
if (x > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第 " + (x--) + " 张票");
}
}finally {
// void unlock(); //释放锁
lock.unlock();
}
}
}
死锁
/** * 死锁的案例 * 死锁的产生: 线程1拿着objA的锁去获取objB的锁, 线程2拿着objB的锁去获取objA的锁, 两者互不相让就产生了死锁 * @author Luzhuo */
public class 死锁{
public static void main(String[] args) {
死锁.DieLock d1 = new 死锁.DieLock(true);
死锁.DieLock d2 = new 死锁.DieLock(false);
d1.start();
d2.start();
}
public static class DieLock extends Thread{
private boolean flag;
public DieLock(boolean flag){
this.flag = flag;
}
public void run() {
if(flag){
while(true){
synchronized (Mylock.objA) {
System.out.println("if objA");
synchronized (Mylock.objB) {
System.out.println("if objB");
}
}
}
}else{
while(true){
synchronized (Mylock.objB) {
System.out.println("else objB");
synchronized (Mylock.objA) {
System.out.println("else objA");
}
}
}
}
// 执行结果:
// if objA -> if objA -> if objA -> else objB -> 死锁
// else objB -> else objB -> else objB -> if objA -> 死锁
// if objA -> else objB -> 死锁
}
}
public static class Mylock {
public static final Object objA = new Object();
public static final Object objB = new Object();
}
}
线程安全的数据访问
第3个和第4个的数据访问线程是安全的
/** * 多线程下数据安全访问的几种方式(第3,4个线程安全, 第1,2个线程不安全) * 以下方式中, 只有 synchronized 能保证数据访问安全 * @author Luzhuo */
public class DataSecurity implements Runnable{
private static int num1 = 100;
/** * 数据访问不安全 * 由于子线程修改数据后, 可能不去及时更新主线程数据, 而去继续执行其他操作 * @return */
public int getNum1(){
return num1--;
}
private static volatile int num2 = 100;
/** * 数据访问不安全; * volatile[ˈvɑ:lətl] 原解释: 保证修改的值会立即被更新到主内存,当其他线程读取时,会去主内存中读取最新值 * 实际情况: 由于线程1修改数据时, 若线程2拿到的是旧数据, 那么线程2修改的数据将无效, 所以无法保证数据安全 */
public int getNum2(){
return num2--;
}
private static int num3 = 100;
/** * 数据访问安全 * 锁的机制保证了该数据只有一个线程在修改 * @return */
public synchronized int getNum3(){
num3--;
return num3;
}
private AtomicInteger num4 = new AtomicInteger(100);
/** * 数据访问安全 * 使用Java自定的线程安全封装类 * @return */
public int getNum4(){
return num4.getAndDecrement();
}
@Override
public void run() {
int number = Integer.MAX_VALUE;
while(number > 0){
try {
Thread.sleep(10);
// number = getNum1(); // 原始方式
// number = getNum2(); // volatile 关键词
// number = getNum3(); // synchronized 锁机制
number = getNum4(); // Java线程安全封装类
System.out.println(Thread.currentThread().getName() + ": " + number);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
线程的等待唤醒机制
等待唤醒机制
/** * 多线程的等待唤醒机制 * wait: 当前线程等待,会释放锁; notify/notifyAll: 唤醒其他线程 * @author Luzhuo */
public class 等待唤醒机制 {
public static void main(String[] args) {
Bean bean = new Bean();
SetThread st = new SetThread(bean);
GetThread gt = new GetThread(bean);
Thread t1 = new Thread(st, "setData");
Thread t2 = new Thread(gt, "getData");
t1.start();
t2.start();
}
/** * Bean * @author Luzhuo */
public static class Bean {
public int number = 0;
public boolean flag = false;
}
public static class SetThread implements Runnable {
private Bean mBean;
private int count = 0;
public SetThread(Bean bean) {
this.mBean = bean;
}
@Override
public void run() {
while (true) {
synchronized (mBean) {
try{
Thread.sleep(10);
if (mBean.flag) { // bean = true 则等待
// public final void wait(); // 当前线程等待
mBean.wait(); // wait和notify必须在同步代码块中使用, 因为在执行这两个方法之前要先获得锁
}
// 设置值
mBean.number = count;
System.out.println(Thread.currentThread().getName() + ": " + count);
count++;
mBean.flag = true;
// public final void notify(); // 唤醒对象监视器上等待的单个线程
mBean.notify(); // 唤醒一个等待该锁的线程, 然后继续执行完锁定区, 再释放锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static class GetThread implements Runnable{
private Bean mBean;
public GetThread(Bean bean){
this.mBean = bean;
}
@Override
public void run(){
while(true){
synchronized (mBean) {
try{
Thread.sleep(10);
if(!mBean.flag) { // flag = false
mBean.wait(); // 等待会释放锁对象
}
System.out.println(Thread.currentThread().getName() + ": " + mBean.number);
mBean.flag = false;
mBean.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
等待唤醒机制的案例: 生成者消费者
使用Synchronized的实现方式
/** * 案例: 生产者-消费者(等待唤醒机制 使用Synchronized的案例) * 一边生产一边消费 * @author Luzhuo */
public class 生产者消费者Synchronized {
public static void main(String[] args) {
Resource resorce = new Resource(); // 资源
new Thread(new Producer(resorce)).start(); // 生产者
new Thread(new Consumer(resorce)).start();; // 消费者
}
/** * 资源 */
private static class Resource {
private String[] datas = new String[1];
/** * 将资源存储到容器中 */
public synchronized void save(String data){
try{
Thread.sleep(10);
while(datas[0] != null){
// 让当前的线程等待, 会释放锁 (必须同一把锁上的)
wait(); // // wait 必须存在于同步中
}
datas[0] = data;
System.out.println(Thread.currentThread().getName() + "Resource:save:" + datas[0]);
// 唤醒其他等待的线程(随机一个)执行 (必须同一把锁上的)
// notify();
// 唤醒所有线程
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** * 需要提供从容器中取出商品的方法 */
public synchronized void get(){
try{
Thread.sleep(10);
//消费者线程进来之后,需要先判断有没有商品,没有就等待,有就消费
while(datas[0] == null){
wait();
}
System.out.println(Thread.currentThread().getName() + "Resource:get:" + datas[0]);
datas[0] = null;
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/** * 生产者 */
private static class Producer implements Runnable { //给生产者生产商品进行编号
private int num = 1;
private Resource resource ;
Producer(Resource resource){
this.resource = resource;
}
public void run(){
while(true){ resource.save("data:" + num++); }
}
}
/** * 消费者 */
private static class Consumer implements Runnable{
private Resource resource ;
Consumer(Resource resource){
this.resource = resource;
}
public void run(){
while(true){ resource.get(); }
}
}
}
使用Lock的实现方式
/** * 案例: 生产者-消费者(等待唤醒机制 使用Lock的案例) * 一边生产一边消费 * @author Luzhuo */
public class 生产者消费者Lock {
public static void main(String[] args) {
Resource resorce = new Resource(); // 资源
new Thread(new Producer(resorce)).start(); // 生产者
new Thread(new Consumer(resorce)).start();; // 消费者
}
private static class Resource {
private String[] datas = new String[1];
// jdk 1.5 之后的锁, 替代 synchronized 代码块 // jdk1.5之后的监视器, Condition 替代 Object 的等待和唤醒机制 (1个Lock下拥有多个Condition对象)
private final Lock lock = new ReentrantLock();
// 创建锁下的监视器对象 (生产者)
Condition pro = lock.newCondition();
// 创建锁下的监视器对象 (消费者)
Condition con = lock.newCondition();
public void save(String data){
lock.lock(); // 获取锁
try{
Thread.sleep(10);
while(datas[0] != null){
// 等待-生产者
pro.await();
}
datas[0] = data;
System.out.println(Thread.currentThread().getName() + "Resource:save:" + datas[0]);
// 唤醒-消费者
con.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}
//需要提供从容器中取出商品的方法
public void get(){
lock.lock();
try{
Thread.sleep(10);
while(datas[0] == null){
con.await(); // 等待-消费者
}
System.out.println(Thread.currentThread().getName() + "Resource:get:" + datas[0]);
datas[0] = null;
pro.signal(); // 唤醒-生产者
}catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
/** * 生产者 */
private static class Producer implements Runnable { //给生产者生产商品进行编号
private int num = 1;
private Resource resource ;
Producer(Resource resource){
this.resource = resource;
}
public void run(){
while(true){ resource.save("data:" + num++); }
}
}
/** * 消费者 */
private static class Consumer implements Runnable{
private Resource resource ;
Consumer(Resource resource){
this.resource = resource;
}
public void run(){
while(true){ resource.get(); }
}
}
}
线程池
原理
线程池的原理类
/** * 线程池原理 * 最多开启maxCount个线程, 未处理的任务都放到LinkedBlockingQueue集合里, 当线程执行完会从LinkedBlockingQueue集合里取出新任务继续执行 * @author Luzhuo */
public class ThreadPool {
int maxCount = 3; // 最多开启多少个线程
private final AtomicInteger count = new AtomicInteger(0); // 当前开的线程数 AtomicInteger:线程同步的Integer
private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<Runnable>(); // LinkedBlockingQueue:链式阻塞队列(线程安全); 在Java源码的线程池中, 使用的是SynchronousQueue来存储任务
/** * 执行线程 * @param runnable */
public void execute(Runnable runnable){
runnables.add(runnable);
if(count.incrementAndGet() <= maxCount){
createThread();
}
}
/** * 创建线程 */
private void createThread(){
new Thread(){
@Override
public void run() {
while(true){
if(runnables.size() > 0){
Runnable remove = runnables.remove(); // 取出一个异步任务
if(remove != null){
remove.run();
}
}else return;
}
}
}.start();
}
}
使用
public class Test {
public static void main(String[] args) {
// 自定义线程池ThreadPool
threadPool();
}
/** * 自定义线程池ThreadPool */
private static void threadPool() {
ThreadPool threadPool = new ThreadPool();
for (int i = 0; i < 10; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
while(true){
try {
Thread.sleep(10);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
}
}
原理图大致如下:
ThreadPoolExecutor
ThreadPoolExecutor是Java自带的线程池类
/** * Java自带线程池的使用 * @author Luzhuo */
public class Test {
public static void main(String[] args) {
// 线程池无限大(核心: 0, 最大: Integer.MAX_VALUE)
ExecutorService threadpool = Executors.newCachedThreadPool();
// 限制线程池大小(x, x), 超过线程池数量限制的任务将加入等待队列
threadpool = Executors.newFixedThreadPool(3); // 获取逻辑处理器数: Runtime.getRuntime().availableProcessors();
// 单线程池(1, 1)
threadpool = Executors.newSingleThreadExecutor();
for(int i = 0; i < 10; i++){ // 执行10个任务
run(threadpool);
}
// 周期性线程池(x, Integer.MAX_VALUE)
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 延迟执行
for(int i = 0; i < 10; i++){
scheduledThreadPool.schedule(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 3, TimeUnit.SECONDS); // 延迟3秒后去执行
}
// 周期执行
for(int i = 0; i < 10; i++){
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1, 3, TimeUnit.SECONDS); // 延迟1秒后去执行, 每3秒为一个周期去执行
}
}
private static void run(ExecutorService executorService){
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
AsyncTask源码分析
AsyncTask是Android里的一个异步框架, 本质上是个线程池
AsyncTask由ThreadPoolExecutor和Handler组成, ThreadPoolExecutor执行任务, Handler将执行完的任务切换到主线程
AsyncTask源码
这是AsyncTask整理后的源码
public abstract class AsyncTask<Params, Progress, Result> {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/** * An {@link Executor} that can be used to execute tasks in parallel. */
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static final int MESSAGE_POST_RESULT = 0x1;
private static final int MESSAGE_POST_PROGRESS = 0x2;
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private static InternalHandler sHandler;
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
private volatile Status mStatus = Status.PENDING;
private final AtomicBoolean mCancelled = new AtomicBoolean();
private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
public enum Status {
PENDING,
RUNNING,
FINISHED,
}
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
@WorkerThread
protected abstract Result doInBackground(Params... params);
@MainThread
protected void onPreExecute() {
}
@MainThread
protected void onPostExecute(Result result) {
}
@MainThread
protected void onProgressUpdate(Progress... values) {
}
@MainThread
protected void onCancelled(Result result) {
onCancelled();
}
@MainThread
protected void onCancelled() {
}
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
@MainThread
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
}
这是被封装后的Runnable源码
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
protected void done() { }
public void run() {
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long RUNNER;
private static final long WAITERS;
static {
try {
STATE = U.objectFieldOffset
(FutureTask.class.getDeclaredField("state"));
RUNNER = U.objectFieldOffset
(FutureTask.class.getDeclaredField("runner"));
WAITERS = U.objectFieldOffset
(FutureTask.class.getDeclaredField("waiters"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
}
AsyncTask的使用
这是AsyncTask的使用
public class MainActivity extends AppCompatActivity {
private String uri = "";
private TextView textview;
private void initView() {
textview = (TextView) findViewById(R.id.textview);
}
public void onClick(View v){
// 线程池
//new DataTask().execute(params);//如果操作线程过多,等待 (串行)
new DataTask().executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, uri);//不用等待 (并行)
}
/** * 线程池 * @author Luzhuo * @param String 传入的参数 * @param Integer 进度 * @param Bitmap 任务完毕的返回 */
class DataTask extends AsyncTask<String, Integer, Bitmap>{
@Override
protected void onPreExecute() {
super.onPreExecute();
// 1.处理任务前,ThreadUI调用
// 初始化操作(主线程)
textview.setText("任务即将处理");
}
@Override
protected Bitmap doInBackground(String... params) {
// 2.运行在辅助线程中,负责耗时任务,可调用publishProgress(values)更新任务进度
// 耗时操作(辅线程), 然后将结果传给 onPostExecute(Bitmap)
SystemClock.sleep(1000);
publishProgress(1); // 更新进度[0,100]
SystemClock.sleep(1000);
return BitmapFactory.decodeFile("");
}
@Override
protected void onPostExecute(Bitmap result) {
super.onPostExecute(result);
// 3.ThreadUI调用,结果通过该方法传递给UI
// 接收 doInBackground() 的运算结果(主线程)
textview.setText("任务完成处理");
}
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);
// 当调用publishProgress(values)方法时执行该方法,ThreadUI执行,展示界面
// 接收 publishProgress(values) 的调用(主线程)
textview.setText("任务正在处理");
Log.e("onProgressUpdate", values[0].toString());
}
@Override
protected void onCancelled() {
super.onCancelled();
// 当ThreadUI中调用cancel()方法时调用,取消线程操作
// DataTask.onCancelled() (主线程)
}
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
initView();
}
}
源码分析
我们来分析下他的源码, 首先我们
new DataTask()
创建AsyncTask对象, 看创建对象时该类做什么:public abstract class AsyncTask<Params, Progress, Result> { private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4)); private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE_SECONDS = 30; // ↓↓↓ private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; // ↓↓↓ private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128); public static final Executor THREAD_POOL_EXECUTOR; // ↓↓↓ static { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); threadPoolExecutor.allowCoreThreadTimeOut(true); THREAD_POOL_EXECUTOR = threadPoolExecutor; } private static final int MESSAGE_POST_RESULT = 0x1; private static final int MESSAGE_POST_PROGRESS = 0x2; private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR; private static InternalHandler sHandler; private final WorkerRunnable<Params, Result> mWorker; private final FutureTask<Result> mFuture; private volatile Status mStatus = Status.PENDING; private final AtomicBoolean mCancelled = new AtomicBoolean(); private final AtomicBoolean mTaskInvoked = new AtomicBoolean(); public enum Status { PENDING, RUNNING, FINISHED, } // ↓↓↓ private static Handler getHandler() { synchronized (AsyncTask.class) { if (sHandler == null) { sHandler = new InternalHandler(); } return sHandler; } } // ↓↓↓ public AsyncTask() { // ↓↓↓ mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } }; // ↓↓↓ mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; } @WorkerThread protected abstract Result doInBackground(Params... params); @MainThread protected void onPreExecute() { } @MainThread protected void onPostExecute(Result result) { } @MainThread protected void onProgressUpdate(Progress... values) { } @MainThread protected void onCancelled(Result result) { onCancelled(); } @MainThread protected void onCancelled() { } private static class InternalHandler extends Handler { public InternalHandler() { super(Looper.getMainLooper()); } @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj; switch (msg.what) { case MESSAGE_POST_RESULT: // There is only one result result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: result.mTask.onProgressUpdate(result.mData); break; } } } private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams; } }
可见主要是创建了线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,sPoolWorkQueue, sThreadFactory);
和HandlersHandler = new InternalHandler();
,以及一些常量和变量.另外, AsyncTask初始化的线程池使用的任务队列是
private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128);
, 而非Java默认的new SynchronousQueue<Runnable>()
现在要调用
.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, uri);
去真正的执行任务了, 看看源码是怎么执行的public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) { mStatus = Status.RUNNING; onPreExecute(); mWorker.mParams = params; exec.execute(mFuture); return this; }
AsyncTask会在执行任务直线,在主线程调用
onPreExecute()
方法,这是一个空方法,有子类重写, 然后才去调用线程池执行任务exec.execute(mFuture)
, 线程池里的线程会调用run()方法执行任务public AsyncTask() { mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } }; mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; } public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
mFuture是被封装后的Runnable类, 线程池执行任务会调用run()方法
public void run() { try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
可见run()方法里调用的是
mWorker
的call()
方法, 并且在call()回调方法里会执行Result result = doInBackground(mParams);
方法,这个方法是抽象的空方法,必须被子类重写,并且这个方法是在线程池的子线程里执行的private Result postResult(Result result) { @SuppressWarnings("unchecked") Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT, new AsyncTaskResult<Result>(this, result)); message.sendToTarget(); return result; }
子线程执行完该任务之后,会调用
return postResult(result)
代码, 也就是使用Handler发送执行结果private static class InternalHandler extends Handler { public InternalHandler() { super(Looper.getMainLooper()); } @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj; switch (msg.what) { case MESSAGE_POST_RESULT: // There is only one result result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: result.mTask.onProgressUpdate(result.mData); break; } } }
Handler会根据消息是
MESSAGE_POST_RESULT
还是MESSAGE_POST_PROGRESS
, 去回调相应的方法, Handler在这里主要起到切换线程的作用如果是RESULT消息
private void finish(Result result) { if (isCancelled()) { onCancelled(result); } else { onPostExecute(result); } mStatus = Status.FINISHED; } protected void onCancelled(Result result) { onCancelled(); } protected void onCancelled() { } protected void onPostExecute(Result result) { }
onCancelled
和onPostExecute
都是被子类重写的空方法, 默认回调onPostExecute
public final boolean cancel(boolean mayInterruptIfRunning) { mCancelled.set(true); return mFuture.cancel(mayInterruptIfRunning); } public final boolean isCancelled() { return mCancelled.get(); }
如果用户调用了cancel方法进去取消任务操作, 则该任务会被打上标记
mCancelled=true
, 才会去回调onCancelled()方法- 如果这是
PROGRESS
消息, 就直接回调onProgressUpdate()
方法,这个方法也是空方法,需要被子类重写
其他
public class Test {
public static void main(String[] args) throws InterruptedException {
// 1. 线程中发生异常
new Thread(new ThreadException()).start();
// 2. 守护线程
Thread threadDaemon = new Thread(new ThreadDaemon());
threadDaemon.setDaemon(true); // 将该线程标记为守护线程 (非活动状态时标记)
threadDaemon.start();
// 3. 线程的优先级: 在创建线程时可设置线程优先级, 优先级越高,CPU处理这个线程的几率越高
Thread threadPriority = new Thread(new ThreadDaemon());
threadPriority.setPriority(Thread.MAX_PRIORITY); // 优先级 [1-10] 一般为: 1 / 5(默认) / 10
threadPriority.start();
}
}
/** * 线程异常, 任意线程异常,该线程结束,对其他任何线程无影响 */
public class ThreadException implements Runnable{
@SuppressWarnings("unused")
@Override
public void run() {
System.out.println("ThreadException");
int num = 1/0;
}
}
/** * 守护线程 <br> * 正常开启的线程都属于前台线程 <br> * 被标记守护线程的才是守护线程 <br> * 当所有的前台线程都结束时,程序才算结束 <br> * 守护线程执行效果与前台线程一样, 当所有的前台线程结束, 守护线程随前台线程结束而结束 <br> */
public class ThreadDaemon extends Thread{
@Override
public void run() {
while(true){
System.out.println("Thread_Daemon");
}
}
}