Semaphore源码-JUC线程同步工具1

Semaphore源码-JUC线程同步工具1

之前JAVA锁Lock说过一句话线程获取到了锁就是AQS得成员state+1了,今天讲的信号量Semaphore还是跟它有关系。

何为信号量

Semaphore是计数信号量。Semaphore管理一系列许可证。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量。

这段文字说的很抽象,简单来说就是控制线程并发访问某个资源的最大个数。
在说白一点Semaphore能做网站单机器的限流,只不过它跟一些限流的工具如google的Ratelimiter相比少了一些特性如预热等。其实它的这个特性跟MapReduce也是很类似的,map的阶段就是各个子线程在完成任务的阶段,reduce就是await()等 各个子线程map完,然后汇总。

举个例子

停车场有三个车位,那么该停车场同一时间内最多允许三辆车进入,当然可以一辆一辆的停入停车场,不过总数不能超过三个。

代码使用

static ExecutorService pool = Executors.newFixedThreadPool(7);

public static void main(String[] args) {

    Semaphore semaphore = new Semaphore(3);//模拟停车场三个车位
    for(int i= 0; i < 10;i++) {//模拟同时来了10个车 
        new Car(semaphore,i+"").start();
    }

}
@Data
static class Car extends Thread{
    private Semaphore semaphore;
    private  String carNo;

    public Car(Semaphore semaphore,String carNo) {
        this.semaphore = semaphore;
        this.carNo = carNo;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();//获取一个车位
            System.out.println("cardNo:"+carNo+"获取停车位");

            Thread.sleep(new Random().nextInt(1000));

            semaphore.release();//释放一个车位
            System.out.println("cardNo:"+carNo+"离开,释放一个停车位");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

结果输出:
cardNo:0获取停车位
cardNo:2获取停车位
cardNo:1获取停车位
cardNo:2离开,释放一个停车位
cardNo:3获取停车位
cardNo:3离开,释放一个停车位
cardNo:4获取停车位
cardNo:1离开,释放一个停车位

最多同时允许三个车进来acquire成功获取停车权限(锁),后续需要有车离开release释放车位(锁)。

控制线程并发数其实仍然是AQS
《Semaphore源码-JUC线程同步工具1》

构造方法

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

默认也是非公平锁,并发度(permits)赋值给了AQS的volatile int state
并发度(停车场的车位数)就是AQS.state的初始值

acquire()方法

//Semaphore
 public void acquire() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
 }
//AQS
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);//A获取许可失败
}
//Semaphore--NonfairSync
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
//Semaphore--内部类Sync
 final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

这次的模板方法名字是AQS的acquireSharedInterruptibly,以前看的是acquire(1)
调用子类重写的tryAcquireShared从方法名字可以看出来,共享state个数量的许可(并发度),以前的是独占的tryAcquire。

直接看nonfairTryAcquireShared(1)方法,仍然很简单 上来一个死循环哈,看起来很唬人。
死循环其实很少能循环两次的,基本一次搞定,这就是无锁for(;;)CAS
只有当许可数小于零或者CAS失败 才会去再次循环,所以这个方法看起来是死循环其实很容易就退出来了。

当remaining大于0的时候表示线程成功获取到了锁即state-1>0,允许进入停车场
我们看下当remaining<0的时候即三个车位都被占用的时候— A获取许可失败

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这段代码跟之前JAVA锁Lock几乎一模一样,不熟悉的抓紧点链接补一下。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这里只是简单说下过程:

创建一个Node节点waitStatus=0 (构建head)组成一个双向链表,开始自旋
判断node.pre是否为head是的话在tryAcquire()cas一下

失败,将自己的node.waitStatus=-1等待唤醒装态,然后在一轮判断node.pre==head 然后tryAcquire()cas如果还是失败 lockSupport.park(this)挂起等待被唤醒。
成功,直接获取锁执行业务逻辑

不同之处在于tryAcquire变成这里的tryAcquireShared而已。

release()

public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

跟预想的差不多,归还state 唤醒队列的头结点的下一个。

总结:之所以这里讲的不细致,主要因为大部分代码还是在AQS这里

获取锁

以前lock.lock()讲state+1>0获取锁成功
这里初始state=并发数(许可数) state-1> 0的线程都表示获取锁成功。

释放锁

以前是lock.unlock() state-1 表示释放一次(可能重入就需要释放多次),唤醒等待队列head.next
这里state+1 表示腾出一把锁,唤醒等待队列head.next

    原文作者:JUC
    原文地址: https://blog.csdn.net/mayongzhan_csdn/article/details/81035356
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞