一、前言
java 的 java.util.concurrent 是 java 用于提供一些并发程序所需功能的类包。它的功能全面且强大,在前面,我们已经使用过原子基本变量,BlockingQueue 等类。现在,我们需要更加深入的去了解 JUC 的强大功能。
二、CountDownLatch
该类用来同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。
在 CountDownLatch 对象中设置一个初始的计数值,任何在这个对象上调用 wait() 的方法都讲阻塞,直至这个计数值到达0。其他任务在结束工作时,可以在该对象上调用 countDown() 来减小这个数值。同事,CountDownLatch 只能出发一次,计数值不能被重置。如果有重置的需要,可以使用 CyclicBarrier。
先来看一个使用 CountDownLatch 的简单示例:
1 package JUCTest; 2 3 import java.util.Random; 4 import java.util.concurrent.CountDownLatch; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.TimeUnit; 8 9 class TaskPortion implements Runnable{ 10 private static int counter = 0; 11 private final int id = counter++; 12 private static Random rand = new Random(47); 13 private final CountDownLatch countDownLatch; 14 public TaskPortion(CountDownLatch countDownLatch) { 15 this.countDownLatch = countDownLatch; 16 } 17 @Override 18 public void run() { 19 try { 20 doWork(); 21 countDownLatch.countDown(); 22 }catch(InterruptedException e) { 23 System.out.println("Exit"); 24 } 25 } 26 27 public void doWork() throws InterruptedException{ 28 TimeUnit.MILLISECONDS.sleep(rand.nextInt(20000)); 29 System.out.println(this + " complete"); 30 } 31 32 public String toString() { 33 return "TaskPorition : " + id; 34 } 35 36 } 37 38 class WaitingTask implements Runnable{ 39 private static int counter = 0; 40 private final int id = counter++; 41 private final CountDownLatch countDownLatch; 42 public WaitingTask(CountDownLatch countDownLatch) { 43 this.countDownLatch = countDownLatch; 44 } 45 46 @Override 47 public void run() { 48 try { 49 countDownLatch.await(); 50 System.out.println("latch barrier pass for " + this); 51 }catch(InterruptedException e) { 52 System.out.println(this + "interrupted"); 53 } 54 } 55 56 public String toString() { 57 return "TaskPorition : " + id; 58 } 59 } 60 public class CountDownLatchDemo { 61 static final int SIZE = 10; 62 public static void main(String args[]) { 63 ExecutorService exec = Executors.newCachedThreadPool(); 64 CountDownLatch latch = new CountDownLatch(SIZE); 65 for(int i=0;i<10;i++) { 66 exec.execute(new WaitingTask(latch)); 67 } 68 for(int i=0;i<SIZE;i++) { 69 exec.execute(new TaskPortion(latch)); 70 } 71 System.out.println("Launched all tasls"); 72 exec.shutdown(); 73 } 74 } 75 //output 76 /*Launched all tasls 77 TaskPorition : 5 complete 78 TaskPorition : 1 complete 79 TaskPorition : 4 complete 80 TaskPorition : 3 complete 81 TaskPorition : 9 complete 82 TaskPorition : 0 complete 83 TaskPorition : 7 complete 84 TaskPorition : 8 complete 85 TaskPorition : 6 complete 86 TaskPorition : 2 complete 87 latch barrier pass for TaskPorition : 1 88 latch barrier pass for TaskPorition : 2 89 latch barrier pass for TaskPorition : 0 90 latch barrier pass for TaskPorition : 6 91 latch barrier pass for TaskPorition : 7 92 latch barrier pass for TaskPorition : 3 93 latch barrier pass for TaskPorition : 4 94 latch barrier pass for TaskPorition : 5 95 latch barrier pass for TaskPorition : 8 96 latch barrier pass for TaskPorition : 9 97 */
通过前面章节的内容,我们可以很容一个实现 “A 任务 等到 B 任务完成之后再去执行” 的功能,而在上述例子中,B 任务是由 10 个子任务构成的。通过 CountDownLatch 我们没完成一个子任务,就会是 countDownLatch 减1。等待所有子任务完成,countDownLatch 变为0后,启动 A 任务。
二、CyclicBarrier
countDownLatch 可以使某个任务完成之后进入阻塞状态,阻塞状态持续到其他相关任务全部完成之后(countDownLatch 变为0)。CyclicBarrier 类似于countDownLatch ,和 countDownLatch 的区别在于。在所有任务完成之后,CyclicBarrier 的计数器会重置。
先看一个简单的示例:
1 package JUCTest; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.Random; 6 import java.util.concurrent.BrokenBarrierException; 7 import java.util.concurrent.CyclicBarrier; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.Executors; 10 import java.util.concurrent.TimeUnit; 11 12 class Horse implements Runnable{ 13 private static int counter = 0; 14 private final int id = counter++; 15 private int strides = 0; 16 private static Random rand = new Random(47); 17 private static CyclicBarrier barrier; 18 public Horse(CyclicBarrier b) { 19 barrier = b; 20 } 21 public synchronized int getStrides() { 22 return strides; 23 } 24 @Override 25 public void run() { 26 try { 27 while(!Thread.interrupted()) { 28 synchronized(this) { 29 strides += rand.nextInt(3); 30 } 31 barrier.await(); 32 } 33 }catch(InterruptedException e) { 34 e.printStackTrace(); 35 }catch(BrokenBarrierException e) { 36 throw new RuntimeException(e); 37 } 38 } 39 40 public String toString() { 41 return "Horse " + id +" "; 42 } 43 44 public String tracks(){ 45 StringBuilder s = new StringBuilder(); 46 for(int i=0;i<getStrides();i++) { 47 s.append("*"); 48 } 49 s.append(id); 50 return s.toString(); 51 } 52 } 53 public class HorseRace { 54 55 static final int FINISH_LINE = 75; 56 private List<Horse> horses = new ArrayList<Horse>(); 57 private ExecutorService exec = Executors.newCachedThreadPool(); 58 private CyclicBarrier barrier; 59 public HorseRace(int nHorses,final int pause) { 60 barrier = new CyclicBarrier(nHorses,new Runnable() { 61 public void run() { 62 StringBuilder s = new StringBuilder(); 63 for(int i=0;i<FINISH_LINE;i++) { 64 s.append("="); 65 } 66 System.out.println(s); 67 for(Horse horse : horses) 68 System.out.println(horse.tracks()); 69 for(Horse horse : horses) { 70 if(horse.getStrides() >= FINISH_LINE) { 71 System.out.println(horse + "Won!"); 72 exec.shutdownNow(); 73 return; 74 } 75 } 76 try { 77 TimeUnit.MILLISECONDS.sleep(pause); 78 }catch(InterruptedException e) { 79 System.out.println("barrier-action sleep interrupted"); 80 } 81 } 82 }); 83 for(int i=0;i<nHorses;i++) { 84 Horse horse = new Horse(barrier); 85 horses.add(horse); 86 exec.execute(horse); 87 } 88 } 89 public static void main(String[] args) { 90 int nHorses = 7; 91 int pause = 200; 92 new HorseRace(nHorses,pause); 93 } 94 95 }
上述程序是一个模拟赛马的操作,一共有75个栅栏,每个马的速度都不一样的,所以每次打印每只马跨越了多少栅栏时,会出现你追我赶的情况。但是程序内在逻辑是怎么样呢?
我们可以把马对应成一个任务,马跨域栅栏是一次 run() 方法内部走完了一次循环。
CyclicBarrier 就相当于一堵墙,它横在所有马的前方,当马完成一次操作(随机跨越1~3个栅栏),它来到了墙面前,被墙挡住(代码是通过 await() 实现的)。等所有的马(具体几只是在 CyclicBarrier 的构造函数里确定的)都来到墙面前的时候,墙打开,所有马进行下一次操作。
可以推测出来,CyclicBarrier 内部一定有一个计数器(通过查看源码可以知道 在构造函数里是把值赋给 final int parties 和 int count 的,前者是 final 无法改变用于重置计数器使用,后者用于计数),我们没调用一次 await() 方法,这个计数器就会减1。直到我们调用了 parties 次 await() 计数器变为 0 。然后所有任务可以进行一下步,同时,计数器变为 parties ,继续阻塞任务进入再下一步的操作,直到它再次为0;
ps: 通过源码可以肯定我们的推测,事实上 每次我们调用 await(), count 就会递减,而当 count 为 0 时,就会调用 nextGeneration 方法。nextGeneration 会把计数器重置,同时会唤醒阻塞的任务。顺便一提的事,CyclicBarrier 实现阻塞和唤醒的方式是使用 Condition (前面有具体内容)。
在 CyclicBarrier 的构造函数里还有一个 Runnable,它会在计数器为 0 的时候启动。
三、DelayQueue
在 JUC 中,除了之前提到的,LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 之外,还有其他几种 Queue, DelayQueue 就是其中之一。
DelayQueue 是一个无界的 BlockingQueue,用于放置实现了 Delayed 接口的对象,其中对象只能在其到期才能从队列中取走。并且该队列是有序的,我们需要实现 compareTo 方法用来作为排序的标准。当从 DelayQueue 获取对象时,只会获取延迟到期的对象。
package JUCTest; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class DelayedTask implements Runnable,Delayed{ private static int counter = 0; private final int id = counter++; private final int delta; private final long trigger; protected static List<DelayedTask> squence = new ArrayList<DelayedTask>(); public DelayedTask(int delayInMilliseconds) { delta = delayInMilliseconds; trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS); squence.add(this); } @Override public void run() { System.out.println(this); } @Override public int compareTo(Delayed o) { DelayedTask that = (DelayedTask) o; if(trigger < that.trigger) return 1; if(trigger > that.trigger) return 1; return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(trigger - System.nanoTime(),TimeUnit.NANOSECONDS); } public String toString() { return String.format("[%1$-4d]", delta) + " Task " + id; } public String summary() { return "("+id+":"+delta+")"; } public static class EndSentinel extends DelayedTask{ private ExecutorService exec; public EndSentinel(int delay,ExecutorService e) { super(delay); exec=e; } public void run() { for(DelayedTask pt : squence) { System.out.print(pt.summary() + " "); } System.out.println(" "); System.out.println(this + " Calling shutdownNow()"); exec.shutdownNow(); } } } class DelayedTaskConsumer implements Runnable{ private DelayQueue<DelayedTask> q; public DelayedTaskConsumer(DelayQueue<DelayedTask> q) { this.q = q; } @Override public void run() { try { while(!Thread.interrupted()) { q.take().run(); } }catch(InterruptedException e) { e.printStackTrace(); } System.out.println("Finished DelayedTaskConsumer"); } } public class DelayQueueDemo { public static void main(String[] args) { Random rand = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); for(int i=0;i<20;i++) { queue.put(new DelayedTask(rand.nextInt(5000))); } queue.add(new DelayedTask.EndSentinel(5000, exec)); exec.execute(new DelayedTaskConsumer(queue)); } }
在上面这个例子中,我们让 DelayedTask 实现了 Runnable 和 Delayed 接口。除了 run() 方法之外,我们同时实现了 compareTo() 和 getDelay() 方法。然后输出的结果表明,任务从队列总出来的顺序是按照 getDelay() 所获得的值来确定的。
我们使用变量 delta 来作为延迟时间的,System.nanoTime() 会获得一个纳秒为单位的数字,这个数字单独使用没有任何意义,但是,在程序的两个位置都使用 System.nanoTime() 并且把这两个值相减,就能得到一个精准的时间差。在构造函数里 trigger 被赋值为 System.nanoTime() + delta,而在 getDelay() 中返回的值是 trigger – System.nanoTime()(第二次使用,后面用 System.nanoTime()2 做区别),那么返回的值其实是,System.nanoTime() + delta – System.nanoTime()2,System.nanoTime()2 – System.nanoTime() 可以认为使我们给 trigger 赋值和程序调用 getDelay() 之间的时间差,当时间差,也就是经过的时间 > delta (设定的延迟时间) 时,对象才能出列。换句话说 getDelay() 返回的值 < 0 才能出列。
但是对象出列除了延迟时间到达之外这个条件之外,还得满足它在对列的首位,所以我们必须使用 compareTo() 来规定一个排列的顺序,使得延迟时间到达最短的放在队首位置。所以我们用 trigger 来尽行比较。注意,这里的排队应该是最块走完延迟时间的排前面,而不是延迟时间最短的排前面。比如,A的延迟时间为 1s 他是在第 10s 中的时候放进去的,B的延迟时间为 2 s 它是在第 4s 的时候放进去的,那么B应该排在A前面。
那么,如果我们使用错误的方式来排队,比如把延迟时间到达最晚的放在前面。就会导致效率低下,程序会等到最长的延迟时间到达才会有出列操作。
四、PriorityBlockingQueue
顾名思义,他是以优先级作为排序顺序来给队列中的对象排序的。而排序的方法,依旧是通过 compareTo 方法实现,其实,DelayQueue 可以看做是一种特殊的优先级排序,除了排序之外,他还有延迟的附加条件。所以对于 PriorityBlockingQueue 我们不做过多的说明。
五、SheduledExecutor
SheduledExecutor 可以使任务按照设定的计划去执行,通常,我们需要在指定的时间执行某项任务,或者在一定的周期内循环的执行某项目,就会使用到 SheduledExecutor。
1 package JUCTest; 2 3 import java.util.ArrayList; 4 import java.util.Calendar; 5 import java.util.Collections; 6 import java.util.List; 7 import java.util.Random; 8 import java.util.concurrent.ScheduledThreadPoolExecutor; 9 import java.util.concurrent.TimeUnit; 10 11 public class GreenhouseScheduler { 12 13 private volatile boolean light = false; 14 private volatile boolean water = false; 15 private String thermostat = "Day"; 16 17 public synchronized String getThermostat() { 18 return thermostat; 19 } 20 21 public synchronized void setThermostat(String thermostat) { 22 this.thermostat = thermostat; 23 } 24 25 ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10); 26 27 public void schedule(Runnable event,long delay) { 28 scheduler.schedule(event, delay, TimeUnit.MILLISECONDS); 29 } 30 31 public void repeat(Runnable event,long initialDelay,long period) { 32 scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS); 33 } 34 35 class LightOn implements Runnable{ 36 public void run() { 37 System.out.println("Turn on lights"); 38 light = true; 39 } 40 } 41 42 class LightOff implements Runnable{ 43 public void run() { 44 System.out.println("Turn off lights"); 45 light = false; 46 } 47 } 48 49 class WaterOn implements Runnable{ 50 public void run() { 51 System.out.println("Turning greenhoulse water on"); 52 water = true; 53 } 54 } 55 56 class WaterOff implements Runnable{ 57 public void run() { 58 System.out.println("Turning greenhoulse water off"); 59 water = false; 60 } 61 } 62 63 class ThermostatNight implements Runnable{ 64 public void run() { 65 System.out.println("Thermostat to night setting"); 66 setThermostat("Night"); 67 } 68 } 69 70 class ThermostatDay implements Runnable{ 71 public void run() { 72 System.out.println("Thermostat to day setting"); 73 } 74 } 75 76 class Bell implements Runnable{ 77 78 public void run() { 79 System.out.println("Bing!"); 80 } 81 82 } 83 84 class Terminate implements Runnable{ 85 public void run() { 86 System.out.println("Terminating!"); 87 scheduler.shutdown(); 88 new Thread() { 89 public void run() { 90 for(DataPoint p : data) { 91 System.out.println(p); 92 } 93 } 94 }; 95 } 96 97 } 98 99 static class DataPoint{ 100 final Calendar time; 101 final float temperature; 102 final float humidity; 103 public DataPoint(Calendar d,float temp,float hum) { 104 time = d; 105 temperature = temp; 106 humidity = hum; 107 } 108 public String toString() { 109 return time.getTime() + String.format(" temperature:, %1s$.1f humidity: %2$.2f",temperature); 110 } 111 } 112 113 private Calendar lastTime = Calendar.getInstance(); 114 { 115 lastTime.set(Calendar.MINUTE, 30); 116 lastTime.set(Calendar.SECOND, 00); 117 } 118 119 private float lastTemp = 65.0f; 120 private int tempDirection = 1; 121 private float lastHumidity = 50.0f; 122 private int humidityDirection = 1; 123 private Random rand = new Random(47); 124 List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>()); 125 126 class CollectData implements Runnable{ 127 public void run() { 128 System.out.println("Collecting date"); 129 synchronized(GreenhouseScheduler.this) { 130 lastTime.set(Calendar.MINUTE,lastTime.get(Calendar.MINUTE) + 30); 131 } 132 if(rand.nextInt(5) == 4) { 133 tempDirection = -tempDirection; 134 } 135 lastTemp = lastTemp + tempDirection*(1.0f + rand.nextFloat()); 136 if(rand.nextInt(5) == 4) { 137 humidityDirection = -humidityDirection; 138 } 139 lastHumidity = lastHumidity + humidityDirection * rand.nextFloat(); 140 data.add(new DataPoint((Calendar)lastTime.clone(),lastTemp,lastHumidity)); 141 } 142 } 143 public static void main(String[] args) { 144 GreenhouseScheduler gh = new GreenhouseScheduler(); 145 gh.schedule(gh.new ThermostatNight(),5000); 146 gh.repeat(gh.new Bell(), 0, 1000); 147 gh.repeat(gh.newThermostatNight(), 0, 2000);
148 gh.repeat(gh.new LightOn(), 0, 200);
149 gh.repeat(gh.new LightOff(), 0, 400);
150 gh.repeat(gh.new WaterOn(), 0, 600);
151 gh.repeat(gh.new WaterOff(), 0, 800);
152 gh.repeat(gh.new ThermostatDay(), 0, 1400);
153 gh.repeat(gh.new CollectData(), 500, 500);
154
155 }
156
157 }
这里我们引入了一个新的线程池—— ScheduledThreadPoolExecutor,他添加和执行任务的方法不在是 Executor,而是 schedule 和 schedule。schedule 除了需要提供一个 Runnable 作为参数以外,还要提供一个延迟时间,和时间单位。延迟时间和时间单位共同决定了任务在什么时候被启动。scheduleAtFixedRate 还需额外提供一个周期时间,在到达延迟时间之后,每过一个周期,任务就会执行一次。
在上面的示例中,我们创建了一个温室,温室需要进行开关灯、防水、收集数据等操作。
我们一共设置了1个单一任务和8个循环任务。在程序进行到 5s 中时,所有任务被中断。
六、Semaphore
无论是使用 synchronized 亦或是 lock 的方式,都能保证某项资源只能被一个任务获取和使用。但有时候,我们或许会希望能够允许指定数量的任务来获取同一个资源。JUC 为我们提供了 Semaphore 来实现这方面的需求。
在 Thinking in Java 的关于 Semaphore 的示例中,首先创建了一个使用 Semaphore 来进行控制的对象池,然后通过这个对象池来实现“允许指定数量的任务来获取同一个资源”这一功能,我们先看代码。
在看示例钱,我们需要简单理解下 Semaphore 的运作方式,Seamaphore 的构造方法里,包含两个参数:permits(int),fair(bool)。permits 就是所谓的计数器的值,即我们希望资源能同时被多少任务访问。而 fair 是一个布尔值,它决定我们使用的是公平锁还是非公平锁,关于公平锁,在之后的拓展章节再详细叙述。
创建完 Semaphore 之后,我们通过它的 aquire() 来获取进入资源的权限,此时计数器 -1,通过它的 release() 方法,来释放一个权限,此时计数器 +1。
以下是 Thinking in Java 示例中所用的对象池:
1 package JUCTest; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.Semaphore; 6 7 public class Pool<T> { 8 private int size; 9 private List<T> items = new ArrayList<T>(); 10 private volatile boolean[] checkedOut; 11 private Semaphore available; 12 public Pool(Class<T> classObject,int size) { 13 this.size = size; 14 checkedOut = new boolean[size]; 15 available = new Semaphore(10,true); 16 for(int i=0;i<size;i++) { 17 try { 18 items.add(classObject.newInstance()); 19 }catch(Exception e) { 20 throw new RuntimeException(e); 21 } 22 } 23 } 24 25 public T checkOut() throws InterruptedException{ 26 available.acquire(); 27 return getItem(); 28 } 29 30 public void checkIn(T x) { 31 if(releaseItem(x)) { 32 available.release(); 33 } 34 } 35 private synchronized T getItem() { 36 for(int i=0;i<size;i++) { 37 if(!checkedOut[i]) { 38 checkedOut[i] = true; 39 return items.get(i); 40 } 41 } 42 return null; 43 } 44 45 private synchronized boolean releaseItem(T item) { 46 int index = items.indexOf(item); 47 if(index == -1) { 48 checkedOut[index] = false; 49 return true; 50 } 51 return false; 52 } 53 }
在 pool 的构造函数中,我们创建一个可以放置对象(泛型 )的 List,初始化 Semaphore。同时使用了 newInstance() 的方式创建了 size 个对象。
在更详细的说明之前,先来看看这个对象池的应用。首先,我们需要新建一个类:
1 package JUCTest; 2 3 public class Fat { 4 private volatile double d; 5 private static int counter = 0; 6 private int id = counter++; 7 public Fat() { 8 for(int i=1;i<10000;i++) { 9 d += (Math.PI + Math.E); 10 } 11 } 12 13 public void operation() { 14 System.out.println("this"); 15 } 16 17 public String toString() { 18 return "Fat id: " + id; 19 } 20 }
然后,我么通过 Pool 来对该对象进行管理。
1 package JUCTest; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.Future; 8 import java.util.concurrent.TimeUnit; 9 10 class CheckoutTask<T> implements Runnable { 11 12 private static int counter = 0; 13 private final int id = counter++; 14 private Pool<T> pool; 15 public CheckoutTask(Pool<T> pool) { 16 this.pool = pool; 17 } 18 19 public void run() { 20 try { 21 T item = pool.checkOut(); 22 System.out.println(this + " checked out " + item); 23 TimeUnit.SECONDS.sleep(1); 24 System.out.println(this + "cheked in " + item); 25 pool.checkIn(item); 26 }catch(InterruptedException e) { 27 System.out.println("InterruptedException"); 28 } 29 } 30 31 public String toString() { 32 return "CheckoutTesk " + id + " "; 33 } 34 35 36 37 38 } 39 40 public class SemaphoreDemo{ 41 final static int SIZE = 25; 42 public static void main(String[] args) throws InterruptedException { 43 final Pool<Fat> pool = new Pool<Fat>(Fat.class,SIZE); 44 ExecutorService exec = Executors.newCachedThreadPool(); 45 List<Fat> lists = new ArrayList<Fat>(); 46 for(int i=0;i<SIZE;i++) { 47 Fat f = pool.checkOut(); 48 System.out.println(i + ": main() thread check out"); 49 f.operation(); 50 lists.add(f); 51 } 52 Future<?> blocked = exec.submit(new Runnable() { 53 public void run() { 54 try { 55 pool.checkOut(); 56 }catch(InterruptedException e) { 57 System.out.println("Check out Interrupted"); 58 } 59 } 60 }); 61 62 TimeUnit.SECONDS.sleep(2); 63 blocked.cancel(true); 64 System.out.println("Check in object in " + lists); 65 for(Fat f : lists) { 66 pool.checkIn(f); 67 } 68 for(Fat f : lists) { 69 pool.checkIn(f); 70 } 71 exec.shutdown(); 72 } 73 74 75 76 }
在 SemaphoreDemo 中,创建了一个容量为 SIZE 的 pool,在 pool 的构造方法中,我们根据传入的模板参数,创建了 SIZE 个 Fat 对象,然后所有的 Fat 的对象全部通过 checkout 从 pool 里取出。
在最后往 pool 中 checkin Fat 时,我们发现不论我们往里添加了多个对象,在 pool 中始终最多只有 SIZE 个对象。那么后来的添加的对象哪里去了?checkin 的操作并没有消失,也没有出错,只是被阻塞了,如果我们此时通过 checkout 释放出一些位置,那些消失的 Fat 就会顺利的插入到 pool 里。
那么这是如何实现的?
在 Checkout() 中,我们再获取到 Fat 对象前,需要进行一次 acquire() 每次的 acquire 操作,都会使得 Semaphore 中的计数器 -1,当技术器为 0 时,我们继续进行 checkout()(或者继续进行 checkout() 里的 acquire() 操作),就会被阻塞。直到我们使用 checkin() (或者说是 checkin() 里的 release()),使得计数器 +1。被阻塞的 checkout 操作才会继续执行。
在上面的代码中,我们先进行了 SIZE 次 checkout() 操作,然后,再新建一个任务继续使用 checkout 操作,其被阻塞,直到我们将其中断。后台输出 Check out Interrupted,如果我们在 blocked.cancel(true) ——中断操作之前,执行 checkin 操作,就会使阻塞的任务能够继续进行下去。
七、Exchanger
Exchanger 是在两个任务之间交换对象的栅栏。当 A 和 B 任务进入栅栏时,它们各自拥有一个对象 C 和 D,当他们离开时,拥有的对象互换,即 A 拥有 D,B 有用 C。在创建 A 和 B 时,需要把他们和同一个 Exchanger 绑定。
1 package JUCTest; 2 3 import java.util.concurrent.Exchanger; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.TimeUnit; 7 8 class ExchangerProducer implements Runnable{ 9 10 private Exchanger<Integer> exchanger; 11 public ExchangerProducer(Exchanger<Integer> exchanger) { 12 this.exchanger = exchanger; 13 } 14 @Override 15 public void run() { 16 for(int i=1;i<10;i++) { 17 Integer data = i; 18 System.out.println(i + " : producer before exchange : " + data); 19 try { 20 data = exchanger.exchange(data); 21 } catch (InterruptedException e) { 22 System.out.println("Interrupted..."); 23 } 24 System.out.println(i + ": producer after exchange : " + data); 25 } 26 } 27 28 } 29 30 class ExchagerConsumer implements Runnable{ 31 32 private Exchanger<Integer> exchanger; 33 public ExchagerConsumer(Exchanger<Integer> exchanger) { 34 this.exchanger = exchanger; 35 } 36 @Override 37 public void run() { 38 for(int i=1;i<10;i++) { 39 Integer data = i * 2; 40 System.out.println(i + " : consumer before exchange : " + data); 41 try { 42 data = exchanger.exchange(data); 43 } catch (InterruptedException e) { 44 System.out.println("Interrupted..."); 45 } 46 System.out.println(i + " : consumer after exchange : " + data); 47 } 48 } 49 } 50 public class ExchagerDemo { 51 52 public static void main(String[] args) throws InterruptedException { 53 Exchanger<Integer> exchanger = new Exchanger<Integer>(); 54 ExchangerProducer exchangerProducer = new ExchangerProducer(exchanger); 55 ExchagerConsumer exchagerConsumer = new ExchagerConsumer(exchanger); 56 ExecutorService exec = Executors.newCachedThreadPool(); 57 exec.execute(exchangerProducer); 58 exec.execute(exchagerConsumer); 59 TimeUnit.SECONDS.sleep(3); 60 exec.shutdownNow(); 61 62 } 63 64 }
在上面的示例中,producer 负责生产奇数,consumer 负责生产偶数,在 producer 或者 consumer 生产完一个数之后,会将其放入 Exchanger 中等待交换,双方进入到阻塞状态,等待交换完成之后,任务继续进行。