JAVA细粒度、互斥KEY锁 —— KeyLock

原文:http://blog.csdn.net/icebamboo_moyun/article/details/9391915#comments

java中的几种锁:synchronized,ReentrantLock,ReentrantReadWriteLock已基本可以满足编程需求,但其粒度都太大,同一时刻只有一个线程能进入同步块,这对于某些高并发的场景并不适用。本文实现了一个基于KEY(主键)的互斥锁,具有更细的粒度,在缓存或其他基于KEY的场景中有很大的用处。下面将讲解这个锁的设计和实现

(关于这个锁的讨论贴:KeyLock讨论贴-CSDN

设想这么一个场景:转账

private int[] accounts; // 账户数组,其索引为账户ID,内容为金额  
  
public boolean transfer(int from, int to, int money) {  
    if (accounts[from] < money)  
        return false;  
    accounts[from] -= money;  
    accounts[to] += money;  
    return true;  
}  


从from中转出金额到to中。可能同时会有很多个线程同时调用这个转账方法,为保证原子性,保证金额不会出错,必须为这个方法加个锁,防止对共享变量accounts的并发修改。

加锁后的代码如下:

private int[] accounts; // 账户数组,其索引为账户ID,内容为金额  
private Lock lock = new ReentrantLock();  
  
public boolean transfer(int from, int to, int money) {  
    lock.lock();  
    try {  
        if (accounts[from] < money)  
            return false;  
        accounts[from] -= money;  
        accounts[to] += money;  
        return true;  
    } finally {  
        lock.unlock();  
    }  
} 

好了,加锁后这个代码就能保证金额不出错了。但问题又出现了,一次只能执行一个转账过程!意思就是A给B转账的时候,C要给D转账也得等A给B转完了才能开始转。这就有点扯蛋了,就像只有一个柜台,所有人必须排队等前面的处理完了才能到自己,效率太低。

解决这种情况有一个方案:A给B转账的时候只锁定A和B的账户,使其转账期间不能再有其他针对A和B账户的操作,但其他账户的操作可以并行发生。类似于如下场景:


public boolean transfer(int from, int to, int money) {  
    lock.lock(from, to);  
    try {  
        if (accounts[from] < money)  
            return false;  
        accounts[from] -= money;  
        accounts[to] += money;  
        return true;  
    } finally {  
        lock.unlock(from, to);  
    }  
}  

但很显然,JAVA并没有为我们提供这样的锁(也有可能是我没找到。。。)

于是,就在这样的需求下我花了整一天来实现了这个锁——KeyLock(代码量很短,但多线程的东西真的很让人头疼)

不同于synchronized等锁,KeyLock是对所需处理的数据的KEY(主键)进行加锁,只要是对不同key操作,其就可以并行处理,大大提高了线程的并行度(最后有几个锁的对比测试

总结下就是:对相同KEY操作的线程互斥,对不同KEY操作的线程可以并行

KeyLock有如下几个特性

    1、细粒度,高并行性
    2、可重入
    3、公平锁
    4、加锁开销比ReentrantLock大,适用于处理耗时长、key范围大的场景

KeyLock代码如下(注释很少,因为我也不知道该怎么写清楚,能看懂就看,懒得看的直接用就行):

public class KeyLock<K> {  
    // 保存所有锁定的KEY及其信号量  
    private final ConcurrentMap<K, Semaphore> map = new ConcurrentHashMap<K, Semaphore>();  
    // 保存每个线程锁定的KEY及其锁定计数  
    private final ThreadLocal<Map<K, LockInfo>> local = new ThreadLocal<Map<K, LockInfo>>() {  
        @Override  
        protected Map<K, LockInfo> initialValue() {  
            return new HashMap<K, LockInfo>();  
        }  
    };  
  
    /** 
     * 锁定key,其他等待此key的线程将进入等待,直到调用{@link #unlock(K)} 
     * 使用hashcode和equals来判断key是否相同,因此key必须实现{@link #hashCode()}和 
     * {@link #equals(Object)}方法 
     *  
     * @param key 
     */  
    public void lock(K key) {  
        if (key == null)  
            return;  
        LockInfo info = local.get().get(key);  
        if (info == null) {  
            Semaphore current = new Semaphore(1);  
            current.acquireUninterruptibly();  
            Semaphore previous = map.put(key, current);  
            if (previous != null)  
                previous.acquireUninterruptibly();  
            local.get().put(key, new LockInfo(current));  
        } else {  
            info.lockCount++;  
        }  
    }  
      
    /** 
     * 释放key,唤醒其他等待此key的线程 
     * @param key 
     */  
    public void unlock(K key) {  
        if (key == null)  
            return;  
        LockInfo info = local.get().get(key);  
        if (info != null && --info.lockCount == 0) {  
            info.current.release();  
            map.remove(key, info.current);  
            local.get().remove(key);  
        }  
    }  
  
    /** 
     * 锁定多个key 
     * 建议在调用此方法前先对keys进行排序,使用相同的锁定顺序,防止死锁发生 
     * @param keys 
     */  
    public void lock(K[] keys) {  
        if (keys == null)  
            return;  
        for (K key : keys) {  
            lock(key);  
        }  
    }  
  
    /** 
     * 释放多个key 
     * @param keys 
     */  
    public void unlock(K[] keys) {  
        if (keys == null)  
            return;  
        for (K key : keys) {  
            unlock(key);  
        }  
    }  
  
    private static class LockInfo {  
        private final Semaphore current;  
        private int lockCount;  
  
        private LockInfo(Semaphore current) {  
            this.current = current;  
            this.lockCount = 1;  
        }  
    }  
}  

KeyLock
使用示例

private int[] accounts;  
private KeyLock<Integer> lock = new KeyLock<Integer>();  
  
public boolean transfer(int from, int to, int money) {  
    Integer[] keys = new Integer[] {from, to};  
    Arrays.sort(keys); //对多个key进行排序,保证锁定顺序防止死锁  
    lock.lock(keys);  
    try {  
        //处理不同的from和to的线程都可进入此同步块  
        if (accounts[from] < money)  
            return false;  
        accounts[from] -= money;  
        accounts[to] += money;  
        return true;  
    } finally {  
        lock.unlock(keys);  
    }  
}  

好,工具有了,接下来就是测试了,为了测出并行度,我把转账过程延长了,加了个sleep(2),使每个转账过程至少要花2毫秒(这只是个demo,真实环境下对数据库操作也很费时)。

测试代码如下:

//场景:多线程并发转账  
public class Test {  
    private final int[] account; // 账户数组,其索引为账户ID,内容为金额  
  
    public Test(int count, int money) {  
        account = new int[count];  
        Arrays.fill(account, money);  
    }  
  
    boolean transfer(int from, int to, int money) {  
        if (account[from] < money)  
            return false;  
        account[from] -= money;  
        try {  
            Thread.sleep(2);  
        } catch (Exception e) {  
        }  
        account[to] += money;  
        return true;  
    }  
      
    int getAmount() {  
        int result = 0;  
        for (int m : account)  
            result += m;  
        return result;  
    }  
  
    public static void main(String[] args) throws Exception {  
        int count = 100;        //账户个数  
        int money = 10000;      //账户初始金额  
        int threadNum = 8;      //转账线程数  
        int number = 10000;     //转账次数  
        int maxMoney = 1000;    //随机转账最大金额  
        Test test = new Test(count, money);  
          
        //不加锁  
//      Runner runner = test.new NonLockRunner(maxMoney, number);  
        //加synchronized锁  
//      Runner runner = test.new SynchronizedRunner(maxMoney, number);  
        //加ReentrantLock锁  
//      Runner runner = test.new ReentrantLockRunner(maxMoney, number);  
        //加KeyLock锁  
        Runner runner = test.new KeyLockRunner(maxMoney, number);  
          
        Thread[] threads = new Thread[threadNum];  
        for (int i = 0; i < threadNum; i++)  
            threads[i] = new Thread(runner, "thread-" + i);  
        long begin = System.currentTimeMillis();  
        for (Thread t : threads)  
            t.start();  
        for (Thread t : threads)  
            t.join();  
        long time = System.currentTimeMillis() - begin;  
        System.out.println("类型:" + runner.getClass().getSimpleName());  
        System.out.printf("耗时:%dms\n", time);  
        System.out.printf("初始总金额:%d\n", count * money);  
        System.out.printf("终止总金额:%d\n", test.getAmount());  
    }  
  
    // 转账任务  
    abstract class Runner implements Runnable {  
        final int maxMoney;  
        final int number;  
        private final Random random = new Random();  
        private final AtomicInteger count = new AtomicInteger();  
  
        Runner(int maxMoney, int number) {  
            this.maxMoney = maxMoney;  
            this.number = number;  
        }  
  
        @Override  
        public void run() {  
            while(count.getAndIncrement() < number) {  
                int from = random.nextInt(account.length);  
                int to;  
                while ((to = random.nextInt(account.length)) == from)  
                    ;  
                int money = random.nextInt(maxMoney);  
                doTransfer(from, to, money);  
            }  
        }  
  
        abstract void doTransfer(int from, int to, int money);  
    }  
  
    // 不加锁的转账  
    class NonLockRunner extends Runner {  
        NonLockRunner(int maxMoney, int number) {  
            super(maxMoney, number);  
        }  
  
        @Override  
        void doTransfer(int from, int to, int money) {  
            transfer(from, to, money);  
        }  
    }  
  
    // synchronized的转账  
    class SynchronizedRunner extends Runner {  
        SynchronizedRunner(int maxMoney, int number) {  
            super(maxMoney, number);  
        }  
  
        @Override  
        synchronized void doTransfer(int from, int to, int money) {  
            transfer(from, to, money);  
        }  
    }  
  
    // ReentrantLock的转账  
    class ReentrantLockRunner extends Runner {  
        private final ReentrantLock lock = new ReentrantLock();  
  
        ReentrantLockRunner(int maxMoney, int number) {  
            super(maxMoney, number);  
        }  
  
        @Override  
        void doTransfer(int from, int to, int money) {  
            lock.lock();  
            try {  
                transfer(from, to, money);  
            } finally {  
                lock.unlock();  
            }  
        }  
    }  
  
    // KeyLock的转账  
    class KeyLockRunner extends Runner {  
        private final KeyLock<Integer> lock = new KeyLock<Integer>();  
  
        KeyLockRunner(int maxMoney, int number) {  
            super(maxMoney, number);  
        }  
  
        @Override  
        void doTransfer(int from, int to, int money) {  
            Integer[] keys = new Integer[] {from, to};  
            Arrays.sort(keys);  
            lock.lock(keys);  
            try {  
                transfer(from, to, money);  
            } finally {  
                lock.unlock(keys);  
            }  
        }  
    }  
}  

最最重要的
测试结果

(8线程对100个账户随机转账总共10000次):

       类型:NonLockRunner(不加锁)
       耗时:2482ms
       初始总金额:1000000
       终止总金额:998906(无法保证原子性)

       类型:SynchronizedRunner(加synchronized锁)
       耗时:20872ms
       初始总金额:1000000
       终止总金额:1000000

       类型:ReentrantLockRunner(加ReentrantLock锁)
       耗时:21588ms
       初始总金额:1000000
       终止总金额:1000000

       类型:KeyLockRunner(加KeyLock锁)
       耗时:2831ms
       初始总金额:1000000
       终止总金额:1000000



    原文作者:java锁
    原文地址: https://blog.csdn.net/bpingchang/article/details/17336399
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞