CAS,Compare And Swap,即比较并交换。Doug lea大神在同步组件中大量使用CAS技术鬼斧神工地实现了Java多线程的并发操作。整个AQS同步组件、Atomic原子类操作等等都是以CAS为基础实现的,甚至ConcurrentHashMap在1.8的版本中也调整为了CAS+Synchronized。可以说CAS是整个JUC的基石,是乐观并发策略的一种实现,硬件保证一个语义上看起来需要多次操作(比较并交换)的行为通过一条处理器指令就能完成。
CAS分析
在CAS指令中有3个操作数,分别是内存值V(在Java中可以简单理解为变量的内存地址)、旧的预期值A(进行运算前从内存中读取的值)和准备写入的值B(运算得到的值)。在CAS指令执行时,只有当内存值V等于旧的预期值A时,处理器才会将内存值V更新为值B,否则它就不执行更新,但是无论是否更新了V的值,都会返回V的旧值,这个处理过程是一个原子操作,由硬件来保证。
模拟CAS操作:
public class CompareAndSwap{ private int value; //获取内存值
public synchronized int get() { return value; } /** * 比较当前内存值和旧的预期值,只有两个值相等的情况,进行更新 * @param expectedValue 旧的预期值 - 在进行运算前从内存中读取的值 * @param newValue 拟写入的新值 - 运算得到的值,即拟写入内存的值 * @return
*/
public synchronized int compareAndSwap(int expectedValue, int newValue){ int oldValue = value; //比较当前内存值和旧的预期值 如果相等,将更新值赋给内存值
if (oldValue == expectedValue) { this.value = newValue; } return oldValue; } //设置
public synchronized boolean compareAndSet(int expectedValue, int newValue){ return expectedValue == compareAndSwap(expectedValue, newValue); } }
常见的使用情况是:线程首先从内存位置V中读取到预期值A,在执行写入前,比较当前内存值和旧的预期值A是否相等,如果相等,就将计算得到的值赋给内存值。不相等则说明,期间有其他线程修改了内存位置V的值。
当多个线程使用CAS同时更新一个变量值时,只有其中一个线程能够更新成功,其他的线程都将失败。但是,失败的线程不会被挂起(但如果获取锁失败,线程将被挂起),而是返回失败状态,调用者线程可以选择是否需要再一次尝试(如果是在一些竞争激烈的情况下,更好的方式是在重试之前等待一段时间或者回退,从而避免活锁问题–不断重试,不断失败),或者执行一些恢复操作,也可以什么都不做。
JUC下的atomic类都是通过CAS来实现的,下面就以AtomicInteger为例来阐述CAS的实现。如下:
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
Unsafe是CAS的核心类,但是Unsafe类不是提供给用户程序调用的(Unsafe.getUnsafe()代码限制了只有启动类加载器(Bootstrap ClassLoader)加载的Class才能访问它),因此,如果不使用反射,只能通过其他的java API来间接使用。 比如J.U.C包中的原子类。其中整数原子类有compareAndSet() 和 getAndIncrement()方法都是用了Unsafe类的CAS操作。该操作由Unsafe类里面的compareAndSwapInt()和compareAndSwapLong()等几个方法包装提供,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令。
我们就以AtomicInteger的incrementAndGet()方法来做说明,先看源代码:
/** * Atomically increments by one the current value. * * @return the updated value */
public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
内部调用unsafe的getAndAddInt方法,在getAndAddInt方法中主要是看compareAndSwapInt方法:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
该方法为native方法,有四个参数,分别代表:对象、对象的地址、预期值、修改值。
CPU提供了两种方法来实现多处理器的原子操作:总线加锁或者缓存加锁。
总线加锁:总线加锁就是就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占使用共享内存。但是这种处理方式显得有点儿霸道,不厚道,他把CPU和内存之间的通信锁住了,在锁定期间,其他处理器都不能其他内存地址的数据,其开销有点儿大。所以就有了缓存加锁。
缓存加锁:其实针对于上面那种情况我们只需要保证在同一时刻对某个内存地址的操作是原子性的即可。缓存加锁就是缓存在内存区域的数据如果在加锁期间,当它执行锁操作写回内存时,处理器不在输出LOCK#信号,而是修改内部的内存地址,利用缓存一致性协议来保证原子性。缓存一致性机制可以保证同一个内存区域的数据仅能被一个处理器修改,也就是说当CPU1修改缓存行中的i时使用缓存锁定,那么CPU2就不能同时缓存了i的缓存行。
CAS缺陷
CAS虽然高效地解决了原子操作,但是还是存在一些缺陷的,它无法涵盖互斥同步的所有场景,缺陷主要表现在三个方面:循环时间太长、只能保证一个共享变量原子操作、ABA问题。
循环时间太长
如果CAS一直不成功呢?这种情况绝对有可能发生,如果CAS自旋长时间地不成功,则会给CPU带来非常大的开销。在JUC中有些地方就限制了CAS自旋的次数,例如BlockingQueue的SynchronousQueue。
只能保证一个共享变量原子操作
看了CAS的实现就知道这只能针对一个共享变量,如果是多个共享变量就只能使用锁了,当然如果你有办法把多个变量整成一个变量,利用CAS也不错。例如读写锁中state的高低位
ABA问题
CAS中存在这样一种场景:如果一个变量V初次读取的时候是A值,如果在这段期间它的值曾经被改成了B,后来又被改回为A,那CAS检查就会误认为它从来没有被改变过,但是实质上它已经发生了改变,这就是CAS操作的”ABA”问题。对于ABA问题其解决方案是加上版本号,即在每个变量都加上一个版本号,每次改变时加1,即A —> B —> A,变成1A —> 2B —> 3A。
用一个例子来阐述ABA问题所带来的影响。
有如下链表
假如我们想要把B替换为A,也就是compareAndSet(this,A,B)。线程1执行B替换A操作,线程2主要执行如下动作,A 、B出栈,然后C、A入栈,最终该链表如下:
完成后线程1发现仍然是A,那么compareAndSet(this,A,B)成功,但是这时会存在一个问题就是B.next = null,compareAndSet(this,A,B)后,会导致C丢失,改栈仅有一个B元素,平白无故把C给丢失了。
J.U.C包为了解决这个问题,提供了一个带有标记的原子引用类AtomicStampedReference。AtomicStampedReference通过控制变量值的版本来保证CAS的正确性.
AtomicStampedReference的compareAndSet()方法定义如下:
/** * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference * and the current stamp is equal to the expected stamp. * * @param expectedReference the expected value of the reference * @param newReference the new value for the reference * @param expectedStamp the expected value of the stamp * @param newStamp the new value for the stamp * @return {@code true} if successful */
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
compareAndSet有四个参数,分别表示:预期引用、更新后的引用、预期标志、更新后的标志。源码很好理解预期的引用 == 当前引用,预期的标识 == 当前标识,如果更新后的引用和标志和当前的引用和标志相等则直接返回true,否则通过Pair生成一个新的pair对象与当前pair CAS替换。Pair为AtomicStampedReference的内部类,主要用于记录引用和版本戳信息(标识),定义如下:
private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } }
Pair记录着对象的引用和版本戳,版本戳为int型,保持自增。同时Pair是一个不可变对象,其所有属性全部定义为final,对外提供一个of方法,该方法返回一个新建的Pari对象。pair对象定义为volatile,保证多线程环境下的可见性。在AtomicStampedReference中,大多方法都是通过调用Pair的of方法来产生一个新的Pair对象,然后赋值给变量pair。如set方法:
/** * Unconditionally sets the value of both the reference and stamp. * * @param newReference the new value for the reference * @param newStamp the new value for the stamp */
public void set(V newReference, int newStamp) { Pair<V> current = pair; if (newReference != current.reference || newStamp != current.stamp) this.pair = Pair.of(newReference, newStamp); }
下面我们将通过一个例子可以可以看到AtomicStampedReference和AtomicInteger的区别。我们定义两个线程,线程1负责将100 —> 110 —> 100,线程2执行 100 —>120,看两者之间的区别。
public class Test { private static AtomicInteger atomicInteger = new AtomicInteger(100); private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1); public static void main(String[] args) throws InterruptedException { //AtomicInteger
Thread at1 = new Thread(new Runnable() { @Override public void run() { atomicInteger.compareAndSet(100,110); atomicInteger.compareAndSet(110,100); } }); Thread at2 = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(2); // at1,执行完
} catch (InterruptedException e) { e.printStackTrace(); } System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120)); } }); at1.start(); at2.start(); at1.join(); at2.join(); //AtomicStampedReference
Thread tsf1 = new Thread(new Runnable() { @Override public void run() { try { //让 tsf2先获取stamp,导致预期时间戳不一致
TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } // 预期引用:100,更新后的引用:110,预期标识getStamp() 更新后的标识getStamp() + 1
atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1); atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1); } }); Thread tsf2 = new Thread(new Runnable() { @Override public void run() { int stamp = atomicStampedReference.getStamp(); try { TimeUnit.SECONDS.sleep(2); //线程tsf1执行完
} catch (InterruptedException e) { e.printStackTrace(); } System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1)); } }); tsf1.start(); tsf2.start(); } }
运行结果:
运行结果充分展示了AtomicInteger的ABA问题和AtomicStampedReference解决ABA问题。
Ref:
http://cmsblogs.com/?p=2235