JUC源码——CopyOnWriteArrayList、(Array/Linked)BlockingQueue、ConcurrentLinkedQueue(1.8)

1、CopyOnWriteArrayList

public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
	
	// 用ReentrantLock实现线程安全
	final transient ReentrantLock lock = new ReentrantLock();
	
	// 没有size域,因为每次添加元素时,array都会改变,size和array的大小始终一致
	private transient volatile Object[] array;
	
	// 很简单,set(int index, E element)方法和add(E e)类似
	public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 写时复制
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
	
	final void setArray(Object[] a) {
        array = a;
    }
}

    迭代时,迭代器的数组只是一个快照,不是最新的。迭代器里定义了Object[] snapshot,初始化迭代器时snapshot = array。CopyOnWriteArraySet用CopyOnWriteArrayList实现,add()时调用CopyOnWriteArrayList的addIfAbsent()。

2、ArrayBlockingQueue和LinkedBlockingQueue

    BlockingQueue的添加操作有【1】add(E),队列满时会抛异常【2】offer(E),offer(E,long, TimeUnit),等待一段时间,队列有空间就放入,一直满就返回false【3】put(E)队满会阻塞;取出操作有【1】poll(long,TimeUnit)一直无数据可取则返回null【2】remove()无数据则抛异常【3】take()无数据阻塞

    获取队头的操作有(BlockingQueue里没有定义这两个方法)【1】element()队列为空抛异常【2】peek()不抛异常

    (1)ArrayBlockingQueue,如下用循环队列和ReentrantLock实现,构造时可以指定大小和锁的公平性,默认用非公平锁。put(),take()时调用lockInterruptibly(),不满足条件则在Condition上等待。迭代时也会加锁。

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
	final Object[] items;

	int takeIndex;

	int putIndex;

	int count;

	final ReentrantLock lock;

	private final Condition notEmpty;

	private final Condition notFull;

	// 可以指定大小和锁的公平性
	public ArrayBlockingQueue(int capacity, boolean fair) {
		if (capacity <= 0)
			throw new IllegalArgumentException();
		this.items = new Object[capacity];
		lock = new ReentrantLock(fair);
		notEmpty = lock.newCondition();
		notFull = lock.newCondition();
	}

	private void enqueue(E x) {
		final Object[] items = this.items;
		items[putIndex] = x;
		// 循环队列
		if (++putIndex == items.length)
			putIndex = 0;
		count++;
		notEmpty.signal();
	}
        // 操作前后加锁解锁,take()类似
	public void put(E e) throws InterruptedException {
		checkNotNull(e);
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			while (count == items.length)
				// 队列已满,在notFull上等待
				notFull.await();
			enqueue(e);
		} finally {
			lock.unlock();
		}
	}
}

    (2)LinkedBlockingQueue,用两个ReentrantLock实现,可以同时put(),take(),引入了原子变量来计数

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

	private final int capacity;

	// 因为用了两个ReentrantLock,可以同时take(),put(),所以count必须是原子的
	private final AtomicInteger count = new AtomicInteger();

	transient Node<E> head;

	private transient Node<E> last;

	private final ReentrantLock takeLock = new ReentrantLock();

	private final Condition notEmpty = takeLock.newCondition();

	private final ReentrantLock putLock = new ReentrantLock();

	private final Condition notFull = putLock.newCondition();

	// 构造时也可以指定容量,不指定则默认Integer.MAX_VALUE
	public LinkedBlockingQueue(int capacity) {
		if (capacity <= 0)
			throw new IllegalArgumentException();
		this.capacity = capacity;
		last = head = new Node<E>(null);
	}

	public void put(E e) throws InterruptedException {
		if (e == null)
			throw new NullPointerException();
		int c = -1;
		Node<E> node = new Node<E>(e);
		final ReentrantLock putLock = this.putLock;
		final AtomicInteger count = this.count;
		putLock.lockInterruptibly();
		try {
			while (count.get() == capacity) {
				notFull.await();
			}
			enqueue(node);
			c = count.getAndIncrement();
			if (c + 1 < capacity)
				notFull.signal();
		} finally {
			putLock.unlock();
		}
		if (c == 0)
			signalNotEmpty();
	}

	public E take() throws InterruptedException {
		E x;
		int c = -1;
		final AtomicInteger count = this.count;
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lockInterruptibly();
		try {
			while (count.get() == 0) {
				notEmpty.await();
			}
			x = dequeue();
			c = count.getAndDecrement();
			if (c > 1)
				notEmpty.signal();
		} finally {
			takeLock.unlock();
		}
		if (c == capacity)
			signalNotFull();
		return x;
	}
}

    为什么ArrayBlockingQueue用一把锁,LinkedBlockingQueue用两把?因为LinkedBlockingQueue需要构造节点(构造节点不在lock块里,应该主要是为了节省调整指针的时间)、调整指针,等待的时间比ArrayBlockingQueue(只需改变数组的相应位置,很快)长,用两把锁同时存取速度更快;而ArrayBlockingQueue如果用双锁,必须引入AtomicInteger,性能上几乎没有提升,代码更复杂了。

3、ConcurrentLinkedQueue

    head节点值为null,next指向第一个元素。插入时,tail.next != null时更新tail,tail可能是倒数第一或第二个节点。offer()时先定位尾节点,CAS将入队节点更新为尾节点的next节点,如果之前尾节点是倒数第二个节点,则更新尾节点。

    原文作者:JUC
    原文地址: https://blog.csdn.net/weixin_39420024/article/details/80082321
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞