Java当中提供了一些有关无锁类的使用,在底部使用比较交换指令来实现。一般来说有锁的方式,会导致线程可能会阻塞、挂起,在进入临界区之前由系统对它进行阻塞和挂起,相对来讲无锁的性能会更好些,除非是人为的挂起线程,否则通过无锁的方式线程是不可能被挂起的只会不断的重试。如果线程被挂起,做一次线程的上下文切换可能需要8万个时钟周期,
但是如果做重试的操作(比如循环体),除非重试的操作过多,否则一般基本上无锁的操作比有锁的方式要好很多。
1.无锁的原理详解
1.CAS(Compare And Swap/Set)比较并交换
CAS算法的过程是这样:它包含3个参数CAS(V,E,N)。V表示要更新的变量(内存值),E表示预期值(旧的),N表示新值。当且仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的(乐观锁),它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即时没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
CAS 整个操作过程是原子操作,是一条CPU指令完成的
2.CPU指令
2.无锁类
JDK1.5的原子包:java.util.concurrent.atomic
这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。
1.AtomicInteger
可以用原子方式更新的 int 值。
主要用于在高并发环境下的高效程序处理。使用非阻塞算法来实现并发控制。
Public class AtomicInteger extends Number
implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
//偏移量
private static final long valueOffset;
static {
try {
//获取偏移量 指value这个值在类当中的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//AtomicInteger内部对int类型的value进行包装,所有的操作都是对这个value操作
private volatile int value;
/**
* 返回当前值
*/
public final int get() {
return value;
}
/**
* 设置当前值
* @param 当前值
*/
public final void set(int newValue) {
value = newValue;
}
/**
* 设置新值并返回旧值
*
* @param 新值
* @return 旧值(上一个值)
*/
public final int getAndSet(int newValue) {
for (;;) {
int current = get();
if (compareAndSet(current, newValue))
return current;
}
}
/**
* 如果当前值是期望值 expect 则设置为 update 值
*
*
* @param 期望(老的)值
* @param 新值
* @return 是否设置成功,返回失败代表新值和期望值是不相同的
*/
public final boolean compareAndSet(int expect, int update) {
//unsafe是个不安全的操作,会提供类似于指针之类的操作
//对于这个类(this)的这个偏移量(valueOffset)上的数据查看期望值是多少
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 当前值加一,并返回旧值
*
* @return 旧值
*/
public final int getAndIncrement() {
for (;;) {
//获取当前值
int current = get();
int next = current + 1;
//比较期望值是否是当前值(current) 目标是+1后的值
//如果在做完+1操作后 有线程修改了value值,那么current就和期望值是不相符的,所以设置失败,继续下次执行.如果设置成功,则返回current
if (compareAndSet(current, next))
return current;
}
}
/**
* 当前值减一,并返回旧值
*
* @return 旧值
*/
public final int getAndDecrement() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return current;
}
}
/**
* 当前值加delta,并返回
*
* @param
* @return 旧值
*/
public final int getAndAdd(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
/**
* 当前值加一,并返回新值
*
* @return 新值
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
/**
* 当前值减一,并返回新值
*
* @return 新值
*/
public final int decrementAndGet() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return next;
}
}
/**
* 当前值加delta,并返回新值
*
* @param
* @return 新值
*/
public final int addAndGet(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return next;
}
}
}
关于偏移量:例如C里面的结构体
Java :所以unsafe可以通过偏移量对变量进行操作
2.Unsafe
/**
* 非安全的操作,比如:
* 根据偏移量设置值
* park()
* 底层的CAS操作
* 非公开API,在不同版本的JDK中,可能有较大差异
*/
public final class Unsafe {
private static native void registerNatives();
static {
registerNatives();
sun.reflect.Reflection.registerMethodsToFilter(Unsafe.class, "getUnsafe");
}
private Unsafe() {}
private static final Unsafe theUnsafe = new Unsafe();
/**
* 获取给定对象偏移量上的int值
* @param 对象
* @param 偏移量
*
* @return 该偏移量上的整数
* @throws RuntimeException No defined exceptions are thrown, not even
* {@link NullPointerException}
*/
public native int getInt(Object o, long offset);
/**
* 设置给定对象偏移量上面的值(int类型)
* @param 对象
* @param 偏移量
* @param 整数
* @throws RuntimeException No defined exceptions are thrown, not even
* {@link NullPointerException}
*/
public native void putInt(Object o, long offset, int x);
/**
* 获取字段在对象中的偏移量
* @param 对象中的字段
*/
public native long objectFieldOffset(Field f);
/**
* 设置给定对象偏移量上面的值(int 类型) volatile语义
*/
public native void putIntVolatile(Object o, long offset, int x);
/**
* 获取给定对象偏移量上的int值 volatile语义
*/
public native int getIntVolatile(Object o, long offset);
/**
* 和putIntVolatile()一样,但它要求被操作的字段就是volatile类型的
*/
public native void putOrderedInt(Object o, long offset, int x);
}
3.AtomicReference
/**
* 对引用进行修改
* AtomicInteger 是对整数进行封装,而AtomicReference封装的其实是对象的引用
* 是一个模板类,抽象化了数据类型
*/
public class AtomicReference<V> implements java.io.Serializable {
private static final long serialVersionUID = -1848883965231344442L;
//偏移量
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
//获取偏移量 指value这个值在类当中的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile V value;
/**
* 获取当前值
*
* @return 当前值
*/
public final V get() {
return value;
}
/**
* 设置当前值
*
* @param 当前值
*/
public final void set(V newValue) {
value = newValue;
}
/**
* 如果当前值是期望值 expect 则设置为 update 值
* @param 期望(老的)值
* @param 新值
* @return 是否设置成功,返回失败代表新值和期望值是不相同的
*/
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
/**
* 设置新值并返回旧值
*
* @param 新值
* @return 旧值(上一个值)
*/
public final V getAndSet(V newValue) {
while (true) {
V x = get();
if (compareAndSet(x, newValue))
return x;
}
}
}
AtomicReference与volatile的区别
首先volatile是java中关键字用于修饰变量,AtomicReference是并发包java.util.concurrent.atomic下的类。
首先volatile作用,当一个变量被定义为volatile之后,看做“程度较轻的 synchronized”,具备两个特性:
1.保证此变量对所有线程的可见性(当一条线程修改这个变量值时,新值其他线程立即得知)
2.禁止指令重新排序
注意volatile修饰变量不能保证在并发条件下是线程安全的,因为java里面的运算并非原子操作。
volatile说明
java.util.concurrent.atomic工具包,支持在单个变量上解除锁的线程安全编程。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。
4.AtomicStampedReference
在运用CAS做操作中有一个经典的ABA问题:
如果场景是和过程状态无关的,只跟结果有关系,那么影响不大,但是有些情况之下,场景可能和过程有关的.当你对数据变化过程是敏感的时候,普通的CAS操作是无法辨别上图2个A的区别的.
/**
* 原子更新带有版本号的引用类型。
* 该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号。
* 可以解决使用CAS进行原子更新时,可能出现的ABA问题。
*/
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
//最好不要重复的一个数据,决定数据是否能设置成功
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
//根据reference和stamp来生成一个Pair的实例
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
//对Value 进行封装,放入Pair里面
private volatile Pair<V> pair;
/**
* 返回当前的对象
*/
public V getReference() {
return pair.reference;
}
/**
* 返回当前的stamp
*/
public int getStamp() {
return pair.stamp;
}
/**
* 当当前的引用数据和期望的引用数据相等并且当前stamp和期望的stamp也相等
* 并且
* (当前的引用数据和新的引用数据相等并且当前stamp和新的stamp也相等
* 或者cas操作成功
* )
* @param 期望(老的)的引用数据
* @param 新的引用数据
* @param 期望(老的)的stamp值
* @param 新的stamp值
* @return
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
/**
* 当前值和期望值比较设置
*/
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
}
使用方法:
AtomicStampedReference<Integer> num = new AtomicStampedReference<Integer>(1, 0);
public void test () {
Integer i = num.getReference();
int stamped = num.getStamp();
if (num.compareAndSet(i, i+1, stamped, stamped+1 )) {
System.out.println("设置成功");
}
}
5.AtomicIntegerArray
/**
* 原子更新整型数组里的元素。
*/
public class AtomicIntegerArray implements java.io.Serializable {
private static final long serialVersionUID = 2862133569453604235L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
//数组所在的基地址
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int shift;
private final int[] array;
static {
//数组中元素的宽度,比如int占4个byte,所以此处scale应该是4
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
//31减去前导零(数字变成二进制后前面0的个数)这里shift 为2
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
/**
* 获取第i个元素在数组中的偏移量
*/
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
/**
* 获取第i个元素在数组中的偏移量
*/
private static long byteOffset(int i) {
//因为shift=2 所以相当于 i*4+base
return ((long) i << shift) + base;
}
/**
* 获取数组的大小
*/
public final int length() {
return array.length;
}
/**
* 获取数组中第i个下标的元素
*/
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}
/**
* 通过偏移量获取数组中的元素
*/
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}
/**
* 设置数组中第i个下标的元素
*/
public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}
/**
* 将数组中第i个下标设置为新值,并返回旧值
*/
public final int getAndSet(int i, int newValue) {
long offset = checkedByteOffset(i);
while (true) {
int current = getRaw(offset);
if (compareAndSetRaw(offset, current, newValue))
return current;
}
}
/**
*如果第i个下标的元素等于expect,则设置为update,设置成功返回true
*/
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
}
/**
* 数组中第i个下标的元素+1,返回旧值
*/
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
/**
* 数组中第i个下标的元素-1,返回旧值
*/
public final int getAndDecrement(int i) {
return getAndAdd(i, -1);
}
/**
* 数组中第i个下标元素+delta 并返回旧值
*/
public final int getAndAdd(int i, int delta) {
long offset = checkedByteOffset(i);
while (true) {
int current = getRaw(offset);
if (compareAndSetRaw(offset, current, current + delta))
return current;
}
}
/**
* 数组中第i个下标的元素+1,返回新值
*/
public final int incrementAndGet(int i) {
return addAndGet(i, 1);
}
/**
* 数组中第i个下标的元素-1,返回新值
*/
public final int decrementAndGet(int i) {
return addAndGet(i, -1);
}
/**
* 数组中第i个下标元素+delta 并返回新值
*/
public final int addAndGet(int i, int delta) {
long offset = checkedByteOffset(i);
while (true) {
int current = getRaw(offset);
int next = current + delta;
if (compareAndSetRaw(offset, current, next))
return next;
}
}
}
原子操作的实现原理
http://ifeve.com/atomic-operation/#header
Java JUC之Atomic系列12大类实例讲解和原理分解