转载 http://www.cnblogs.com/skywang12345/p/3509954.html http://www.cnblogs.com/skywang12345/p/3512947.html
Executors创建线程池
Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。
Executors创建线程池的函数列表
public class Executors { //返回用于创建新线程的默认线程工厂。 static ThreadFactory defaultThreadFactory() //创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。 static ExecutorService newCachedThreadPool() //和上面方法一样,只是使用了外部提供的 ThreadFactory 创建新线程。 static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) //创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。 static ExecutorService newFixedThreadPool(int nThreads) //创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。 static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) //创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。 static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) //创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。 static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) //创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 static ExecutorService newSingleThreadExecutor() //创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。 static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。 static ScheduledExecutorService newSingleThreadScheduledExecutor() //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。 static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) }
1. newFixedThreadPool()
newFixedThreadPool()在Executors.java中定义,源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
说明:newFixedThreadPool(int nThreads)中执行ThreadPoolExecutor()来构造线程池,它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,不会动态地扩大,另外,有两个参数是失效的keepAliveTime和maximumPoolSize,因为只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用 ,只有阻塞队列满时才会根据创建的线程数和maximumPoolSize做比较,但是这里创建的是无边界阻塞队列,所以阻塞队列理论上永远不会满(最大为Integer.MAX_VALUE);
2.newSingleThreadExecutor()
newSingleThreadExecutor()在Executors.java中定义,源码如下:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
说明:newSingleThreadExecutor()执行FinalizableDelegatedExecutorService(new ThreadPoolExecutor())来构造线程池,初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行.由于使用了无界队列, 所以SingleThreadPool永远不会拒绝, 即拒绝策略失效
3.newScheduledThreadPool()
newScheduledThreadPool()在Executors.java中定义,源码如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
说明:newSingleThreadExecutor()执行ScheduledThreadPoolExecutor(int corePoolSize)来构造调度型线程池,corePoolSize由外部定义,线程池的线程可以按schedule依次delay执行,或周期执行。
4. newCachedThreadPool()
newCachedThreadPool()在Executors.java中定义,源码如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
说明:newCachedThreadPool()执行ThreadPoolExecutor()来构造线程池,它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime时,会将线程会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。
执行过程与前两种稍微不同:
(1) 主线程调用SynchronousQueue的offer()方法放入task, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue的task, 即调用了SynchronousQueue的poll(), 那么主线程将该task交给空闲线程. 否则执行(2)
(2) 当线程池为空或者没有空闲的线程, 则创建新的线程执行任务.
(3) 执行完任务的线程倘若在60s内仍空闲, 则会被终止. 因此长时间空闲的CachedThreadPool不会持有任何线程资源.
一般来说,CachedTheadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的Executor的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用FixedThreadPool。
拒绝策略介绍
线程池的拒绝策略,是指当任务添加到线程池时被拒绝,而采取的处理措施。
当任务添加到线程池时之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。
线程池共包括4种拒绝策略,它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy和DiscardPolicy。
AbortPolicy -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。 CallerRunsPolicy -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。 DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。 DiscardPolicy -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
线程池默认的处理策略是AbortPolicy!
下面通过示例,分别演示线程池的4种拒绝策略。
1. DiscardPolicy 示例
import java.lang.reflect.Field; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy; public class DiscardPolicyDemo { private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception { // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); // 设置线程池的拒绝策略为"丢弃" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 新建10个任务,并将它们添加到线程池中。 for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } // 关闭线程池 pool.shutdown(); } } class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } }
运行结果:
task-0 is running. task-1 is running.
结果说明:线程池pool的”最大池大小”和”核心池大小”都为1(THREADS_SIZE),这意味着”线程池能同时运行的任务数量最大只能是1″。
线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!
2. DiscardOldestPolicy 示例
import java.lang.reflect.Field; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; public class DiscardOldestPolicyDemo { private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception { // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); // 设置线程池的拒绝策略为"DiscardOldestPolicy" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 新建10个任务,并将它们添加到线程池中。 for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } // 关闭线程池 pool.shutdown(); } } class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } } }
运行结果:
task-0 is running. task-9 is running.
结果说明:将”线程池的拒绝策略”由DiscardPolicy修改为DiscardOldestPolicy之后,当有任务添加到线程池被拒绝时,线程池会丢弃阻塞队列中末尾的任务,然后将被拒绝的任务添加到末尾。
3. AbortPolicy 示例
import java.lang.reflect.Field; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.RejectedExecutionException; public class AbortPolicyDemo { private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception { // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); // 设置线程池的拒绝策略为"抛出异常" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); try { // 新建10个任务,并将它们添加到线程池中。 for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } } catch (RejectedExecutionException e) { e.printStackTrace(); // 关闭线程池 pool.shutdown(); } } } class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } } }
某一次运行结果:
java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) at AbortPolicyDemo.main(AbortPolicyDemo.java:27) task-0 is running. task-1 is running.
结果说明:将”线程池的拒绝策略”由DiscardPolicy修改为AbortPolicy之后,当有任务添加到线程池被拒绝时,会抛出RejectedExecutionException。
4. CallerRunsPolicy 示例
import java.lang.reflect.Field; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; public class CallerRunsPolicyDemo { private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception { // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); // 设置线程池的拒绝策略为"CallerRunsPolicy" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 新建10个任务,并将它们添加到线程池中。 for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } // 关闭线程池 pool.shutdown(); } } class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } }
某一次运行结果:
task-2 is running. task-3 is running. task-4 is running. task-5 is running. task-6 is running. task-7 is running. task-8 is running. task-9 is running. task-0 is running. task-1 is running.
结果说明:将”线程池的拒绝策略”由DiscardPolicy修改为CallerRunsPolicy之后,当有任务添加到线程池被拒绝时,线程池会将被拒绝的任务添加到”线程池正在运行的线程”中运行。
Executor执行Runnable任务
通过Executors的以上四个静态工厂方法获得 ExecutorService实例,而后调用该实例的execute(Runnable command)方法即可。一旦Runnable任务传递到execute()方法,该方法便会自动在一个线程上执行。下面是Executor执行Runnable任务的示例代码:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolDemo2 { public static void main(String[] args){ ExecutorService executorService = Executors.newCachedThreadPool(); // ExecutorService executorService = Executors.newFixedThreadPool(5); // ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++){ executorService.execute(new TestRunnable()); System.out.println("************* a" + i + " *************"); } executorService.shutdown(); } } class TestRunnable implements Runnable{ public void run(){ System.out.println(Thread.currentThread().getName() + "线程被调用了。"); } }
某次执行后的结果如下:
************* a0 *************
************* a1 *************
************* a2 *************
pool-1-thread-1线程被调用了。 ************* a3 ************* ************* a4 ************* pool-1-thread-2线程被调用了。 pool-1-thread-4线程被调用了。 pool-1-thread-3线程被调用了。 pool-1-thread-5线程被调用了。
从结果中可以看出,pool-1-thread-1和pool-1-thread-2均被调用了两次,这是随机的,execute会首先在线程池中选择一个已有空闲线程来执行任务,如果线程池中没有空闲线程,它便会创建一个新的线程来执行任务。
Executor执行Callable任务
在Java 5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable<T> task) 方法来执行,并且返回一个 <T>Future<T>,是表示任务等待完成的 Future。
Callable接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常而Callable又返回结果,而当获取返回结果时可能会抛出异常。Callable中的call()方法类似Runnable的run()方法,区别同样是有返回值,后者没有。
当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且会返回执行结果Future对象。同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null。
下面给出一个Executor执行Callable任务的示例代码:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class ThreadPoolDemo3 { public static void main(String[] args){ ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<>(); //创建10个任务并执行 for (int i = 0; i < 10; i++){ //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中 Future<String> future = executorService.submit(new TaskWithResult(i)); //将任务执行结果存储到List中 resultList.add(future); } //遍历任务的结果 for (Future<String> fs : resultList){ try{ while(!fs.isDone());//Future返回如果没有完成,则一直循环等待,直到Future返回完成 System.out.println(fs.get()); //打印各个线程(任务)执行的结果 }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); }finally{ //启动一次顺序关闭,执行以前提交的任务,但不接受新任务 executorService.shutdown(); } } } } class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id){ this.id = id; } /** * 任务的具体过程,一旦任务传给ExecutorService的submit方法, * 则该方法自动在一个线程上执行 */ public String call() throws Exception { System.out.println("call()方法被自动调用!!! " + Thread.currentThread().getName()); //该返回结果将被Future的get方法得到 return "call()方法被自动调用,任务返回的结果是:" + id + " " + Thread.currentThread().getName(); } }
某次执行结果如下:
call()方法被自动调用!!! pool-1-thread-2
call()方法被自动调用!!! pool-1-thread-1 call()方法被自动调用!!! pool-1-thread-2 call()方法被自动调用!!! pool-1-thread-2 call()方法被自动调用!!! pool-1-thread-2 call()方法被自动调用!!! pool-1-thread-2 call()方法被自动调用,任务返回的结果是:0 pool-1-thread-1 call()方法被自动调用!!! pool-1-thread-4 call()方法被自动调用!!! pool-1-thread-3 call()方法被自动调用!!! pool-1-thread-5 call()方法被自动调用,任务返回的结果是:1 pool-1-thread-2 call()方法被自动调用,任务返回的结果是:2 pool-1-thread-3 call()方法被自动调用,任务返回的结果是:3 pool-1-thread-4 call()方法被自动调用,任务返回的结果是:4 pool-1-thread-2 call()方法被自动调用,任务返回的结果是:5 pool-1-thread-5 call()方法被自动调用!!! pool-1-thread-6 call()方法被自动调用,任务返回的结果是:6 pool-1-thread-2 call()方法被自动调用,任务返回的结果是:7 pool-1-thread-2 call()方法被自动调用,任务返回的结果是:8 pool-1-thread-2 call()方法被自动调用,任务返回的结果是:9 pool-1-thread-6
从结果中可以同样可以看出,submit也是首先选择空闲线程来执行任务,如果没有,才会创建新的线程来执行任务。另外,需要注意:如果Future的返回尚未完成,则get()方法会阻塞等待,直到Future完成返回,可以通过调用isDone()方法判断Future是否完成了返回。
坚持修行,别让才华配不上梦想,还辜负了所有的苦难。