前言
Java 5 中由Doug Lea大神写的atomic classes 中引入了 Field Updater,本质上来说就是volatile 字段的包装器,下面我们看看该如何使用:
AtomicIntegerFieldUpdater
首先看看类的定义:
public abstract class AtomicIntegerFieldUpdater<T> {
/**
* 抽象方法,提供一个静态方法以便得到实例
*/
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
}
private static final class AtomicIntegerFieldUpdaterImpl<T>
extends AtomicIntegerFieldUpdater<T> {
AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
final String fieldName,
final Class<?> caller) {
final Field field;
final int modifiers;
try {
field = AccessController.doPrivileged(
new PrivilegedExceptionAction<Field>() {
public Field run() throws NoSuchFieldException {
//字段不存在会抛异常
return tclass.getDeclaredField(fieldName);
}
});
//检查访问级别,
modifiers = field.getModifiers();
sun.reflect.misc.ReflectUtil.ensureMemberAccess(
caller, tclass, null, modifiers);
ClassLoader cl = tclass.getClassLoader();
ClassLoader ccl = caller.getClassLoader();
if ((ccl != null) && (ccl != cl) &&
((cl == null) || !isAncestor(cl, ccl))) {
sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
}
} catch (PrivilegedActionException pae) {
throw new RuntimeException(pae.getException());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
//必须是int
if (field.getType() != int.class)
throw new IllegalArgumentException("Must be integer type");
//必须用volatile修饰
if (!Modifier.isVolatile(modifiers))
throw new IllegalArgumentException("Must be volatile type");
this.cclass = (Modifier.isProtected(modifiers)) ? caller : tclass;
this.tclass = tclass;
//用Unsafe里的那一坨方法去原子更新
this.offset = U.objectFieldOffset(field);
}
}
那么有那些使用场景呢,我们可以看到Cassandra中有一些地方用到了,比如:
public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T>{
private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
= AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
private volatile int failures = 0;
public void get() throws WriteTimeoutException, WriteFailureException
{
long timeout = currentTimeout();
boolean success;
try
{
success = condition.await(timeout, TimeUnit.NANOSECONDS);
}
catch (InterruptedException ex)
{
throw new AssertionError(ex);
}
if (!success)
{
int blockedFor = totalBlockFor();
int acks = ackCount();
if (acks >= blockedFor)
acks = blockedFor - 1;
throw new WriteTimeoutException(writeType, consistencyLevel, acks, blockedFor);
}
//可以看到平常可以直接访问failures,而不用像AtomicInteger一样还需要调用get方法
if (totalBlockFor() + failures > totalEndpoints())
{
throw new WriteFailureException(consistencyLevel, ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
}
}
@Override
public void onFailure(InetAddress from, RequestFailureReason failureReason)
{
logger.trace("Got failure from {}", from);
//当需要类似AtomicInteger一样的功能时,我们知道volatile执行++是非原子的
int n = waitingFor(from)
? failuresUpdater.incrementAndGet(this)
: failures;
failureReasonByEndpoint.put(from, failureReason);
if (totalBlockFor() + n > totalEndpoints())
signal();
}
}
小结
总体来看使用场景不多,有点类似AtomicInteger,但是可以使用比较方便,比如定义了一个变量x,你可以直接正常访问x,而如果使用了AtomicInteger的话,还是需要调用AtomicInteger#get方法,另外如果你又需要原子的get-set的话也可以满足使用场景;另外比如说你有个比较大的链表,内部每个节点都需要原子的get-set,那每个节点都要定义一个AtomicInteger,这样会消耗更多的内存,那么就可以定义个static的AtomicIntegerFieldUpdater。
AtomicReferenceFieldUpdater
和Integer差不多了,只不过这个是用来更新引用的,所以就不过多解读了,Java类库中BufferedInputStream就调用了这个类:
public
class BufferedInputStream extends FilterInputStream {
protected volatile byte buf[];
/*
* 原子的更新内部数组,比如扩容、关闭时,
*/
private static final
AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater
(BufferedInputStream.class, byte[].class, "buf");
public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
//放在一个循环中,如果CAS更新失败,那么就读取最新的buf引用,继续CAS更新
if (bufUpdater.compareAndSet(this, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
}
}
}
另外还有JDK1.7之前的ConcurrentLinkedQueue,也有了这个机制,但是可以1.7之后切换到了Unsafe的方案, 主要还是因为性能原因,FieldUpdater的方案需要使用反射API配合,而Unsafe不用,而且有些JVM会把Unsafe的调用内联,理论上看会快很多(Todo :待测试)
//JDK1.6
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
private static final
AtomicReferenceFieldUpdater<Node, Node>
nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(Node.class, Node.class, "next");
private static final
AtomicReferenceFieldUpdater<Node, Object>
itemUpdater =
AtomicReferenceFieldUpdater.newUpdater
(Node.class, Object.class, "item");
}
//JDK1.8
private static class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
总结
总的来说,JDK类库里面使用较多一点,大部分场景直接使用JUC下面类似ConcurrentHashMap这些就够了,真遇到性能不满足的场景,再根据Profile的结果针对性优化。