JUC同步工具类——信号量、闭锁、栅栏、交换器

同步工具类可以使任何一种对象,只要该对象可以根据自身的状态来协调控制线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:信号量(Semaphore)、栅栏(Barrier)、闭锁(Latch)以及交换器(Exchanger)。 –《Java并发编程实战》

信号量

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

概念上,Semaphore管理一组许可(permits),许可的数量根据构造器指定。在执行操作前通过acquire获取许可(有许可剩余,否则需要等待),操作执行完成后通过release释放许可。实际上,并不存在许可对象,Semaphore仅仅维护一个可用许可数量,并以此工作。

Semaphore通常用于严格控制访问某物理或逻辑资源的线程数,JavaDoc示例如下:

class Pool {
  private static final int MAX_AVAILABLE = 100;
  private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
  public Object getItem() throws InterruptedException {
    available.acquire();
    return getNextAvailableItem();
  }
  public void putItem(Object x) {
    if (markAsUnused(x))
      available.release();
  }
  // Not a particularly efficient data structure; just for demo
  protected Object[] items = ... whatever kinds of items being managed
  protected boolean[] used = new boolean[MAX_AVAILABLE];
  protected synchronized Object getNextAvailableItem() {
    for (int i = 0; i < MAX_AVAILABLE; ++i) {
      if (!used[i]) {
         used[i] = true;
         return items[i];
      }
    }
    return null; // not reached
  }
  protected synchronized boolean markAsUnused(Object item) {
    for (int i = 0; i < MAX_AVAILABLE; ++i) {
      if (item == items[i]) {
         if (used[i]) {
           used[i] = false;
           return true;
         } else
           return false;
      }
    }
    return false;
  }
}

在获取对象前必须先取得许可,以保证对象可用,使用完成后返回对象,并交还许可,以允许其它线程使用。

初始值为1的Semaphore,即二值信号量,最多只允许拥有1个许可,可用作互斥锁

Semaphore构造器接受一个可选的公平参数。当设置为false时,信号量不保证线程获取许可的顺序,有可能后调用acquire的线程先获取许可;当设置为true时,信号量保证先调用acquire的线程先获取许可(FIFO)。但是,tryAcquire方法不区分公平参数,只要有可用的许可就会获取。同时,acquire(int)和release(int)方法允许一次获取/释放多个许可。

通常,使用信号量来控制资源访问时应该使用公平模式,防止有线程获取不到资源而饿死。当信号量用于其它类型同步控制时,在吞吐量优势上非公平模式胜过公平模式。

示例:

package thread;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/** * @author QFJiang on 2018/03/05 10:32 */
public class SemaphoreTest {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(8);
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Task(1, semaphore, 3));
        executor.execute(new Task(2, semaphore, 4));
        executor.execute(new Task(3, semaphore, 6));
        executor.shutdown();
    }

    static class Task implements Runnable {
        private int id;
        private Semaphore semaphore;
        private int permit;

        public Task(int id, Semaphore semaphore, int permit) {
            this.id = id;
            this.semaphore = semaphore;
            this.permit = permit;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire(permit);
                System.out.println("Task " + id + " acquire " + permit + " semaphore");
                Thread.sleep(1000 + new Random().nextInt(1000));
                semaphore.release(permit);
                System.out.println("Task " + id + " release " + permit + " semaphore");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

输出:(只有Task1、Task2全部释放许可之后,Task3才能获取足够许可)

Task 2 acquire 4 semaphore
Task 1 acquire 3 semaphore
Task 1 release 3 semaphore
Task 2 release 4 semaphore
Task 3 acquire 6 semaphore
Task 3 release 6 semaphore

闭锁

闭锁可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关着的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须在这个闭锁上等待。
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S以来的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。
  • 等待直到某个操作的所有参与者(例如:在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有的玩家都准备就绪时,闭锁将到达结束状态。

CountDownLatch闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法将递减计数器,表示有一个事件已经发生了,而await方法等待计数器到达零,这表示所有需要的事件都已经发生。如果计数器的值为非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

JavaDoc示例如下:

class Driver {
  void main() throws InterruptedException {
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(N);
    for (int i = 0; i < N; ++i) // create and start threads
      new Thread(new Worker(startSignal, doneSignal)).start();
    doSomethingElse();            // don't let run yet
    startSignal.countDown();      // let all threads proceed
    doSomethingElse();
    doneSignal.await();           // wait for all to finish
  }
}
class Worker implements Runnable {
  private final CountDownLatch startSignal;
  private final CountDownLatch doneSignal;
  Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
    this.startSignal = startSignal;
    this.doneSignal = doneSignal;
  }
  public void run() {
    try {
      startSignal.await();
      doWork();
      doneSignal.countDown();
    } catch (InterruptedException ex) {} // return;
  }
  void doWork() { ... }
}

FutureTask

FutureTask也可以用作闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算)。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3中状态:等待运行(Waiting to run),正在运行(Running)和完成运行(Completed)。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态以后,它会永远停止在这个状态上。

Future.get的行为取决于任务的状态。如果任务已经执行完成,那么get会立即返回结果,否则get将阻塞知道任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

栅栏

栅栏(Barrier)类似于闭锁,它能阻塞线程直到某个事件发生。栅栏与闭锁的关键区别在于:所有线程必须同时到达栅栏位置,才能继续执行。闭锁也用于等待事件,而栅栏也用于等待其他线程。比如开班级会议,需要班级成员全部到达之后才开始。

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将也一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时,将调用await方法,这个方法将阻塞直到所有线程都到达阻塞位置。如果所有线程都到达了栅栏的位置,那么栅栏将打开,此时所有线程都将被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏参数传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在下一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。

示例:

package thread;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** * @author QFJiang on 2017/04/18 */
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("All are ready");
            }
        });
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new TaskThread(barrier));
        }
        exec.shutdown();
    }

    static class TaskThread implements Runnable {

        private CyclicBarrier barrier;

        public TaskThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " is preparing");
            try {
                Thread.sleep(1000 + new Random().nextInt(1000));
                System.out.println(Thread.currentThread().getName() + " is ready");
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(1000 + new Random().nextInt(1000));
                System.out.println(Thread.currentThread().getName() + " is finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

输出:

pool-1-thread-1 is preparing
pool-1-thread-2 is preparing
pool-1-thread-3 is preparing
pool-1-thread-4 is preparing
pool-1-thread-5 is preparing
pool-1-thread-1 is ready
pool-1-thread-3 is ready
pool-1-thread-4 is ready
pool-1-thread-2 is ready
pool-1-thread-5 is ready
All are ready
pool-1-thread-4 is finished
pool-1-thread-2 is finished
pool-1-thread-5 is finished
pool-1-thread-3 is finished
pool-1-thread-1 is finished

栅栏与闭锁的一个重要区别在于:闭锁只能使用一次,而栅栏可以重复循环使用(Cyclic)。 例如,一个游乐场项目一次只允许5个人玩,而且必须满5人才可以,这就是一个栅栏,每次等5人齐了开始游玩,结束之后换另外5人可以重新开始。

交换器

Exchanger是另一种形式的栅栏,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger会非常有用,例如当也一个线程向缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。

示例:

package thread;

import java.util.Random;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** * @author QFJiang on 2017/04/19 */
public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        new Thread(new Producer(exchanger, "Full queue"), "Producer").start();
        new Thread(new Consumer(exchanger, "Empty queue"), "Consumer").start();
    }

    static class Producer implements Runnable {

        private Exchanger<String> exchanger;
        private String queue;

        public Producer(Exchanger<String> exchanger, String queue) {
            this.exchanger = exchanger;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " exchange " + queue + " out");
                Thread.sleep(1000 + new Random().nextInt(1000));
                String other = exchanger.exchange(queue);
                System.out.println(Thread.currentThread().getName() + " exchange " + other + " back");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {

        private Exchanger<String> exchanger;
        private String queue;

        public Consumer(Exchanger<String> exchanger, String queue) {
            this.exchanger = exchanger;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " exchange " + queue + " out");
                Thread.sleep(1000 + new Random().nextInt(1000));
                String other = exchanger.exchange(queue);
                System.out.println(Thread.currentThread().getName() + " exchange " + other + " back");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

输出:

Producer exchange Full queue out
Consumer exchange Empty queue out
Producer exchange Empty queue back
Consumer exchange Full queue back
    原文作者:JUC
    原文地址: https://blog.csdn.net/jiangqiongfeng/article/details/79456588
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞