分布式锁,在高并发场景下,集群中各机器之间只能有一个线程处理某个操作,这样和java同一进程下的锁,就不能解决了。就用到了不同机器之间的分布式锁。
package com.test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
/**
* 测试高并发场景下,各个机器之间处理相同业务时,要确保只有一个线程在处理
* 用zookeeper原生的api实现
*
* @author zhb
*/
public class testThread implements Runnable {
// zookeeper集群的地址,ip后的逗号前后不能加空格
private static String connectString = "192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181";
private static int sessionTimeout = 3000;
//zookeeper链接时保持同步
private CountDownLatch cdl = new CountDownLatch(1);
// 高并发测试,count并发量
private static int count = 100;
private static CountDownLatch cd2 = new CountDownLatch(count);
@Override
public void run() {
cd2.countDown();
ZooKeeper zookeeper =null;
try {
cd2.await();
// 此过程是异步的
zookeeper = new ZooKeeper(connectString, sessionTimeout,
// 创建zookeeper连接时的监听器
new Watcher(){
@Override
public void process(WatchedEvent event) {
// 异步连接成功
if(event.getState() == KeeperState.SyncConnected){
// 释放阻塞
cdl.countDown();
}
}
}
);
// 等待链接成功再执行
cdl.await();
} catch (Exception e2) {
e2.printStackTrace();
}
//在根目录下要创建的节点
String path = "/test";
try {
// zookeeper创建节点是异步的
String create = zookeeper.create(path, "222".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 获取锁开始 处理业务
handler(zookeeper, path);
} catch (Exception e) {
//循环再次获取锁
againCreat(zookeeper, path);
}
}
/**
* 获取锁失败,已经被别人获取,需要再次获取
* @param zookeeper
* @param path
*/
private void againCreat(ZooKeeper zookeeper, String path) {
// 该功能可以实现分布式锁
// 创建节点失败说明该节点已经存在,该锁已经被别人持有 ephemeral
int count = 1;
while(true){
String createSuccessPath = null;
try {
createSuccessPath = zookeeper.create(path, "222".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 处理业务
handler(zookeeper, path);
} catch (Exception e1) {
count++;
System.err.println("抢锁次数:"+count);
//抢20次锁,抢不到锁就结束
if(count == 20){
System.err.println("分布式锁获取失败,请稍后再试");
break;
}
}
if(path.equals(createSuccessPath)){
try {
// 处理业务
handler(zookeeper, path);
} catch (Exception e1) {
System.err.println("--------------获取锁成功处理业务时失败------------");
}
break;
}
}
}
/**
* 获取锁成功开始处理业务
* @param zookeeper
* @param path
* @throws InterruptedException
*/
private static void handler(ZooKeeper zookeeper, String path) throws InterruptedException {
System.err.println("--------------分布式锁获取成功,可以进行其他运算------------");
System.err.println("正在处理业务------------");
// Thread.sleep(1000);
// 释放锁
try {
zookeeper.delete(path, -1);
} catch (Exception e) {
System.err.println("--------------释放锁出现异常------------");
}
System.err.println("--------------业务处理成功,锁已经释放------------");
}
// 启动main函数
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
// 并发测试
for(int i = 0; i<count; i++){
new Thread(new testThread()).start();
}
}
}