JUC Semaphore 分析

基本介绍

Semaphore,JUC中提供的一个并发组件,根据字面理解是信号量的意思。Semaphore能够实现的功能是允许多个线程同时获取共享资源,实际是共享锁(基于AQS的共享实现模式)的实现。获取不到资源的线程会进行阻塞等待,直到其他线程释放资源,当前阻塞线程尝试获取共享资源成功,则从阻塞等待中返回,否则继续阻塞等待。每个资源称为一个permit(许可),由Semaphore内部持有。Semaphore实际上也是基于AQS来实现其功能,permit对应的就是AQS中的同步状态state。通过静态内部类实现共享锁的获取和释放方法,同时提供公平和非公平的实现,类似于ReentrantLock中的实现。

可以用Semaphore实现限行的功能,比如限制任务执行一次并发只能有N个(定义一个具有N个permit的Semaphore)

基本方法

重要方法列表

方法说明
void acquire() throws InterruptedException获取一个许可,允许中断
void acquireUninterruptibly()同acquire()获取一个许可,不同的是该方法不响应中断
boolean tryAcquire()尝试获取一个许可,立刻返回,成功返回true,失败返回false
boolean tryAcquire(long timeout, TimeUnit unit)同tryAcquire(),尝试获取一个许可,不同的是该方法支持超时机制
void acquire(int permits) throws InterruptedExceptionacquire高级版,尝试一次获取多个许可
void acquireUninterruptibly(int permits)acquireUninterruptibly高级版,尝试一次获取多个许可
void tryAcquire(int permits)tryAcquire高级版
boolean tryAcquire(int permits, long timeout, TimeUnit unit)tryAcquire(long timeout, TimeUnit unit)高级版
void release()释放一个许可
void release(int permits)释放多个许可

Semaphore使用示例

public class SemaphoreTest {

    public static void main(String[] args) {

        final Semaphore res = new Semaphore(2);

        Thread[] thds = new Thread[10];

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {
                    res.acquire();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                    return ;
                }
                try {
                    System.out.println("thread : " + Thread.currentThread().getName() + " got permit");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    res.release();
                }
            }
        };

        for (int i = 0; i < thds.length; i++) {
            thds[i] = new Thread(r, "thd" + i);
            thds[i].start();
        }

        for (int i = 0; i < thds.length; i++) {
            try {
                thds[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}

示例输出,每次只有2个线程能够获取到许可(看到两两输出):

thread : thd0 got permit
thread : thd1 got permit
thread : thd3 got permit
thread : thd2 got permit
thread : thd4 got permit
thread : thd5 got permit
thread : thd7 got permit
thread : thd6 got permit
thread : thd8 got permit
thread : thd9 got permit

源码分析

Semaphore代码不多,下面直接根据源码 + 注释的形式来理解:

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;

    private final Sync sync;

    // 基于AQS实现,使用AQS中同步状态state来表示permits
    // 分非公平实现和公平实现,默认使用非公平实现
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        // 非公平尝试获取共享资源(对该实现类而言,是尝试获取许可permits)
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        // 释放共享资源,资源释放无所谓公平性(对该实现类而言,是释放许可permits)
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        // 动态减少共享资源(许可)
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        // 直接修改共享资源(许可)为0
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /** * 非公平实现 */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /** * 公平实现 */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 公平性的关键: 判断同步队列是否有其他节点在等待获取共享资源
                // 非公平性就是一上来就尝试抢占,不管同步队列里面是否有其他节点在等待获取共享资源
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    /** * Semaphore默认使用非公平实现 */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    // 可选择公平性的构造函数
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    // 请求获取许可,若许可不足则阻塞,直到有其他线程调用release,或者当前线程被中断(若被中断,则会抛InterruptedException并且中断标志位会被重置)
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 请求获取许可,不响应线程中断
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    // 尝试获取一个许可(非公平获取),成功获取返回true,失败则返回false
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    // 尝试获取一个许可,获取成功则返回true,如果无法获取,等待直到超时或者被中断(无法获取返回false)
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 释放一个许可,使得阻塞于获取许可的线程有机会得到许可
    public void release() {
        sync.releaseShared(1);
    }

    // acquire()的高级版,一次请求获取多个许可
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    // acquireUninterruptibly()的高级版,一次请求获取多个许可
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    // tryAcquire()高级版,尝试一次获取多个许可
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }


    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    // 释放多个许可
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    // 返回当前可用的许可数量
    public int availablePermits() {
        return sync.getPermits();
    }

    // 请求获取剩余的所有许可
    public int drainPermits() {
        return sync.drainPermits();
    }

    // 直接减少许可数量,不阻塞当前调用线程
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }


    public boolean isFair() {
        return sync instanceof FairSync;
    }


    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }


    public final int getQueueLength() {
        return sync.getQueueLength();
    }


    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }


    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}
    原文作者:JUC
    原文地址: https://blog.csdn.net/d6619309/article/details/80642409
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞