1、相关类
Executors ExecutorService Callable ThreadPool Future
2、相关接口
Executor
Executor接口的使用:
public class TestExecutor implements Executor{ @Override public void execute(Runnable command){ //调用execute方法常常传入runnable接口对象,开启线程 } }
ExecutorService接口的使用:(继承Executor接口)
/**
*submit方法(执行runnble、callable对象的线程)
*实现类:各种线程池
*/
Callable接口 && Runnable接口 callable调用call方法 runnable调用run方法 都可以被线程调用,但callable的call方法具有返回值(泛型) Executors类(操作Executor的工具类)
ExecutorService service = Executors.newFixedThreadPool(5);//创建5个线程的线程池
ThreadPool线程池类(装着线程的容器)
线程池创建的固定线程,线程任务执行完后线程
不会消失,处于等待任务的状态(idel)。 线程任务大于线程池容量时,多出来的任务放在
等待队列中(内部使用BlockingQueue实现)
public class TestThreadPool{ 2 public static void main(String[] args){ 3 ExecutorService service = Executors.newFixedThreadPool(5);//1.创建5个线程的线程池容器 4 5 for(int i=0;i<6;i++){//2.放6个任务,线程池一次只能放5个,所以第6个任务需要重复使用旧的线程 6 service.execute(() -> { 7 System.out.println(Thread.getCurrentThread().getName());//3.打印出当前线程名 8 }); 9 } 10 service.shutdown();//执行完当前任务则关闭线程 11 service.shutdownNow();//无论是否执行完都关闭线程 12 } 13 }
Future接口(线程未来产生的返回值)
public class TestFuture{ 2 public static void main(String[] args){ 3 //FutureTask实现类Runnable和Future接口 4 FutureTask<Integer> task = new FutureTask<>( 5 Thread.sleep(500);//阻塞等待500毫秒 6 return 1000; 7 ); 8 9 //new的方式启动线程任务 10 new Thread(task).start(); 11 System.out.println(task.get());//阻塞等待500毫秒后得到返回值 12 /////////////////////////////////////////////////////////////////// 13 ExecutorService service = Executors.newFixedThreadPool(5);//创建5个线程的线程池 14 Future<Integer> future = service.submit(()->{//相当于运行类callable接口的call方法,返回1 15 Thread.sleep(500); 16 return 1; 17 }); 18 System.out.println(future.get());//阻塞等待500毫秒后得到返回值 19 } 20 }
WorkStealingPool偷任务线程池
底层采用ForkJoinPool实现(开启的是Deamon守护线程,主线程退出则线程退出)
public class WorkStealingPoolTest { 2 public static void main(String[] args) throws IOException { 3 //根据CPU核数启动相应个数的线程(4核cpu---4个线程) 4 ExecutorService service = Executors.newWorkStealingPool(); 5 System.out.println(Runtime.getRuntime().availableProcessors()); 6 7 service.execute(new R(1000));//线程1执行任务1----1秒 8 service.execute(new R(2000));//线程2执行任务2----2秒 9 service.execute(new R(2000));//线程3执行任务3----2秒 10 service.execute(new R(2000));//线程4执行任务4----2秒 11 service.execute(new R(2000));//任务5阻塞,当线程1执行完后把任务5偷过来执行 12 13 //由于产生的是守护线程,主线程不阻塞的话,看不到输出 14 System.in.read();//将主线程阻塞 15 } 16 17 static class R implements Runnable { 18 19 int time; 20 21 R(int t) { 22 this.time = t; 23 } 24 25 @Override 26 public void run() { 27 28 try { 29 TimeUnit.MILLISECONDS.sleep(time); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 //打印线程名---ForkJoinPool 34 System.out.println(time + " " + Thread.currentThread().getName()); 35 36 } 37 38 } 39 }
ForkJoinPool(分支合并线程池)
思想:分治,把大任务拆分成小任务并行计算,计算完成后将结果合并
守护线程
public class ForkJoinPoolTest{ 2 3 public static void main(String[] args) throws Exception { 4 ForkJoinPool pool = new ForkJoinPool(); 5 MyTask task = new MyTask(inits, 0, inits.;ength-1); 6 ForkJoinTask<int[]> taskResult = pool.submit(task); 7 try { 8 taskResult.get();//阻塞等待所有线程结果计算完成 9 } catch (InterruptedException | ExecutionException e) { 10 e.printStackTrace(System.out); 11 } 12 } 13 14 /** 15 * 单个排序的子任务 16 */ 17 static class MyTask extends RecursiveTask<int[]> { 18 19 private int[] source; 20 private int start; 21 private int end; 22 23 public MyTask(int[] source,int start, int end ) { 24 this.source = source; 25 this.start = start; 26 this.end = end; 27 } 28 29 30 @Override 31 protected int[] compute() { 32 //长度小于50,进行计算 33 if(source.length <= 50) { 34 long sum = 0L; 35 for(int i=start; i<end; i++) sum += nums[i]; 36 return sum; 37 } 38 //长度大于50,继续划分子任务 39 int middle = start + (end-start)/2; 40 41 AddTask subTask1 = new MyTask(source,start,middle); 42 AddTask subTask2 = new MyTask(source,middle,end); 43 subTask1.fork();//递归创建子任务线程 44 subTask2.fork(); 45 46 //计算完成后将两个子任务的结果合并 47 return subTask1.join() + subTask2.join(); 48 } 49 } 50 }
各种线程池的底层实现:
一、基本线程池:
FixedThreadPool
CachedThreadPool
ScheduledThreadPool
SingleThreadPool
二、底层创建线程池都是使用ThreadPoolExecutor类实现的,而放置任务、执行任务使用了生产者消费者模型(阻塞队列的方式)
三、源码分析
ThreadPoolExecutor的API:
ThreadPoolExecutor(int corePoolSize,//核心线程数(最小) int maximumPoolSize,//最大线程数 long keepAliveTime, //线程运行时间 TimeUnit unit, //时间单位 BlockingQueue<Runnable> workQueue)//底层采用哪种阻塞队列来放线程任务
各种线程池的底层实现:
//FixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor(nThreads,//初始线程数自定义 nThreads,//最大线程数自定义 0L, TimeUnit.SECONDS,//一旦启动线程池,线程永远不消失 new LinkedBlockingQueue<Runnable>());//链表阻塞队列 }
//CachedThreadPool(采用同步阻塞队列装任务,队列中有任务则启动新线程执行,没任务就阻塞) public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor(0,//初始为0个线程 Integer.MAX_VALUE,//可以启动无限多线程 60L, TimeUnit.SECONDS,//60秒空闲则结束 new SynchronousQueue<Runnable>());//同步阻塞队列,有任务马上开新线程执行(容量用于为0) }
//SingleThreadPool return new ThreadPoolExecutor(1,//初始线程数为1 1,//最大线程数为1 0L, TimeUnit.SECONDS,//一旦启动线程池,线程永远不消失 new LinkedBlockingQueue<Runnable>());//链表阻塞队列 }
//ScheduledThreadPool public newScheduledThreadPool(int corePoolSize){ super(corePoolSize,//初始线程数自定义 Integer.MAX_VALUE,//无限多线程数 0, NANOSECONDS,//一旦启动线程池,线程永远不消失 new DelayedWorkQueue<Runnable>());//延时阻塞队列,隔一段时间执行一次任务 }