Motan源码分析(二).服务提供与注册

Motan源码分析(二).服务提供与注册

1.简介

motan是新浪微博开源的服务治理框架,本系列的文章将分析它的底层源码,分析的源码版本为:1.1.0。源码地址:https://github.com/weibocom/motan
第一篇文章将以服务的发布和注册开始,注册服务使用zookeeper来分析。

2. 深入分析

本文涉及到的主要类和接口:MotanApiExportDemo、MotanDemoService、MotanDemoServiceImpl、ServiceConfig、RegistryConfig、ProtocolConfig、DefaultProvider、ZookeeperRegistryFactory、ZookeeperRegistry、SimpleConfigHandler、ProtocolFilterDecorator等。

2.1 从MotanApiExportDemo引出服务发布

首先来看demo源码:MotanApiExportDemo
demo中先后创建了ServiceConfig、RegistryConfig和ProtocolConfig相关的对象,其中ServiceConfig是我们提供服务的相关配置(每个服务一个配置,例如一个服务接口一个配置,本文中的具体服务是:MotanDemoServiceImpl)、RegistryConfig是注册中心相关的配置信息、ProtocolConfig是应用协议相关的配置(在客户端还负责集群相关的配置)。

ServiceConfig<MotanDemoService> motanDemoService = new ServiceConfig<MotanDemoService>();
// 设置接口及实现类
motanDemoService.setInterface(MotanDemoService.class);//设置服务接口,客户端在rpc调用时,会在协议中传递接口名称,从而实现与具体实现类一一对应
motanDemoService.setRef(new MotanDemoServiceImpl());//设置接口实现类,实际的业务代码

// 配置服务的group以及版本号
motanDemoService.setGroup("motan-demo-rpc");//服务所属的组
motanDemoService.setVersion("1.0");

// 配置ZooKeeper注册中心
RegistryConfig zookeeperRegistry = new RegistryConfig();
zookeeperRegistry.setRegProtocol("zookeeper");//使用zookeeper作为注册中心
zookeeperRegistry.setAddress("127.0.0.1:2181");//zookeeper的连接地址
motanDemoService.setRegistry(zookeeperRegistry);

// 配置RPC协议
ProtocolConfig protocol = new ProtocolConfig();
protocol.setId("motan");//使用motan应用协议
protocol.setName("motan");
motanDemoService.setProtocol(protocol);

motanDemoService.setExport("motan:8002");//本服务的监控端口号是8002
motanDemoService.export();//发布及在zookeeper上注册此服务

从上面的代码可知ServiceConfig类是服务的发布及注册的核心是motanDemoService.export()方法,我们来看一下此方法的实现细节:

public synchronized void export() {
    if (exported.get()) {
        LoggerUtil.warn(String.format("%s has already been expoted, so ignore the export request!", interfaceClass.getName()));
        return;
    }

    checkInterfaceAndMethods(interfaceClass, methods);

    List<URL> registryUrls = loadRegistryUrls();//加载注册中心的url,支持多个注册中心
    if (registryUrls == null || registryUrls.size() == 0) {
        throw new IllegalStateException("Should set registry config for service:" + interfaceClass.getName());
    }

    Map<String, Integer> protocolPorts = getProtocolAndPort();//通过设置的export来获取协议和端口
    for (ProtocolConfig protocolConfig : protocols) { //protocols为配置的rpc协议

        Integer port = protocolPorts.get(protocolConfig.getId());
        if (port == null) {
            throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", interfaceClass.getName(),
                    protocolConfig.getId()));
        }//判断一下需要发布的服务是否在有对应配置的rpc协议服务
        doExport(protocolConfig, port, registryUrls);//发布服务
    }

    afterExport();
}

方法中调用了doexport和afterExport方法:

private void doExport(ProtocolConfig protocolConfig, int port, List<URL> registryURLs) {
    String protocolName = protocolConfig.getName(); //获取设置的协议名称,此处为motan
    if (protocolName == null || protocolName.length() == 0) {
        protocolName = URLParamType.protocol.getValue();//名称为空,取默认值motan
    }

    String hostAddress = host;
    if (StringUtils.isBlank(hostAddress) && basicService != null) {
        hostAddress = basicService.getHost();
    }
    if (NetUtils.isInvalidLocalHost(hostAddress)) {
        hostAddress = getLocalHostAddress(registryURLs);//根据注册中心获取本机地址
    }

    Map<String, String> map = new HashMap<String, String>();

    map.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_SERVICE);//表明是提供服务
    map.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));

    collectConfigParams(map, protocolConfig, basicService, extConfig, this);
    collectMethodConfigParams(map, this.getMethods());
//根据url规则生成服务url:motan://192.168.25.237:8002/com.weibo.motan.demo.service.MotanDemoService?group=motan-demo-rpc
    URL serviceUrl = new URL(protocolName, hostAddress, port, interfaceClass.getName(), map);

    if (serviceExists(serviceUrl)) { //判断服务之前是否已经加载过

        LoggerUtil.warn(String.format("%s configService is malformed, for same service (%s) already exists ", interfaceClass.getName(),
                serviceUrl.getIdentity()));
        throw new MotanFrameworkException(String.format("%s configService is malformed, for same service (%s) already exists ",
                interfaceClass.getName(), serviceUrl.getIdentity()), MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);//抛出同名服务异常
    }

    List<URL> urls = new ArrayList<URL>();

    // injvm 协议只支持注册到本地,其他协议可以注册到local、remote
    if (MotanConstants.PROTOCOL_INJVM.equals(protocolConfig.getId())) {
        URL localRegistryUrl = null;
        for (URL ru : registryURLs) {
            if (MotanConstants.REGISTRY_PROTOCOL_LOCAL.equals(ru.getProtocol())) {
                localRegistryUrl = ru.createCopy();
                break;
            }
        }
        if (localRegistryUrl == null) {
            localRegistryUrl =
                    new URL(MotanConstants.REGISTRY_PROTOCOL_LOCAL, hostAddress, MotanConstants.DEFAULT_INT_VALUE,
                            RegistryService.class.getName());
        }

        urls.add(localRegistryUrl);
    } else {
        for (URL ru : registryURLs) {
            urls.add(ru.createCopy());
        }
    }

    for (URL u : urls) {
        u.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(serviceUrl.toFullStr()));
        registereUrls.add(u.createCopy());
    }
    //使用spi机制加载SimpleConfigHandler
    ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);

    exporters.add(configHandler.export(interfaceClass, ref, urls));//调用SimpleConfigHandler的export方法
}

再来看一下SimpleConfigHandler的export方法

public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {

    String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
    URL serviceUrl = URL.valueOf(serviceStr);

    // export service
    // 利用protocol decorator来增加filter特性
    String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());//此处protacolName为motan
    Protocol orgProtocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName);//根据SPI机制加载到DefaultRpcProtocol
    Provider<T> provider = getProvider(orgProtocol, ref, serviceUrl, interfaceClass);//服务的代理提供者,包装ref的服务

    Protocol protocol = new ProtocolFilterDecorator(orgProtocol); //对于Protocol对象增强filter

    Exporter<T> exporter = protocol.export(provider, serviceUrl);//发布服务,将代理对象provider与具体的serviceUrl关联

    // register service
    register(registryUrls, serviceUrl);//在注册中心注册服务

    return exporter;
}

其中getProvider方法如下:

protected <T> Provider<T> getProvider(Protocol protocol, T proxyImpl, URL url, Class<T> clz){
    if (protocol instanceof ProviderFactory){
        return ((ProviderFactory)protocol).newProvider(proxyImpl, url, clz);
    } else{
        return new DefaultProvider<T>(proxyImpl, url, clz);//服务的代理提供者,包装ref的服务
    }
}

下面我们来看一下,motan如何对filter进行相应的增强处理

public class ProtocolFilterDecorator implements Protocol {//实现Protocol的接口,联系到上文中使用此类对实际的Protocol进行包装

    private Protocol protocol;

    public ProtocolFilterDecorator(Protocol protocol) {
        if (protocol == null) {
            throw new MotanFrameworkException("Protocol is null when construct ProtocolFilterDecorator",
                    MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
        }
        this.protocol = protocol;//给实际的Protocol进行赋值
    }

    @Override
    public <T> Exporter<T> export(Provider<T> provider, URL url) {
        return protocol.export(decorateWithFilter(provider, url), url);//发布服务时,调用filter增强处理方法
    }

   
    private <T> Provider<T> decorateWithFilter(final Provider<T> provider, URL url) {
        List<Filter> filters = getFilters(url, MotanConstants.NODE_TYPE_SERVICE);//获取实际需要增强的filter

        if (filters == null || filters.size() == 0) {
            return provider;
        }
        Provider<T> lastProvider = provider;
        for (Filter filter : filters) {//对于代理对象provider进行包装,包装成一个provider链,返回最后一个provider
            final Filter f = filter;
            if (f instanceof InitializableFilter) {
                ((InitializableFilter) f).init(lastProvider);
            }
            final Provider<T> lp = lastProvider;
            lastProvider = new Provider<T>() {
                @Override
                public Response call(Request request) {
                    return f.filter(lp, request);
                }

                @Override
                public String desc() {
                    return lp.desc();
                }

                @Override
                public void destroy() {
                    lp.destroy();
                }

                @Override
                public Class<T> getInterface() {
                    return lp.getInterface();
                }

                @Override
                public Method lookupMethod(String methodName, String methodDesc) {
                    return lp.lookupMethod(methodName, methodDesc);
                }

                @Override
                public URL getUrl() {
                    return lp.getUrl();
                }

                @Override
                public void init() {
                    lp.init();
                }

                @Override
                public boolean isAvailable() {
                    return lp.isAvailable();
                }

            @Override
            public T getImpl() {
               return provider.getImpl();
            }
            };
        }
        return lastProvider;
    }

    /**
     * <pre>
    * 获取方式:
    * 1)先获取默认的filter列表;
    * 2)根据filter配置获取新的filters,并和默认的filter列表合并;
    * 3)再根据一些其他配置判断是否需要增加其他filter,如根据accessLog进行判断,是否需要增加accesslog
    * </pre>
     *
     * @param url
     * @param key
     * @return
     */
    private List<Filter> getFilters(URL url, String key) {

        // load default filters
        List<Filter> filters = new ArrayList<Filter>();
        List<Filter> defaultFilters = ExtensionLoader.getExtensionLoader(Filter.class).getExtensions(key);//使用spi机制初始化filer对象
        if (defaultFilters != null && defaultFilters.size() > 0) {
            filters.addAll(defaultFilters);
        }

        // add filters via "filter" config
        String filterStr = url.getParameter(URLParamType.filter.getName());
        if (StringUtils.isNotBlank(filterStr)) {
            String[] filterNames = MotanConstants.COMMA_SPLIT_PATTERN.split(filterStr);
            for (String fn : filterNames) {
                addIfAbsent(filters, fn);
            }
        }

        // add filter via other configs, like accessLog and so on
        boolean accessLog = url.getBooleanParameter(URLParamType.accessLog.getName(), URLParamType.accessLog.getBooleanValue());
        if (accessLog) {
            addIfAbsent(filters, AccessLogFilter.class.getAnnotation(SpiMeta.class).name());
        }

        // sort the filters
        Collections.sort(filters, new ActivationComparator<Filter>());
        Collections.reverse(filters);
        return filters;
    }

继续查看 Exporter<T> exporter = protocol.export(provider, serviceUrl);这段,上文提到demo中获取的protocol为
DefaultRpcProtocol,来到父类AbstractProtocol,可以看到export方法

public <T> Exporter<T> export(Provider<T> provider, URL url) {
    if (url == null) {
        throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: url is null",
                MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
    }

    if (provider == null) {
        throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: provider is null, url=" + url,
                MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
    }

    String protocolKey = MotanFrameworkUtil.getProtocolKey(url);// protocol://host:port/group/interface/version


    synchronized (exporterMap) {
        Exporter<T> exporter = (Exporter<T>) exporterMap.get(protocolKey);

        if (exporter != null) {
            throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: service already exist, url=" + url,
                    MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);//判断是否已经发布过,发布过则抛存在的异常
        }

        exporter = createExporter(provider, url);
        exporter.init();

        exporterMap.put(protocolKey, exporter);

        LoggerUtil.info(this.getClass().getSimpleName() + " export Success: url=" + url);

        return exporter;
    }


}

回到子类看createExporter方法

@Override
protected <T> Exporter<T> createExporter(Provider<T> provider, URL url) {
    return new DefaultRpcExporter<T>(provider, url, this.ipPort2RequestRouter, this.exporterMap);
}

继续进入DefaultRpcExporter方法:

public DefaultRpcExporter(Provider<T> provider, URL url, ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter,
                          ConcurrentHashMap<String, Exporter<?>> exporterMap) {
    super(provider, url);
    this.exporterMap = exporterMap;
    this.ipPort2RequestRouter = ipPort2RequestRouter;

    ProviderMessageRouter requestRouter = initRequestRouter(url);
    endpointFactory =
            ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                    url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));
// 通过spi根据接口EndpointFactory获取到实现类NettyEndpointFactory(注意此配置不在motanc-core模块中,而在motan-transport相关模块中)
    server = endpointFactory.createServer(url, requestRouter);
}

以此跟踪NettyEndpointFactory类,从其父类AbstractEndpointFactory中可以看到createServer的代码
com.weibo.api.motan.transport.support.AbstractEndpointFactory.java

public Server createServer(URL url, MessageHandler messageHandler) {
    messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler);

    synchronized (ipPort2ServerShareChannel) {
        String ipPort = url.getServerPortStr();
        String protocolKey = MotanFrameworkUtil.getProtocolKey(url);

        boolean shareChannel =
                url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue());

        if (!shareChannel) { // 独享一个端口
            LoggerUtil.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);

            // 如果端口已经被使用了,使用该server bind 会有异常
            return innerCreateServer(url, messageHandler);
        }

        LoggerUtil.info(this.getClass().getSimpleName() + " create share_channel server: url={}", url);

        Server server = ipPort2ServerShareChannel.get(ipPort);

        if (server != null) {
            // can't share service channel
            if (!MotanFrameworkUtil.checkIfCanShallServiceChannel(server.getUrl(), url)) {
                throw new MotanFrameworkException(
                        "Service export Error: share channel but some config param is different, protocol or codec or serialize or maxContentLength or maxServerConnection or maxWorkerThread or heartbeatFactory, source="
                                + server.getUrl() + " target=" + url, MotanErrorMsgConstant.FRAMEWORK_EXPORT_ERROR);
            }

            saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);

            return server;
        }

        url = url.createCopy();
        url.setPath(""); // 共享server端口,由于有多个interfaces存在,所以把path设置为空

        server = innerCreateServer(url, messageHandler);//不存在server的时候创建server

        ipPort2ServerShareChannel.put(ipPort, server);
        saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);

        return server;
    }
}

在具体实现类中可以看到:

protected Server innerCreateServer(URL url, MessageHandler messageHandler) {
    return new NettyServer(url, messageHandler);
}

在NettySever中可以看到通过netty创建服务的代码,此处就不继续跟踪了。

2.2.注册服务到注册中心

回到刚才 com.weibo.api.motan.config.handler.SimpleConfigHandler中的export服务,可以看到最后一句 register(registryUrls, serviceUrl);
进入该方法:


private void register(List<URL> registryUrls, URL serviceUrl) {

    for (URL url : registryUrls) {//循环便利多个注册中心的信息
        // 根据check参数的设置,register失败可能会抛异常,上层应该知晓
        RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(url.getProtocol());//根据SPI获取注册方法,此处是ZookeeperRegistryFactory
        if (registryFactory == null) {
            throw new MotanFrameworkException(new MotanErrorMsg(500, MotanErrorMsgConstant.FRAMEWORK_REGISTER_ERROR_CODE,
                    "register error! Could not find extension for registry protocol:" + url.getProtocol()
                            + ", make sure registry module for " + url.getProtocol() + " is in classpath!"));
        }
        Registry registry = registryFactory.getRegistry(url);//获取registry
        registry.register(serviceUrl);//将服务注册到zookeeper,也就是把节点信息写入到zookeeper中
    }
}

我们来看一下zookeeper注册中心的工厂类:每个Registry都需要独立维护一个ZkClient与zookeeper的链接

@SpiMeta(name = "zookeeper")
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    @Override
    protected Registry createRegistry(URL registryUrl) {
        try {
            int timeout = registryUrl.getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
            int sessionTimeout =
                    registryUrl.getIntParameter(URLParamType.registrySessionTimeout.getName(),
                            URLParamType.registrySessionTimeout.getIntValue());
            ZkClient zkClient = new ZkClient(registryUrl.getParameter("address"), sessionTimeout, timeout);//创建zookeeper的客户端
            return new ZookeeperRegistry(registryUrl, zkClient);//创建zookeeper的客户端
        } catch (ZkException e) {
            LoggerUtil.error("[ZookeeperRegistry] fail to connect zookeeper, cause: " + e.getMessage());
            throw e;
        }
    }
}

我们再来分析ZookeeperRegistry中的代码

public ZookeeperRegistry(URL url, ZkClient client) {
    super(url);
    this.zkClient = client;
    IZkStateListener zkStateListener = new IZkStateListener() {
        @Override
        public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
            // do nothing
        }

        @Override
        public void handleNewSession() throws Exception {
            LoggerUtil.info("zkRegistry get new session notify.");
            reconnectService();//重新注册服务
            reconnectClient();
        }
    };
    zkClient.subscribeStateChanges(zkStateListener);
    ShutDownHook.registerShutdownHook(this);
}
private void reconnectService() {
    Collection<URL> allRegisteredServices = getRegisteredServiceUrls();
    if (allRegisteredServices != null && !allRegisteredServices.isEmpty()) {
        try {
            serverLock.lock();
            for (URL url : getRegisteredServiceUrls()) {
                doRegister(url);//注册
            }
            LoggerUtil.info("[{}] reconnect: register services {}", registryClassName, allRegisteredServices);

            for (URL url : availableServices) {
                if (!getRegisteredServiceUrls().contains(url)) {
                    LoggerUtil.warn("reconnect url not register. url:{}", url);
                    continue;
                }
                doAvailable(url);//标识服务可以提供服务
            }
            LoggerUtil.info("[{}] reconnect: available services {}", registryClassName, availableServices);
        } finally {
            serverLock.unlock();
        }
    }
}
protected void doRegister(URL url) {
    try {
        serverLock.lock();
        // 防止旧节点未正常注销
        removeNode(url, ZkNodeType.AVAILABLE_SERVER);
        removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
        createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
    } catch (Throwable e) {
        throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
    } finally {
        serverLock.unlock();
    }
}
protected void doAvailable(URL url) {
    try{
        serverLock.lock();
        if (url == null) {
            availableServices.addAll(getRegisteredServiceUrls());
            for (URL u : getRegisteredServiceUrls()) {
                removeNode(u, ZkNodeType.AVAILABLE_SERVER);
                removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
                createNode(u, ZkNodeType.AVAILABLE_SERVER);//创建节点
            }
        } else {
            availableServices.add(url);
            removeNode(url, ZkNodeType.AVAILABLE_SERVER);
            removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
            createNode(url, ZkNodeType.AVAILABLE_SERVER);
        }
    } finally {
        serverLock.unlock();
    }
}
private void createNode(URL url, ZkNodeType nodeType) {
    String nodeTypePath = ZkUtils.toNodeTypePath(url, nodeType);
    if (!zkClient.exists(nodeTypePath)) {
        zkClient.createPersistent(nodeTypePath, true);//对于服务的标识信息,创建持久化节点
    }
    zkClient.createEphemeral(ZkUtils.toNodePath(url, nodeType), url.toFullStr());//对于服务的ip和端口号信息使用临时节点,当服务断了后,zookeeper自动摘除目标服务器
}

3.总结

本文分析了motan的服务发布及注册到zookeeper的流程相关的源码,主要涉及到的知识点:
1.利用相关的配置对象进行信息的存储及传递;
2.利用provider对具体的业务类进行封装代理;
3.利用filter链的结构,来包装实际的provider,把所有的过滤器都处理完毕后,最后调用实际的业务类,大家可以想象一下aop相关的原理,有些类似;
4.代码中大量使用jdk的标准spi技术进行类的加载;
5.支持多个注册中心,也就是同一个服务可以注册到不同的注册中心上,每个registry对应一个具体的zkclient;
6.利用了zookeeper的临时节点来维护服务器的host和port信息;
7.支持多个服务发布到同一个端口,在本文中并没分析netty使用相关的代码,后面会分析到。

4 .参考文章:

http://www.cnblogs.com/mantu/p/5872793.html

欢迎大家扫码关注我的微信公众号,与大家一起分享技术与成长中的故事。

《Motan源码分析(二).服务提供与注册》 我的微信公众号.jpg

    原文作者:shunyang
    原文地址: https://www.jianshu.com/p/8d26b969c9d8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞