1、CopyOnWriteArrayList
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
// 用ReentrantLock实现线程安全
final transient ReentrantLock lock = new ReentrantLock();
// 没有size域,因为每次添加元素时,array都会改变,size和array的大小始终一致
private transient volatile Object[] array;
// 很简单,set(int index, E element)方法和add(E e)类似
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 写时复制
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
final void setArray(Object[] a) {
array = a;
}
}
迭代时,迭代器的数组只是一个快照,不是最新的。迭代器里定义了Object[] snapshot,初始化迭代器时snapshot = array。CopyOnWriteArraySet用CopyOnWriteArrayList实现,add()时调用CopyOnWriteArrayList的addIfAbsent()。
2、ArrayBlockingQueue和LinkedBlockingQueue
BlockingQueue的添加操作有【1】add(E),队列满时会抛异常【2】offer(E),offer(E,long, TimeUnit),等待一段时间,队列有空间就放入,一直满就返回false【3】put(E)队满会阻塞;取出操作有【1】poll(long,TimeUnit)一直无数据可取则返回null【2】remove()无数据则抛异常【3】take()无数据阻塞
获取队头的操作有(BlockingQueue里没有定义这两个方法)【1】element()队列为空抛异常【2】peek()不抛异常
(1)ArrayBlockingQueue,如下用循环队列和ReentrantLock实现,构造时可以指定大小和锁的公平性,默认用非公平锁。put(),take()时调用lockInterruptibly(),不满足条件则在Condition上等待。迭代时也会加锁。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
// 可以指定大小和锁的公平性
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 循环队列
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
// 操作前后加锁解锁,take()类似
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 队列已满,在notFull上等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
}
(2)LinkedBlockingQueue,用两个ReentrantLock实现,可以同时put(),take(),引入了原子变量来计数。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
private final int capacity;
// 因为用了两个ReentrantLock,可以同时take(),put(),所以count必须是原子的
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
// 构造时也可以指定容量,不指定则默认Integer.MAX_VALUE
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public void put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
}
为什么ArrayBlockingQueue用一把锁,LinkedBlockingQueue用两把?因为LinkedBlockingQueue需要构造节点(构造节点不在lock块里,应该主要是为了节省调整指针的时间)、调整指针,等待的时间比ArrayBlockingQueue(只需改变数组的相应位置,很快)长,用两把锁同时存取速度更快;而ArrayBlockingQueue如果用双锁,必须引入AtomicInteger,性能上几乎没有提升,代码更复杂了。
3、ConcurrentLinkedQueue
head节点值为null,next指向第一个元素。插入时,tail.next != null时更新tail,tail可能是倒数第一或第二个节点。offer()时先定位尾节点,CAS将入队节点更新为尾节点的next节点,如果之前尾节点是倒数第二个节点,则更新尾节点。