显示锁
Lock接口是Java 5.0新增的接口,该接口的定义如下:
1 2 3 4 5 6 7 8 public
interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long
time
, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
与内置加锁机制不同的是,Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁的方法都是显示的。ReentrantLock实现了Lock接口,与内置锁相比,ReentrantLock有以下优势:可以中断获取锁操作,获取锁时候可以设置超时时间。以下代码给出了Lock接口的标准使用形式:
1 2 3 4 5 6 7 Lock lock = new ReentrantLock();
...
lock.lock();
try{
...
} finally {
lock.unlock();
1.1、轮询锁与定时锁
可定时的与可轮询的锁获取方式是由tryLock方法实现的,与无条件的锁获取方式相比,它具有跟完善的错误回复机制。tryLock方法的说明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 boolean tryLock():仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值
true
。如果锁不可用,则此方法将立即返回值
false
。
boolean tryLock(long
time
, TimeUnit unit) throws InterruptedException:
如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。
如果锁可用,则此方法将立即返回值
true
。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下三种情况之一前,该线程将一直处于休眠状态:
锁由当前线程获得;或者
其他某个线程中断当前线程,并且支持对锁获取的中断;或者
已超过指定的等待时间
如果获得了锁,则返回值
true
。
如果当前线程:
在进入此方法时已经设置了该线程的中断状态;或者
在获取锁时被中断,并且支持对锁获取的中断,
则将抛出 InterruptedException,并会清除当前线程的已中断状态。
如果超过了指定的等待时间,则将返回值
false
。如果
time
小于等于 0,该方法将完全不等待。
在内置锁中,死锁是一个严重的问题,恢复程序的唯一方法是重新启动程序,而防止死锁的唯一方法就是在构造程序时避免出现不一致的锁顺序,可定时的与可轮询的锁提供了另一种选择:先用tryLock()尝试获取所有的锁,如果不能获取所有需要的锁,那么释放已经获取的锁,然后重新尝试获取所有的锁,以下例子演示了使用tryLock避免死锁的方法:先用tryLock来获取两个锁,如果不能同时获取,那么就回退并重新尝试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public
boolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount, long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException {
long fixedDelay = 1;
long randMod = 2;
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (
true
) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else
{
fromAcct.debit(amount);
toAcct.credit(amount);
return
true
;
}
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() < stopTime)
return
false
;
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}
1.2、可中断的锁获取操作
lockInterruptibly方法能够在获得锁的同时保持对中断的响应,该方法说明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void lockInterruptibly() throws InterruptedException:
如果当前线程未被中断,则获取锁。
如果锁可用,则获取锁,并立即返回。
如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态:
锁由当前线程获得;或者
其他某个线程中断当前线程,并且支持对锁获取的中断。
如果当前线程:
在进入此方法时已经设置了该线程的中断状态;或者
在获取锁时被中断,并且支持对锁获取的中断,
则将抛出 InterruptedException,并清除当前线程的已中断状态。
1.3、读-写锁
Java 5除了增加了Lock接口,还增加了ReadWriteLock接口,即读写锁,该接口定义如下:
1 2 3 4 public
interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
读写锁允许多个读线程并发执行,但是不允许写线程与读线程并发执行,也不允许写线程与写线程并发执行。下面的例子使用了ReentrantReadWriteLock包装Map,从而使他能够在多个线程之间安全的共享:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public
class
ReadWriteMap <K,V> {
private
final
Map<K, V> map;
private
final
ReadWriteLock lock =
new
ReentrantReadWriteLock();
private
final
Lock r = lock.readLock();
private
final
Lock w = lock.writeLock();
public
ReadWriteMap(Map<K, V> map) {
this
.map = map;
}
public
V put(K key, V value) {
w.lock();
try
{
return
map.put(key, value);
}
finally
{
w.unlock();
}
}
public
V remove(Object key) {
w.lock();
try
{
return
map.remove(key);
}
finally
{
w.unlock();
}
}
public
void
putAll(Map<?
extends
K, ?
extends
V> m) {
w.lock();
try
{
map.putAll(m);
}
finally
{
w.unlock();
}
}
public
void
clear() {
w.lock();
try
{
map.clear();
}
finally
{
w.unlock();
}
}
public
V get(Object key) {
r.lock();
try
{
return
map.get(key);
}
finally
{
r.unlock();
}
}
public
int
size() {
r.lock();
try
{
return
map.size();
}
finally
{
r.unlock();
}
}
public
boolean
isEmpty() {
r.lock();
try
{
return
map.isEmpty();
}
finally
{
r.unlock();
}
}
public
boolean
containsKey(Object key) {
r.lock();
try
{
return
map.containsKey(key);
}
finally
{
r.unlock();
}
}
public
boolean
containsValue(Object value) {
r.lock();
try
{
return
map.containsValue(value);
}
finally
{
r.unlock();
}
}
}
同步工具类
2.1、闭锁
闭锁是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
下例给出了闭锁的常见用法,TestHarness创建一定数量的线程,利用它们并发的执行指定的任务,它使用两个闭锁,分别表示”起始门”和”结束门”。每个线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行,而每个线程要做的最后一件事是将调用结束门的countDown方法减1,这能使主线程高效地等待直到所有工作线程都执行完毕,因此可以统计所消耗的时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public
class
TestHarness {
public
long
timeTasks(
int
nThreads,
final
Runnable task)
throws
InterruptedException {
final
CountDownLatch startGate =
new
CountDownLatch(
1
);
final
CountDownLatch endGate =
new
CountDownLatch(nThreads);
for
(
int
i =
0
; i < nThreads; i++) {
Thread t =
new
Thread() {
public
void
run() {
try
{
startGate.await();
try
{
task.run();
}
finally
{
endGate.countDown();
}
}
catch
(InterruptedException ignored) {
}
}
};
t.start();
}
long
start = System.nanoTime();
startGate.countDown();
endGate.await();
long
end = System.nanoTime();
return
end - start;
}
}
2.2、FutureTask
FutureTask表示可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。FutureTask的方法摘要如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 boolean
cancel(
boolean
mayInterruptIfRunning)
试图取消对此任务的执行。
protected
void
done()
当此任务转换到状态 isDone(不管是正常地还是通过取消)时,调用受保护的方法。
V get()
throws
InterruptedException, ExecutionException
如有必要,等待计算完成,然后获取其结果。
V get(
long
timeout, TimeUnit unit)
throws
InterruptedException, ExecutionException, TimeoutException
如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
boolean
isCancelled()
如果在任务正常完成前将其取消,则返回
true
。
boolean
isDone()
如果任务已完成,则返回
true
。
void
run()
除非已将此 Future 取消,否则将其设置为其计算的结果。
protected
boolean
runAndReset()
执行计算而不设置其结果,然后将此 Future 重置为初始状态,如果计算遇到异常或已取消,则该操作失败。
protected
void
set(V v)
除非已经设置了此 Future 或已将其取消,否则将其结果设置为给定的值。
protected
void
setException(Throwable t)
除非已经设置了此 Future 或已将其取消,否则它将报告一个 ExecutionException,并将给定的 throwable 作为其原因。
FutureTask可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动,以下代码就是模拟一个高开销的计算,我们可以先调用start()方法开始计算,然后在需要结果时,再调用get得到结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public
class
Preloader {
ProductInfo loadProductInfo()
throws
DataLoadException {
return
null
;
}
private
final
FutureTask<ProductInfo> future =
new
FutureTask<ProductInfo>(
new
Callable<ProductInfo>() {
public
ProductInfo call()
throws
DataLoadException {
return
loadProductInfo();
}
});
private
final
Thread thread =
new
Thread(future);
public
void
start() {
thread.start();
}
public
ProductInfo get()
throws
DataLoadException, InterruptedException {
try
{
return
future.get();
}
catch
(ExecutionException e) {
Throwable cause = e.getCause();
if
(cause
instanceof
DataLoadException)
throw
(DataLoadException) cause;
else
throw
new
RuntimeException(e);
}
}
interface
ProductInfo {
}
}
class
DataLoadException
extends
Exception {
}
2.3、信号量
从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后等待获取许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class
Pool {
private
static
final
int
MAX_AVAILABLE =
100
;
private
final
Semaphore available =
new
Semaphore(MAX_AVAILABLE,
true
);
public
Object getItem()
throws
InterruptedException {
available.acquire();
return
getNextAvailableItem();
}
public
void
putItem(Object x) {
if
(markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected
Object[] items = ... whatever kinds of items being managed
protected
boolean
[] used =
new
boolean
[MAX_AVAILABLE];
protected
synchronized
Object getNextAvailableItem() {
for
(
int
i =
0
; i < MAX_AVAILABLE; ++i) {
if
(!used[i]) {
used[i] =
true
;
return
items[i];
}
}
return
null
;
// not reached
}
protected
synchronized
boolean
markAsUnused(Object item) {
for
(
int
i =
0
; i < MAX_AVAILABLE; ++i) {
if
(item == items[i]) {
if
(used[i]) {
used[i] =
false
;
return
true
;
}
else
return
false
;
}
}
return
false
;
}
}
获得一项前,每个线程必须从信号量获取许可,从而保证可以使用该项。该线程结束后,将项返回到池中并将许可返回到该信号量,从而允许其他线程获取该项。注意,调用 acquire() 时无法保持同步锁,因为这会阻止将项返回到池中。信号量封装所需的同步,以限制对池的访问,这同维持该池本身一致性所需的同步是分开的。
将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。
Semaphore的构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序做任何保证。特别地,闯入 是允许的,也就是说可以在已经等待的线程前为调用 acquire() 的线程分配一个许可,从逻辑上说,就是新线程将自己置于等待线程队列的头部。当公平设置为 true时,信号量保证对于任何调用获取方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、获得许可。注意,FIFO 排序必然应用到这些方法内的指定内部执行点。所以,可能某个线程先于另一个线程调用了acquire,但是却在该线程之后到达排序点,并且从方法返回时也类似。还要注意,非同步的tryAcquire 方法不使用公平设置,而是使用任意可用的许可。
通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
Semaphore还提供便捷的方法来同时 acquire 和释放多个许可。小心,在未将公平设置为 true 时使用这些方法会增加不确定延期的风险。
内存一致性效果:线程中调用“释放”方法(比如 release())之前的操作 happen-before 另一线程中紧跟在成功的“获取”方法(比如 acquire())之后的操作。
2.4、栅栏
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier很有用。因为该 barrier在释放等待线程后可以重用,所以称它为循环的barrier。
CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
示例用法:下面是一个在并行分解设计中使用barrier的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class
Solver {
final
int
N;
final
float
[][] data;
final
CyclicBarrier barrier;
class
Worker
implements
Runnable {
int
myRow;
Worker(
int
row) {
myRow = row;
}
public
void
run() {
while
(!done()) {
processRow(myRow);
try
{
barrier.await();
}
catch
(InterruptedException ex) {
return
;
}
catch
(BrokenBarrierException ex) {
return
;
}
}
}
}
public
Solver(
float
[][] matrix) {
data = matrix;
N = matrix.length;
barrier =
new
CyclicBarrier(N,
new
Runnable() {
public
void
run() {
//mergeRows(...);
}
});
for
(
int
i =
0
; i < N; ++i)
new
Thread(
new
Worker(i)).start();
waitUntilDone();
}
}
在这个例子中,每个 worker 线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的 Runnable 屏障操作,并合并这些行。如果合并者确定已经找到了一个解决方案,那么 done() 将返回 true,所有的 worker 线程都将终止。
如果屏障操作在执行时不依赖于正挂起的线程,则线程组中的任何线程在获得释放时都能执行该操作。为方便此操作,每次调用 await() 都将返回能到达屏障处的线程的索引。然后,您可以选择哪个线程应该执行屏障操作.
对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException以反常的方式离开。
内存一致性效果:线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。