线程池-你可能需要知道这些

线程池无论是在Java开发还是Android开发中,都是一个很常见也很重要的知识点,在面试中也常常被考官问到,那么本博文就带大家一探线程池的究竟。

为什么要使用线程池

在我们日常开发中,当遇到异步或者线程问题时,我们首先想到的是使用Thread或者Runnable来处理,在Android中我们还会结合Handler来实现线程的切换,Thread的使用也比较简单,在处理一些简单的异步任务时也是我们的不二之选。

但是,如果出现需要处理大量异步任务的情况时,使用new Thread的方式就会给我们带来很多问题:

  1. 频繁的创建线程,线程执行完之后又被回收,又会导致频繁的GC,无法重用线程。
  2. 如果线程的创建耗时或者需要消耗资源,频繁的创建和回收会对系统造成负担。
  3. 线程缺乏统一管理,各线程之间互相竞争,降低程序的运行效率。

如果我们使用线程池,那么就能有效的解决以上的问题,线程池主要有以下优点:

  1. 可以重用已经创建好的线程,避免频繁创建进而导致的频繁GC。
  2. 可以有效控制和管理线程并发,提高性能。
  3. 可以有效控制线程的执行,比如:定时执行,取消执行等等。

如何创建、使用线程池

相信大家已经对如何创建、使用线程池了如指掌了,我们举几个简单的例子,以方便后续的博文讲解。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

一探究竟

在平时的开发中,我们通常通过如下的代码构造一个固定数量线程的线程池:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);

我们来看下固定线程数的线程池是如何构造出来的。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                 0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>());
    }

原来我们常用的线程池都是通过对ThreadPoolExecutor进行不同配置来实现的,那么我们就从这个ThreadPoolExecutor来开始分析吧!

1.ThreadPoolExecutor

ThreadPoolExecutor有四个构造函数,我们来看一些最复杂的构造函数:

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

这个构造函数有7个参数,而其他构造函数也是复用了该构造函数,我们来分析下这几个参数的意义:

  1. corePoolSize

线程池中核心线程数。

线程池在新建线程时,如果当前线程池总数小于核心线程数(corePoolSize),那么新建的线程是核心线程;否则,新建的线程则为非核心线程。

除非调用了allowCoreThreadTimeOut方法设置了空闲等待时间,否则这些线程一直存在于线程池中,即使他们处于空闲状态。

  1. maximumPoolSize

线程池中允许存在的最大线程数量。

maximumPoolSize = 核心线程数(corePoolSize)+非核心线程数。

  1. keepAliveTime

非核心线程空闲等待时间(超时时长),当系统中非核心线程闲置时间超过keepAliveTime后,非核心线程就会被回收。

如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长。

  1. unit

keepAliveTime的单位,有纳秒、微秒、毫秒、秒、分、时、天等。

TimeUnit是一个枚举类型,包括:

NANOSECONDS : 1微毫秒 = 1微秒 / 1000

MICROSECONDS : 1微秒 = 1毫秒 / 1000

MILLISECONDS : 1毫秒 = 1秒 /1000

SECONDS : 秒

MINUTES : 分

HOURS : 小时

DAYS : 天

  1. workQueue

线程池中待执行任务队列。该队列用于存储由execute提交的Runnable任务。
常用的任务队列主要有以下几种:

  1. SynchronousQueue
    这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。

  2. LinkedBlockingQueue
    这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize.

  3. ArrayBlockingQueue
    可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。

  4. DelayQueue
    队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

  1. threadFactory

为线程池创建新线程的工厂类,这个我们一般使用默认即可。

  1. handler

拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到 最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会 抛出一个RejectedExecutionException。

这就是是ThreadPoolExecutor的构造方法参数的解释,我们的线程提交到线程池之后又是按照什么样的策略去运行呢?

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get(); // 获取线程池中的线程数目
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) // 启动一个核心线程,true代表的是核心线程
                return;
            c = ctl.get();
        }

        // 向队列中添加任务,注意offer返回结果(队列已满时返回false,add直接抛出异常)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

从上面的代码中我们可以发现,当一个任务被加入线程池时,ThreadPoolExecutor执行策略如下:

  1. 如果当前线程池中的线程数少于核心线程数(corePoolSize),那么就启动一个新的线程(核心线程)执行任务。

  2. 线程池中的线程数已经达到核心线程数(corePoolSize),而且workQueue未满,则将任务放入workQueue中等待执行。

  3. 如果线程池中的线程数已经达到核心线程数(corePoolSize)但未超过最大线程数(maximumPoolSize),而且workQueue已满,则开启一个非核心线程来执行任务。

  4. 如果线程池中的线程数已经超过最大线程数(maximumPoolSize),则拒绝执行该任务。

2. FixedThreadPool

FixedThreadPool是一个核心线程数量固定的线程池,创建以及构造如下:

 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

从构造上分析,我们得出FixedThreadPool有以下特点:

  1. 所有线程均为核心线程,不存在非核心线程。(corePoolSize和maximumPoolSize相等,均为nThreads)。

  2. 不存在线程超时的情况。(keepAliveTime==0)

  3. 线程队列(LinkedBlockingQueue)的默认大小为Integer.MAX_VALUE(2的31次方减1),因此我们可以往队列里添加多个任务。

我们来验证下分析是否正确:

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 30; i++) {
            final int finalI = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());

                }
            };
            fixedThreadPool.execute(runnable);
        }

运行结果如下:

The runnable num is :0 and  thread name :pool-1-thread-1
The runnable num is :1 and  thread name :pool-1-thread-2
The runnable num is :2 and  thread name :pool-1-thread-3
The runnable num is :3 and  thread name :pool-1-thread-1
The runnable num is :4 and  thread name :pool-1-thread-2
The runnable num is :5 and  thread name :pool-1-thread-3
The runnable num is :6 and  thread name :pool-1-thread-1
The runnable num is :7 and  thread name :pool-1-thread-2
The runnable num is :8 and  thread name :pool-1-thread-3
The runnable num is :9 and  thread name :pool-1-thread-1
The runnable num is :10 and  thread name :pool-1-thread-2
The runnable num is :11 and  thread name :pool-1-thread-3
The runnable num is :13 and  thread name :pool-1-thread-2
The runnable num is :12 and  thread name :pool-1-thread-1
The runnable num is :14 and  thread name :pool-1-thread-3

运行结果与我们的分析一致,先往核心线程中添加三个任务,剩余任务进入到workQueue中等待,当有空闲的核心线程时就执行任务队列中的任务。

3. SingleThreadExecutor

SingleThreadExecutor与FixedThreadPool类似,不同的是SingleThreadExecutor线程池中只有一个固定的核心线程(可以将其看成是一个特殊的FixedThreadPool)。创建及构造如下:

 ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
  }

从构造上分析,我们得出SingleThreadExecutor有以下特点:

  1. 只存在一个核心线程,不存在非核心线程。(corePoolSize和maximumPoolSize相等,均为nThreads)。

  2. 不存在线程超时的情况。(keepAliveTime==0)

  3. 线程队列的默认大小为Integer.MAX_VALUE(2的31次方减1)。

  4. 任务按照FIFO原则执行,不存在线程同步的问题(只有一个线程啊)。

我们来验证下我们的分析是否正确:

ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int finalI = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());

                }
            };
            singleThreadPool.execute(runnable);
        }


运行结果如下:

The runnable num is :0 and  thread name :pool-1-thread-1
The runnable num is :1 and  thread name :pool-1-thread-1
The runnable num is :2 and  thread name :pool-1-thread-1
The runnable num is :3 and  thread name :pool-1-thread-1
The runnable num is :4 and  thread name :pool-1-thread-1

4. CachedThreadPool

CachedThreadPool的创建及构造如下:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

从构造上分析,我们得出SingleThreadExecutor有以下特点:

1.不存在核心线程,只有非核心线程,而且线程数目非常大(Integer.MAX_VALUE)。

  1. 存在线程超时机制,超时时间为60s。(keepAliveTime==60s)

  2. 使用SynchronousQueue来管理任务队列。

从上面的特点我们可以分析出CachedThreadPool特别适合执行大量并发任务的场景,因为线程数目比较大,所以每当我们添加一个新任务进来的时候,如果线程池中有空闲的线程,则由该空闲的线程执行新任务,如果没有空闲线程,则创建新线程来执行任务。

我们来验证下我们的分析是否正确:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int finalI = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("The runnable num is :" + finalI + " and " + " thread name :" + Thread.currentThread().getName());

                }
            };
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(runnable);
        }

运行结果如下:

The runnable num is :0 and  thread name :pool-1-thread-1
The runnable num is :1 and  thread name :pool-1-thread-2
The runnable num is :2 and  thread name :pool-1-thread-3
The runnable num is :3 and  thread name :pool-1-thread-2
The runnable num is :4 and  thread name :pool-1-thread-3
The runnable num is :5 and  thread name :pool-1-thread-1
The runnable num is :6 and  thread name :pool-1-thread-2
The runnable num is :7 and  thread name :pool-1-thread-1
The runnable num is :8 and  thread name :pool-1-thread-3
The runnable num is :9 and  thread name :pool-1-thread-2

5. ScheduledThreadPool

ScheduledThreadPool具备定时执行任务的特点,ScheduledThreadPool的创建及构造如下:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
      return new ScheduledThreadPoolExecutor(corePoolSize);
}
    
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
            new DelayedWorkQueue());
}

ScheduledThreadPool具有如下特点:

  1. 核心线程数量固定,而且非核心线程数目非常大(Integer.MAX_VALUE)。

  2. 存在线程超时机制,超时时间为10s。(keepAliveTime==10s)

  3. 使用DelayedWorkQueue来管理任务队列。

我们看下ScheduledThreadPool几个常用的API:

  1. 延迟执行任务
   public ScheduledFuture<?> schedule(Runnable command,
                                      long delay, TimeUnit unit);
  1. 延迟定时执行任务
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                 long initialDelay,
                                                 long period,
                                                 TimeUnit unit);

首次启动任务的时间间隔为initialDelay+period,然后每隔period执行一次任务。

  1. 延迟执行定时执行任务
 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);

首次启动任务的时间间隔为initialDelay,然后每隔delay执行一次任务。

Android中的线程池

在我们日常的Android开发中,我们经常使用的AsyncTask的实现就采用了线程池,我们来看下AsyncTask是如何实现的。

首先来看AsyncTask的一些属性声明:

    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    // We want at least 2 threads and at most 4 threads in the core pool,
    // preferring to have 1 less than the CPU count to avoid saturating
    // the CPU with background work
    // 核心线程数最少为2个,最多为4个。为了不影响系统运行,一般取CPU-1个。
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    // 最多存在的线程数为CPU*2-1
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    // 线程超时时间为30s
    private static final int KEEP_ALIVE_SECONDS = 30;

    // 线程生成工厂(没什么卵用,只是为了标识AsyncTask)
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    // 使用LinkedBlockingQueue作为任务管理队列
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        // 根据上面的参数构造线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        // 允许核心线程超时存在
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

当我们调用了AsyncTask的execute方法后,执行了那些操作呢?

 @MainThread
    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

@MainThread
    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;

        onPreExecute();

        mWorker.mParams = params;
        // 调用了该接口执行任务,exec为SerialExecutor类的实例
        exec.execute(mFuture);

        return this;
    }

接着来看SerialExecutor类。

    private static class SerialExecutor implements Executor {
        // 将其当做栈使用即可
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        // 执行任务的接口
        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        // 一个任务执行完,才会执行下一个任务(AsyncTask是可能会顺序任务的)
                        scheduleNext();
                    }
                }
            });
            // 首个任务执行时,mActive为空调用scheduleNext进行执行
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            // 使用了我们创建的线程池完成了任务的执行
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

通过上面的简单分析,相信大家对AsyncTask的线程池机制有了一个更清晰的认识,其实很多我们看起来很牛逼的东西实现原理也很简单,只要我们多学习多看源码,就能了解其机制。

结束语

线程池在我们的实际开发中重要性不言而喻,在面试中也会被经常问到,希望大家通过本博文对线程池有一个清晰明了的认识,并在实际开发中使用它,谢谢。

    原文作者:24K男
    原文地址: https://www.jianshu.com/p/542498e27a23
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞