在java中,无锁并发数据结构已经有一大堆了,例如,ConcurrentLinkedQueue、ConcurrentSkipListSet、ConcurrentSkipListMap、ConcurrentHashMap(ConcurrentHashMap实际上是有锁的,只是锁的粒度更小)等。
但是在某个应用中,我需要一种无锁并发的数组容器,并且在容器生成时就指定容量,不需要动态扩展(以避免jvm恼人的频繁gc)。于是 ConcurrentArrayQueue 就有存在的必要了,而jdk并没有提供这种结构,我们自己动手写一个。
首先搜索了网上相关资料,找到一种c++的实现,看了一下,实现的还是非常巧妙的,这里是传送门。
将其翻译成java代码如下
import java.util.concurrent.atomic.AtomicInteger;
/**
* 参考:
* http://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular
*/
public class ConcurrentArrayQueue <T> {
// 环状缓存
private final Object[] ring;
private final AtomicInteger maximumReadIndex = new AtomicInteger(0);
private final AtomicInteger readIndex = new AtomicInteger(0);
private final AtomicInteger writeIndex = new AtomicInteger(0);
public ConcurrentArrayQueue(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException("Illegal capacity " + capacity);
ring = new Object[capacity + 1];
}
public boolean push(T e) {
int currentReadIndex, currentWriteIndex;
do {
currentReadIndex = readIndex.get();
currentWriteIndex = writeIndex.get();
// check if queue is full
if (((currentWriteIndex + 1) % ring.length) ==
(currentReadIndex % ring.length))
return false;
} while (!writeIndex.compareAndSet(currentWriteIndex, currentWriteIndex + 1));
// We know now that this index is reserved for us. Use it to save the data
ring[currentWriteIndex % ring.length] = e;
// update the maximum read index after saving the data.
// It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while (!maximumReadIndex.compareAndSet(currentWriteIndex, currentWriteIndex + 1)) {
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield() (POSIX.1b)
Thread.yield();
}
return true;
}
public T pop() {
int currentMaximumReadIndex;
int currentReadIndex;
while (true) {
currentReadIndex = readIndex.get();
currentMaximumReadIndex = maximumReadIndex.get();
// The queue is empty or a producer thread has allocate space in the queue
// but is waiting to commit the data into it
if ((currentReadIndex % ring.length) ==
(currentMaximumReadIndex % ring.length))
return null;
// retrieve the data from the queue
@SuppressWarnings("unchecked")
T ret = (T) ring[currentReadIndex % ring.length];
if (readIndex.compareAndSet(currentReadIndex, currentReadIndex + 1))
return ret; // 这里没有办法清理残余的引用,可能导致内存泄露
}
}
public int size() {
int ret = writeIndex.get() - readIndex.get();
if (ret < 0)
return 0;
return ret;
}
public void clear() {
while (size() > 0)
pop();
}
public int capactity() {
return ring.length - 1;
}
}
其原理就不多说了,给出的参考文章说的比较清楚了(特别是其给出的图片:-)
然后说一下应用,这里是用上面的结构写了一个对象池(ObjectPool)。
首先是可缓存对象工厂接口:
/**
* 用于配置对象池
*/
public abstract class PoolableObjectFactory <T> {
/**
* 新建对象
*/
public abstract T newObject();
/**
* 还原对象状态,避免内存泄露和垃圾信息导致的不确定性问题
*/
public abstract void passivateObject(T obj);
/**
* 最多缓存的对象数
*/
public int maxPooled() {
return 10;
}
}
接着是对象池:
import util.ConcurrentArrayQueue;
/**
* 对象池
*/
public class ObjectPool <T> {
// XXX 使用数组而不是链表,是为了避免反复的 new 节点
private final ConcurrentArrayQueue<T> ring;
private final PoolableObjectFactory<T> factory;
public ObjectPool(PoolableObjectFactory<T> factory) {
this.factory = factory;
this.ring = new ConcurrentArrayQueue<T>(factory.maxPooled());
}
public T borrowObject() {
T ret = ring.pop();
if (ret == null)
ret = factory.newObject();
return ret;
}
public T returnObject(T obj) {
if (obj == null)
return null;
// 清理对象
factory.passivateObject(obj);
// 对象入池
ring.push(obj);
// 丢弃对象
return null;
}
public void clear() {
ring.clear();
}
}