只做记录,直接上代码
父类:
package com.ylcloud.common.lock; import com.alibaba.fastjson.JSON; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author cjh * @Description: zk分布锁 * @date: 2018/9/27 11:36 */ public class ZkLock { private static Logger logger = LogManager.getLogger(ZkLock.class); public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181"; public static Integer sessionTimeout = 30000; public static Integer connectTimeout = 30000; /** * 节点锁标记 */ public String lockPath; /** * 前一个节点(设置用户添加监听器) */ public String beforeNode; /** * 当前执行节点(设置用于删除) */ public String currentNode; /** * 当前请求节点 */ public String threadTag = null; private String lock1 = null; private String lock2 = null; public static final ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, sessionTimeout, connectTimeout, new SerializableSerializer()); private static final ThreadLocal<String> NODES = new ThreadLocal<String>(); public ZkLock() { } public void init(String code) { this.lockPath = code; this.lock1 = code + "LOCK"; this.lock2 = code + "UNLOCK"; client.deleteRecursive(lockPath); } public void lock() { synchronized (lock1) { if (!client.exists(lockPath)) { client.createPersistent(lockPath); } if (!tryLock()) { } } } public void unlock() { List<String> childrens = client.getChildren(lockPath); Collections.sort(childrens); String nodes = NODES.get(); logger.info(JSON.toJSONString(childrens) + " ==== " + nodes + " ==== " + (nodes.equals(lockPath + '/' + childrens.get(0)))); if (childrens.size() > 0 && nodes.equals(lockPath + '/' + childrens.get(0))) { client.delete(nodes); } } private boolean tryLock() { threadTag = client.createEphemeralSequential(lockPath + '/', ""); NODES.set(threadTag); List<String> childrens = client.getChildren(lockPath); Collections.sort(childrens); currentNode = lockPath + '/' + childrens.get(0); if (threadTag.equals(currentNode)) { return true; } else { currentNode = threadTag; int wz = Collections.binarySearch(childrens, threadTag.substring(lockPath.length() + 1)); beforeNode = lockPath + '/' + childrens.get(wz - 1); final CountDownLatch latch = new CountDownLatch(1); try { client.subscribeDataChanges(beforeNode, new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (latch != null && latch.getCount() > 0) { latch.countDown(); } } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }); if (client.exists(beforeNode)) { latch.await(sessionTimeout, TimeUnit.MILLISECONDS); } } catch (Exception e) { return true; } finally { } } return false; } }
子类
package com.ylcloud.common.lock.ext; import com.ylcloud.common.lock.ZkLock; /** * @Description: 用户编码锁 * @author cjh * @date: 2018/10/10 14:47 */ public class ZkLockUserCode extends ZkLock { public ZkLockUserCode() { super.init("/USER_CODE"); } }
使用示例:
private ZkLock zkLock = new ZkLockUserCode(); public void addUser() { try { zkLock.lock(); /** *业务实现 */ } catch (Exception e) { logger.info("err {} {} ", e.getMessage(), e.getCause()); } finally { zkLock.unlock(); } }
注意:unlock必须写在finally里面,否则一旦业务出现运行错误造成没有解锁,下一次访问的人就需要等待一个sessionTime了
题外话:zk在linux上启动命令 ./zkServer.sh start
转载请注明博客出处:http://www.cnblogs.com/cjh-notes/