Dubbo之ZookeeperRegistry源码分析

ZookeeperRegistry的作用

ZookeeperRegistry是dubbo中常用的注册中心实现,它主要作用通过Zookeeper的目录监听机制,让消费者能够实时得到在线的提供者列表。并且一些服务治理的功能也是通过zookeeper这个监听特性巧妙的完成。

在具体讲解ZookeeperRegistry的相关源码之前,先来分析下dubbo在zookeeper的目录结构以及dubbo如何利用这个特性

Zookeeper目录结构

dubbo在zookeeper建立的目录是基于接口的,大致如下

《Dubbo之ZookeeperRegistry源码分析》 image.png

针对每个接口节点会存在以下4个子节点

节点名作用子节点是否持久节点
consumers存储消费者节点url
configuators存储override或者absent url,用于服务治理
routers用于设置路由url,用于服务治理
providers存储在线提供者url

consumer节点存在的意义并不大,主要还是为了做监控
其他三个节点,都会设置被相应的监听器,发生改变时,会触发特定事件

Dubbo对Zookeeper监听机制的利用

Dubbo中通过ZookeeperClient的实现类来对zookeeper进行操作

《Dubbo之ZookeeperRegistry源码分析》 image.png

ZookeeperClient提供设置两种监听器的方法,对应子节点监听器和状态监听器,这里我们关注子节点监听器ChildListener

public interface ChildListener {

    /**
     *
     * @param path 监听的节点
     * @param children 监听的节点的所有子节点
     */
    void childChanged(String path, List<String> children);

}

ZookeeperClient有两种实现,第一种通过官方提供的jar包,第二个通过Apache的Curator框架,默认使用第二种,我们讲解的也是Curator的对应实现
添加子节点监听器的方法为addChildListener

public List<String> addChildListener(String path, final ChildListener listener) {
        //对listener做缓存,因为ChildListener是dubbo提供的监听器接口,需要转换为cruator的监听器接口
        ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
        if (listeners == null) {
            childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
            listeners = childListeners.get(path);
        }
        TargetChildListener targetListener = listeners.get(listener);
        if (targetListener == null) {
            //createTargetChildListener会对监听器进行转换
            listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
            targetListener = listeners.get(listener);
        }
        return addTargetChildListener(path, targetListener);
    }

Dubbo底层封装了2套Zookeeper API,所以通过ChildListener抽象了监听器,但是在实际调用时会通过createTargetChildListener转为对应框架的监听器实现
addTargetChildListener方法在添加监听器之后会返回监听path当前的所有的子节点

public List<String> addTargetChildListener(String path, CuratorWatcher listener) {
        try {
            //添加监听,并且返回这个目录当前所有子节点
            //这种监听方式是一次性的,在listener实现中会再次执行监听逻辑
            return client.getChildren().usingWatcher(listener).forPath(path);
        } catch (NoNodeException e) {
            return null;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

上述代码需要注意监听是一次性的,其实curator提供了TreeCache用作永久性的监听,这边不用到这个特性,应该是为了和官方API保持一致吧。
接下去看下Cruator监听器的封装

public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {
        return new CuratorWatcherImpl(listener);
    }
private class CuratorWatcherImpl implements CuratorWatcher {

        private volatile ChildListener listener;

        public CuratorWatcherImpl(ChildListener listener) {
            this.listener = listener;
        }

        public void unwatch() {
            this.listener = null;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            if (listener != null) {
                String path = event.getPath() == null ? "" : event.getPath();
                listener.childChanged(path,
                        // if path is null, curator using watcher will throw NullPointerException.
                        // if client connect or disconnect to server, zookeeper will queue
                        // watched event(Watcher.Event.EventType.None, .., path = null).
                        StringUtils.isNotEmpty(path)
                                //再次设置监听,并且把监听path的所有子节点传入childChanged方法
                                ? client.getChildren().usingWatcher(this).forPath(path)
                                : Collections.<String>emptyList());
            }
        }
    }

可以看到listener的触发逻辑以及入参来源

源码分析

《Dubbo之ZookeeperRegistry源码分析》 image.png

通过ZookeeperRegistry的类继承图,逐上而下的分析源码

Registry接口

public interface Registry extends Node, RegistryService {
}

Registry继承Node和RegistryService两个接口,本身不提供接口方法

public interface Node {

    /**
     * get url.
     *
     * @return url.
     */
    URL getUrl();

    /**
     * is available.
     *
     * @return available.
     */
    boolean isAvailable();

    /**
     * destroy.
     */
    void destroy();

}

Node约束了三个生命周期相关的方法
getUrl用于获取当前组件的url配置
isAvailable检测组件是否可用
destroy用于销毁组件

public interface RegistryService {

   
    void register(URL url);

    void unregister(URL url);

    void subscribe(URL url, NotifyListener listener);

    void unsubscribe(URL url, NotifyListener listener);

    List<URL> lookup(URL url);

}

RegistryService规定了和注册中心相关的方法
register和unregister用于提供者向注册中心注册提供者url
subscribe和unsubscribe用于消费者向对应接口目录注册监听
lookup用于查找查找url,通过消费者url查找提供者url以及服务治理有关的url

AbstractRegistry

主要提供接口提供者本地缓存功能
以及基础register,unregister,subscribe,unsubscribe,notify,lookup,recover逻辑

register,unregister会(接触)注册提供者url,主要操作

private final Set<URL> registered = new ConcurrentHashSet<URL>();

subscribe,unsubscribe则会针对特定url提供监听,主要操作

private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

notify方法会缓存最近通知的url到notified以及触发listener回调

/**
     * 这个方法不会直接触发,被FailbackRegistry重载
     * FailbackRegistry增加failback逻辑后,还是会调用这个方法
     * @param url
     * @param listener
     * @param urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        //根据url的category进行分类
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        //下面操作notified缓存
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            //对notified内容进行覆盖,相当于会保存上一次的通知
            categoryNotified.put(category, categoryList);
            //每次通知后会刷新本地缓存
            saveProperties(url);
            //进行listener回调,每种category的url分别回调一次
            listener.notify(categoryList);
        }
    }

这个类的recover方法不分析,因为FailbackRegistry完全重写了这个方法

FailbackRegistry

FailbackRegistry重载了AbstractRegistry中的subscribe,unsubscribe,register,unregister,notify方法,在AbstractRegistry的基础上提供了失败重试机制,并且暴露模板方法doRegister,doUnregister,doSubscribe,doUnsubscribe让不同类型的注册中心实现。doNotify还是默认父类的逻辑。
同时也重载了recover方法,通过FailbackRegistry的重试机制实现recover

以registry方法作为样例看下添加的重试机制

/**
     * register行为,提供者使用
     * 在AbstractRegistry的基础上,增加失败重试机制
     * @param url
     */
    @Override
    public void register(URL url) {
        super.register(url);
        //这里成功,会删除failedRegistered,failedUnregistered中的url
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            //具体register逻辑交给子类实现
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            //如果注册中心或者提供者url的check为false的话,跳过抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            //如果是注册的时候,抛出这个异常,那么也会忽略,只打日志
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            //加入到失败重试集合
            failedRegistered.add(url);
        }
    }

注册失败后会把需要注册重试的url放入failedRegistered集合
然后在FailbackRegistry构造函数中起的定时任务会进行重试

public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        //重试的定时线程,使用future用于取消
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

retry方法的具体逻辑,就是循环遍历这些失败集合,然后调用doXXX方法进行重试

recover方法会在和Zookeeper重连时触发,在断连状态下,dubbo进程内的注册,订阅行为是会被缓存下来的,然后对所有缓存的url进行重新注册,订阅。
这边有个细节点,可以看到failedRegistered这些集合使用的都是线程安全的集合

private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

因为recover,retry这两个操作还是存在资源竞争的,但不仅限于这两个操作

ZookeeperRegistry

ZookeeperRegistry的工作就是通过Zookeeper API实现doRegister,doUnregister,doSubscribe,doUnsubscribe具体逻辑

首先来看下ZookeeperRegistry的构造函数,做的主要工作是初始化zk客户端

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //如果不进行配置,默认dubbo根目录就是/dubbo
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        //zookeeper添加重连回调,会触发recover方法,进行失败任务重试
        //为什么FailbackRegistry都是用线程安全的集合,因为在这里存在线程竞争资源
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }

使用zookeeperTransporter扩展点加载zk客户端实现,默认为Curator框架

@SPI("curator")
public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}

注册/取消注册实现

然后再来看doRegister和doUnregister方法,对于zk来说,就是创建目录呗

/**
     * 注册的逻辑,就是在zookeeper创建节点,节点路径为toUrlPath(url)
     * 具体格式为 /{group}/{interfaceName}/{category}/{url.toFullString}
     * DYNAMIC_KEY表示是否创建永久节点,true表示不是,断开连接后会消失,所以需要进行recover
     * @param url
     */
    @Override
    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
/**
     * 取消注册,就是删除那个节点
     * @param url
     */
    @Override
    protected void doUnregister(URL url) {
        try {
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

需要注意下节点的路径生成格式,也就是toUrlPath(url)方法,格式为 /{group}/{interfaceName}/{category}/{url.toFullString},
group一般不配置的话为dubbo,
interfaceName对应具体接口,
category开始就讲过,分为consumers,configuators,routers,providers
url.toFullString就是我们的url配置

对于registry来讲category=providers

取消注册就是对应删除那个节点

订阅/取消订阅实现

订阅的行为对于消费者来讲,用于获取providers和routers,用于得到路由后的提供者
对于提供者来讲,订阅configuators,通过新的配置重新暴露
在ZookeeperRegistry,我们只关注如何进行订阅,具体监听器的作用,在用到的模块再讲
doSubscribe方法支持订阅全局和订阅特定接口
如果interface=*,即订阅全局,对于新增和已存在的所有接口的改动都会触发回调
如果interface=特定接口,那么只有这个接口的子节点改变时,才触发回调

@Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
                                child = URL.decode(child);
                                if (!anyServices.contains(child)) {
                                    anyServices.add(child);
                                    //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
                                    //这里是用来对/dubbo下面提供者新增时的回调,相当于增量
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                //添加监听器会返回子节点集合
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
                        //这里的逻辑只执行一次,一次全量
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                //这边是针对明确interface的订阅逻辑
                List<URL> urls = new ArrayList<URL>();
                //针对每种category路径进行监听
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        //封装回调逻辑
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //创建节点
                    zkClient.create(path, false);
                    //增加回调
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //如果有子节点,直接进行触发一次,对应AbstractRegsitry的lookup方法
                //意思就是第一次订阅,如果订阅目录存在子节点,直接会触发一次
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

这边需要注意一点的是,每次进行订阅,最重要的第一次,会使用当前订阅节点的子节点数据触发一次notify,执行对应监听器逻辑,这个在后面RegistryDirectory中会用到这个特性

取消订阅没什么好讲的,删除订阅数据即可

讲了这么多,对于lookup方法,使用消费者查找提供者的逻辑其实也很简单。使用消费者url构造出zk中provider的目录,然后返回所有子节点即可

/**
     * 查找消费者url 对应 提供者url实现
     * 这边的url为消费者url
     * @param url
     * @return
     */
    @Override
    public List<URL> lookup(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("lookup url == null");
        }
        try {
            List<String> providers = new ArrayList<String>();
            //返回inteface下面所有category的url
            for (String path : toCategoriesPath(url)) {
                List<String> children = zkClient.getChildren(path);
                if (children != null) {
                    providers.addAll(children);
                }
            }
            //返回匹配的url
            return toUrlsWithoutEmpty(url, providers);
        } catch (Throwable e) {
            throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

总结

1.Zookeeper监听器的妙用,在Elasticjob也是使用到了这个特性,进行任务触发
2.通过zookeeperTransporter以及ZookeeperClient对Zookeeper操作进行抽象,进而支持两种zookeeper客户端框架。包括在remoting模块也是采用这种设计模式,和底层框架解耦。
3.Zookeeper默认的监听是一次性的,Curator框架实现了永久监听,但是dubbo没用到Curator这个特性。
4.写完这部分,Dirctory模块就比较容易写下去了,东西太多,有些地方的理解肯定存在偏差,希望读者能多多交流

最后

希望大家关注下我的公众号

《Dubbo之ZookeeperRegistry源码分析》 image

    原文作者:土豆肉丝盖浇饭
    原文地址: https://www.jianshu.com/p/d89023a266be
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞