Java并发基础-并发工具类(二)

并发工具类

本系列文章主要讲解Java并发相关的内容,包括同步、锁、信号量、阻塞队列、线程池等,整体思维导图如下:
《Java并发基础-并发工具类(二)》

系列文章列表:

本文主要以实例讲解Semaphore、阻塞队列等内容。

Semaphore

基本概念和用途

Semaphore常称信号量,其维护了一个许可集,可以用来控制线程并发数。线程调用acquire()方法去或者许可证,然后执行相关任务,任务完成后,调用release()方法释放该许可证,让其他阻塞的线程可以运行。
Semaphore可以用于流量控制,尤其是一些公共资源有限的场景,比如数据库连接。假设我们上面的账户余额管理中的账户修改操作涉及到去更改mysql数据库,为了避免数据库并发太大,我们进行相关限制。
常用方法
Semaphore(int permits):构造方法,初始化许可证数量
void acquire():获取许可证
void release():释放许可证
int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。

运行示例

虽然在代码中设置了20个线程去运行,但同时设置了许可证的数量为5,因而实际的最大并发数还是5

package com.aidodoo.java.concurrent;

import java.util.concurrent.*;

/**
 * Created by zhangkh on 2018/9/9.
 */
public class SemaphoreDemo {
    public static void main(String[] args){
        Semaphore semaphore=new Semaphore(5);
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        Account account=new Account();
        for(int i=0;i<20;i++){
            SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore);
            executorService.submit(spender);
        }

        executorService.shutdown();
    }
}
class SpenderWithSemaphore implements Runnable {
    private final Account account;
    private final Semaphore semaphore;

    public SpenderWithSemaphore(Account account, Semaphore semaphore) {
        this.account = account;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try{
            semaphore.acquire();
            System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000));
            Thread.sleep(2000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
//            System.out.println(String.format("%s release a premit",Thread.currentThread().getName()));
            semaphore.release();
        }
    }
}

获取许可证后,模拟操作mysql,我们让线程睡眠2秒,程序输出如下:

pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql

可以看到前面5个线程同一时间1536480858获得许可证,然后执行操作,并不是20个线程一起操作,这样能降低对mysql数据库的影响。
如果把上面Semaphore的构造方法中的许可证数量改为20,大家可以看到20个线程的运行时间基本一致。

源码实现

Semaphore实现直接基于AQS,有公平和非公平两种模式。公平模式即按照调用acquire()的顺序依次获得许可证,遵循FIFO(先进先出),非公平模式是抢占式的,谁先抢到先使用。

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

获取许可证
acquire()方法最终调用父类AQS中的acquireSharedInterruptibly方法。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)        	    //(1)
        doAcquireSharedInterruptibly(arg);		//(2)
}

(1):调用tryAcquireShared,尝试去获取许可证
(2):如果获取失败,则调用doAcquireSharedInterruptibly,将线程加入到等待队列中
tryAcquireShared方法由Semaphore的内部类,同时也是AQS的子类去实现,即NonfairSyncFairSync,下面我们以NonfairSync为例说明其实现。

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared方法如下:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();				//(1)
        int remaining = available - acquires;	//(2)
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) (3)
            return remaining;
    }
}

(1):获取state的值,也就是总许可证数量
(2):计算本次申请后,剩余的许可证数量
(3):如果剩余的许可证数量大于0且通过CASstate的值修改成功后,返回剩余的许可证数量,否则继续循环阻塞。

释放许可证
release()方法的调用最终会调用父类AQSreleaseShared()方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {		//(1)
        doReleaseShared();				//(2)
        return true;
    }
    return false;
}

(1):尝试释放许可证
(2):如果释放许可证成功,则通知阻塞的线程,让其执行
tryReleaseShared方法很简单,基本上是nonfairTryAcquireShared的逆过程,即增加许可证的数量,并通过CAS修改state的值。

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

BlockingQueue

基本概念

阻塞队列主要是解决如何高效安全传输数据的问题,此外能降低程序耦合度,让代码逻辑更加清晰。
其继承了Queue,并在其基础上支持了两个附加的操作:

  • 当队列为空时,获取元素的线程会阻塞,等待队列变为非空
  • 当队列满时,添加元素的线程会阻塞,等待队列可用

比较典型的使用场景是生产者和消费者。
BlockingQueue根据对于不能立即满足但可能在将来某一时刻可以满足的操作,提供了不同的处理方法,进而导致众多的api操作:

Throws exceptionSpecial valueBlocksTimes out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()}not applicablenot applicable

Throws exception:指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常
Special value:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
Blocks:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
Time out:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

整体架构和类图

Java并发包根据不同的结构和功能提供了不同的阻塞队列,整体类图如下:
《Java并发基础-并发工具类(二)》
其中BlockingQueue有如下子类:

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。

其中BlockingDeque有一个子类:

  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
    BlockingDeque作为双端队列,针对头部元素,还提供了如下方法:
First Element (Head)
Throws exceptionSpecial valueBlocksTimes out
InsertaddFirst(e)offerFirst(e)putFirst(e)offerFirst(e, time, unit)
RemoveremoveFirst()pollFirst()takeFirst()pollFirst(time, unit)
ExaminegetFirst()peekFirst()not applicablenot applicable

针对尾部元素

Last Element (Tail)
Throws exceptionSpecial valueBlocksTimes out
InsertaddLast(e)offerLast(e)putLast(e)offerLast(e, time, unit)
RemoveremoveLast()pollLast()takeLast()pollLast(time, unit)
ExaminegetLast()peekLast()not applicablenot applicable

使用示例

一个典型的生产者和消费者实例如下,一个BlockingQueue可以安全地与多个生产者和消费者一起使用,Producer线程调用NumerGenerator.getNextNumber()生成自增整数,不断地写入数字,然后Consumer循环消费。

package com.aidodoo.java.concurrent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by zhangkh on 2018/7/17.
 */
public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new ArrayBlockingQueue(1024,true);
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 5; i++) {
            executorService.submit(new Producer(queue));
        }
        for (int i = 0; i < 3; i++) {
            executorService.submit(new Consumer(queue));
        }
        Thread.sleep(30 * 1000L);
        executorService.shutdown();
    }
}

class Producer implements Runnable {
    Logger logger = LoggerFactory.getLogger(Producer.class.getName());
    protected BlockingQueue queue = null;
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            for(int i=0;i<3;i++){
                int num = NumerGenerator.getNextNumber();
                queue.put(num);
                Thread.sleep(1000);
                logger.info("{} producer put {}", Thread.currentThread().getName(), num);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


class Consumer implements Runnable {
    Logger logger = LoggerFactory.getLogger(Consumer.class.getName());

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                int ele = (int) queue.take();
                logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class NumerGenerator{
    private static AtomicInteger count = new AtomicInteger();
    public static Integer getNextNumber(){
        return count.incrementAndGet();
    }
}

程序输出如下:

18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15

其他BlockingQueue子类的使用可参考对应的Java Api

源码分析

由于BlockingQueue相关的子类众多,我们仅以ArrayBlockingQueue从源码角度分析相关实现。
构造方法
ArrayBlockingQueue中定义的成员变量如下:

final Object[] items; 
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null

各变量的解释如下,以便了解后续的代码:

  • items用于存储具体的元素
  • takeIndex元素索引,用于记录下次获取元素的位置
  • putIndex元素索引,用于记录下次插入元素的位置
  • count用于记录当前队列中元素的个数
  • notEmpty条件变量,此处为获取元素的条件,即队列不能为空,否则线程阻塞
  • notFull条件变量,此处为插入元素的条件,即队列不能已满,否则线程阻塞
  • itrs用于维护迭代器相关内容

内部结构如下:

《Java并发基础-并发工具类(二)》

构造方法如下:

public ArrayBlockingQueue(int capacity) {
	this(capacity, false);	//(1)								
}

public ArrayBlockingQueue(int capacity, boolean fair) {
	if (capacity <= 0)
		throw new IllegalArgumentException();
	this.items = new Object[capacity];		//(2)				
	lock = new ReentrantLock(fair);
	notEmpty = lock.newCondition();     	//(3)
	notFull =  lock.newCondition();	    	//(4)
}
public ArrayBlockingQueue(int capacity, boolean fair,
						  Collection<? extends E> c) {
	this(capacity, fair);

	final ReentrantLock lock = this.lock;
	lock.lock(); // Lock only for visibility, not mutual exclusion
	try {
		int i = 0;
		try {
			for (E e : c) {					//(5)
				checkNotNull(e);
				items[i++] = e;
			}
		} catch (ArrayIndexOutOfBoundsException ex) {
			throw new IllegalArgumentException();
		}
		count = i;
		putIndex = (i == capacity) ? 0 : i;
	} finally {
		lock.unlock();
	}
}

(1):默认情况下,非公平模式,即抢占式
(2):数组初始化
(3)/(4):条件变量初始化
(5):如果构造方法中,含有初始化集合的话,则将对应元素添加到内部数组,并更改countputIndex的值。

插入数据
插入数据,我们主要看put()方法的实现,重点看生产者和消费者插入和获取数据时,线程何时阻塞,同时又何时唤醒。

public void put(E e) throws InterruptedException {
    checkNotNull(e);						//(1)
    final ReentrantLock lock = this.lock;   //(2)
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await(); 				//(3)
        enqueue(e);
    } finally {
        lock.unlock();		 				//(4)
    }
}

private void enqueue(E x) {
final Object[] items = this.items;
    items[putIndex] = x;				    //(5)
    if (++putIndex == items.length)         //(6)
        putIndex = 0;
    count++;								//(7)
    notEmpty.signal();						//(8)
}

(1):非空检查,插入的元素不能为null,否则抛出NullPointerException
(2):获取互斥锁
(3):如果当前队列的元素个数等于队列总长度,即队列已满,则通过条件变量,释放和notFull相关的锁,当前线程阻塞。当前线程唤醒的条件如下:

  • 其他某个线程调用此 Conditionsignal() 方法,并且碰巧将当前线程选为被唤醒的线程;
  • 或者其他某个线程调用此 ConditionsignalAll() 方法;
  • 或者其他某个线程中断当前线程,且支持中断线程的挂起;
  • 或者发生“虚假唤醒”

(5):如果队列未满,则将元素添加的putIndex索引的位置
(6):putIndex增加1后和队列长度相等,即已到达队列尾部,则putIndex0
(7):队列已有元素数量加1
(8):通知notEmpty条件变量,唤醒等待获取元素的线程
(4):释放互斥锁
可以看到ArrayBlockingQueue每次插入元素后,都会去唤醒等待获取元素的线程。

获取数据
take()方法源码如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock; 	//(1)
    lock.lockInterruptibly();
    try {
        while (count == 0)					
            notEmpty.await();				//(2)
        return dequeue();
    } finally {
        lock.unlock();						//(9)
    }
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];				//(3)
    items[takeIndex] = null;				//(4)
    if (++takeIndex == items.length)
        takeIndex = 0;						//(5)
    count--;								//(6)
    if (itrs != null)
        itrs.elementDequeued();				//(7)
    notFull.signal();						//(8)
    return x;
}

(1):获取互斥锁
(2):如果count0,即队列为空,则释放互斥锁,然后挂起当前线程
(3):根据takeIndex索引到数组中获取具体的值,并赋值给x
(4):赋值完成后,takeIndex索引位置数据置null,便于回收
(5):takeIndex1,然后和队列长度比较,如果相等,即已经读取到队列尾部,takeIndex0
(6):获取后,将队列元素个数count1
(7):维护和queue相关的迭代器
(8):唤醒等待插入元素的线程
(9):释放互斥锁
可以看到ArrayBlockingQueue每次获取元素后,都会唤醒等待插入元素的线程。

迭代器
在分析源码前,我们先看在一个迭代器的示例

package com.aidodoo.java.concurrent;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Created by zhangkh on 2018/9/10.
 */
public class ArrayBlockingQueueIterDemo {
        public static void main(String[] args) throws InterruptedException{
            BlockingQueue<String> queue=new ArrayBlockingQueue(5);
            queue.put("hadoop");
            queue.put("spark");
            queue.put("storm");
            queue.put("flink");

            Iterator<String> iterator1 = queue.iterator();
            System.out.println( queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println();
            while(iterator1.hasNext()) {
                System.out.println(iterator1.next());
            }
            System.out.println();
            Iterator<String> iterator2 = queue.iterator();
            while(iterator2.hasNext()) {
                System.out.println(iterator2.next());
            }
        }
}

程序输出如下:

hadoop
spark
storm

hadoop
flink

flink

我们结合这个示例来具体分析数据插入和获取时,内部成员变量的值
当分别插入hadoopsparkstormflink四个元素后,内部变量的值如下:
《Java并发基础-并发工具类(二)》
此时,ArrayBlockingQueue的成员变量的值itrsnull
调用iterator()方法后,源码如下:

public Iterator<E> iterator() {
    return new Itr(); 					//(1)
}

Itr() {
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();							//(2)
try {
	if (count == 0) {					//(3)
		cursor = NONE;
		nextIndex = NONE;
		prevTakeIndex = DETACHED;
	} else {
		final int takeIndex = ArrayBlockingQueue.this.takeIndex;
		prevTakeIndex = takeIndex;
		nextItem = itemAt(nextIndex = takeIndex);	//(4)
		cursor = incCursor(takeIndex);				//(5)
		if (itrs == null) {
			itrs = new Itrs(this);					//(6)
		} else {
			itrs.register(this); 					//(7)
			itrs.doSomeSweeping(false);
		}
		prevCycles = itrs.cycles;
	}
} finally {
	lock.unlock();									//(8)
}

}
(1):调用内部类Itr的构造方法
(2):获取外部类即ArrayBlockingQueue的锁
(3):没有没有元素,初始化变量值。内部类Itr的成员变量如下:

/** Index to look for new nextItem; NONE at end */
private int cursor;

/** Element to be returned by next call to next(); null if none */
private E nextItem;

/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;

/** Last element returned; null if none or not detached. */
private E lastItem;

/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;

/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;

/** Previous value of iters.cycles */
private int prevCycles;

(4):将外部类的takeIndex赋值给内部类nextIndex,并获取数组具体的值赋值给nextItem
(5):计算游标cursor的下个值,其中incCursor方法如下:

private int incCursor(int index) {
    // assert lock.getHoldCount() == 1;
    if (++index == items.length)
        index = 0;
    if (index == putIndex)
        index = NONE;
    return index;
}

(6):注册,主要是维护链表
(7):清理itrs
(8):释放外部类的互斥锁
在上面的示例中,调用iterator()方法后,Itr的内部变量值如下:
《Java并发基础-并发工具类(二)》

由于后面三次调用了queue.take(),依次输出hadoopsparkstorm后,相关成员变量的值见图片标识,重点关注takeIndex=3

当调用next()方法时,代码如下:

public E next() {
	final E x = nextItem;
	if (x == null)
		throw new NoSuchElementException();
	final ReentrantLock lock = ArrayBlockingQueue.this.lock;
	lock.lock();
	try {
		if (!isDetached())			//(1)
			incorporateDequeues();
		lastRet = nextIndex;
		final int cursor = this.cursor;
		if (cursor >= 0) {
			nextItem = itemAt(nextIndex = cursor);
			this.cursor = incCursor(cursor);
		} else {
			nextIndex = NONE;
			nextItem = null;
		}
	} finally {
		lock.unlock();
	}
	return x;
}

其中(1)处的isDetached方法如下

boolean isDetached() {
    // assert lock.getHoldCount() == 1;
    return prevTakeIndex < 0;
}

由于我们示例中初始化Itr的时候的prevTakeIndex0,故isDetached返回为false,程序将调用incorporateDequeues方法,根据注释我们也知道,该方法主要是调整和迭代器相关的内部索引。

/**
 * Adjusts indices to incorporate all dequeues since the last
 * operation on this iterator.  Call only from iterating thread.
 */
private void incorporateDequeues() {
    final int cycles = itrs.cycles;
    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    final int prevCycles = this.prevCycles;
    final int prevTakeIndex = this.prevTakeIndex;

    if (cycles != prevCycles || takeIndex != prevTakeIndex) {
        final int len = items.length;
        // how far takeIndex has advanced since the previous
        // operation of this iterator
        long dequeues = (cycles - prevCycles) * len
            + (takeIndex - prevTakeIndex);

        // Check indices for invalidation
        if (invalidated(lastRet, prevTakeIndex, dequeues, len))
            lastRet = REMOVED;
        if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
            nextIndex = REMOVED;
        if (invalidated(cursor, prevTakeIndex, dequeues, len))
            cursor = takeIndex;

        if (cursor < 0 && nextIndex < 0 && lastRet < 0)
            detach();
        else {
            this.prevCycles = cycles;
            this.prevTakeIndex = takeIndex;
        }
    }
}

注意cursor = takeIndex这句代码,将外部内的takeIndex赋值给cursor,这样子将队列和迭代器数据读取进行了同步。
对于iterator1,第一次调用next()方法时,cursor被赋值为3首先将nextItem的值保持在x变量中,即hadoop字符串。
然后设置nextItemcursor的值

nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);

设置完成后,nextItemflink,cursor为-1
最后返回保存在x变量中的值,即返回hadoop字符串。
第二次调用next()方法时,输出的值即上次保存的nextItem值,即flink字符串。
迭代器运行过程中,相关变量内容如下:
《Java并发基础-并发工具类(二)》
至于iterator2迭代器,各位可以自己去分析,不再赘述。

本文主要以实例讲解Semaphore、阻塞队列,并分析了相关核心源码实现。

本文参考

Java 7 Concurrency Cookbook

concurrency-modle-seven-week

java-concurrency

java-util-concurrent

java se 8 apidoc

关于作者
爱编程、爱钻研、爱分享、爱生活
关注分布式、高并发、数据挖掘
如需捐赠,请扫码
《Java并发基础-并发工具类(二)》

点赞