JUC源码——CopyOnWriteArrayList、(Array/Linked)BlockingQueue、ConcurrentLinkedQueue(1.8)

1、CopyOnWriteArrayList

public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    
    // 用ReentrantLock实现线程安全
    final transient ReentrantLock lock = new ReentrantLock();
    
    // 没有size域,因为每次添加元素时,array都会改变,size和array的大小始终一致
    private transient volatile Object[] array;
    
    // 很简单,set(int index, E element)方法和add(E e)类似
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 写时复制
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    final void setArray(Object[] a) {
        array = a;
    }
}

    迭代时,迭代器的数组只是一个快照,不是最新的。迭代器里定义了Object[] snapshot,初始化迭代器时snapshot = array。CopyOnWriteArraySet用CopyOnWriteArrayList实现,add()时调用CopyOnWriteArrayList的addIfAbsent()。

2、ArrayBlockingQueue和LinkedBlockingQueue

    BlockingQueue的添加操作有【1】add(E),队列满时会抛异常【2】offer(E),offer(E,long, TimeUnit),等待一段时间,队列有空间就放入,一直满就返回false【3】put(E)队满会阻塞;取出操作有【1】poll(long,TimeUnit)一直无数据可取则返回null【2】remove()无数据则抛异常【3】take()无数据阻塞

    获取队头的操作有(BlockingQueue里没有定义这两个方法)【1】element()队列为空抛异常【2】peek()不抛异常

    (1)ArrayBlockingQueue,如下用循环队列和ReentrantLock实现,构造时可以指定大小和锁的公平性,默认用非公平锁。put(),take()时调用lockInterruptibly(),不满足条件则在Condition上等待。迭代时也会加锁。

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    final Object[] items;

    int takeIndex;

    int putIndex;

    int count;

    final ReentrantLock lock;

    private final Condition notEmpty;

    private final Condition notFull;

    // 可以指定大小和锁的公平性
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        // 循环队列
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
        // 操作前后加锁解锁,take()类似
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                // 队列已满,在notFull上等待
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
}

    (2)LinkedBlockingQueue,用两个ReentrantLock实现,可以同时put(),take(),引入了原子变量来计数

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

    private final int capacity;

    // 因为用了两个ReentrantLock,可以同时take(),put(),所以count必须是原子的
    private final AtomicInteger count = new AtomicInteger();

    transient Node<E> head;

    private transient Node<E> last;

    private final ReentrantLock takeLock = new ReentrantLock();

    private final Condition notEmpty = takeLock.newCondition();

    private final ReentrantLock putLock = new ReentrantLock();

    private final Condition notFull = putLock.newCondition();

    // 构造时也可以指定容量,不指定则默认Integer.MAX_VALUE
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    public void put(E e) throws InterruptedException {
        if (e == null)
            throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
}

    为什么ArrayBlockingQueue用一把锁,LinkedBlockingQueue用两把?因为LinkedBlockingQueue需要构造节点(构造节点不在lock块里,应该主要是为了节省调整指针的时间)、调整指针,等待的时间比ArrayBlockingQueue(只需改变数组的相应位置,很快)长,用两把锁同时存取速度更快;而ArrayBlockingQueue如果用双锁,必须引入AtomicInteger,性能上几乎没有提升,代码更复杂了。

3、ConcurrentLinkedQueue

    head节点值为null,next指向第一个元素。插入时,tail.next != null时更新tail,tail可能是倒数第一或第二个节点。offer()时先定位尾节点,CAS将入队节点更新为尾节点的next节点,如果之前尾节点是倒数第二个节点,则更新尾节点。

    原文作者:JUC
    原文地址: https://blog.csdn.net/weixin_39420024/article/details/80082321
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞