基于Zookeeper构建分布式RPC框架

新年伊始,为了让自己快速的进入工作状态。决定自己动手构建一套分布式RPC框架。

由于是独立的RPC框架,所以采用Zookeeper做注册中心,使用Netty做服务处理。由于Netty是NIO框架,在处理网络请求等待结果返回的时候着实需要一番大改动。

  • 注册中心

    基于Zookeeper做注册中心的实现其实是比较简单的。

    具体的实现逻辑:在服务启动时扫描是否包含RpcService注解的类。然后将该类注解的serverName属性拿到注册到Zookeeper节点。再拿到该服务所在机器(或者容器)的IP,将该IP以临时节点的角色注册到Zookeeper上。同时客户端启动时监听Zookeeper节点改变事件,并及时刷新可用的服务列表。

    代码实现

    IRegisterCenterProvider //服务端逻辑

package one.bugu.zookeeper.rpc.framework.service.zookeeper;

import java.util.List;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/14
 * Time: 14:30
 * Description:
 */
public interface IRegisterCenterProvider {

    /**
     * 服务端获取服务提供者信息
     *
     * 返回对象:key:服务提供者接口,value:服务提供者IP列表
     * @param serverName
     * @param ips
     */
    void registerProvider(String serverName, List<String> ips);

    /**
     * 更新服务端提供者的信息
     */
    void updateProvider();
}


RegisterCenterProviderImpl //服务端逻辑实现

package one.bugu.zookeeper.rpc.framework.service.zookeeper;

import one.bugu.zookeeper.rpc.framework.annotations.RpcService;
import one.bugu.zookeeper.rpc.framework.service.RpcServiceConfiguration;
import one.bugu.zookeeper.rpc.framework.service.socket.ServiceSocket;
import one.bugu.zookeeper.rpc.framework.util.SpringContextUtil;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZooKeeperHelper;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZookeeperConfiguration;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.*;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/14
 * Time: 14:31
 * Description:
 */
public class RegisterCenterProviderImpl implements IRegisterCenterProvider {

    /**
     * zookeeper配置
     */
    private ZookeeperConfiguration zookeeperConfiguration;
    /**
     * rpc配置
     */
    private RpcServiceConfiguration rpcServiceConfiguration;

    /**
     * zookeeper连接器
     */
    private ZooKeeperHelper zooKeeperHelper;

    /**
     * 保存service对应的Bean
     * 接收到客户端请求时,可快速的找到处理的bean
     */
    public static Map<String, Object> serverBean = new HashMap<>();

    private static Logger logger = LoggerFactory.getLogger(RegisterCenterProviderImpl.class);

    /**
     * netty服务
     */
    private Thread socketThread;

    public RegisterCenterProviderImpl(ZookeeperConfiguration zookeeperConfiguration, RpcServiceConfiguration rpcServiceConfiguration) {
        this.zookeeperConfiguration = zookeeperConfiguration;
        this.rpcServiceConfiguration = rpcServiceConfiguration;
    }

    @Override
    public void registerProvider(String serverName, List<String> ips) {
        if (zooKeeperHelper == null) {
            zooKeeperHelper = new ZooKeeperHelper(zookeeperConfiguration.getUrl(), new ServerNodeChangeWatcher());
        }
        String path = zookeeperConfiguration.getPath() + "/" + serverName;
        if (!zooKeeperHelper.existsNode(path)) {
            String resultPath = zooKeeperHelper.createNode(path, null);
            if (StringUtils.isEmpty(resultPath)) {
                logger.info("RPC服务注册失败,服务名:{}", serverName);
                return;
            }
        }
        for (String ip : ips) {
            String node = path + "/" + ip;
            if (!zooKeeperHelper.existsNode(node)) {
                String resultPath = zooKeeperHelper.createTempNode(node, null);
                if (StringUtils.isEmpty(resultPath)) {
                    logger.info("RPC服务注册失败,服务名:{},IP地址:{}", serverName, ip);
                }
            }
        }
    }


    public void updateProvider() {
        Map<String, Object> beansWithAnnotation = SpringContextUtil.getApplicationContext().getBeansWithAnnotation(RpcService.class);
        if (beansWithAnnotation == null || beansWithAnnotation.isEmpty()) {
            return;
        }
        //Socket地址
        serverBean.clear();
        List<String> ips = getIp();
        for (String key : beansWithAnnotation.keySet()) {
            String serverName = beansWithAnnotation.get(key).getClass().getAnnotation(RpcService.class).name();
            serverBean.put(serverName, beansWithAnnotation.get(key));
            registerProvider(serverName, ips);
        }
        if (socketThread == null) {
            socketThread = new Thread(new ServiceSocket(rpcServiceConfiguration));
        }
        if (!socketThread.isAlive()) {
            socketThread.start();
        }
    }


    /**
     * 获取IP
     *
     * @return IP地址:端口号
     */
    public List<String> getIp() {
        List<String> host_ip_list = new ArrayList<String>();
        try {
            for (NetworkInterface networkInterface : Collections
                    .list(NetworkInterface.getNetworkInterfaces())) {
                for (InetAddress addr : Collections.list(networkInterface.getInetAddresses())) {
                    if (!addr.isLoopbackAddress() && !addr.isLinkLocalAddress() && addr.isSiteLocalAddress()) {
                        host_ip_list.add(addr.getHostAddress() + ":" + rpcServiceConfiguration.getPort());
                    }
                }
            }
        } catch (SocketException e) {
            logger.error("获取IP地址异常", e);
        }
        return host_ip_list;
    }

    class ServerNodeChangeWatcher implements Watcher {
        public void process(WatchedEvent event) {
            if (event.getState() == Event.KeeperState.SyncConnected) {
                logger.info("Watch received SyncConnected event");
                updateProvider();
            }
            if (event.getState() == Event.KeeperState.Disconnected) {
                logger.info("Watch received Disconnected event");
                zooKeeperHelper = null;
            }
        }
    }
}

IRegisterCenterInvoker //客户端逻辑

package one.bugu.zookeeper.rpc.framework.client.zookeeper;

import java.util.List;
import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/14
 * Time: 14:27
 * Description:
 */
public interface IRegisterCenterInvoker {

    /**
     * 消费端初始化服务提供者信息本地缓存
     */
    public void initProviderMap();

    public void updateProviderMap();

    /**
     * 消费端获取服务提供者信息
     */
    public Map<String, List<String>> getProviderMap();

}

RegisterCenterInvokerImpl //客户端逻辑实现

package one.bugu.zookeeper.rpc.framework.client.zookeeper;

import one.bugu.zookeeper.rpc.framework.zookeeper.ZooKeeperHelper;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZookeeperConfiguration;
import com.google.gson.Gson;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/14
 * Time: 15:34
 * Description:
 */
public class RegisterCenterInvokerImpl implements IRegisterCenterInvoker {

    private Gson gson = new Gson();

    /**
     * zookeeper配置
     */
    private ZookeeperConfiguration zookeeperConfiguration;

    /**
     * 保存所有的service以及对应的Netty提供者的IP:PORT
     */
    private Map<String, List<String>> providerMap;

    private static Logger logger = LoggerFactory.getLogger(RegisterCenterInvokerImpl.class);

    /**
     * Zookeeper连接器
     */
    private ZooKeeperHelper zooKeeperHelper;

    public RegisterCenterInvokerImpl(ZookeeperConfiguration zookeeperConfiguration) {
        this.zookeeperConfiguration = zookeeperConfiguration;
    }


    @Override
    public void initProviderMap() {
        zooKeeperHelper = new ZooKeeperHelper(zookeeperConfiguration.getUrl(), new ClientNodeChangeWatcher());
        providerMap = new ConcurrentHashMap<>();
        updateProviderMap();

    }

    @Override
    public void updateProviderMap() {
        synchronized (this){
            providerMap.clear();
            List<String> serverList = zooKeeperHelper.getChildren(zookeeperConfiguration.getPath());
            if (serverList == null || serverList.isEmpty()) {
                return;
            }
            for (String server : serverList) {
                String serverPath = zookeeperConfiguration.getPath() + "/" + server;
                List<String> providerList = zooKeeperHelper.getChildren(serverPath);
                if (providerList!=null&&!providerList.isEmpty()){
                    providerMap.put(server,providerList);
                }
            }
            logger.info("update socket server finish. server list is:{}", gson.toJson(providerMap));
        }
    }

    @Override
    public Map<String, List<String>> getProviderMap() {
        return providerMap;
    }


    class ClientNodeChangeWatcher implements Watcher {

        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeChildrenChanged) {
                logger.info("Watch received NodeChildrenChanged event");
                updateProviderMap();
            }
        }
    }
}
  • Netty处理

Netty服务则是在检测到项目中包含@RpcServer时启动Netty服务。在检测到正在执行含有@RpcClient注解的类方法时拦截该方法,如果还未和服务端建立连接,则建立长连接,进行RPC通信(通信完成连接不断开,考虑到是服务端RPC通信,此处长连接更适合),调用服务端代码,如返回有正确结果则替换服务端返回的结果,异常时继续调用客户端方法的返回结果。

代码实现

ClientAspect //客户端拦截器

package one.bugu.zookeeper.rpc.framework.aspect;

import one.bugu.zookeeper.rpc.framework.annotations.RpcClient;
import one.bugu.zookeeper.rpc.framework.client.socket.ClientRequestPool;
import com.google.gson.Gson;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/12
 * Time: 18:28
 * Description:客户端拦截器
 */
@Aspect
@Component
public class ClientAspect {

    @Autowired
    private ClientRequestPool clientRequestPool;

    private Gson gson = new Gson();

    private Logger logger = LoggerFactory.getLogger(ClientAspect.class);

    /**
     * 切入所有注有RpcClient注解实体类的方法
     * @param pjp
     * @return
     * @throws Throwable
     */
    @Around("@within(one.bugu.zookeeper.rpc.framework.annotations.RpcClient)")
    public Object doSocket(ProceedingJoinPoint pjp) throws Throwable {
        RpcClient an = (RpcClient) pjp.getSignature().getDeclaringType().getAnnotation(RpcClient.class);
        String serverName = an.serverName();
        String ip = an.serverIp();
        String method = pjp.getSignature().getName();
        String resultObject;
        if (StringUtils.isEmpty(ip)) {
            resultObject = clientRequestPool.send(serverName, method, pjp.getArgs());
        } else {
            resultObject = clientRequestPool.send(serverName, ip, method, pjp.getArgs());
        }
        Object object = pjp.proceed();
        if (resultObject != null) {
            try {
                return gson.fromJson(resultObject, object.getClass());
            } catch (Exception e) {
                logger.info("RPC接收结果转换异常:{}", resultObject);
                return object;
            }
        }
        return object;
    }

}

大体上的思路就是这样,具体代码当然少不了封装。这里就不过多的贴代码了,有兴趣的伙伴可以去LangK开源 / RpcFramework下载项目。

通过该项目,熟悉了使用Zookeeper做注册中心的关键功能。也对Netty框架NIO有了更深刻的理解。算是为2019年有个好的开始。

当然如果你有好的设计思想,可以一起参与完善该项目。
    原文作者:LangK
    原文地址: https://www.jianshu.com/p/32f39d973d74
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞