Java - ExecutorService shutdown系列方法 的理解

涉及的主要方法

  • void shutdown();
  • List<Runnable> shutdownNow();
  • boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

线程池状态

    /*
     * The runState provides the main lifecycle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     */

void shutdown()

注释:

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    void shutdown();

该方法会停止ExecutorService添加新的任务, 但是老任务还是会继续执行.

This method does not wait for previously submitted tasks to * complete execution.

这句话指的是该方法会立即返回, 但不一定代表之前提交的任务已经全部完成了. 如果需要一个阻塞的方法, 可以调用awaitTermination方法.

该方法内部实现是设置了状态, 并interrupt了所有的空闲线程, 使其不再接受新的任务.

List<Runnable> shutdownNow()

注释:

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     *
     * @return list of tasks that never commenced execution
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    List<Runnable> shutdownNow();

该方法尝试停止所有正在执行的任务, 停止对正在等待执行的任务的处理, 并且返回正在等待执行的任务.

shutdown(), 该方法也是立刻返回的, 不会等到所有任务终止以后才返回.

因为终止是通过interrupt实现的, 所以如果那个任务没有对interrupt做出正确响应, 那么该方法将无法终止该任务. 所以传进去的任务需要对interrup做出合适的响应.

boolean awaitTermination(long timeout, TimeUnit unit)

注释:

    /**
     * Blocks until all tasks have completed execution after a shutdown
     * request, or the timeout occurs, or the current thread is
     * interrupted, whichever happens first.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return {@code true} if this executor terminated and
     *         {@code false} if the timeout elapsed before termination
     * @throws InterruptedException if interrupted while waiting
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

该方法是阻塞的, 阻塞到所有任务都完成(必须在shutdown调用之后)或者超时. 如果executor在超时之前终止了, 那么返回true, 否则返回false.

注意, 如果不在awaitTermination前调用shutdown, 则即使在超时之前所有任务都已经完成, awaitTermination仍然会等待着, 而且最后一定返回false, 因为没有shutdown的调用不会使executor的状态变为terminated.
例子:

    public static void testAwaitTerminationWithoutShutdown(){
        Runnable runnable = () -> {
            System.out.println("I'm a very quick task");
        };

        executorService.submit(runnable);

        try {
            executorService.awaitTermination(3000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

上面这段代码将会阻塞3000毫秒, 并且最终返回true, 即使仅有的任务一瞬间就完成了, 因为没有对shutdown的调用, 所以executorService的状态不可能会变成terminated.

实例

shutdown后再尝试添加任务:

    public static void testShutdown(){
        Runnable runnable = () -> {
            try {
                System.out.println("going to sleep for 1s");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        scheduledExecutorService.submit(runnable);
        scheduledExecutorService.shutdown();
        try {
            scheduledExecutorService.submit(runnable);
        } catch (RejectedExecutionException e){
            System.out.println("cannot add task after shutdown");
        }
    }

输出(输出顺序不一定一致):

cannot add task after shutdown
going to sleep for 1s

从输出可以到确实有RejectedExecutionException被抛出了, 另外从这次输出也可以看出shutdown确实立马就返回了.

shutdownNow()关闭成功的例子:

    public static void shutdownNowNormally() throws InterruptedException {

        Runnable task = () -> {
            try {
                System.out.println(String.format("now is %s, I'm going to sleep for 10s", getCurrentTime()));
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                System.out.println(String.format("someone asked me to terminate at: %s", getCurrentTime()));
            }
        };

        scheduledExecutorService.submit(task);

        Thread.sleep(1000);

        scheduledExecutorService.shutdownNow();

    }

输出:

Now is 13:47:30, I'm going to sleep for 10s
someone asked me to terminate at: 13:47:31

shutdownNow()不成功的例子:
因为shutdownNow()最终是通过interrupt来打断工作线程, 如果任务没有对interrupt做出反应, 那么shutdownNow()将无法正常terminate.

    public static void shutdownNowNotWorking(){
        Runnable task = () ->{
            while (true){
                try {
                    System.out.println("I'm gonna sleep for 1s");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
//                    e.printStackTrace();
                    System.out.println(String.format("I'll ignore this InterruptedException. Now is : %s", getCurrentTime()));
                }
            }
        };

        scheduledExecutorService.submit(task);

        scheduledExecutorService.shutdownNow();
    }

输出:

I'm gonna sleep for 1s
I'll ignore this InterruptedException. Now is : 13:53:12
I'm gonna sleep for 1s
I'm gonna sleep for 1s
I'm gonna sleep for 1s
I'm gonna sleep for 1s

void shutdown()源码

ThreadPoolExecutor中的实现:
设置状态并interrupt全部空闲的工作线程(即不让其再继续从任务队列中获取任务). 但是之前提交的任务还会被执行.

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

interruptIdleWorkers方法:

    /**
     * Interrupts threads that might be waiting for tasks (as
     * indicated by not being locked) so they can check for
     * termination or configuration changes. Ignores
     * SecurityExceptions (in which case some threads may remain
     * uninterrupted).
     *
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

List<Runnable> shutdownNow()源码

ThreadPoolExecutor中的源码:

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

其中interruptWorkers方法:

    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

可以看到即使这个工作线程已经拿到任务在执行中, 也会被interrupt, 这种情况需要我们的任务对interrupt做出响应, 否则就会导致shutdownNow也无法终止executorService.

runWoker()源码

idle worker指的就是正在执行while (task != null || (task = getTask()) != null)这个while条件的worker, 即还未成功取到task的任务.

interruptIdleWorkers()方法就是针对这个状态的woker, 如果getTask()返回值是null, 那么该woker线程就会结束了. 从getTask()源码中可以看到, 如果shutdown的时候, wokerQueue(BlockingQueue)的poll()或者take()方法能够响应interrupt(), 从而导致getTask()会继续下一次循环, 从而能够检查到shutdown状态, 从而直接返回null, 进而使woker退出. 所以shutdown不会对已经进入while body的woker线程起作用.

shutdown仅仅调用了一次interruptIdleWorkers(), 所以那些idlewokers被直接结束了, 但是剩下的仍然在工作的workers不会受到影响, 如果任务队列中仍然有剩余的任务, 那么这些woker仍然能够取出并且完成 (因为shutdown()方法仅仅将状态改成了SHUTDOWN).

shutdownNow()中设置状态为STOP, 并调用了interruptWorkers()方法. 所以即使worker已经执行到task.run(), 如果我们传进去的任务的run方法有对interrupt做出合适响应, 那么依然可以被停止, 否则shutdownNow()也无法终止. 另外结合getTask(), 可以知道即使已经缓存在任务队列中的任务也不会被执行了 (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())).

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

其中getTask()方法:

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

其中processWorkerExit方法:
注意processWorkerExit方法会调用tryTerminate()方法. 所以每次有一个woker结束的时候, 都会尝试termiante, 所以仅仅调用shutdown也可以使得在全部任务完成以后terminate.

/**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

tryTerminate()源码:

/**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

一个关键的地方在于interruptIdleWorkers(ONLY_ONE);, 下面是关于这个参数的解释:

    /*
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     */

如果这个参数是true的话, 那么一次最多interrupt一个空闲的worker. 因为每一个worker在退出的时候都会调用processWorkerExit方法, 而且processWorkerExit方法中也会继续调用tryTerminate()方法, 所以注释里面的propagate就能解释得通了. in case all threads are currently waiting, 这里还不是很理解, 这里是说避免所有线程都在那时刻等待的情况, 但是这样做的目的还是不很清楚.

总结

要让ExecutorService能够被正常关闭, 需要任务本身对interrupted这个状态做出反应, 否则可能无法正常关闭ExecutorService.

    原文作者:xiaofudeng
    原文地址: https://www.jianshu.com/p/4de5dca3b187
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞