安全性和活跃度通常相互牵制。我们使用锁来保证线程安全,但是滥用锁可能引起锁顺序死锁。类似地,我们使用线程池和信号量来约束资源的使用,
但是缺不能知晓哪些管辖范围内的活动可能形成的资源死锁。Java应用程序不能从死锁中恢复,所以确保你的设计能够避免死锁出现的先决条件是非常有价值。
一.死锁
经典的“哲学家进餐”问题很好的阐释了死锁。5个哲学家一起出门去吃中餐,他们围坐在一个圆桌边。他们只有五只筷子(不是5双),每两个人中间放有一只。
哲学家边吃边思考,交替进行。每个人都需要获得两只筷子才能吃东西,但是吃后要把筷子放回原处继续思考。有一些管理筷子的算法,使每一个人都能够或多或少,及时
吃到东西(一个饥饿的哲学家试图获得两只临近的筷子,但是如果其中的一只正在被别人占用,那么他英爱放弃其中一只可用的筷子,等待几分钟再尝试)。但是这样做可能导致
一些哲学家或者所有哲学家都饿死 (每个人都迅速捉住自己左边的筷子,然后等待自己右边的筷子变成可用,同时并不放下左边的筷子)。这最后一种情况,当每个人都拥有他人需要的
资源,并且等待其他人正在占有的资源,如果大家一致占有资源,直到获得自己需要却没占有的其他资源,如果大家一致占有资源,直到获得自己需要却没被占有的其他资源,那么就会产生死锁。
当一个线程永远占有一个锁,而其他线程尝试去获得这个锁,那么他们将永远被阻塞。当线程Thread1占有锁A时,想要获得锁B,但是同时线程Thread2持有B锁,并尝试获得A锁,两个线程将永远等待下去。
这种情况是死锁最简单的形式.
例子如下代码:
public class DeadLock { private static Object lockA = new Object(); private static Object lockB = new Object(); public static void main(String[] args) { new DeadLock().deadLock(); } private void deadLock() { Thread thread1 = new Thread(new Runnable() { public void run() { synchronized (lockA){ try { System.out.println(Thread.currentThread().getName() + "获取A锁 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "需要B锁!!!"); synchronized (lockB){ System.out.println(Thread.currentThread().getName() + "B锁获取成功"); } } } },"Thread1"); Thread thread2 = new Thread(new Runnable() { public void run() { synchronized (lockB){ try { System.out.println(Thread.currentThread().getName() + "获取B锁 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "需要A锁!!!"); synchronized (lockA){ System.out.println(Thread.currentThread().getName() + "A锁获取成功"); } } } },"Thread2"); thread1.start(); thread2.start(); } }
运行结果如下图:
结果很明显了,这两个线程陷入了死锁状态了,发生死锁的原因是,两个线程试图通过不同的顺序获得多个相同的锁。如果请求锁的顺序相同,
就不会出现循环的锁依赖现象(你等我放锁,我等你放锁),也就不会产生死锁了。如果你能够保证同时请求锁A和锁B的每一个线程,都是按照从锁A到锁B的顺序,那么就不会发生死锁了。
如果所有线程以通用的固定秩序获取锁,程序就不会出现锁顺序死锁问题了。
什么情况下会发生死锁呢?
1.锁的嵌套容易发生死锁。解决办法:获取锁时,查看是否有嵌套。尽量不要用锁的嵌套,如果必须要用到锁的嵌套,就要指定锁的顺序,因为参数的顺序是超乎我们控制的,为了解决这个问题,我们必须指定锁的顺序,并且在整个应用程序中,
获得锁都必须始终遵守这个既定的顺序。
上面的例子出现死锁的根本原因就是获取所的顺序是乱序的,超乎我们控制的。上面例子最理想的情况就是把业务逻辑抽离出来,把获取锁的代码放在一个公共的方法里面,让这两个线程获取锁
都是从我的公共的方法里面获取,当Thread1线程进入公共方法时,获取了A锁,另外Thread2又进来了,但是A锁已经被Thread1线程获取了,Thread1接着又获取锁B,Thread2线程就不能再获取不到了锁A,更别说再去获取锁B了,这样就有一定的顺序了。
上面例子的改造如下:
public class DeadLock { private static Object lockA = new Object(); private static Object lockB = new Object(); public static void main(String[] args) { new DeadLock().deadLock(); } private void deadLock() { Thread thread1 = new Thread(new Runnable() { public void run() { getLock(); } },"Thread1"); Thread thread2 = new Thread(new Runnable() { public void run() { getLock(); } },"Thread2"); thread1.start(); thread2.start(); } public void getLock() { synchronized (lockA){ try { System.out.println(Thread.currentThread().getName() + "获取A锁 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "需要B锁!!!"); synchronized (lockB){ System.out.println(Thread.currentThread().getName() + "B锁获取成功"); } } } }
运行结果如下:
可以看到把业务逻辑抽离出来,把获取锁的代码放在一个公共的方法里面,获得锁都必须始终遵守这个既定的顺序。
2.引入显式锁的超时机制特性来避免死锁
超时机制是监控死锁和从死锁中恢复的技术,是使用每个显式所Lock类中定时tryLock特性,来替代使用颞部所机制。在内部锁的机制中,只要没有获得锁,就永远保持等待,而
显示的锁使你能狗定义超时的时间,在规定时间之后tryLock还没有获得锁就会返回失败。通过使用超时,尽管这段时间比你预期能够获得所的时间长很多,你仍然可以在意外发生后重新
获得控制权。当尝试获得定时锁失败时,你并不需要知道原因。也许是因为有死锁发生,也许是线程在持有锁的时候错误地进入无限循环;也有可能是执行一些活动所花费的时间比你
预期慢了许多。不过至少你有机会了解到你的尝试已经失败,记录下这次尝试中有用的信息,并重新开始计算,这远比关闭整个线程要优雅得多。
即使定时锁并没有应用于整个系统,使用它来获得多重锁还是能够有效应对死锁。如果获取锁的请求超时,你可以释放这个锁,并后退,等待一会后再尝试,这很可能消除了死锁发生的条件,
并且循序程序恢复。(这项技术只有在同时获得两个锁的时候才有效;如果多个锁是在嵌套的方法中被请求的,你无法仅仅释放外层的锁,尽管你知道自己已经持有该锁)
显式锁Lock,Lock是一个接口,定义了一些抽象的所操作。与内部锁机制不同,Lock提供了无条件,可轮询,定时的,可中断的锁获取操作,所有加锁和解锁的方法都是显式的。
Lock的实现必须提供举报与内部锁相同的内存可见性的语义。但是加锁的语义,调度算法,顺序保证,性能特性这些可以不同。
Lock接口源码如下:
public interface Lock { //加锁 void lock(); //可中断的锁,打算线程的等待状态,即A线程已经获取该锁,B线程又来获 //取,但是A线程会通知B,来打算B线程的等待。 void lockInterruptibly() throws InterruptedException; //尝试去获取锁,失败返回False boolean tryLock(); //超时机制获取锁 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //释放锁 void unlock(); Condition newCondition(); }
ReentranLock实现了Lock接口,提供了与synchronized相同的互斥和内存可见性的保证。获得ReentrantLock的锁与进入synchronized块有着相同内存含义,释放ReentrantLock锁与退出synchronized块有着相同内存含义。
ReentrantLock提供了与synchronized一样可重入加锁的语义。ReentrantLock支持Lock接口定义的所有获取锁的方式。与synchronized相比,ReentranLock为处理不可用的锁提供了更多灵活性。
但是对于现在的JDK的更新,synchronized的性能被优化的越来越好,内部锁(synchronized)已经获得相当可观的性能,性能不仅仅是个不断变化的目标,而且变化的非常快。
如下图:
看到图,随着JDK的更新迭代,内部锁的性能越来越快,这不是ReentrantLock的衰退,而是内部锁(synchronized)越来越快,特别在JDK目前跟新到现在1.9.
下面用显式锁Lock再来改造上面的例子
public class DeadLock { Lock lock = new ReentrantLock(); private static Object lockA = new Object(); private static Object lockB = new Object(); public static void main(String[] args) { new DeadLock().deadLock(); } private void deadLock() { Thread thread1 = new Thread(new Runnable() { public void run() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + "获取A锁 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } System.out.println(Thread.currentThread().getName() + "需要B锁!!!"); try { lock.lock(); System.out.println(Thread.currentThread().getName() + "B锁获取成功"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }, "Thread1"); Thread thread2 = new Thread(new Runnable() { public void run() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + "获取B锁 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } System.out.println(Thread.currentThread().getName() + "需要A锁!!!"); try { lock.lock(); System.out.println(Thread.currentThread().getName() + "A锁获取成功"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }, "Thread1"); thread1.start(); thread2.start(); } }
运行结果如下:
可以看到显示锁Lock是可以避免死锁的。
注意:Lock接口规范形式。这种模式在某种程度上比使用内部锁更加复杂:锁必须在finally块中释放。另一方面,如果锁守护的代码在try块之外抛出了异常,它将永远都不会被释放了;如果对象
能够被置于不一致状态,可能需要额外的try-catch,或try-finally块。(当你在使用任何形式的锁时,你总是应该关注异常带来的影响,包括内部锁)。
忘记时候finally释放Lock是一个定时炸弹。当不幸发生的时候,你将很难追踪到错误的发生点,因为根本没有记录锁本应该被释放的位置和时间。这就是ReentrantLock不能完全替代synchronized的原因:它更加危险,
因为当程序的控制权离开守护的块,不会自动清除锁。尽管记得在finally块中释放锁并不苦难,但忘记的可能仍然存在。
sy
可轮询的和可定时的锁请求
可定时的与可轮询的锁获取模式,是由tryLock方法实现,与物体爱建的锁获取相比,它具有更完善的错误恢复机制。在内部锁中,死锁是致命的,唯一的恢复方法是重新启动程序,唯一的预防方法是在构建程序时不要出错,
所以不可能循序不一致的锁顺序。可定时的与可轮询的锁提供了另外一个选择:可以规避死锁的放生。
如果你不能获得所有需要的锁,那么使用可定时的与可轮询的获取方式(tryLock)使你能够重新拿到控制权,它会释放你已经获得的这些锁,然后再重新尝试(或者至少会记录这个失败,抑或者采取其他措施)。使用tryLock试图获得两个锁,
如果不能同时获得两个,就回退,并重新尝试。休眠时间由一个特定的组件管理,并由一个随机组件减少活锁发生的可能性。如果一定时间内,没有获得所有需要的锁,就会返回一个失败状态,这样操作就能优雅的失败了。
tryLock()经常与if esle一起使用。
读-写锁
ReentrantLock实现了标准的互斥锁:一次最多只有一个线程能够持有相同ReentrantLock。但是互斥通常做为保护数据一致性的很强的加锁约束,因此,过分的限制了并发性。互斥是保守的加锁策略,避免了
“写/写”和“写/读”的重读,但是同样避开了”读/读”的重叠。在很多情况下,数据结构是”频繁被读取“的——它们是可变的,有时候会被改变,但多数访问只进行读操作。此时,如果能够放宽,允许多个读者同时访问数据结构就
非常好了。只要每个线程保证能够读到最新的数据(线程的可见性),并且在读者读取数据的时候没有其他线程修改数据,就不会发生问题。这就是读-写锁允许的情况:一个资源能够被多个读者访问,或者被一个写者访问,两者不能同时进行。
ReadWriteLock,暴露了2个Lock对象,一个用来读,另一个用来写。读取ReadWriteLock锁守护的数据,你必须首先获得读取的锁,当需要修改ReadWriteLock守护的数据,你必须首先获得写入锁。
ReadWriteLock源码接口如下:
public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing */ Lock writeLock(); }
读写锁实现的加锁策略允许多个同时存在的读者,但是只允许一个写者。与Lock一样,ReadWriteLock允许多种实现,造成性能,调度保证,获取优先,公平性,以及加锁语义等方面的不尽相同。
读写锁的设计是用来进行性能改进的,使得特定情况下能够有更好的并发性。时间实践中,当多处理器系统中,频繁的访问主要为读取数据结构的时候哦,读写锁能够改进性能;在其他情况下运行的情况比独占
的锁要稍微差一些,这归因于它更大的复杂性。使用它能否带来改进,最好通过对系统进行剖析来判断:好在ReadWriteLock使用Lock作为读写部分的锁,所以如果剖析得的结果发现读写锁没有能提高性能,把读写锁置换为独占锁是比较容易。
下面我们用synchonized来进行读操作,对于读操作性能如何呢?
例子如下:
public class ReadWriteLockTest { private ReentrantReadWriteLock rw1 = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReadWriteLockTest test = new ReadWriteLockTest(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); } public synchronized void get(Thread thread) { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1){ System.out.println(thread.getName() + "正在读操作"); } System.out.println(thread.getName() + "读操作完成"); } }
运行结果如下:
可以看到要线程Thread0读操作完了,Thread1才能进行读操作。明显这样性能很慢。
现在我们用ReadWriteLock来进行读操作,看一下性能如何
例子如下:
public class ReadWriteLockTest { private ReentrantReadWriteLock rw1 = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReadWriteLockTest test = new ReadWriteLockTest(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); } public void get(Thread thread) { try { rw1.readLock().lock(); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1){ System.out.println(thread.getName() + "正在读操作"); } System.out.println(thread.getName() + "读操作完成"); } catch (Exception e) { e.printStackTrace(); } finally { rw1.readLock().unlock(); } } }
运行结果如下:
可以看到线程间是不用排队来读操作的。这样效率明显很高。
我们再看一下写操作,如下:
public class ReadWriteLockTest { private ReentrantReadWriteLock rw1 = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReadWriteLockTest test = new ReadWriteLockTest(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); } public void get(Thread thread) { try { rw1.writeLock().lock(); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1){ System.out.println(thread.getName() + "正在写操作"); } System.out.println(thread.getName() + "写操作完成"); } catch (Exception e) { e.printStackTrace(); } finally { rw1.writeLock().unlock(); } } }
运行结果如下:
可以看到ReadWriteLock只允许一个写者。
公平锁
ReentrantReadWriteLock为两个锁提供了可重入的加锁语义,它是继承了ReadWriteLock,扩展了ReadWriteLock。它与ReadWriteLock相同,ReentrantReadWriteLock能够被构造
为非公平锁(构造方法不设置参数,默认是非公平),或者公平。在公平锁中,选择权交给等待时间最长的线程;如果锁由读者获得,而一个线程请求写入锁,那么不再允许读者获得读取锁,直到写者被受理,平且已经释放了写锁。
在非公平的锁中,线程允许访问的顺序是不定的。由写者降级为读者是允许的;从读者升级为写者是不允许的(尝试这样的行为会导致死锁)
当锁被持有的时间相对较长,并且大部分操作都不会改变锁守护的资源,那么读写锁能够改进并发性。ReadWriteMap使用了ReentrantReadWriteLock来包装Map,使得它能够在多线程间
被安全的共享,并仍然能够避免 “读-写” 或者 ”写-写“冲突。显示中ConcurrentHashMap并发容器的性能已经足够好了,所以你可以是使用他,而不必使用这个新的解决方案,如果你需要并发的部分
只有哈希Map,但是如果你需要为LinkedHashMap这种可替换元素Map提供更好的并发访问,那么这项技术是非常有用的。
用读写锁包装的Map如下图:
读写锁的性能如下图:
总结:
显式的Lock与内部锁相比提供了一些扩展的特性,包括处理不可用的锁时更好的灵活性,以及对队列行为更好的控制。但是ReentrantLock不能完全替代synchronized;只有当你需要
synchronized没能提供的特性时才应该使用。
读-写锁允许多个读者并发访问被守护的对象,当访问多为读取数据结构的时候,它具有改进可伸缩性的潜力。
数据库层面上的锁——悲观锁和乐观锁
乐观锁:他对世界比较乐观,认为别人访问正在改变的数据的概率是很低的,所以直到修改完成准备提交所做的的修改到数据库的时候才会将数据锁住。完成更改后释放。
我想一下一个这样的业务场景:我们从数据库中获取了一条数据,我们正要修改他的数据时,刚好另外一个用户此时已经修改过了这条数据,这是我们是不知道别人修改过这条数据的。
解决办法,我们可以在表中增加一个version字段,让这个version自增或者自减,或者用一个时间戳字段,这个时间搓字段是唯一的。我们写数据的时候带上version,也就是每个人更新的时候都会判断当前的版本号是否跟我查询出来得到的版本号是否一致,不一致就更新失败,一致就更新这条记录并更改版本号。
例子如下:
1.查询出商品信息 select (status,status,version) from t_goods where id=#{id} 2.根据商品信息生成订单 3.修改商品status为2 update t_goods set status=2,version=version+1 where id=#{id} and version=#{version};
用户体验表现层面通常表现为系统繁忙之类的。
在这里还要注意乐观锁的一个细节:就是version字段要自增或者自减,否者会出现ABA问题。
ABA问题:线程Thread1拿到了version字段为A,由于CAS操作(即先进行比较然后设值),线程Thread2先拿到的version,将version改成B,线程Thread3来拿到version,将version值又改回了A。此时Thread1的CAS(先比较后set值)操作结束了,继续执行,它发现version的值还是A,以为没有发生变化,所以就继续执行了。这个过程中,version从A变为B,再由B变为A就被形象地称为ABA问题了。
悲观锁:也称排它锁,当事务在操作数据时把这部分数据进行锁定,直到操作完毕后再解锁,其他事务操作才可操作该部分数据。这将防止其他进程读取或修改表中的数据。
一般使用 select …for update 对所选择的数据进行加锁处理,例如
select * from account where name=”JAVA” for update,
这条sql 语句锁定了account 表中所有符合检索条件(name=”JAVA”)的记录。本次事务提交之前(事务提交时会释放事务过程中的锁),外界无法修改这些记录。
用户界面常表现为转圈圈等待。
如果数据库分库分表了,不再是单个数据库了,那么我们可以用分布式锁,比如redis的setnx特性,zookeeper的节点唯一性和顺序性特性来做分布式锁。