一堆线程一起完成一件事情就是线程池。
【 线程池的核心组成 】
【 线程池分类 】
➢ java.util.concurrent.Executors类可以创建线程池
➣ 创建无大小限制的线程池 : public static ExecutorService newCacheThreadPool();
➢ 创建固定大小的线程池 : public static ExecutorService newFixedThreadPool(int nThreads);
➣ 单线程池 : public static ScheduledExecutorService newSingleThreadScheduledExecutor();
➣ 创建定时调度池 : public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize).
【 创建四种线程池 】
下面将具体演示四中线程池的创建以及其自身的使用特点。当Executors创建完成了线程池之后可以返回“ExecutorService”接口对象,而这个接口对象里面有两个方法来接收线程的执行:
★ 接收Callable: public <T> Future<T> submit(Callable<T> task);
★ 接收Runnable: public Future<?> submit(Runnable task);
范例:创建无限量线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 现在创建了一个线程池
ExecutorService service =
Executors.newCachedThreadPool();
for (int x = 0; x < 10; x++) {
service.submit(() -> { // 线程池会负责启动
System.out.println(Thread.currentThread()
.getName() + "执行操作。");
});
}
service.shutdown(); // 线程池执行完毕后需要关闭
}
}
范例:创建有限量的线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 线程池只能够装下3个人
ExecutorService service =
Executors.newFixedThreadPool(3) ;
for (int x = 0; x < 10; x++) {
// 线程池会负责启动
service.submit(() -> {
System.out.println(Thread.currentThread()
.getName() + "执行操作。");
});
}
service.shutdown(); // 线程池执行完毕后需要关闭
}
}
范例:创建单线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
ExecutorService service =
Executors.newSingleThreadExecutor() ;
for (int x = 0; x < 10; x++) {
// 线程池会负责启动
service.submit(() -> {
System.out.println(Thread.currentThread()
.getName() + "执行操作。");
});
}
service.shutdown(); // 线程池执行完毕后需要关闭
}
}
除了以上的三中线程池之外还可以创建一个定时调度池,这个调度池主要以间隔调度为主。如果要创建调度池则肯定使用ScheduledExecutorService接口完成,在该接口之中包含有如下的两个方法:
延迟启动:public ScheduledFuture<?>schedule(Runnable command, long delay, TimeUnit unit);
间隔调度:ScheduledFuture<?>scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit);
范例:创建调度池
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 创建一个调度池
ScheduledExecutorService service =
Executors.newScheduledThreadPool(1) ;
for (int x = 0; x < 10; x++) {
// 线程池会负责启动
service.schedule(() -> {
System.out.println(Thread.currentThread()
.getName() + "执行操作。");
},2,TimeUnit.SECONDS);
}
service.shutdown(); // 线程池执行完毕后需要关闭
}
}
范例:观察间隔调度
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 创建一个调度池
ScheduledExecutorService service =
Executors.newScheduledThreadPool(1) ;
for (int x = 0; x < 10; x++) {
// 线程池会负责启动
service.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread()
.getName() + "执行操作。");
},2,2,TimeUnit.SECONDS);
}
// service.shutdown(); // 线程池执行完毕后需要关闭
}
}
【 ExecutorService线程池处理方法 】
整个线程池的处理里面都是以ExecutorService接口的方法为核心展开的,所以如何要想去理解线程池,
还需要对这个接口的方法做一个小小的说明。
1、在Exector接口里面定义有execute()方法:public void execute(Runnable command)
*这个方法接收的是Runnable,因为Runnable没有返回值,所以该方法的返回值是void。
范例:使用execute()来代替之前的submit()。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 现在创建了一个线程池
ExecutorService service =
Executors.newCachedThreadPool();
for (int x = 0; x < 10; x++) {
// 线程池会负责启动
service.execute(() -> {
System.out.println(
Thread.currentThread().getName()
+ "执行操作。");
});
}
service.shutdown();
}
}
2、在ExecutorService接口里面的确提供有接收Runnable接口对象的方法,
但是这个方法为了统一使用的都是submit();
submit()重载了许多次,可以接收Runnable: public Future<?> submit(Runnable task)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 现在创建了一个线程池
ExecutorService service =
Executors.newCachedThreadPool();
for (int x = 0; x < 10; x++) {
// 线程池会负责启动
Future<?> future = service.submit(() -> {
System.out.println(Thread.currentThread()
.getName() + "执行操作。");
});
// Runnable接口没有返回值,所以永恒为null
System.out.println(future.get());
}
service.shutdown();
}
}
3、submit()方法是可以接收Callable接口对象的;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 现在创建了一个线程池
ExecutorService service =
Executors.newCachedThreadPool();
for (int x = 0; x < 10; x++) {
// 线程池会负责启动
Future<?> future = service.submit(() -> {
System.out.println(Thread.currentThread()
.getName() + "执行操作。");
});
// Runnable接口没有返回值,所以永恒为null
System.out.println(future.get());
}
service.shutdown();
}
}
现在已经清楚了execute()和submit()两个方法的作用,但是需要来观察一下它的源代码:
在这个方法里面主要区分三个概念:
task:是具体的线程执行任务,线程在追加到线程池的时候没有进行启动;
worker:任务的执行需要worker来支持的,可以运行的worker受到”corePoolSize”限制。
reject:如果现在线程池已经满了或者关闭了,那么就会出现拒绝新线程加入的可能性。
4、public<T>T invokeAny(Collection<? extends Callable<T>> tasks) throws
InterruptedException,ExecutionException
Future线程模型设计的优势在于:可以进行数据的异步返回,但是之前编写的过程严格来讲并不好,
相当于启动了一个线程就获得了一个返回值,于是为了方面这些线程池中线程对象的管理,
可以使用此方法进行统一返回。
范例:使用invokeAny()方法
import java.util.HashSet; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 所有任务
Set<Callable<String>> tasks
= new HashSet<Callable<String>>() ;
// 追加线程执行任务
for (int x = 0; x < 10; x++) {
int temp = x ;
tasks.add(() -> {
// 线程池会负责启动
return Thread.currentThread().getName()
+ "执行操作。 x = " + temp ;
});
}
// 现在创建了一个线程池
ExecutorService service =
Executors.newCachedThreadPool();
// 执行任务
String invokeAny = service.invokeAny(tasks) ;
System.out.println("返回结果:" + invokeAny);
service.shutdown();
}
}
返回结果:pool-1-thread-1执行操作。 x = 2
使用invokeAny()只会返回一个任务的执行操作。
5、public<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException
此时会返回多个任务的执行结果,以List集合的形式返回。
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 所有任务
Set<Callable<String>> tasks =
new HashSet<Callable<String>>() ;
// 追加线程执行任务
for (int x = 0; x < 10; x++) {
int temp = x ;
// 线程池会负责启动
tasks.add(() -> {
return Thread.currentThread().getName() + "执行操作。 x = " + temp ;
});
}
// 现在创建了一个线程池
ExecutorService service = Executors.newCachedThreadPool();
// 执行任务
List<Future<String>> invokeAll =
service.invokeAll(tasks) ;
for (Future<String> fut : invokeAll) {
System.out.println("返回结果:" + fut.get());
}
service.shutdown();
}
}
清除invokeAny()、invokeAll()实际上就可以更加清楚任务(task)和工作者(Worker)之间的关系。
【 CompletionService线程池异步交互 】
➣ 将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者submit()执行的任务。
使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果。
➣ CompletionService依赖于一个单独的Executor来实际执行任务,在这种情况下,
CompletionService只管理一个内部完成队列,在CompletionService接口里面提供有如下两个方法:
➣ 设置Callable : public Future<V> submit(Callable<V> task) ;
➣ 设置Runnable : public Future<V> submit(Runnable task, V result) ;
CompletionService是一个接口,如果要想使用这个接口可以采用
ExecutorCompletionService子类。
• 构造方法:public ExecutorCompletionService(Executor executor) ;
CompletionService来控制所有线程池的操作以及数据返回,则应该使用这个类来进行线程池的提交处理。
• 提交线程:public Future<V> submit(Callable<V> task) ;
• 获取返回内容:public Future<V> take() throws InterruptedException ;
范例:使用CompletionService工具类
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 现在创建了一个线程池
ExecutorService service =
Executors.newCachedThreadPool();
CompletionService<String> completions =
new ExecutorCompletionService<String>(service) ;
for (int x = 0 ; x < 10 ; x ++) {
int temp = x ;
completions.submit(()->{
return Thread.currentThread()
.getName() + " - x = " + temp ;
}) ;
}
for (int x = 0 ; x < 10 ; x ++) {
System.out.println(completions.take().get());
}
service.shutdown();
}
}
CompletionService操作接口的主要目的是告诉大家可以去隐藏ExecutorService接口执行线程池的处理了,不再需要关注那些invokeAny()、invokeAll()的执行方法了。
【 ThreadPoolExecutor线程池执行者 】
线程池的创建主要依靠的事一个类完成的;ThreadPoolExecutor。下面以无限量线程池为例做一个说明;
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>() );
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>() );
}
如果掌握了ThreadPoolExecutor类的使用,那么你就可以自己来定义线程池了,首先来观察这个类的构造方法;
范例:可以创建属于自己的线程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<Runnable> queue = new
ArrayBlockingQueue<Runnable>(2) ;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2,5,6L,TimeUnit.SECONDS,queue) ;
for (int x = 0 ; x < 2 ; x ++) {
int temp = x ;
pool.execute(()->{
System.out.println("【"
+Thread.currentThread().getName()
+"】任务开始执行 - " + temp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("【"
+Thread.currentThread().getName()
+"】任务结束执行 - " + temp);
});
}
}
}
【pool-1-thread-1】任务开始执行 – 0
【pool-1-thread-2】任务开始执行 – 1
【pool-1-thread-1】任务结束执行 – 0
【pool-1-thread-2】任务结束执行 – 1
在线程池之中存在拒绝策略的概念,所谓的拒绝策略指的是线程池满了之后的其它等待线程的处理状态,
在ThreadPoolExecutor类里面提供有一些”RejectedExecutionHandler”子类,如果现在被拒绝了会出现”拒绝异常”
(默认AbortPolicy)。对于给出的几种拒绝策略如下:
* AbortPolicy(默认实现):当任务添加到线程池中被拒绝时,会抛出拒绝异常”RejectedExecutionException”;
* DiscardPolicy:当将任务添加到线程池中被拒绝的时候,线程池将直接丢弃该拒绝的任务;
* DiscardOldestPolicy:当被拒绝的时候,线程池会放弃等待队列中时间最长的未被处理的任务,
让后将拒绝的任务添加到队列之中;
* CallerRunsPolicy:当任务被拒绝时,会在线程池当前正在运行的Thread线程之中处理该任务(加塞儿)。
范例:修改一下拒绝策略
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<Runnable> queue =
new ArrayBlockingQueue<Runnable>(2) ;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,2,6L, TimeUnit.SECONDS,
queue,Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()) ;
for (int x = 0 ; x < 5 ; x ++) {
int temp = x ;
pool.execute(()->{
System.out.println("【BEFORE - "
+Thread.currentThread().getName()
+"】任务开始执行 - " + temp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("【AFTER - "
+Thread.currentThread().getName()
+"】任务结束执行 - " + temp);
});
} } }
由于只设置了一个CorePoolSize,所以当多余的任务出现之后将才哦难过设置的默认的拒绝策略
”new ThreadPoolExecutor.AbortPolicy()”,则会出现”RejectedExecutionException”,
如果出现拒绝之后可以丢弃被拒绝的任务,那么也可以将拒绝策略修改为:
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 6L, TimeUnit.SECONDS,
queue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() );
对于拒绝策略最简单的解释就是:线程池中corePoolSize满了,再追加的任务的处理方案。