java juc 包下面已经提供了很多并发锁工具供我们使用,但在日常开发中,为了各种原因我们总是会用多线程来并发处理一些问题,然而并不是所有的场景都可以使用juc 或者java本身提供的锁来方便的帮助我们控制多线程带来的并发问题,这个时候就需要我们根据自己的业务场景来子实现定制一把我们自己专属的锁,来满足我们的需要。
假设系统对接了很多第三方公司,来帮助我们完成业务,但这些第三方的服务接口稳定性参差不齐,以往的过程中我们可能会做一些监控措施来帮助我们监控接口的稳定性,但这会存在一个问题,就是当我们监控到操作失败的时候其实已经会有用户产生操作失败的结果了,这对重视用户体验的互联网公司肯定是不能忍的,为此我们可以每个用户来访问时都同时调用多个第三方,只要有一个返回结果,我们就可以给用户做相应的展示,这样即使有一两个第三方出现故障对用户也是无感知的,但另一个问题来了,同时并发调用第三方我怎么选哪个结果呢,很简单,当然是返回最快的了!具体如何选用最快的返回结果就用到我们今天的主题了,定制自己的锁。
上面所说的大致流程可以描述为这样:接受用户请求 → 多线程组装报文调用第三方 → 阻塞等待 → 任意结果返回唤醒主线程继续处理。基于此流程很自然想到这个阻塞其实就可以用多线程中的锁来实现,主线程在将任务提交给线程池多线程处理后,去获取一个锁,而这个锁需要在线程中第一个第三方返回结果时才能获取到,这样就让主线程继续执行,有了大概思路,我们来看下具体如何实现。
看过java 源码的同学对AbstractQueuedSynchronizer一定不会陌生,java中的很多锁ReentrantLock 、ReadWriteLock 、ReentrantReadWriteLock 和一些其他的并发工具CountDownLatch、 Semaphore等都基于此抽象类实现,AbstractQueuedSynchronizer中通过一个FIFO队列来管理等待加锁的线程,通过一个state的int变量控制线程加锁状态,其内部也帮我实现了线程获得锁和挂起的方法,我们这里参考CountDownLatch来实现,因为我们的需求和CountDownLatch正好相反,CountDownLatch是多个线程都处理完才能继续,而我们是只要有一个处理完就能继续,简单来说就是主线程唤醒的判断条件不一致。先来看下CountDownLatch的使用:
public static void main(String[] args) throws InterruptedException { CountDownLatch await = new CountDownLatch(5); // 依次创建并启动线程 for (int i = 0; i < 5; ++i) { new Thread(new MyRunnable(await)).start(); } await.await(); System.out.println("over!"); } class MyRunnable implements Runnable { private final CountDownLatch await; public MyRunnable(CountDownLatch await) { this.await = await; } public void run() { try { //业务处理 await.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }
熟悉套路之后为了更好的说明,我们引入其部分源码(jdk 1.8中部分方法)
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } }
代码中可以看到,CountDownLatch主要就是倚靠一个内部类Sync来实现,而Sync实现了AbstractQueuedSynchronizer的tryAcquireShared和tryReleaseShared方法,这两方法的主要目的是:tryAcquireShared就是在调用await方法后来判断是否需要阻塞还是执行,tryReleaseShared 就是用来释放state的状态,而state的状态又影响了tryAcquireShared的返回结果,决定了线程是阻塞还是会被唤起继续执行,具体的判断逻辑是在AbstractQueuedSynchronizer中的acquireSharedInterruptibly方法中
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
基于此我们的大概思路就是,利用自己的类实现tryAcquireShared和tryReleaseShared方法来帮助我们管理state状态,决定主线程什么时候可以获得锁继续运行,什么时候需要阻塞。依靠AbstractQueuedSynchronizer的内部机制帮助我们及时获取子线程的处理信息,最快的回到主线程来处理我们业务逻辑。实现代码如下:
package com.chengxiansheng.common; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class ReqBraker { private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { int c = getState(); c = 0; return true; } } private final Sync sync; private ResultDto resultDto;//处理结果 public ReqBraker(){ this.sync = new Sync(1); } /** * 请求返回 * 此处要考虑接口调用失败的情况,如果失败要等待其他线程则不调用此方法 */ public void reqReuturn(ResultDto resultDto){ this.resultDto = resultDto; sync.tryReleaseShared(1); } /** * 主线程等待指定最长等待时间 * @param timeout * @param unit * @return * @throws InterruptedException */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 获取处理结果 * @return */ public ResultDto getResultDto(){ return resultDto; }
}
package com.chengxiansheng.common; public class RequsetWorker implements Runnable { private RequestDto requestDto;//请求信息 private ReqBraker reqBraker; public RequsetWorker(RequestDto requestDto, ReqBraker reqBraker){ this.requestDto = requestDto; this.reqBraker = reqBraker; } public void run() { //请求第三方 //判断返回结果 if(requestDto.isSeccess()){ reqBraker.reqReuturn(resultDto); } } }
具体的使用方法如下:
public void requestService(RequestDto request, List<Service> serviceList) { try { ReqBraker braker = new ReqBraker(); for (Service service : serviceList) { executorService.submit(new RequsetWorker(braker, request); } braker.await(5, TimeUnit.SECONDS); //请求处理完成 最长时间5S ResultDto result = braker.getResultDto(); if(result == null){ //可能超过了最终等待时间 //返回处理失败; } //返回处理结果; } catch (Exception e) { //异常处理 }
当然实际的使用中可能会比这复杂,因为我们要有各种业务处理情况去要考虑,本文只是一个范例来帮助大家介绍一个新的思路来解决问题,合理利用Java中的工具的同事我们也要理解其实现原理,来定制符合我们使用场景的方法,才能写出更高效的代码,实现更高效的系统。AbstractQueuedSynchronizer的作用也远不止此,但我们掌握了它就可以更好的玩转多线程,玩转并发,来创新的实现各种复杂处理和逻辑。