java并发编程基础(五)-类库JUC包的构件
本博文为学习《thinking in java》一书中第21章“并发”时的小笔记,并使用这些构件结合简单的小场景写一些样例代码,这是由于原书上的代码太长了
CountDownLatch
作用:同步一个或者多个任务,强制它们等待由其他任务执行的一组操作完成
使用:
- 向CountdownLatch对象设置一个初始计数值,任何在该对象中调用await()的方法都将阻塞,直到这个计数值为0
- 其他任务在结束其工作时,可以在该对象中调用countDown()来减小这个值
- CountDownLatch被设计只会触发一次,计数值不可以被重置,而CyclicBarrier是可以重置计数器的版本
- 调用countDown()方法不会产生阻塞,只有对await()的调用才会阻塞,直到计数值到达0
- 典型用法:将一个程序分为n个互相独立的可解决任务,并且创建值为0的CountDownLatch,当每一个任务完成时,都会在这个锁存器上调用countDown()。等待问题被解决的任务在这个锁存器上调用await(),将自己拦住直至锁存器计数完毕。
样例代码,“三思而后行”
public class ThinkTwiceDemo { static class Think implements Runnable{ private CountDownLatch latch; public Think(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println("thinking..."); latch.countDown(); } } static class Action implements Runnable { private CountDownLatch latch; public Action(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { latch.await(); System.out.println("now I can do something"); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { int count = 3; // 三思 CountDownLatch latch = new CountDownLatch(count); Think think = new Think(latch); Action action = new Action(latch); ExecutorService service = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2)); service.execute(action); TimeUnit.SECONDS.sleep(1); // 故意拖一段时间,说明Action确实被阻塞了 for (int i = 0; i < count; i++) { service.execute(think); } service.shutdown(); } }
CyclicBarrier
使用场景:创建一组任务,它们并行执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动。
CyclicBarrier.await()就有些想CountDownLatch的countDown(),计数器都会减去1,只是CyclicBarrier可以重复使用。
样例:模拟两个玩家的掷骰子游戏,一轮游戏中只有两个人都掷出骰子,才能进入下一轮游戏。
public class ComputerDiceGame { /** * 掷骰子 */ static class Dice implements Runnable { private CyclicBarrier barrier; private int count; // 线程独有 private String player; private static Random random = new Random(); // 线程安全的 public Dice(CyclicBarrier barrier, int count, String player) { this.barrier = barrier; this.count = count; this.player = player; } @Override public void run() { try { for (int i = 1; i <= count; i++) { System.out.println("Round" + i + " : " + player + " makes " + (random.nextInt(6) + 1)); barrier.await(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } public static void main(String[] args) { int round = 5; // 5轮 int nPlayers = 2; CyclicBarrier barrier = new CyclicBarrier(nPlayers); // 2个玩家 Dice dice1 = new Dice(barrier, round, "player1"); Dice dice2 = new Dice(barrier, round, "player2"); ExecutorService service = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2)); service.execute(dice1); service.execute(dice2); service.shutdown(); } } // 运行结果 Round1 : player2 makes 5 Round1 : player1 makes 1 Round2 : player2 makes 3 Round2 : player1 makes 1 Round3 : player1 makes 4 Round3 : player2 makes 1 Round4 : player2 makes 4 Round4 : player1 makes 6 Round5 : player1 makes 1 Round5 : player2 makes 6
写的时候又再想了一下,如果现在需要加上一个裁判,判断一轮游戏中哪一个player赢了,应该怎么办呢?这个时候,需要用到之前我的一篇博文的Condition来解决问题,现在的游戏流程是这样:
- 在一轮游戏中,两个玩家掷出骰子
- 只有在两个玩家掷出了之后,裁判才能进行判断
- 只有在裁判判断产生结果之后,才能够进入下一轮游戏
现在代码变成
public class ComputerDiceGame { /** * 游戏相关信息,实际上这个类命名不太好,不改了。。 */ static class GameProperties { public int round = 5; // 游戏轮数 public int nPlayers = 2; // 游戏玩家个数 public int counter = 0; // counter记录一轮游戏中当前掷出的骰子数量,读写需要同步 public CyclicBarrier barrier = new CyclicBarrier(nPlayers + 1); // 加一是因为 private Lock lock = new ReentrantLock(); public Condition condition = lock.newCondition(); public int[] result = new int[nPlayers]; // 记录一轮游戏的结果,由于不同线程读写的是不同位置,所以不需要同步 public void judgeCheck() { // 裁判主动检查玩家是否都已经掷出骰子 lock.lock(); try { while (counter != nPlayers) { // 玩家还没全部都掷出骰子 condition.await(); } counter = 0; // 重置 } catch (InterruptedException ex) { } finally { lock.unlock(); } } public void incrementAndCheck() { // 玩家掷出骰子,并检查自己是否最后一个掷出的,如果是需要唤醒裁判 lock.lock(); try { counter++; if (counter == nPlayers) { // 自己是本轮最后一个掷出骰子的人,需要叫醒裁判 condition.signal(); } } finally { lock.unlock(); } } } static class Judge implements Runnable{ private GameProperties properties; public Judge(GameProperties properties) { this.properties = properties; } @Override public void run() { try { for (int r = 1; r <= properties.round; r++) { properties.judgeCheck(); // 判别哪个赢了 int pos = 0; for (int i = 1; i < properties.nPlayers; i++) { if (properties.result[i] > properties.result[pos]) { pos = i; } } System.out.println("Player" + (pos+1) + " wins in round " + r); properties.barrier.await(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } static class Dice implements Runnable { private int id; private static Random random = new Random(); // 线程安全的 private GameProperties properties; public Dice(int id, GameProperties properties) { this.id = id; this.properties = properties; } @Override public void run() { try { for (int r = 1; r <= properties.round; r++) { int point = random.nextInt(6) + 1; properties.result[id] = point; System.out.println("Round" + r + " : player" + (id+1) + " makes " + point); // 更新轮数 properties.incrementAndCheck(); properties.barrier.await(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } public static void main(String[] args) { GameProperties properties = new GameProperties(); Dice dice1 = new Dice(0, properties); Dice dice2 = new Dice(1, properties); Judge judge = new Judge(properties); ExecutorService service = new ThreadPoolExecutor(3, 3, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2)); service.execute(dice1); service.execute(dice2); service.execute(judge); service.shutdown(); } } // 运行结果(点数一样player1赢) Round1 : player1 makes 3 Round1 : player2 makes 3 Player1 wins in round 1 Round2 : player1 makes 3 Round2 : player2 makes 3 Player1 wins in round 2 Round3 : player1 makes 6 Round3 : player2 makes 6 Player1 wins in round 3 Round4 : player1 makes 5 Round4 : player2 makes 6 Player2 wins in round 4 Round5 : player1 makes 4 Round5 : player2 makes 6 Player2 wins in round 5
DelayQueue
无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走
队列是有序的,即从队头对象的延迟到期时间最长,如果没有任何延迟到期,那么就不会有任何头元素,并且poll返回null(故不可以将null放入队列中)
要想构建延迟任务,需要实现Runnable和Delayed接口,Delayed接口的两个方法
- compareTo:实现自己比较两个Delayed对象的方法,关系到从DelayedQueue中的优先级问题
- getDelayed:得到Delayed对象现在的延迟时间,可以理解为还有多久可以干活了
样例代码
public class SleeperDemo { static class Sleeper implements Runnable, Delayed { private long millisecond; // 拖延时间 private long trigger; // 需要干活的时间 private int id; // 标识 public Sleeper(long millisecond, int id) { this.millisecond = millisecond; this.id = id; trigger = System.currentTimeMillis() + millisecond; } @Override public int compareTo(Delayed o) { Sleeper that = (Sleeper) o; if (trigger > that.trigger) { return 1; } else if (trigger < that.trigger) { return -1; } else { return 0; } } @Override public void run() { System.out.println("sleeper" + id + " is working now!"); } @Override public long getDelay(TimeUnit unit) { return unit.convert(trigger - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } } static class DelayedTaskConsumer implements Runnable { private DelayQueue<Sleeper> queue; public DelayedTaskConsumer(DelayQueue<Sleeper> queue) { this.queue = queue; } @Override public void run() { try { while (!Thread.interrupted()) { queue.take().run(); // 直接在当前线程中运行 } } catch (InterruptedException ex) { } } } public static void main(String[] args) throws InterruptedException { DelayQueue<Sleeper> queue = new DelayQueue<>(); int nSleepers = 5; for (int i = nSleepers; i >= 1; i--) { queue.add(new Sleeper(i * 1000, i)); } ExecutorService service = Executors.newFixedThreadPool(1); service.execute(new DelayedTaskConsumer(queue)); // 保证任务都执行完就关闭 TimeUnit.SECONDS.sleep(6); service.shutdownNow(); }
PriorityBlockingQueue
- 带有阻塞操作的优先队列,例子就不举了
ScheduledThreadPoolExecutor
- 相比于线程池ThreadPoolExecutor,ScheduledThreadPoolExecutor可以进行任务的调度
- schedule():可以设置一段时间后调度任务
- scheduleAtFixRate():可以设置每隔特定的时间重复执行任务
- 也没有什么特殊,只是多了调度,例子也不举了
Semaphore
正常的锁(concurrents.locks或synchronized)在任何时候都只允许一个任务访问一个资源
计数信号量semaphore允许n个任务同时访问这个资源
创建Sempaphore时,需要指定有多少个信号量,也可以指定信号量的分发是否公平(按照任务FIFO的请求顺序),例如一个构造函数
public Semaphore(int permits, boolean fair)
信号量操作
- acquire():信号量-1,请求资源前需要调用该方法
- release():信号量+1,归还资源后需要调用该方法
明显,当信号量为1时,Semaphore的作用就和普通的Lock作用类似
样例代码:书本代码,一个对象池,管理有限的对象
public class Pool<T> { private int size; private List<T> items = new ArrayList<>(); // 资源存放处 private volatile boolean[] checkOut; // 记录资源是否已经被借出 private Semaphore available; /** * 由于擦除,需要传入class参数 */ public Pool(Class<T> classObject, int size){ this.size = size; checkOut = new boolean[size]; available = new Semaphore(size, true); // load pool with objects that can be checked out for (int i = 0; i < size; i++){ try { items.add(classObject.getDeclaredConstructor().newInstance()); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } } } public T checkOut() throws InterruptedException { available.acquire(); // 信号量-1 return getItem(); } public void checkIn(T x){ if (releaseItem(x)) available.release(); // 信号量+1 } /** * 获取资源 */ private synchronized T getItem() { for (int i = 0; i < size; i++){ if (!checkOut[i]){ checkOut[i] = true; // 成功借入 return items.get(i); } } return null; // 资源全部被借出 } /** * 释放资源 */ private synchronized boolean releaseItem(T item) { int index = items.indexOf(item); if (index == -1) { // 该资源根本就不是这里的 return false; } if (checkOut[index]){ // 成功归还 checkOut[index] = false; return true; } return false; } }
如果对象的创建过程或者销毁过程十分昂贵,而且对象也会被多次使用,那么使用上述的对象池应该十分理想,这个思想应该和线程池,连接池相通的。
Exchanger
书本描述:两个任务之间交换对象的栅栏,当这些任务进入栅栏时,它们各自拥有一个对象,当它们离开时,他们都拥有之前由对象持有的对象
简单来说,就是任务A持有对象A,任务B持有对象B(A和B需要是同样的类型,这个看Exchanger.exchange()方法便可知道),两个任务共同分享对象Exchanger,只要双方调用了exchange()方法,持有的对象就会交换,即最后任务A持有对象B,任务B持有对象A
样例代码,还是按照书本上的:
class ExchangeProducer<T> implements Runnable{ private Generator<T> generator; private Exchanger<List<T>> exchanger; private List<T> holder; public ExchangeProducer(Generator<T> generator, Exchanger<List<T>> exchanger, List<T> holder) { this.generator = generator; this.exchanger = exchanger; this.holder = holder; } @Override public void run() { try { while (!Thread.interrupted()){ for (int i = 0; i < ExchangeDemo.size; i++) holder.add(generator.next()); // 保存生成器生成的对象 holder = exchanger.exchange(holder); // 进入栅栏,将刚才生产满的列表拿给消费者,换来空的列表 } } catch (InterruptedException e) { e.printStackTrace(); } } } class ExchangerConsumer<T> implements Runnable{ private Exchanger<List<T>> exchanger; private List<T> holder; private volatile T value; public ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> holder) { this.exchanger = exchanger; this.holder = holder; } @Override public void run() { try { while (!Thread.interrupted()){ holder = exchanger.exchange(holder); // 进入栅栏,拿到生产者满的list for (T x : holder){ value = x; // 取出 holder.remove(x); // 在列表中删除 (书本注释: ok for CopyOnWriteArrayList) } } } catch (InterruptedException e) { } System.out.println("Final value : " + value); } } public class ExchangeDemo { static int size = 10; static int delay = 5; // 秒 public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); Exchanger<List<Fat>> exchanger = new Exchanger<>(); List<Fat> producerList = new CopyOnWriteArrayList<>(); List<Fat> consumerList = new CopyOnWriteArrayList<>(); Generator<Fat> generator = new BasicGenerator(Fat.class); service.execute(new ExchangeProducer<>(generator, exchanger, producerList)); service.execute(new ExchangerConsumer<>(exchanger, consumerList)); TimeUnit.SECONDS.sleep(delay); service.shutdownNow(); } }
值得一提的是,该代码互换的是CopyOnWriteArrayList,这个家伙要细说也需要不少笔墨,这里简单提一下:CopyOnWriteArrayList允许在列表遍历时调用remove(),不会抛出ConcurrentModificationException异常,它一种“写时复制”的容器。明显,如果多个线程只读不写,我们也没必要做并发控制,当写操作发生时,CopyOnWriteArrayList不会直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。当然,这也导致一些读操作在短时间内看不到更新的内容。在大量读少量写的情况下这应该是不错的选择。当然容器的copy也是一份开销,如果容器很大,这个开销可能也是需要考虑的。CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。等我更加深入这种写时复制容器再单独做成笔记。