zk分布锁的java实现

只做记录,直接上代码

父类:

package com.ylcloud.common.lock;

import com.alibaba.fastjson.JSON;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


/**
 * @author cjh
 * @Description: zk分布锁
 * @date: 2018/9/27 11:36
 */
public class ZkLock {

    private static Logger logger = LogManager.getLogger(ZkLock.class);

    public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181";

    public static Integer sessionTimeout = 30000;

    public static Integer connectTimeout = 30000;

    /**
     * 节点锁标记
     */
    public String lockPath;

    /**
     * 前一个节点(设置用户添加监听器)
     */
    public String beforeNode;

    /**
     * 当前执行节点(设置用于删除)
     */
    public String currentNode;

    /**
     * 当前请求节点
     */
    public String threadTag = null;

    private String lock1 = null;

    private String lock2 = null;

    public static final ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, sessionTimeout, connectTimeout, new SerializableSerializer());

    private static final ThreadLocal<String> NODES = new ThreadLocal<String>();

    public ZkLock() {
    }

    public void init(String code) {
        this.lockPath = code;
        this.lock1 = code + "LOCK";
        this.lock2 = code + "UNLOCK";
        client.deleteRecursive(lockPath);
    }

    public void lock() {

        synchronized (lock1) {
            if (!client.exists(lockPath)) {
                client.createPersistent(lockPath);
            }

            if (!tryLock()) {
            }

        }

    }

    public void unlock() {

        List<String> childrens = client.getChildren(lockPath);
        Collections.sort(childrens);

        String nodes = NODES.get();
        logger.info(JSON.toJSONString(childrens) + " ==== " + nodes + " ==== " + (nodes.equals(lockPath + '/' + childrens.get(0))));
        if (childrens.size() > 0 && nodes.equals(lockPath + '/' + childrens.get(0))) {
            client.delete(nodes);
        }

    }

    private boolean tryLock() {

        threadTag = client.createEphemeralSequential(lockPath + '/', "");

        NODES.set(threadTag);

        List<String> childrens = client.getChildren(lockPath);
        Collections.sort(childrens);

        currentNode = lockPath + '/' + childrens.get(0);

        if (threadTag.equals(currentNode)) {
            return true;
        } else {

            currentNode = threadTag;

            int wz = Collections.binarySearch(childrens, threadTag.substring(lockPath.length() + 1));
            beforeNode = lockPath + '/' + childrens.get(wz - 1);

            final CountDownLatch latch = new CountDownLatch(1);
            try {

                client.subscribeDataChanges(beforeNode, new IZkDataListener() {

                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        if (latch != null && latch.getCount() > 0) {
                            latch.countDown();
                        }
                    }

                    @Override
                    public void handleDataChange(String dataPath, Object data) throws Exception {
                    }
                });

                if (client.exists(beforeNode)) {
                    latch.await(sessionTimeout, TimeUnit.MILLISECONDS);
                }

            } catch (Exception e) {
                return true;
            } finally {
            }

        }
        return false;
    }

}

 

子类

package com.ylcloud.common.lock.ext;

import com.ylcloud.common.lock.ZkLock;

/**
  * @Description: 用户编码锁
  * @author cjh
  * @date: 2018/10/10 14:47
  */
public class ZkLockUserCode extends ZkLock {

    public ZkLockUserCode() {
        super.init("/USER_CODE");
    }

}

 

使用示例:

private ZkLock zkLock = new ZkLockUserCode();

public void addUser() {
    
    try {

        zkLock.lock();

            /**
             *业务实现
             */
            
    } catch (Exception e) {
        logger.info("err {} {} ", e.getMessage(), e.getCause());


    } finally {
        zkLock.unlock();
    }
        
}     

 

注意:unlock必须写在finally里面,否则一旦业务出现运行错误造成没有解锁,下一次访问的人就需要等待一个sessionTime了

题外话:zk在linux上启动命令 ./zkServer.sh start

 

转载请注明博客出处:http://www.cnblogs.com/cjh-notes/

    原文作者:代码风云
    原文地址: http://www.cnblogs.com/cjh-notes/p/9744183.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞