使用ZooKeeper实现Java跨JVM的分布式锁(优化构思)

一、使用ZooKeeper实现Java跨JVM的分布式锁

二、使用ZooKeeper实现Java跨JVM的分布式锁(优化构思)

三、使用ZooKeeper实现Java跨JVM的分布式锁(读写锁)

说明:这篇文章是基于 使用ZooKeeper实现Java跨JVM的分布式锁 的,没有阅读的朋友请先阅读前面的文章后在阅读本文。

上一篇文章中介绍了如何使用分布式锁,并且对原来的公平锁进行了扩展,实现了非公平锁,已经能够满足大部分跨进程(JVM)锁的需求了。

问题:我们都知道在单个JVM内部实现锁的机制很方便,Java也提供了很丰富的API可以实现,例如Synchronized关键字, ReentrantLock等等,但是在集群环境中,都是多个JVM协同工作,当需要一些全局锁时就要用到上面介绍的分布式锁了,但是这种锁的缺点在于每次客户端(这里说的客户端可以理解为不同JVM当中的线程)需要获取锁时都需要与zook服务端交互,创建znode,等待着自己获取锁,这种网络通信无疑会给服务器带来一定的压力,那么我们有没有什么办法来减少这种压力呢?

场景:有一种很常见的场景就是更新缓存,那么我们一般的处理逻辑如下。

1、 首选根据key获取资源,如果资源存在,用之。

2、如果不存在,则申请获取锁(使用共享锁)。

3、获取到锁以后,再次判断资源是否存在(防止重复更新),如果存在说明已经有人更新了,方法退出,否则更新缓存。

4、释放锁。

假设现在有10(1-10)个线程同时执行上诉逻辑,如果资源不存在,那么它们全部会执行第(2)步获取锁,在同一时刻,只会有1个线程获取锁,其它9个线程阻塞,等待获取锁。现在我们假设线程1获取到锁,开始执行(3-4)步动作,在第(3步)当中,再次判断资源是否存在,(肯定不存在因为它是第一个进去的),所以它负责加载资源放入缓存,然后释放锁, 再说其它线程(2-10)它们依次获取到锁,然后执行(3,4)动作,再次判断资源是否存在(已经存在了,因为1号线程已经放进去了),所以他们直接退出,释放锁。由此可见只有1号线程获取锁是有意义的,但是它们都需要与zook进行网络通讯,因此会给网络带来压力。

如果说我们有A,B 二台服务器进行集群,这10个线程获取锁的请求分别来自这2台服务器,例如(1-5)号线程来自A服务器, (6-10)号线程来自B服务器,那么它们与zk交互了10次,创建10个znode来申请锁,但是如果我们进行一定的优化,它们只要与zk交互2次就够了,我们来把上面的逻辑改造一下。

1、 首选根据key获取资源,如果资源存在,用之。

2、如果不存在,则申请获取锁(JVM进程内的锁)。

3、获取到锁(JVM进程内的锁),再次判断资源是否存在,如果资源存在就退出,没啥好所的。

4、如果资源不存在,则申请获取锁(分布式锁)。

5、获取到锁(分布式锁)再次判断资源是否存在(防止重复更新),如果存在说明已经有人更新了,方法退出,否则更新缓存。

6、释放分布式锁,释放JVM进程内的锁。

代码:我把实现逻辑都放在一起了,方便给大家演示,如果有逻辑错误欢迎大家留言指正。

package com.framework.code.demo.zook;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import com.framework.code.demo.zook.lock.NoFairLockDriver;

public class Main {
	
	//我们用一个static的map模拟一个第三方独立缓存
	public static Map<String, Object> redis = new HashMap<String, Object>();
	public static final String key = "redisKey";
	
	public static void main(String[] args) throws InterruptedException {
		//创建俩个对象分别模拟2个进程
		RedisProcess processA = new RedisProcess();
		RedisProcess processB = new RedisProcess();
		
		//每个进程别分用50个线程并发请求
		ExecutorService service = Executors.newFixedThreadPool(100);
		for (int i = 0; i < 50; i++) {
			service.execute(processA);
			service.execute(processB);
		}
		
		service.shutdown();
		service.awaitTermination(30, TimeUnit.SECONDS);
	}
	
	public static class RedisProcess implements Runnable {
		CuratorFramework client;
		//ZK分布式锁
		InterProcessMutex distributedLock;
		//JVM内部锁
		ReentrantLock jvmLock;
		
		public RedisProcess() {
			client = CuratorFrameworkFactory.newClient("192.168.1.18:2181", 
					new ExponentialBackoffRetry(1000,3));
			client.start();
			distributedLock = new InterProcessMutex(client,"/mylock", new NoFairLockDriver());
			jvmLock = new ReentrantLock();
		}

		@Override
		public void run() {
			//(1)首先判断缓存内资源是否存在
			if(redis.get(key) == null) {
				try {
					
					//这里延时1000毫秒的目的是防止线程过快的更新资源,那么其它线程在步骤(1)处就返回true了.
					Thread.sleep(1000);
					
					//获取JVM锁(同一进程内有效)
					jvmLock.lock();
					
					//(2)再次判断资源是否已经存在
					if(redis.get(key) == null) {
						System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)为空, 准备获取ZK锁");
						
						//这里延时500毫秒的目的是防止线程过快更新资源,其它线程在步骤(2)就返回true了。
						Thread.sleep(500);
						try {
							//获取zk分布式锁
							distributedLock.acquire();
							System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)为空, 获取到了ZK锁");

							//再次判断,如果为空这时可以更新资源
							if(redis.get(key) == null) {
								redis.put(key, Thread.currentThread() + "更新了缓存");
								System.out.println("线程:" + Thread.currentThread() + "更新了缓存");
							} else {
								System.out.println("线程:" + Thread.currentThread() + "当前资源已经存在,不需要更新");
							}
						} catch (Exception e) {
							e.printStackTrace();
						} finally {
							//释放ZK锁
							try {
								distributedLock.release();
							} catch (Exception e) {
								e.printStackTrace();
							}
						}
					} else {
						System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)不为空," + redis.get(key));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					//释放JVM锁
					jvmLock.unlock();
				}
			} else {
				System.out.println(redis.get(key));
			}
		}
	}

}

线程:Thread[pool-5-thread-2,5,main]获取到JVM锁,redis.get(key)为空, 准备获取ZK锁
线程:Thread[pool-5-thread-3,5,main]获取到JVM锁,redis.get(key)为空, 准备获取ZK锁
线程:Thread[pool-5-thread-3,5,main]获取到JVM锁,redis.get(key)为空, 获取到了ZK锁
线程:Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-7,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-1,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-5,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-9,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-23,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-19,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-11,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-31,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-35,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-15,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-27,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-25,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-33,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-37,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-13,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-17,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-2,5,main]获取到JVM锁,redis.get(key)为空, 获取到了ZK锁
线程:Thread[pool-5-thread-2,5,main]当前资源已经存在,不需要更新

线程:Thread[pool-5-thread-21,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-29,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-55,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-59,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-41,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-67,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-39,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-43,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-57,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-47,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-51,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-63,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-8,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-69,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-4,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-6,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-10,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-22,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-16,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-20,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-45,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-24,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-32,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-36,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-49,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-28,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-12,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-14,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-26,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-53,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-18,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-61,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-30,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-65,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-34,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-97,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-40,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-91,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-64,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-42,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-46,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-50,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-87,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-85,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-44,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-75,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-48,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-71,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-77,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-52,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-99,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-93,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-56,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-60,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-95,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-89,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-81,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-73,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-68,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-58,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-62,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-66,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-38,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-54,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-94,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-83,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-96,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-79,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-92,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-90,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-80,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-82,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-72,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-78,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-100,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-70,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-88,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-84,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-98,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-86,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-76,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

线程:Thread[pool-5-thread-74,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

我们通过观察日志,发现只有2个次需要获取分布式锁,其它的都被JVM锁给阻挡在外面了,在这种情况下可以大大的提高锁的性能。

    原文作者:java锁
    原文地址: https://blog.csdn.net/nimasike/article/details/51578163
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞