实现一个分布式调度系统-服务注册

上一篇主要介绍了调度模块的核心Quartz:
https://www.imooc.com/article/272332
因为我们最终要实现一个分布式的调度,所以这次我们会介绍服务的注册

前提

本次我们选用zookeeper来实现服务的注册
引入相关的依赖
    <curator.version>2.12.0</curator.version>
    <kryo-shaded.version>4.0.1</kryo-shaded.version>
<!-- ZK-->
      <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>${curator.version}</version>
      </dependency>
      <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-client</artifactId>
        <version>${curator.version}</version>
      </dependency>
      <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>${curator.version}</version>
      </dependency>
      <dependency>
        <groupId>com.esotericsoftware</groupId>
        <artifactId>kryo-shaded</artifactId>
        <version>${kryo-shaded.version}</version>
      </dependency>

原理

简单来说利用的是zk可以创建临时节点,临时节点的生命周期和客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除。

主要代码实现

1-客户端初始化
  public void init() {

        if (client != null) {
            return;
        }

        //启动zk客户端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

        client = CuratorFrameworkFactory.builder().connectString(zkProperties.getServer())
                .sessionTimeoutMs(10000).connectionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("admin").build();

        client.start();

        try {
            // 判断在admin命名空间下是否有jobRegister节点 /job-register 后续注册操作在此下面
            if (client.checkExists().forPath(zkProperties.getPath()) == null) {
                /** * 对于zk来讲,有两种类型的节点: * 持久节点: 当你创建一个节点的时候,这个节点就永远存在,除非你手动删除 * 临时节点: 你创建一个节点之后,会话断开,会自动删除,当然也可以手动删除 */
                client.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)        // 节点类型:持久节点
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)            // acl:匿名权限
                        .forPath(zkProperties.getPath());
            }
            log.info("zookeeper服务器状态:{}", client.getState());
            if(zkProperties.isOpen()){
                addChildWatch(zkProperties.getPath());
            }

            ShutDownHook.registerShutdownHook(this);//加入到hook事件
        } catch (Exception e) {
            log.error("zookeeper客户端连接、初始化错误...");
            e.printStackTrace();
        }
    }

2-监听事件
  /** * @描述 监听事件 * 永久监听指定节点下的节点,只能监听指定节点下一级节点的变化,可以监听到的事件:节点创建、节点数据的变化、节点删除等 */
    private void addChildWatch(String registerPath) throws Exception {
        pcache = new PathChildrenCache(client, registerPath, true);
        pcache.start();
        pcache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {

                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {

                    String path = event.getData().getPath();
                    log.info("{}注册新的执行器:{}", Constants.LOG_PREFIX, path);

                    addChildsWatch(path);

                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {

                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED)) {
                    log.info("{}重新启动zk", Constants.LOG_PREFIX);
                }

            }
        });
    }

    private PathChildrenCache cache = null;

    private void addChildsWatch(String registerPath) throws Exception {
        cache = new PathChildrenCache(client, registerPath, true);
        cache.start();
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {

                String data = new String(event.getData().getData());
                String executorName=registerPath.substring(registerPath.lastIndexOf(Constants.JOIN_SYMBOL) + 1,registerPath.length());
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {

                    log.info("{}执行器:{},有新服务加入:{}", Constants.LOG_PREFIX, executorName, data);
                    ServerCache.addCache(executorName,data);
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {

                    log.info("{}执行器:{},有服务退出", Constants.LOG_PREFIX, registerPath);
                    ServerCache.removeCache(executorName,data);
                }
            }
        });
    }

3-创建持久节点
public void createPersistentNode(String path) {

        try {
            if (client.checkExists().forPath(path) == null) {
                client.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)        // 节点类型:持久节点
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)            // acl:匿名权限
                        .forPath(path);
                log.info("{}create createPersistentNode:{}", Constants.LOG_PREFIX, path);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("zookeeper创建持久节点失败...{}", path);
        }

    }

4-创建临时顺序节点(和持久节点不同的是 , 临时节点的生命周期和客户端会话绑定 。 也就是说 , 如果客户端会话失效 , 那么这个节点就会自动被清除掉)
public void createPhemeralEphemeralNode(String path, String address) {

        try {
            if (client.checkExists().forPath(path) == null) {
                client.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)        // 临时顺序节点
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)            // acl:匿名权限
                        .forPath(path, address.getBytes());
                log.info("{}create createePhemeralEphemeralNode:{}", Constants.LOG_PREFIX, path);

            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("zookeeper创建临时顺序节点失败...{}", path);
        }

    }

5-获取值
public String getData(String path) {
        String dataPath = path;
        try {
            byte[] result = client.getData().forPath(dataPath);
            return new String(result);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

和SpringBoot的结合

如果在SpringBoot中我们想把服务注册作为一个模块,当其他项目引入时候自动可以启动,
我们可以把其做成一个AutoConfiguration的类
类似:
@Configuration
@EnableConfigurationProperties(ZkProperties.class)
public class ServiceRegistryAutoConfiguration {

    @Autowired
    private ZkProperties zkProperties;

    @Bean
    @ConditionalOnMissingBean(name = "serviceRegistry")
    @ConditionalOnProperty(value = {"faya-job.register.server", "faya-job.register.path"})
    public ServiceRegistry serviceRegistry() {

        log.info("init default register");

        ZKCuratorClient zkCuratorClient = new ZKCuratorClient(zkProperties);
        zkCuratorClient.init();
        ZkServiceRegistry zkServiceRegistry = new ZkServiceRegistry(zkCuratorClient, zkProperties);
        return zkServiceRegistry;

    }
}

关于上面使用的注解,大家可以查阅一下,比如ConditionalOnProperty是指项目中配置了指定的配置参数,此类才会生效。
关于SpringBoot的autoconfig自动配置后面会单独介绍,比如实现一个springboot-redis模块。

总结

利用zk实现一个服务注册很方便。
完整代码:https://github.com/lizu18xz/faya-job
点赞