同步工具类可以使任何一种对象,只要该对象可以根据自身的状态来协调控制线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:信号量(Semaphore)、栅栏(Barrier)、闭锁(Latch)以及交换器(Exchanger)。 –《Java并发编程实战》
信号量
计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
概念上,Semaphore管理一组许可(permits),许可的数量根据构造器指定。在执行操作前通过acquire获取许可(有许可剩余,否则需要等待),操作执行完成后通过release释放许可。实际上,并不存在许可对象,Semaphore仅仅维护一个可用许可数量,并以此工作。
Semaphore通常用于严格控制访问某物理或逻辑资源的线程数,JavaDoc示例如下:
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
在获取对象前必须先取得许可,以保证对象可用,使用完成后返回对象,并交还许可,以允许其它线程使用。
初始值为1的Semaphore,即二值信号量,最多只允许拥有1个许可,可用作互斥锁。
Semaphore构造器接受一个可选的公平参数。当设置为false时,信号量不保证线程获取许可的顺序,有可能后调用acquire的线程先获取许可;当设置为true时,信号量保证先调用acquire的线程先获取许可(FIFO)。但是,tryAcquire方法不区分公平参数,只要有可用的许可就会获取。同时,acquire(int)和release(int)方法允许一次获取/释放多个许可。
通常,使用信号量来控制资源访问时应该使用公平模式,防止有线程获取不到资源而饿死。当信号量用于其它类型同步控制时,在吞吐量优势上非公平模式胜过公平模式。
示例:
package thread;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/** * @author QFJiang on 2018/03/05 10:32 */
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(8);
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Task(1, semaphore, 3));
executor.execute(new Task(2, semaphore, 4));
executor.execute(new Task(3, semaphore, 6));
executor.shutdown();
}
static class Task implements Runnable {
private int id;
private Semaphore semaphore;
private int permit;
public Task(int id, Semaphore semaphore, int permit) {
this.id = id;
this.semaphore = semaphore;
this.permit = permit;
}
@Override
public void run() {
try {
semaphore.acquire(permit);
System.out.println("Task " + id + " acquire " + permit + " semaphore");
Thread.sleep(1000 + new Random().nextInt(1000));
semaphore.release(permit);
System.out.println("Task " + id + " release " + permit + " semaphore");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出:(只有Task1、Task2全部释放许可之后,Task3才能获取足够许可)
Task 2 acquire 4 semaphore
Task 1 acquire 3 semaphore
Task 1 release 3 semaphore
Task 2 release 4 semaphore
Task 3 acquire 6 semaphore
Task 3 release 6 semaphore
闭锁
闭锁可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关着的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须在这个闭锁上等待。
- 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S以来的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。
- 等待直到某个操作的所有参与者(例如:在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有的玩家都准备就绪时,闭锁将到达结束状态。
CountDownLatch闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法将递减计数器,表示有一个事件已经发生了,而await方法等待计数器到达零,这表示所有需要的事件都已经发生。如果计数器的值为非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
JavaDoc示例如下:
class Driver {
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
FutureTask
FutureTask也可以用作闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算)。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3中状态:等待运行(Waiting to run),正在运行(Running)和完成运行(Completed)。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态以后,它会永远停止在这个状态上。
Future.get的行为取决于任务的状态。如果任务已经执行完成,那么get会立即返回结果,否则get将阻塞知道任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。
栅栏
栅栏(Barrier)类似于闭锁,它能阻塞线程直到某个事件发生。栅栏与闭锁的关键区别在于:所有线程必须同时到达栅栏位置,才能继续执行。闭锁也用于等待事件,而栅栏也用于等待其他线程。比如开班级会议,需要班级成员全部到达之后才开始。
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将也一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时,将调用await方法,这个方法将阻塞直到所有线程都到达阻塞位置。如果所有线程都到达了栅栏的位置,那么栅栏将打开,此时所有线程都将被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏参数传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在下一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
示例:
package thread;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** * @author QFJiang on 2017/04/18 */
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("All are ready");
}
});
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new TaskThread(barrier));
}
exec.shutdown();
}
static class TaskThread implements Runnable {
private CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is preparing");
try {
Thread.sleep(1000 + new Random().nextInt(1000));
System.out.println(Thread.currentThread().getName() + " is ready");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000 + new Random().nextInt(1000));
System.out.println(Thread.currentThread().getName() + " is finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出:
pool-1-thread-1 is preparing
pool-1-thread-2 is preparing
pool-1-thread-3 is preparing
pool-1-thread-4 is preparing
pool-1-thread-5 is preparing
pool-1-thread-1 is ready
pool-1-thread-3 is ready
pool-1-thread-4 is ready
pool-1-thread-2 is ready
pool-1-thread-5 is ready
All are ready
pool-1-thread-4 is finished
pool-1-thread-2 is finished
pool-1-thread-5 is finished
pool-1-thread-3 is finished
pool-1-thread-1 is finished
栅栏与闭锁的一个重要区别在于:闭锁只能使用一次,而栅栏可以重复循环使用(Cyclic)。 例如,一个游乐场项目一次只允许5个人玩,而且必须满5人才可以,这就是一个栅栏,每次等5人齐了开始游玩,结束之后换另外5人可以重新开始。
交换器
Exchanger是另一种形式的栅栏,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger会非常有用,例如当也一个线程向缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。
示例:
package thread;
import java.util.Random;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** * @author QFJiang on 2017/04/19 */
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(new Producer(exchanger, "Full queue"), "Producer").start();
new Thread(new Consumer(exchanger, "Empty queue"), "Consumer").start();
}
static class Producer implements Runnable {
private Exchanger<String> exchanger;
private String queue;
public Producer(Exchanger<String> exchanger, String queue) {
this.exchanger = exchanger;
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " exchange " + queue + " out");
Thread.sleep(1000 + new Random().nextInt(1000));
String other = exchanger.exchange(queue);
System.out.println(Thread.currentThread().getName() + " exchange " + other + " back");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
private Exchanger<String> exchanger;
private String queue;
public Consumer(Exchanger<String> exchanger, String queue) {
this.exchanger = exchanger;
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " exchange " + queue + " out");
Thread.sleep(1000 + new Random().nextInt(1000));
String other = exchanger.exchange(queue);
System.out.println(Thread.currentThread().getName() + " exchange " + other + " back");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出:
Producer exchange Full queue out
Consumer exchange Empty queue out
Producer exchange Empty queue back
Consumer exchange Full queue back