【 信号量:Semaphore 】
➣ Semaphore通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
➣ 例如,大家排队去银行办理业务,但是只有两个银行窗口提供服务,来了10个人需要排队,所以这10个排队的人员就需 要依次使 用这两个业务窗口。
首先来观察java.util.concurrent.Semaphore类的基本定义形式:
public class Semaphore extends Object implements Serializable
Semaphore类中定义的方法有如下几个:
构造方法:public Semaphore(int permits)设置服务的信号数量。
构造方法:public Semaphore(int permits, Boolean fair)是否为公平锁。
等待执行:public void acquireUninteruptibly(int permits)
|- 设置的信号量上如果有阻塞的线程对象存在,那么讲一直持续阻塞状态;
释放线程的阻塞状态:public void release(int permits);
返回可用的资源个数:public int availablePermits();
范例:实现银行排队办公处理
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 现在允许操作的资源一共有2两个
Semaphore sem = new Semaphore(2);
// 模拟每一个用户办理业务的时间
Random rand = new Random();
for (int x = 0; x < 10; x++) {
// 每一个线程就是一个要办理业务的人员
new Thread(() -> {
// 现在有空余窗口
if (sem.availablePermits() > 0) {
System.out.println("
【"+Thread.currentThread().getName()
+"】进入银行,此时银行没有人排队");
} else { // 没有空余位置
System.out.println("【"+Thread.currentThread()
.getName()+"】排队等待办理业务。");
}
try {
// 从信号量中获得操作许可
sem.acquire();
System.out.println("
【"+Thread.currentThread().getName()+"】{start}开始办理业务。");
// 模拟办公延迟
TimeUnit.SECONDS.sleep(rand.nextInt(10));
System.out.println("【"+Thread.currentThread().getName()+"】{end}结束办理业务。");
// 当前线程离开办公窗口
sem.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"顾客-" + x).start();
}
}
}
【顾客-0】进入银行,此时银行没有人排队
【顾客-3】进入银行,此时银行没有人排队
【顾客-2】进入银行,此时银行没有人排队
【顾客-1】进入银行,此时银行没有人排队
【顾客-5】排队等待办理业务。
【顾客-4】排队等待办理业务。
【顾客-6】排队等待办理业务。
【顾客-3】{start}开始办理业务。
【顾客-7】排队等待办理业务。
【顾客-0】{start}开始办理业务。
【顾客-8】排队等待办理业务。
【顾客-9】排队等待办理业务。
【顾客-0】{end}结束办理业务。
【顾客-2】{start}开始办理业务。
【顾客-3】{end}结束办理业务。
【顾客-1】{start}开始办理业务。
【顾客-1】{end}结束办理业务。
【顾客-5】{start}开始办理业务。
【顾客-2】{end}结束办理业务。
【顾客-4】{start}开始办理业务。
【顾客-4】{end}结束办理业务。
【顾客-5】{end}结束办理业务。
【顾客-6】{start}开始办理业务。
【顾客-7】{start}开始办理业务。
【顾客-7】{end}结束办理业务。
【顾客-8】{start}开始办理业务。
【顾客-6】{end}结束办理业务。
【顾客-9】{start}开始办理业务。
【顾客-8】{end}结束办理业务。
【顾客-9】{end}结束办理业务。
这种信号量的处理在实际开发中作用:(例如)现在对于数据库的操作连接一共有2个连接,那么可能有10个用户等待进行数据库操作,能够使用的连接个数为2个,这样这10个用户就需要排队依次使用这两个连接来进行操作。
【 闭锁:CountDownLatch 】
CountDownLatch描述的是一个技术的减少,下面首先来观察一个程序的简单问题。
范例:编写一个简单的多线程开发
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
for (int x = 0; x < 2; x++) {
new Thread(() -> {
System.out.println("【"+Thread.currentThread().getName()+"】线程应用执行完毕。");
},"线程对象-" + x).start();
}
System.out.println("【*** 主线程 ***】所有的程序执行完毕。");
}
}
对于此时应该保证所有的线程执行完毕后再进行程序的输出计算,就好比:旅游团集合人员乘车离开,因该保证所有的线程都执行完毕了(指定个数的线程),这样的话就必须做一个计数处理。
【 闭锁:CountDownLatch 】
CountDown类之中的常用方法有如下几个:
构造方法:public CountDownLatch(int count),要设置一个等待的线程个数;
减少等待个数:public void countdown();
等待countDownLatch为0:public void await() throws InterruptedException;
范例:利用CountDown解决之前的设计问题
import java.util.concurrent.CountDownLatch;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 2个线程全部执行完毕后可以继续执行
CountDownLatch cdl = new CountDownLatch(2) ;
for (int x = 0; x < 2; x++) {
new Thread(() -> {
System.out.println("【"
+Thread.currentThread().getName()
+"】线程应用执行完毕。");
cdl.countDown(); // 减少等待的线程个数
},"线程对象-" + x).start();
}
cdl.await(); // 等待计数的结束(个数0)
System.out.println("【*** 主线程 ***】所有的程序执行完毕。");
}
}
-----------
【线程对象-0】线程应用执行完毕。
【线程对象-1】线程应用执行完毕。
【*** 主线程 ***】所有的程序执行完毕。
CountDown的作用,类定义的方法、操作流程都可能成为面试题。
【栅栏:CyclicBarrier】
CyclicBarrier和CountDownLatch是非常相似的,CyclicBarrier核心的概念是在于设置一个等待线程的数量边界,到达了此边界之后进行执行。
CyclicBarrier类的主要方法如下:
构造方法:public CyclicBarrier(int parties), 设置等待的边界;
傻傻等待其它线程:public int await() throws InterruptedException, BrokenBarrierException;
等待其它线程:public int await(long timeout, TimeUnit unit) throws
InterruptedException, BrokenBarrierException, TimeoutException。
重置等待线程个数:public void reset()。
范例:观察CyclicBarrier进行等待处理
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 当凑够2个线程就进行触发
CyclicBarrier cb = new CyclicBarrier(2);
for (int x = 0; x < 3; x++) {
int sec = x ;
new Thread(() -> {
System.out.println("【"
+ Thread.currentThread().getName()
+ " - 等待开始】");
try {
TimeUnit.SECONDS.sleep(sec);
cb.await(); // 等待处理
} catch (Exception e) { e.printStackTrace(); }
System.out.println("【"
+ Thread.currentThread().getName()
+ " - 等待结束】");
}, "娱乐者-" + x).start();
}
}
}
如果不想一直等待则可以设置超时时间,则超过了等待时间之后将出现”TimeoutException”
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 当凑够2个线程就进行触发
CyclicBarrier cb = new CyclicBarrier(2);
for (int x = 0; x < 3; x++) {
int sec = x ;
new Thread(() -> {
System.out.println("【"
+ Thread.currentThread().getName()
+ " - 等待开始】");
try {
TimeUnit.SECONDS.sleep(sec);
cb.await(6,TimeUnit.SECONDS); // 等待处理
} catch (Exception e) { e.printStackTrace(); }
System.out.println("【"
+ Thread.currentThread().getName()
+ " - 等待结束】");
}, "娱乐者-" + x).start();
}
}
}
范例:重置处理 (可以进行重置处理)
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 当凑够2个线程就进行触发
CyclicBarrier cb = new CyclicBarrier(2);
for (int x = 0; x < 3; x++) {
int sec = x ;
new Thread(() -> {
System.out.println("【" + Thread.currentThread().getName() + " - 等待开始】");
try {
if (sec == 2) { // 重置
cb.reset(); // 重置
System.out.println("【重置处理】 ****** " +
Thread.currentThread().getName());
} else {
TimeUnit.SECONDS.sleep(sec);
cb.await(6,TimeUnit.SECONDS); // 等待处理
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("【" +
Thread.currentThread().getName()
+ " - 等待结束】");
}, "娱乐者-" + x).start();
}
}
}
面试题:CountDownLatch与CyclicBarrier区别?
CountDownLatch:最大特征是进行一个数据减法的操作等待,所有的统计操作一旦开始之中就必须一直执行countDown()方法,
如果等待个数不是0将被一直等待,并且无法重置;
CyclicBarrier:设置一个等待的临界点,并且可以有多个等待线程出现,只要满足了临界点触发了线程的执行代码后将重新开始进行计数处理操作,也可以直接利用reset()方法执行重置操作。
【 交换空间:Exchanger 】
如果说现在有两个线程,一个线程负责生产数据,另外一个线程负责消费数据,那么这两个线程之间一定会存在有一个公共的区域,那么这个公共区域的实现在juc包之中就成为Exchanger。
➣ java.util.concurrent.Exchanger类表示一种两个线程可以进行互相交换对象的汇合点。
Exchanger类中的定义的方法如下:
构造方法:public Exchanger(),创建了一个对象;
设置与获得:public V exchange(V x) throws InterruptedException。
范例:使用Exchanger实现交换处理
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 准备出一个交换空间
Exchanger<String> exc = new Exchanger<String>();
for (int x = 0; x < 3; x++) {
new Thread(() -> {
while (true) {
try {
String data = exc.exchange(null) ;
TimeUnit.SECONDS.sleep(2);
// 现在取得了生产者的数据
if (data != null) {
System.out.println("【" +
Thread.currentThread().getName() + "】" + data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者-" + x).start();
}
// 准备定义两个生产者
for (int x = 0; x < 2; x++) {
int temp = x;
new Thread(() -> {
// 一共生产四个数据
for (int y = 0; y < 2; y++) {
String data = "MLDN - " + temp + " - " + y;
try {
// 让生产者放慢节奏
TimeUnit.SECONDS.sleep(2);
// 将生产的数据保存在交换空间
exc.exchange(data);
System.out.println("【" +
Thread.currentThread().getName()
+ "】生产了数据:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者-" + x).start();
}
}
}
对于这样的操作处理,在多线程基础部分时,当时实现是大量的采用了wait()、notify()、synchronized等操作,但是如果有了JUC中的Exchanger整体操作变得简单许多。
【 线程回调:CompletableFuture 】
场景:现在要使用炮兵轰炸某一个著名的标志物。
所有的执行线程在接收到命令之前都要进入到阻塞状态之中,一直到接收到具体命令之后才会执行下一步的操作处理。
➣ CompletableFuture是java8中添加的一个类,该类主要的作用就是提供了新的方式来完成异步处理,包括合成和组合事件的非阻塞方式。
CompletableFuture类之中提供有如下的方法:
构造方法:public CompletableFuture();
获取命令:public T get() throws InterruptedException, ExecutionException;
范例:使用CompletableFuture实现一个打炮操作
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 是直接利用构造方法来进行类的对象创建
CompletableFuture<String> future =
new CompletableFuture<String>() ;
for (int x = 0; x < 10; x++) {
new Thread(() -> {
System.out.println("BEFORE【"
+Thread.currentThread().getName()
+"】进入炮兵阵地,等待命令,准备开火。");
try {
String cmd = future.get() ; // 接收命令
if ("fire".equalsIgnoreCase(cmd)) {
System.out.println("AFTER【"
+Thread.currentThread().getName()
+"】接收到命令,立刻开火,干死那个死胖子。。" );
}
if ("cancel".equalsIgnoreCase(cmd)) {
System.out.println("AFTER【"
+Thread.currentThread().getName()
+"】收到撤退指令,回家睡觉。" );
}
} catch (Exception e) {
e.printStackTrace();
}
},"炮兵-" + x).start();
}
TimeUnit.SECONDS.sleep(3); // 等待3秒钟
future.complete("cancel") ; // 给出了执行命令
}
}
该操作的处理主要是建立在Future线程模型的基础智商的实现操作。
对于本类而言,除了以上的使用方式之外还可以采用异步的线程执行方式处理。在创建CompletableFuture类对象的时候还可以使用这个类之中所提供的一个静态方法:public static CompletableFuture<Void> runAsync(Runnable runnable).
范例:更换实现形式
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
// 如果使用Runnable接口对象,那么泛型的类型必须为Void,表示没有返回值
// 此线程执行完毕后开火
CompletableFuture<Void> future =
CompletableFuture.runAsync(() -> {
System.out.println("
【FUTURE】将军正在温柔乡里美梦呢,等着将军醒开炮。");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("【FUTURE】睡醒了,开始干活了。");
});
for (int x = 0; x < 10; x++) {
new Thread(() -> {
System.out.println("BEFORE【"
+Thread.currentThread().getName()
+"】进入炮兵阵地,等待命令,准备开火。");
try {
System.out.println("AFTER【"
+Thread.currentThread().getName()
+"】接收到命令,立刻开火,干死那个死胖子。。"
+ future.get());
} catch (Exception e) {
e.printStackTrace();
}
},"炮兵-" + x).start();
} } }
这个类的最大好处是提供所有等待线程的执行触发点。