Java_多线程(创建/同步锁/等待唤醒机制/线程池/AsyncTask)

Java_多线程(创建/同步锁/等待唤醒机制/线程池/AsyncTask)

本文由 Luzhuo 编写,转发请保留该信息.
原文: http://blog.csdn.net/Rozol/article/details/77344792

本文讲解了: 线程的创建 / 同步锁机制 / 等待唤醒机制 / 线程池 / Android的AsyncTask源码分析

线程的创建

创建子线程大约可分为4中方式

public class Test {
    public static void main(String[] args) {
        // 方式一: 继承Thread类
        run1();

        // 方式二: 重写Thread的run方法
        run2();

        // 方式三: 实现Runnable接口
        run3();

        // 方式四: 实现Runnable接口2
        run4();
    }


    /** * 方式一: 继承Thread类 */
    private static void run1() {
        MyThreadClass thread1_1 = new MyThreadClass();
        MyThreadClass thread1_2 = new MyThreadClass("thread 1_2");

        thread1_1.setName("thread 1_1");

        thread1_1.start();
        thread1_2.start();      
    }

    /** * 方式二: 重写Thread的run方法 */
    private static void run2() {
        new Thread("thread 2"){
            public void run() {
                for (int x = 0; x < 100; x++) {
                    System.out.println(Thread.currentThread().getName() + ": " + x);
                }
            };
        }.start();      
    }

    /** * 方式二: 实现Runnable接口 */
    private static void run3() {
        Thread thread2_1 = new Thread(new MyRunnable());
        thread2_1.setName("thread 3_1");
        thread2_1.start();

        Thread thread2_2 = new Thread(new MyRunnable(), "thread 3_2");
        thread2_2.start();      
    }

    /** * 方式三: 实现Runnable接口2 */
    private static void run4() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int x = 0; x < 100; x++) {
                    System.out.println(Thread.currentThread().getName() + ": " + x);
                }
            }
        }, "thread 4").start();     
    }
}
public class MyThreadClass extends Thread{

    public MyThreadClass(){};

    public MyThreadClass(String name){
        super(name);
    };

    @Override
    public void run(){ //继承Thread类并重写run()方法
        for(int x = 0; x < 100; x++){
            //public final String getName(); //返回该线程的名称
            System.out.println(getName() + ": " + x);
        }
    }
}
public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int x = 0; x < 100; x++) {
            //public static Thread currenthread(); //返回对当前正在执行的线程对象的引用
            System.out.println(Thread.currentThread().getName() + ": " + x);
        }
    }
}

线程的同步锁

同步锁实现方式

/** * 同步锁的几种实现放方式 * 区别: * Synchronized: 采用CPU悲观锁机制(JVM执行), 线程是独占的, 当很多线程进程锁时会引起CPU频繁切换而影响性能 * Lock: java写的乐观锁, 每次不加锁假设没有冲突去执行, 如果发生冲突则重试 * @author Luzhuo */
public class Test {
    public static void main(String[] args) {
        // 方式1: 同步代码块
        SynchronizedBlock block = new SynchronizedBlock();

        Thread thread1_1 = new Thread(block, "1号窗口");
        Thread thread1_2 = new Thread(block, "2号窗口");

        thread1_1.start();
        thread1_2.start();

        // 方式2: 同步方法
        SynchronizedMethod method = new SynchronizedMethod();

        Thread thread2_1 = new Thread(method, "1号窗口");
        Thread thread2_2 = new Thread(method, "2号窗口");

        thread2_1.start();
        thread2_2.start();

        // 方式3: 静态同步方法
        SynchronizedStaticMethod staticMethod = new SynchronizedStaticMethod();

        Thread thread3_1 = new Thread(staticMethod, "1号窗口");
        Thread thread3_2 = new Thread(staticMethod, "2号窗口");

        thread3_1.start();
        thread3_2.start();

        // 方式4: Lock锁
        LockBlock lockBlock = new LockBlock();

        Thread thread4_1 = new Thread(lockBlock, "1号窗口");
        Thread thread4_2 = new Thread(lockBlock, "2号窗口");

        thread4_1.start();
        thread4_2.start();

        // ==========================================================

        // 数据安全访问的几种方式
        DataSecurity data = new DataSecurity();

        Thread thread5_1 = new Thread(data, "thread 1");
        Thread thread5_2 = new Thread(data, "thread 2");

        thread5_1.start();
        thread5_2.start();
    }
}
/** * 同步代码块 * @author Luzhuo */
public class SynchronizedBlock implements Runnable{
    private static int x = 100; // 票的数量
    private Object obj = new Object();

    @Override
    public void run() {
        while (true) {
            syncBlock();
        }
    }

    /** * 方式1: 同步代码块, 锁对象:任意对象 */
    public void syncBlock() {
        // synchronized代码同步锁
        synchronized (obj) { // 锁对象是new Object(); 
            if (x > 0) {
                try {
                    // public static void sleep(long millis); // 以指定毫秒数内暂停线程
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "正在出售第 " + (x--) + " 张票");
            }
        }
    }
}
/** * 同步方法 * @author Luzhuo */
public class SynchronizedMethod implements Runnable{
    private static int x = 100; // 票的数量

    @Override
    public void run() {
        while(true){
            syncMethod();
        }
    }

    /** * 方式2: 同步方法: 锁对象:this */
    private synchronized void syncMethod() {  //锁对象是 this
        if (x > 0) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "正在出售第 " + (x--) + " 张票");
        }
    }
}
/** * 静态同步方法 * @author Luzhuo * */
public class SynchronizedStaticMethod implements Runnable{
    private static int x = 100; // 票的数量

    @Override
    public void run() {
        while(true){
            syncStaticMethod();
        }
    }

    /** * 方式3: 静态同步方法: 锁对象: 类.class 字节码文件对象 */
    private static synchronized void syncStaticMethod(){ //锁对象是 Ticket.class
        if (x > 0) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "正在出售第 " + (x--) + " 张票");
        }
    }
}
/** * Lock是Java5之后加入的 * @author Luzhuo */
public class LockBlock implements Runnable{
    private int x = 100; // 票的数量

    @Override
    public void run() {
        while (true) {
            lockBlock();
        }
    }

    // 定义一个锁对象
    private final Lock lock = new ReentrantLock();

    /** * 方式4: Lock锁代码块 */
    public void lockBlock() {
        // void lock(); //获取锁
        lock.lock();

        try{

            if (x > 0) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "正在出售第 " + (x--) + " 张票");
            }

        }finally {
            // void unlock(); //释放锁
            lock.unlock();
        }
    }
}

死锁

/** * 死锁的案例 * 死锁的产生: 线程1拿着objA的锁去获取objB的锁, 线程2拿着objB的锁去获取objA的锁, 两者互不相让就产生了死锁 * @author Luzhuo */
public class 死锁{
    public static void main(String[] args) {
        死锁.DieLock d1 = new 死锁.DieLock(true);
        死锁.DieLock d2 = new 死锁.DieLock(false);
        d1.start();
        d2.start();
    }

    public static class DieLock extends Thread{
        private boolean flag;

        public DieLock(boolean flag){
            this.flag = flag;
        }

        public void run() {
            if(flag){
                while(true){
                    synchronized (Mylock.objA) {
                        System.out.println("if objA");
                        synchronized (Mylock.objB) {
                            System.out.println("if objB");
                        }
                    }
                }
            }else{
                while(true){
                    synchronized (Mylock.objB) {
                        System.out.println("else objB");
                        synchronized (Mylock.objA) {
                            System.out.println("else objA");
                        }
                    }
                }
            }
            // 执行结果:
            // if objA -> if objA -> if objA -> else objB -> 死锁
            // else objB -> else objB -> else objB -> if objA -> 死锁
            // if objA -> else objB -> 死锁
        }
    }

    public static class Mylock {
        public static final Object objA = new Object();
        public static final Object objB = new Object();
    }
}

线程安全的数据访问

第3个和第4个的数据访问线程是安全的

/** * 多线程下数据安全访问的几种方式(第3,4个线程安全, 第1,2个线程不安全) * 以下方式中, 只有 synchronized 能保证数据访问安全 * @author Luzhuo */
public class DataSecurity implements Runnable{
    private static int num1 = 100;
    /** * 数据访问不安全 * 由于子线程修改数据后, 可能不去及时更新主线程数据, 而去继续执行其他操作 * @return */
    public int getNum1(){
        return num1--;
    }


    private static volatile int num2 = 100;
    /** * 数据访问不安全; * volatile[ˈvɑ:lətl] 原解释: 保证修改的值会立即被更新到主内存,当其他线程读取时,会去主内存中读取最新值 * 实际情况: 由于线程1修改数据时, 若线程2拿到的是旧数据, 那么线程2修改的数据将无效, 所以无法保证数据安全 */
    public int getNum2(){
        return num2--;
    }

    private static int num3 = 100;
    /** * 数据访问安全 * 锁的机制保证了该数据只有一个线程在修改 * @return */
    public synchronized int getNum3(){
        num3--;
        return num3;
    }


    private AtomicInteger num4 = new AtomicInteger(100);
    /** * 数据访问安全 * 使用Java自定的线程安全封装类 * @return */
    public int getNum4(){
        return num4.getAndDecrement();
    }

    @Override
    public void run() {
        int number = Integer.MAX_VALUE;

        while(number > 0){
            try {

                Thread.sleep(10);
// number = getNum1(); // 原始方式
// number = getNum2(); // volatile 关键词
// number = getNum3(); // synchronized 锁机制
                number = getNum4(); // Java线程安全封装类
                System.out.println(Thread.currentThread().getName() + ": " + number);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

线程的等待唤醒机制

等待唤醒机制

/** * 多线程的等待唤醒机制 * wait: 当前线程等待,会释放锁; notify/notifyAll: 唤醒其他线程 * @author Luzhuo */
public class 等待唤醒机制 {
    public static void main(String[] args) {
        Bean bean = new Bean();

        SetThread st = new SetThread(bean);
        GetThread gt = new GetThread(bean);
        Thread t1 = new Thread(st, "setData");
        Thread t2 = new Thread(gt, "getData");

        t1.start();
        t2.start();
    }

    /** * Bean * @author Luzhuo */
    public static class Bean {
        public int number = 0;
        public boolean flag = false;
    }

    public static class SetThread implements Runnable {
        private Bean mBean;
        private int count = 0;

        public SetThread(Bean bean) {
            this.mBean = bean;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (mBean) {
                    try{
                        Thread.sleep(10);

                        if (mBean.flag) { // bean = true 则等待
                            // public final void wait(); // 当前线程等待
                            mBean.wait(); // wait和notify必须在同步代码块中使用, 因为在执行这两个方法之前要先获得锁
                        }

                        // 设置值
                        mBean.number = count;
                        System.out.println(Thread.currentThread().getName() + ": " + count);
                        count++;

                        mBean.flag = true;
                        // public final void notify(); // 唤醒对象监视器上等待的单个线程
                        mBean.notify(); // 唤醒一个等待该锁的线程, 然后继续执行完锁定区, 再释放锁

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static class GetThread implements Runnable{
        private Bean mBean;

        public GetThread(Bean bean){
            this.mBean = bean;
        }

        @Override
        public void run(){
            while(true){
                synchronized (mBean) {
                    try{
                        Thread.sleep(10);

                        if(!mBean.flag) { // flag = false
                            mBean.wait(); // 等待会释放锁对象
                        }

                        System.out.println(Thread.currentThread().getName() + ": " + mBean.number);
                        mBean.flag = false;
                        mBean.notify();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

等待唤醒机制的案例: 生成者消费者

使用Synchronized的实现方式

/** * 案例: 生产者-消费者(等待唤醒机制 使用Synchronized的案例) * 一边生产一边消费 * @author Luzhuo */
public class 生产者消费者Synchronized {
    public static void main(String[] args) {
        Resource resorce = new Resource(); // 资源
        new Thread(new Producer(resorce)).start(); // 生产者
        new Thread(new Consumer(resorce)).start();; // 消费者
    }

    /** * 资源 */
    private static class Resource {
        private String[] datas = new String[1];

        /** * 将资源存储到容器中 */
        public synchronized void save(String data){
            try{
                Thread.sleep(10);

                while(datas[0] != null){
                        // 让当前的线程等待, 会释放锁 (必须同一把锁上的)
                        wait(); // // wait 必须存在于同步中
                }

                datas[0] = data;   
                System.out.println(Thread.currentThread().getName() + "Resource:save:" + datas[0]);
                // 唤醒其他等待的线程(随机一个)执行 (必须同一把锁上的)
                // notify();
                // 唤醒所有线程
                notifyAll();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        /** * 需要提供从容器中取出商品的方法 */
        public synchronized void get(){
            try{
                Thread.sleep(10);

                //消费者线程进来之后,需要先判断有没有商品,没有就等待,有就消费
                while(datas[0] == null){
                        wait();
                }

                System.out.println(Thread.currentThread().getName() + "Resource:get:" + datas[0]);
                datas[0] = null;
                notifyAll();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /** * 生产者 */
    private static class Producer implements Runnable {    //给生产者生产商品进行编号
        private int num = 1;
        private Resource resource ;
        Producer(Resource resource){
            this.resource = resource;
        }

        public void run(){
            while(true){ resource.save("data:" + num++); }
        }
    }

    /** * 消费者 */
    private static class Consumer implements Runnable{
        private Resource resource ;
        Consumer(Resource resource){
            this.resource = resource;
        }

        public void run(){
            while(true){ resource.get(); }
        }
    }
}

使用Lock的实现方式

/** * 案例: 生产者-消费者(等待唤醒机制 使用Lock的案例) * 一边生产一边消费 * @author Luzhuo */
public class 生产者消费者Lock {
    public static void main(String[] args) {
        Resource resorce = new Resource(); // 资源
        new Thread(new Producer(resorce)).start(); // 生产者
        new Thread(new Consumer(resorce)).start();; // 消费者
    }

    private static class Resource {
        private String[] datas = new String[1];
        // jdk 1.5 之后的锁, 替代 synchronized 代码块 // jdk1.5之后的监视器, Condition 替代 Object 的等待和唤醒机制 (1个Lock下拥有多个Condition对象)
        private final Lock lock = new ReentrantLock();
        // 创建锁下的监视器对象 (生产者)
        Condition pro = lock.newCondition();
        // 创建锁下的监视器对象 (消费者)
        Condition con = lock.newCondition();

        public void save(String data){
            lock.lock(); // 获取锁

            try{
                Thread.sleep(10);

                while(datas[0] != null){
                    // 等待-生产者
                    pro.await();
                }

                datas[0] = data;   
                System.out.println(Thread.currentThread().getName() + "Resource:save:" + datas[0]);
                // 唤醒-消费者
                con.signal();

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock(); // 释放锁
            }
        }

        //需要提供从容器中取出商品的方法
        public void get(){
            lock.lock();

            try{
                Thread.sleep(10);

                while(datas[0] == null){
                    con.await(); // 等待-消费者
                }

                System.out.println(Thread.currentThread().getName() + "Resource:get:" + datas[0]);
                datas[0] = null;

                pro.signal(); // 唤醒-生产者

            }catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                lock.unlock();
            }

        }
    }

    /** * 生产者 */
    private static class Producer implements Runnable {    //给生产者生产商品进行编号
        private int num = 1;
        private Resource resource ;
        Producer(Resource resource){
            this.resource = resource;
        }

        public void run(){
            while(true){ resource.save("data:" + num++); }
        }
    }

    /** * 消费者 */
    private static class Consumer implements Runnable{
        private Resource resource ;
        Consumer(Resource resource){
            this.resource = resource;
        }

        public void run(){
            while(true){ resource.get(); }
        }
    }
}

线程池

原理

线程池的原理类

/** * 线程池原理 * 最多开启maxCount个线程, 未处理的任务都放到LinkedBlockingQueue集合里, 当线程执行完会从LinkedBlockingQueue集合里取出新任务继续执行 * @author Luzhuo */
public class ThreadPool {
    int maxCount = 3; // 最多开启多少个线程
    private final AtomicInteger count = new AtomicInteger(0); // 当前开的线程数 AtomicInteger:线程同步的Integer
    private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<Runnable>(); // LinkedBlockingQueue:链式阻塞队列(线程安全); 在Java源码的线程池中, 使用的是SynchronousQueue来存储任务

    /** * 执行线程 * @param runnable */
    public void execute(Runnable runnable){
        runnables.add(runnable);
        if(count.incrementAndGet() <= maxCount){
            createThread();
        }
    }

    /** * 创建线程 */
    private void createThread(){
        new Thread(){
            @Override
            public void run() {
                while(true){
                    if(runnables.size() > 0){
                        Runnable remove = runnables.remove(); // 取出一个异步任务
                        if(remove != null){
                            remove.run();
                        }
                    }else return;
                }
            }
        }.start();
    }
}

使用

public class Test {
    public static void main(String[] args) {
        // 自定义线程池ThreadPool
        threadPool();
    }

    /** * 自定义线程池ThreadPool */
    private static void threadPool() {
        ThreadPool threadPool = new ThreadPool();
        for (int i = 0; i < 10; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(10);
                            System.out.println(Thread.currentThread().getName());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    }
}

原理图大致如下:
《Java_多线程(创建/同步锁/等待唤醒机制/线程池/AsyncTask)》

ThreadPoolExecutor

ThreadPoolExecutor是Java自带的线程池类

/** * Java自带线程池的使用 * @author Luzhuo */
public class Test {
    public static void main(String[] args) {
        // 线程池无限大(核心: 0, 最大: Integer.MAX_VALUE)
        ExecutorService threadpool = Executors.newCachedThreadPool();

        // 限制线程池大小(x, x), 超过线程池数量限制的任务将加入等待队列
        threadpool = Executors.newFixedThreadPool(3); // 获取逻辑处理器数: Runtime.getRuntime().availableProcessors();

        // 单线程池(1, 1)
        threadpool = Executors.newSingleThreadExecutor(); 

        for(int i = 0; i < 10; i++){ // 执行10个任务
            run(threadpool);
        }

        // 周期性线程池(x, Integer.MAX_VALUE)
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        // 延迟执行
        for(int i = 0; i < 10; i++){
            scheduledThreadPool.schedule(new Runnable() {  
                public void run() {  
                    try {
                        System.out.println(Thread.currentThread().getName());

                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }                   
                }  
             }, 3, TimeUnit.SECONDS);  // 延迟3秒后去执行
        }

        // 周期执行
        for(int i = 0; i < 10; i++){
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {  
                public void run() {  
                    try {
                        System.out.println(Thread.currentThread().getName());

                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }    
                }
            }, 1, 3, TimeUnit.SECONDS); // 延迟1秒后去执行, 每3秒为一个周期去执行
        }
    }

    private static void run(ExecutorService executorService){
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10);

                    System.out.println(Thread.currentThread().getName());

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

AsyncTask源码分析

AsyncTask是Android里的一个异步框架, 本质上是个线程池
AsyncTask由ThreadPoolExecutor和Handler组成, ThreadPoolExecutor执行任务, Handler将执行完的任务切换到主线程

AsyncTask源码

这是AsyncTask整理后的源码

public abstract class AsyncTask<Params, Progress, Result> {
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE_SECONDS = 30;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    /** * An {@link Executor} that can be used to execute tasks in parallel. */
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

    private static final int MESSAGE_POST_RESULT = 0x1;
    private static final int MESSAGE_POST_PROGRESS = 0x2;

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    private static InternalHandler sHandler;

    private final WorkerRunnable<Params, Result> mWorker;
    private final FutureTask<Result> mFuture;

    private volatile Status mStatus = Status.PENDING;

    private final AtomicBoolean mCancelled = new AtomicBoolean();
    private final AtomicBoolean mTaskInvoked = new AtomicBoolean();

    public enum Status {
        PENDING,
        RUNNING,
        FINISHED,
    }

    private static Handler getHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler();
            }
            return sHandler;
        }
    }

    public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                Result result = doInBackground(mParams);
                Binder.flushPendingCommands();
                return postResult(result);
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }

    private void postResultIfNotInvoked(Result result) {
        final boolean wasTaskInvoked = mTaskInvoked.get();
        if (!wasTaskInvoked) {
            postResult(result);
        }
    }

    private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }


    @WorkerThread
    protected abstract Result doInBackground(Params... params);

    @MainThread
    protected void onPreExecute() {
    }

    @MainThread
    protected void onPostExecute(Result result) {
    }

    @MainThread
    protected void onProgressUpdate(Progress... values) {
    }

    @MainThread
    protected void onCancelled(Result result) {
        onCancelled();
    }    

    @MainThread
    protected void onCancelled() {
    }


    @MainThread
    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

    @MainThread
    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        mStatus = Status.RUNNING;

        onPreExecute();

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }

    @MainThread
    public static void execute(Runnable runnable) {
        sDefaultExecutor.execute(runnable);
    }

    private static class InternalHandler extends Handler {
        public InternalHandler() {
            super(Looper.getMainLooper());
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }

    private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }

    @SuppressWarnings({"RawUseOfParameterizedType"})
    private static class AsyncTaskResult<Data> {
        final AsyncTask mTask;
        final Data[] mData;

        AsyncTaskResult(AsyncTask task, Data... data) {
            mTask = task;
            mData = data;
        }
    }
}

这是被封装后的Runnable源码

public class FutureTask<V> implements RunnableFuture<V> {
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              U.compareAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

    protected void done() { }

    public void run() {
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    private static final long STATE;
    private static final long RUNNER;
    private static final long WAITERS;
    static {
        try {
            STATE = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("state"));
            RUNNER = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("runner"));
            WAITERS = U.objectFieldOffset
                (FutureTask.class.getDeclaredField("waiters"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }

        // Reduce the risk of rare disastrous classloading in first call to
        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
        Class<?> ensureLoaded = LockSupport.class;
    }
}

AsyncTask的使用

这是AsyncTask的使用

public class MainActivity extends AppCompatActivity {

    private String uri = "";
    private TextView textview;

    private void initView() {
        textview = (TextView) findViewById(R.id.textview);
    }

    public void onClick(View v){
        // 线程池
        //new DataTask().execute(params);//如果操作线程过多,等待 (串行)
        new DataTask().executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, uri);//不用等待 (并行)
    }

    /** * 线程池 * @author Luzhuo * @param String 传入的参数 * @param Integer 进度 * @param Bitmap 任务完毕的返回 */
    class DataTask extends AsyncTask<String, Integer, Bitmap>{
        @Override
        protected void onPreExecute() {
            super.onPreExecute();
            // 1.处理任务前,ThreadUI调用
            // 初始化操作(主线程)
            textview.setText("任务即将处理");
        }

        @Override
        protected Bitmap doInBackground(String... params) {
            // 2.运行在辅助线程中,负责耗时任务,可调用publishProgress(values)更新任务进度
            // 耗时操作(辅线程), 然后将结果传给 onPostExecute(Bitmap)
            SystemClock.sleep(1000);
            publishProgress(1); // 更新进度[0,100]
            SystemClock.sleep(1000);
            return BitmapFactory.decodeFile("");
        }

        @Override
        protected void onPostExecute(Bitmap result) {
            super.onPostExecute(result);
            // 3.ThreadUI调用,结果通过该方法传递给UI
            // 接收 doInBackground() 的运算结果(主线程)
            textview.setText("任务完成处理");
        }

        @Override
        protected void onProgressUpdate(Integer... values) {
            super.onProgressUpdate(values);
            // 当调用publishProgress(values)方法时执行该方法,ThreadUI执行,展示界面
            // 接收 publishProgress(values) 的调用(主线程)
            textview.setText("任务正在处理");
            Log.e("onProgressUpdate", values[0].toString());
        }

        @Override
        protected void onCancelled() {
            super.onCancelled();
            // 当ThreadUI中调用cancel()方法时调用,取消线程操作
            // DataTask.onCancelled() (主线程)
        }
    }

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        initView();
    }
}

源码分析

  • 我们来分析下他的源码, 首先我们new DataTask()创建AsyncTask对象, 看创建对象时该类做什么:

    public abstract class AsyncTask<Params, Progress, Result> {
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        private static final int KEEP_ALIVE_SECONDS = 30;
    
        // ↓↓↓
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
            }
        };
    
        // ↓↓↓
        private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
    
        public static final Executor THREAD_POOL_EXECUTOR;
    
        // ↓↓↓
        static {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            THREAD_POOL_EXECUTOR = threadPoolExecutor;
        }
    
        private static final int MESSAGE_POST_RESULT = 0x1;
        private static final int MESSAGE_POST_PROGRESS = 0x2;
    
        private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
        private static InternalHandler sHandler;
    
        private final WorkerRunnable<Params, Result> mWorker;
        private final FutureTask<Result> mFuture;
    
        private volatile Status mStatus = Status.PENDING;
    
        private final AtomicBoolean mCancelled = new AtomicBoolean();
        private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
    
        public enum Status {
            PENDING,
            RUNNING,
            FINISHED,
        }
    
        // ↓↓↓
        private static Handler getHandler() {
            synchronized (AsyncTask.class) {
                if (sHandler == null) {
                    sHandler = new InternalHandler();
                }
                return sHandler;
            }
        }
    
        // ↓↓↓
        public AsyncTask() {
            // ↓↓↓
            mWorker = new WorkerRunnable<Params, Result>() {
                public Result call() throws Exception {
                    mTaskInvoked.set(true);
    
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    //noinspection unchecked
                    Result result = doInBackground(mParams);
                    Binder.flushPendingCommands();
                    return postResult(result);
                }
            };
            // ↓↓↓
            mFuture = new FutureTask<Result>(mWorker) {
                @Override
                protected void done() {
                    try {
                        postResultIfNotInvoked(get());
                    } catch (InterruptedException e) {
                        android.util.Log.w(LOG_TAG, e);
                    } catch (ExecutionException e) {
                        throw new RuntimeException("An error occurred while executing doInBackground()",
                                e.getCause());
                    } catch (CancellationException e) {
                        postResultIfNotInvoked(null);
                    }
                }
            };
        }
    
        @WorkerThread
        protected abstract Result doInBackground(Params... params);
    
        @MainThread
        protected void onPreExecute() {
        }
    
        @MainThread
        protected void onPostExecute(Result result) {
        }
    
        @MainThread
        protected void onProgressUpdate(Progress... values) {
        }
    
        @MainThread
        protected void onCancelled(Result result) {
            onCancelled();
        }    
    
        @MainThread
        protected void onCancelled() {
        }
    
        private static class InternalHandler extends Handler {
            public InternalHandler() {
                super(Looper.getMainLooper());
            }
    
            @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
            @Override
            public void handleMessage(Message msg) {
                AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
                switch (msg.what) {
                    case MESSAGE_POST_RESULT:
                        // There is only one result
                        result.mTask.finish(result.mData[0]);
                        break;
                    case MESSAGE_POST_PROGRESS:
                        result.mTask.onProgressUpdate(result.mData);
                        break;
                }
            }
        }
    
        private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
            Params[] mParams;
        }
    }
  • 可见主要是创建了线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,sPoolWorkQueue, sThreadFactory);和HandlersHandler = new InternalHandler();,以及一些常量和变量.

  • 另外, AsyncTask初始化的线程池使用的任务队列是private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128);, 而非Java默认的new SynchronousQueue<Runnable>()

  • 现在要调用.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, uri);去真正的执行任务了, 看看源码是怎么执行的

    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        mStatus = Status.RUNNING;
    
        onPreExecute();
    
        mWorker.mParams = params;
        exec.execute(mFuture);
    
        return this;
    }
    • AsyncTask会在执行任务直线,在主线程调用onPreExecute()方法,这是一个空方法,有子类重写, 然后才去调用线程池执行任务exec.execute(mFuture), 线程池里的线程会调用run()方法执行任务

      public AsyncTask() {
          mWorker = new WorkerRunnable<Params, Result>() {
              public Result call() throws Exception {
                  mTaskInvoked.set(true);
      
                  Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                  //noinspection unchecked
                  Result result = doInBackground(mParams);
                  Binder.flushPendingCommands();
                  return postResult(result);
              }
          };
      
          mFuture = new FutureTask<Result>(mWorker) {
              @Override
              protected void done() {
                  try {
                      postResultIfNotInvoked(get());
                  } catch (InterruptedException e) {
                      android.util.Log.w(LOG_TAG, e);
                  } catch (ExecutionException e) {
                      throw new RuntimeException("An error occurred while executing doInBackground()",
                              e.getCause());
                  } catch (CancellationException e) {
                      postResultIfNotInvoked(null);
                  }
              }
          };
      }
      
      public FutureTask(Callable<V> callable) {
          if (callable == null)
              throw new NullPointerException();
          this.callable = callable;
          this.state = NEW;       // ensure visibility of callable
      }
      • mFuture是被封装后的Runnable类, 线程池执行任务会调用run()方法

        public void run() {
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                runner = null;
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
      • 可见run()方法里调用的是mWorkercall()方法, 并且在call()回调方法里会执行Result result = doInBackground(mParams);方法,这个方法是抽象的空方法,必须被子类重写,并且这个方法是在线程池的子线程里执行的

        private Result postResult(Result result) {
            @SuppressWarnings("unchecked")
            Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                    new AsyncTaskResult<Result>(this, result));
            message.sendToTarget();
            return result;
        }
      • 子线程执行完该任务之后,会调用return postResult(result)代码, 也就是使用Handler发送执行结果

        private static class InternalHandler extends Handler {
            public InternalHandler() {
                super(Looper.getMainLooper());
            }
        
            @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
            @Override
            public void handleMessage(Message msg) {
                AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
                switch (msg.what) {
                    case MESSAGE_POST_RESULT:
                        // There is only one result
                        result.mTask.finish(result.mData[0]);
                        break;
                    case MESSAGE_POST_PROGRESS:
                        result.mTask.onProgressUpdate(result.mData);
                        break;
                }
            }
        }
      • Handler会根据消息是MESSAGE_POST_RESULT还是MESSAGE_POST_PROGRESS, 去回调相应的方法, Handler在这里主要起到切换线程的作用

      • 如果是RESULT消息

        private void finish(Result result) {
            if (isCancelled()) {
                onCancelled(result);
            } else {
                onPostExecute(result);
            }
            mStatus = Status.FINISHED;
        }
        
        protected void onCancelled(Result result) {
            onCancelled();
        } 
        
        protected void onCancelled() {
        }
        
        protected void onPostExecute(Result result) {
        }
      • onCancelledonPostExecute都是被子类重写的空方法, 默认回调onPostExecute

        public final boolean cancel(boolean mayInterruptIfRunning) {
            mCancelled.set(true);
            return mFuture.cancel(mayInterruptIfRunning);
        }
        
        public final boolean isCancelled() {
            return mCancelled.get();
        }
      • 如果用户调用了cancel方法进去取消任务操作, 则该任务会被打上标记mCancelled=true, 才会去回调onCancelled()方法

      • 如果这是PROGRESS消息, 就直接回调onProgressUpdate()方法,这个方法也是空方法,需要被子类重写

其他

public class Test {
    public static void main(String[] args) throws InterruptedException {
        // 1. 线程中发生异常
        new Thread(new ThreadException()).start();

        // 2. 守护线程
        Thread threadDaemon = new Thread(new ThreadDaemon());
        threadDaemon.setDaemon(true); // 将该线程标记为守护线程 (非活动状态时标记)
        threadDaemon.start();

        // 3. 线程的优先级: 在创建线程时可设置线程优先级, 优先级越高,CPU处理这个线程的几率越高
        Thread threadPriority = new Thread(new ThreadDaemon());
        threadPriority.setPriority(Thread.MAX_PRIORITY); // 优先级 [1-10] 一般为: 1 / 5(默认) / 10
        threadPriority.start();
    }
}

/** * 线程异常, 任意线程异常,该线程结束,对其他任何线程无影响 */
public class ThreadException implements Runnable{
    @SuppressWarnings("unused")
    @Override
    public void run() {
        System.out.println("ThreadException");
        int num = 1/0;
    }
}

/** * 守护线程 <br> * 正常开启的线程都属于前台线程 <br> * 被标记守护线程的才是守护线程 <br> * 当所有的前台线程都结束时,程序才算结束 <br> * 守护线程执行效果与前台线程一样, 当所有的前台线程结束, 守护线程随前台线程结束而结束 <br> */
public class ThreadDaemon extends Thread{
    @Override
    public void run() {
        while(true){
            System.out.println("Thread_Daemon");
        }
    }
}
    原文作者:java锁
    原文地址: https://blog.csdn.net/Rozol/article/details/77344792
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞