package concurrentTest; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; //有界容器 public class BoundedSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedSet(int bound){ this.set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); } public Set<T> getSet() { return set; } public boolean add(T o) throws InterruptedException{ sem.acquire(); boolean wasAdded = false; try{ wasAdded = set.add(o); return wasAdded; }finally{ if(!wasAdded) sem.release(); } } public boolean remove(Object o){ boolean wasRemoverd = set.remove(o); if(wasRemoverd) sem.release(); return wasRemoverd; } class ThreadFactoryDemo implements ThreadFactory{ private boolean isDeamon; private String threadName; private AtomicInteger inc = new AtomicInteger(0); public ThreadFactoryDemo(boolean isDeamon,String threadName){ this.isDeamon = isDeamon; this.threadName = threadName; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(this.isDeamon); t.setName(this.threadName + inc.getAndIncrement()); return t; } } class RunnableDemo implements Runnable{ private int i; private BoundedSet<Integer> test; public RunnableDemo(int i,BoundedSet<Integer> test){ this.i = i; this.test = test; } @Override public void run() { try { test.add(i); System.out.println("Set add something already!" + Thread.currentThread().getName() + " Now set content =" + test.getSet().toString()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } class RemoveRunnableDemo implements Runnable{ private int i; private BoundedSet<Integer> test; public RemoveRunnableDemo(int i,BoundedSet<Integer> test){ this.i = i; this.test = test; } @Override public void run() { while(!test.remove(i)); System.out.println("Set remove something already! Now set content =" + test.getSet().toString()); } } public static void main(String[] args) { int boundSize = 5; final int[] array = {0,1,2,3,4,5}; final BoundedSet<Integer> test = new BoundedSet<Integer>(boundSize); ExecutorService executor = Executors.newFixedThreadPool(boundSize+1,test.new ThreadFactoryDemo(true,"semaphore-thread-")); for(int i=0;i<boundSize+1;i++){ executor.execute(test.new RunnableDemo(array[i],test)); } sleep(1000); Thread thread = new Thread(test.new RemoveRunnableDemo(array[0],test)); thread.setName("remove task"); thread.setDaemon(false); thread.start(); } private static void sleep(int time){ try { Thread.sleep(time); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
虽然测试打印语句会出现延迟导致输出错误,但实现是对的。