JUC线程框架深度解析 — 07、线程池

 一堆线程一起完成一件事情就是线程池。

 
《JUC线程框架深度解析 — 07、线程池》

【 线程池的核心组成 】

《JUC线程框架深度解析 — 07、线程池》

【 线程池分类 】
➢ java.util.concurrent.Executors类可以创建线程池
➣ 创建无大小限制的线程池 : public static ExecutorService newCacheThreadPool();
➢ 创建固定大小的线程池 : public static ExecutorService newFixedThreadPool(int nThreads);
➣ 单线程池 : public static ScheduledExecutorService newSingleThreadScheduledExecutor();

➣ 创建定时调度池 : public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize).

【 创建四种线程池 】
下面将具体演示四中线程池的创建以及其自身的使用特点。当Executors创建完成了线程池之后可以返回“ExecutorService”接口对象,而这个接口对象里面有两个方法来接收线程的执行:
★ 接收Callable: public <T> Future<T> submit(Callable<T> task);
★ 接收Runnable: public Future<?> submit(Runnable task);

范例:创建无限量线程池

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 现在创建了一个线程池
        ExecutorService service = 
                  Executors.newCachedThreadPool();
        for (int x = 0; x < 10; x++) {
            service.submit(() -> { // 线程池会负责启动
                System.out.println(Thread.currentThread()
                        .getName() + "执行操作。");
            });
        }
        service.shutdown(); // 线程池执行完毕后需要关闭
    }
}

范例:创建有限量的线程池

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 线程池只能够装下3个人
        ExecutorService service = 
                   Executors.newFixedThreadPool(3) ;
        for (int x = 0; x < 10; x++) {
            // 线程池会负责启动
            service.submit(() -> {
                System.out.println(Thread.currentThread()
                   .getName() + "执行操作。");
            });
        } 
        service.shutdown(); // 线程池执行完毕后需要关闭
    }
}

《JUC线程框架深度解析 — 07、线程池》

范例:创建单线程池

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService service = 
                   Executors.newSingleThreadExecutor() ;
        for (int x = 0; x < 10; x++) {
            // 线程池会负责启动
            service.submit(() -> {
                System.out.println(Thread.currentThread()
                        .getName() + "执行操作。");
            });
        }
        service.shutdown(); // 线程池执行完毕后需要关闭
    }
}

     除了以上的三中线程池之外还可以创建一个定时调度池,这个调度池主要以间隔调度为主。如果要创建调度池则肯定使用ScheduledExecutorService接口完成,在该接口之中包含有如下的两个方法:

延迟启动:public ScheduledFuture<?>schedule(Runnable command, long delay, TimeUnit unit);

间隔调度:ScheduledFuture<?>scheduleAtFixedRate(Runnable command, 

            long initialDelay, long period, TimeUnit unit);

范例:创建调度池

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 创建一个调度池
        ScheduledExecutorService service =
                Executors.newScheduledThreadPool(1) ;
        for (int x = 0; x < 10; x++) {
            // 线程池会负责启动
            service.schedule(() -> {
                System.out.println(Thread.currentThread()
                        .getName() + "执行操作。");
            },2,TimeUnit.SECONDS);
        }
        service.shutdown(); // 线程池执行完毕后需要关闭
    }
}

范例:观察间隔调度

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 创建一个调度池
        ScheduledExecutorService service =
                Executors.newScheduledThreadPool(1) ;  
        for (int x = 0; x < 10; x++) {
            // 线程池会负责启动
            service.scheduleAtFixedRate(() -> {
                System.out.println(Thread.currentThread()
                        .getName() + "执行操作。");
            },2,2,TimeUnit.SECONDS);
        }
        // service.shutdown(); // 线程池执行完毕后需要关闭
    }
}

【 ExecutorService线程池处理方法 】

整个线程池的处理里面都是以ExecutorService接口的方法为核心展开的,所以如何要想去理解线程池,

还需要对这个接口的方法做一个小小的说明。

1、在Exector接口里面定义有execute()方法:public void execute(Runnable command)

    *这个方法接收的是Runnable,因为Runnable没有返回值,所以该方法的返回值是void。

范例:使用execute()来代替之前的submit()。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 现在创建了一个线程池
        ExecutorService service = 
Executors.newCachedThreadPool();
        for (int x = 0; x < 10; x++) {
            // 线程池会负责启动
            service.execute(() -> {
                System.out.println(
Thread.currentThread().getName() 
+ "执行操作。");
            });
        }
        service.shutdown();
    }
}

《JUC线程框架深度解析 — 07、线程池》

2、在ExecutorService接口里面的确提供有接收Runnable接口对象的方法,
但是这个方法为了统一使用的都是submit();

     submit()重载了许多次,可以接收Runnable: public Future<?> submit(Runnable task)

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 现在创建了一个线程池
        ExecutorService service = 
                    Executors.newCachedThreadPool();
        for (int x = 0; x < 10; x++) {
            // 线程池会负责启动
            Future<?> future = service.submit(() -> {
                System.out.println(Thread.currentThread()
                  .getName() + "执行操作。");
            });
            // Runnable接口没有返回值,所以永恒为null
            System.out.println(future.get());
        }
        service.shutdown();
    }
}

3、submit()方法是可以接收Callable接口对象的;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 现在创建了一个线程池
        ExecutorService service = 
                Executors.newCachedThreadPool();
        for (int x = 0; x < 10; x++) {
            // 线程池会负责启动
            Future<?> future = service.submit(() -> {
                System.out.println(Thread.currentThread()
                        .getName() + "执行操作。");
            });
            // Runnable接口没有返回值,所以永恒为null
            System.out.println(future.get());
        }
        service.shutdown();
    }
}

现在已经清楚了execute()和submit()两个方法的作用,但是需要来观察一下它的源代码:

《JUC线程框架深度解析 — 07、线程池》

《JUC线程框架深度解析 — 07、线程池》

在这个方法里面主要区分三个概念:
task:是具体的线程执行任务,线程在追加到线程池的时候没有进行启动;
worker:任务的执行需要worker来支持的,可以运行的worker受到”corePoolSize”限制。

reject:如果现在线程池已经满了或者关闭了,那么就会出现拒绝新线程加入的可能性。

4、public<T>T invokeAny(Collection<? extends Callable<T>> tasks) throws 
         InterruptedException,ExecutionException
   Future线程模型设计的优势在于:可以进行数据的异步返回,但是之前编写的过程严格来讲并不好,
相当于启动了一个线程就获得了一个返回值,于是为了方面这些线程池中线程对象的管理,
可以使用此方法进行统一返回。

范例:使用invokeAny()方法

import java.util.HashSet;    import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 所有任务
        Set<Callable<String>> tasks 
                        = new HashSet<Callable<String>>() ;
        // 追加线程执行任务
        for (int x = 0; x < 10; x++) {
            int temp = x ;
            tasks.add(() -> {
                // 线程池会负责启动
                return Thread.currentThread().getName() 
                         + "执行操作。 x = " + temp ;
            });
        }
        // 现在创建了一个线程池
        ExecutorService service =
                      Executors.newCachedThreadPool();
        // 执行任务
        String invokeAny = service.invokeAny(tasks) ;
        System.out.println("返回结果:" + invokeAny);
        service.shutdown();
    }
}

返回结果:pool-1-thread-1执行操作。 x = 2
  使用invokeAny()只会返回一个任务的执行操作。
5、public<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
        throws InterruptedException

   此时会返回多个任务的执行结果,以List集合的形式返回。

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 所有任务
        Set<Callable<String>> tasks =
                new HashSet<Callable<String>>() ;
        // 追加线程执行任务
        for (int x = 0; x < 10; x++) {
            int temp = x ;
            // 线程池会负责启动
            tasks.add(() -> {
                return Thread.currentThread().getName() + "执行操作。 x = " + temp ;
            });
        }
        // 现在创建了一个线程池
        ExecutorService service = Executors.newCachedThreadPool();
        // 执行任务
        List<Future<String>> invokeAll = 
         service.invokeAll(tasks) ;
        for (Future<String> fut : invokeAll) {
            System.out.println("返回结果:" + fut.get());
        }
        service.shutdown();
    }
}

清除invokeAny()、invokeAll()实际上就可以更加清楚任务(task)和工作者(Worker)之间的关系。

【 CompletionService线程池异步交互 】
➣ 将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者submit()执行的任务。
  使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果。
➣ CompletionService依赖于一个单独的Executor来实际执行任务,在这种情况下,
  CompletionService只管理一个内部完成队列,在CompletionService接口里面提供有如下两个方法:
     ➣ 设置Callable : public Future<V> submit(Callable<V> task) ;
     ➣ 设置Runnable : public Future<V> submit(Runnable task, V result) ;

CompletionService是一个接口,如果要想使用这个接口可以采用
ExecutorCompletionService子类。
   • 构造方法:public ExecutorCompletionService(Executor executor) ;
CompletionService来控制所有线程池的操作以及数据返回,则应该使用这个类来进行线程池的提交处理。
• 提交线程:public Future<V> submit(Callable<V> task) ;
• 获取返回内容:public Future<V> take() throws InterruptedException ;

范例:使用CompletionService工具类

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        // 现在创建了一个线程池
        ExecutorService service =
                Executors.newCachedThreadPool();
        CompletionService<String> completions =
                new ExecutorCompletionService<String>(service) ;
        for (int x = 0 ; x < 10 ; x ++) {
            int temp = x ;
            completions.submit(()->{
                return Thread.currentThread()
                        .getName() + " - x = " + temp ;
            }) ;
        }
        for (int x = 0 ; x < 10 ; x ++) {
            System.out.println(completions.take().get());
        }
        service.shutdown();
    }
}

       CompletionService操作接口的主要目的是告诉大家可以去隐藏ExecutorService接口执行线程池的处理了,不再需要关注那些invokeAny()、invokeAll()的执行方法了。

【 ThreadPoolExecutor线程池执行者 】

线程池的创建主要依靠的事一个类完成的;ThreadPoolExecutor。下面以无限量线程池为例做一个说明;

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                 60L, TimeUnit.SECONDS,
                                 new SynchronousQueue<Runnable>() );
}
public static ExecutorService newFixedThreadPool(int nThreads) {
   return new ThreadPoolExecutor(nThreads, nThreads, 
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>() );
}

如果掌握了ThreadPoolExecutor类的使用,那么你就可以自己来定义线程池了,首先来观察这个类的构造方法;

《JUC线程框架深度解析 — 07、线程池》


范例:可以创建属于自己的线程池

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
    public static void main(String[] args) throws Exception {
        BlockingQueue<Runnable> queue = new 
                         ArrayBlockingQueue<Runnable>(2) ;
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                2,5,6L,TimeUnit.SECONDS,queue) ;
        for (int x = 0 ; x < 2 ; x ++) {
            int temp = x ;
            pool.execute(()->{
                System.out.println("【"
                        +Thread.currentThread().getName()
                        +"】任务开始执行 - " + temp);
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("【"
                        +Thread.currentThread().getName()
                        +"】任务结束执行 - " + temp);
            });
        }
    }
}

【pool-1-thread-1】任务开始执行 – 0

【pool-1-thread-2】任务开始执行 – 1

【pool-1-thread-1】任务结束执行 – 0

【pool-1-thread-2】任务结束执行 – 1

     在线程池之中存在拒绝策略的概念,所谓的拒绝策略指的是线程池满了之后的其它等待线程的处理状态,
在ThreadPoolExecutor类里面提供有一些”RejectedExecutionHandler”子类,如果现在被拒绝了会出现”拒绝异常”
(默认AbortPolicy)。对于给出的几种拒绝策略如下:
   * AbortPolicy(默认实现):当任务添加到线程池中被拒绝时,会抛出拒绝异常”RejectedExecutionException”;
   * DiscardPolicy:当将任务添加到线程池中被拒绝的时候,线程池将直接丢弃该拒绝的任务;
   * DiscardOldestPolicy:当被拒绝的时候,线程池会放弃等待队列中时间最长的未被处理的任务,
        让后将拒绝的任务添加到队列之中;
   * CallerRunsPolicy:当任务被拒绝时,会在线程池当前正在运行的Thread线程之中处理该任务(加塞儿)。

范例:修改一下拒绝策略

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestDemo {
    public static void main(String[] args) throws Exception {
        BlockingQueue<Runnable> queue =
                new ArrayBlockingQueue<Runnable>(2) ;
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,2,6L, TimeUnit.SECONDS,
                queue,Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()) ;
        for (int x = 0 ; x < 5 ; x ++) {
            int temp = x ;
            pool.execute(()->{
                System.out.println("【BEFORE - "
                        +Thread.currentThread().getName()
                        +"】任务开始执行 - " + temp);
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("【AFTER - "
                        +Thread.currentThread().getName()
                        +"】任务结束执行 - " + temp);
            });
        }   }  }

《JUC线程框架深度解析 — 07、线程池》

        由于只设置了一个CorePoolSize,所以当多余的任务出现之后将才哦难过设置的默认的拒绝策略
”new ThreadPoolExecutor.AbortPolicy()”,则会出现”RejectedExecutionException”,

如果出现拒绝之后可以丢弃被拒绝的任务,那么也可以将拒绝策略修改为:

ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 6L, TimeUnit.SECONDS, 
queue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() );

对于拒绝策略最简单的解释就是:线程池中corePoolSize满了,再追加的任务的处理方案。

    原文作者:JUC
    原文地址: https://blog.csdn.net/androidsj/article/details/80347574
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞