上一篇主要介绍了调度模块的核心Quartz:
https://www.imooc.com/article/272332
因为我们最终要实现一个分布式的调度,所以这次我们会介绍服务的注册
前提
本次我们选用zookeeper来实现服务的注册
引入相关的依赖
<curator.version>2.12.0</curator.version>
<kryo-shaded.version>4.0.1</kryo-shaded.version>
<!-- ZK-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>${kryo-shaded.version}</version>
</dependency>
原理
简单来说利用的是zk可以创建临时节点,临时节点的生命周期和客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除。
主要代码实现
1-客户端初始化
public void init() {
if (client != null) {
return;
}
//启动zk客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
client = CuratorFrameworkFactory.builder().connectString(zkProperties.getServer())
.sessionTimeoutMs(10000).connectionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("admin").build();
client.start();
try {
// 判断在admin命名空间下是否有jobRegister节点 /job-register 后续注册操作在此下面
if (client.checkExists().forPath(zkProperties.getPath()) == null) {
/** * 对于zk来讲,有两种类型的节点: * 持久节点: 当你创建一个节点的时候,这个节点就永远存在,除非你手动删除 * 临时节点: 你创建一个节点之后,会话断开,会自动删除,当然也可以手动删除 */
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) // 节点类型:持久节点
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // acl:匿名权限
.forPath(zkProperties.getPath());
}
log.info("zookeeper服务器状态:{}", client.getState());
if(zkProperties.isOpen()){
addChildWatch(zkProperties.getPath());
}
ShutDownHook.registerShutdownHook(this);//加入到hook事件
} catch (Exception e) {
log.error("zookeeper客户端连接、初始化错误...");
e.printStackTrace();
}
}
2-监听事件
/** * @描述 监听事件 * 永久监听指定节点下的节点,只能监听指定节点下一级节点的变化,可以监听到的事件:节点创建、节点数据的变化、节点删除等 */
private void addChildWatch(String registerPath) throws Exception {
pcache = new PathChildrenCache(client, registerPath, true);
pcache.start();
pcache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
String path = event.getData().getPath();
log.info("{}注册新的执行器:{}", Constants.LOG_PREFIX, path);
addChildsWatch(path);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED)) {
log.info("{}重新启动zk", Constants.LOG_PREFIX);
}
}
});
}
private PathChildrenCache cache = null;
private void addChildsWatch(String registerPath) throws Exception {
cache = new PathChildrenCache(client, registerPath, true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
String data = new String(event.getData().getData());
String executorName=registerPath.substring(registerPath.lastIndexOf(Constants.JOIN_SYMBOL) + 1,registerPath.length());
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
log.info("{}执行器:{},有新服务加入:{}", Constants.LOG_PREFIX, executorName, data);
ServerCache.addCache(executorName,data);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
log.info("{}执行器:{},有服务退出", Constants.LOG_PREFIX, registerPath);
ServerCache.removeCache(executorName,data);
}
}
});
}
3-创建持久节点
public void createPersistentNode(String path) {
try {
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) // 节点类型:持久节点
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // acl:匿名权限
.forPath(path);
log.info("{}create createPersistentNode:{}", Constants.LOG_PREFIX, path);
}
} catch (Exception e) {
e.printStackTrace();
log.error("zookeeper创建持久节点失败...{}", path);
}
}
4-创建临时顺序节点(和持久节点不同的是 , 临时节点的生命周期和客户端会话绑定 。 也就是说 , 如果客户端会话失效 , 那么这个节点就会自动被清除掉)
public void createPhemeralEphemeralNode(String path, String address) {
try {
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL) // 临时顺序节点
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // acl:匿名权限
.forPath(path, address.getBytes());
log.info("{}create createePhemeralEphemeralNode:{}", Constants.LOG_PREFIX, path);
}
} catch (Exception e) {
e.printStackTrace();
log.error("zookeeper创建临时顺序节点失败...{}", path);
}
}
5-获取值
public String getData(String path) {
String dataPath = path;
try {
byte[] result = client.getData().forPath(dataPath);
return new String(result);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
和SpringBoot的结合
如果在SpringBoot中我们想把服务注册作为一个模块,当其他项目引入时候自动可以启动,
我们可以把其做成一个AutoConfiguration的类
类似:
@Configuration
@EnableConfigurationProperties(ZkProperties.class)
public class ServiceRegistryAutoConfiguration {
@Autowired
private ZkProperties zkProperties;
@Bean
@ConditionalOnMissingBean(name = "serviceRegistry")
@ConditionalOnProperty(value = {"faya-job.register.server", "faya-job.register.path"})
public ServiceRegistry serviceRegistry() {
log.info("init default register");
ZKCuratorClient zkCuratorClient = new ZKCuratorClient(zkProperties);
zkCuratorClient.init();
ZkServiceRegistry zkServiceRegistry = new ZkServiceRegistry(zkCuratorClient, zkProperties);
return zkServiceRegistry;
}
}
关于上面使用的注解,大家可以查阅一下,比如ConditionalOnProperty是指项目中配置了指定的配置参数,此类才会生效。
关于SpringBoot的autoconfig自动配置后面会单独介绍,比如实现一个springboot-redis模块。
总结
利用zk实现一个服务注册很方便。
完整代码:https://github.com/lizu18xz/faya-job