新年伊始,为了让自己快速的进入工作状态。决定自己动手构建一套分布式RPC框架。
由于是独立的RPC框架,所以采用Zookeeper做注册中心,使用Netty做服务处理。由于Netty是NIO框架,在处理网络请求等待结果返回的时候着实需要一番大改动。
注册中心
基于Zookeeper做注册中心的实现其实是比较简单的。
具体的实现逻辑:在服务启动时扫描是否包含RpcService注解的类。然后将该类注解的serverName属性拿到注册到Zookeeper节点。再拿到该服务所在机器(或者容器)的IP,将该IP以临时节点的角色注册到Zookeeper上。同时客户端启动时监听Zookeeper节点改变事件,并及时刷新可用的服务列表。
代码实现
IRegisterCenterProvider //服务端逻辑
package one.bugu.zookeeper.rpc.framework.service.zookeeper;
import java.util.List;
/**
* Created with IntelliJ IDEA.
* User: LangK
* Created Date 2019/2/14
* Time: 14:30
* Description:
*/
public interface IRegisterCenterProvider {
/**
* 服务端获取服务提供者信息
*
* 返回对象:key:服务提供者接口,value:服务提供者IP列表
* @param serverName
* @param ips
*/
void registerProvider(String serverName, List<String> ips);
/**
* 更新服务端提供者的信息
*/
void updateProvider();
}
RegisterCenterProviderImpl //服务端逻辑实现
package one.bugu.zookeeper.rpc.framework.service.zookeeper;
import one.bugu.zookeeper.rpc.framework.annotations.RpcService;
import one.bugu.zookeeper.rpc.framework.service.RpcServiceConfiguration;
import one.bugu.zookeeper.rpc.framework.service.socket.ServiceSocket;
import one.bugu.zookeeper.rpc.framework.util.SpringContextUtil;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZooKeeperHelper;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZookeeperConfiguration;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.*;
/**
* Created with IntelliJ IDEA.
* User: LangK
* Created Date 2019/2/14
* Time: 14:31
* Description:
*/
public class RegisterCenterProviderImpl implements IRegisterCenterProvider {
/**
* zookeeper配置
*/
private ZookeeperConfiguration zookeeperConfiguration;
/**
* rpc配置
*/
private RpcServiceConfiguration rpcServiceConfiguration;
/**
* zookeeper连接器
*/
private ZooKeeperHelper zooKeeperHelper;
/**
* 保存service对应的Bean
* 接收到客户端请求时,可快速的找到处理的bean
*/
public static Map<String, Object> serverBean = new HashMap<>();
private static Logger logger = LoggerFactory.getLogger(RegisterCenterProviderImpl.class);
/**
* netty服务
*/
private Thread socketThread;
public RegisterCenterProviderImpl(ZookeeperConfiguration zookeeperConfiguration, RpcServiceConfiguration rpcServiceConfiguration) {
this.zookeeperConfiguration = zookeeperConfiguration;
this.rpcServiceConfiguration = rpcServiceConfiguration;
}
@Override
public void registerProvider(String serverName, List<String> ips) {
if (zooKeeperHelper == null) {
zooKeeperHelper = new ZooKeeperHelper(zookeeperConfiguration.getUrl(), new ServerNodeChangeWatcher());
}
String path = zookeeperConfiguration.getPath() + "/" + serverName;
if (!zooKeeperHelper.existsNode(path)) {
String resultPath = zooKeeperHelper.createNode(path, null);
if (StringUtils.isEmpty(resultPath)) {
logger.info("RPC服务注册失败,服务名:{}", serverName);
return;
}
}
for (String ip : ips) {
String node = path + "/" + ip;
if (!zooKeeperHelper.existsNode(node)) {
String resultPath = zooKeeperHelper.createTempNode(node, null);
if (StringUtils.isEmpty(resultPath)) {
logger.info("RPC服务注册失败,服务名:{},IP地址:{}", serverName, ip);
}
}
}
}
public void updateProvider() {
Map<String, Object> beansWithAnnotation = SpringContextUtil.getApplicationContext().getBeansWithAnnotation(RpcService.class);
if (beansWithAnnotation == null || beansWithAnnotation.isEmpty()) {
return;
}
//Socket地址
serverBean.clear();
List<String> ips = getIp();
for (String key : beansWithAnnotation.keySet()) {
String serverName = beansWithAnnotation.get(key).getClass().getAnnotation(RpcService.class).name();
serverBean.put(serverName, beansWithAnnotation.get(key));
registerProvider(serverName, ips);
}
if (socketThread == null) {
socketThread = new Thread(new ServiceSocket(rpcServiceConfiguration));
}
if (!socketThread.isAlive()) {
socketThread.start();
}
}
/**
* 获取IP
*
* @return IP地址:端口号
*/
public List<String> getIp() {
List<String> host_ip_list = new ArrayList<String>();
try {
for (NetworkInterface networkInterface : Collections
.list(NetworkInterface.getNetworkInterfaces())) {
for (InetAddress addr : Collections.list(networkInterface.getInetAddresses())) {
if (!addr.isLoopbackAddress() && !addr.isLinkLocalAddress() && addr.isSiteLocalAddress()) {
host_ip_list.add(addr.getHostAddress() + ":" + rpcServiceConfiguration.getPort());
}
}
}
} catch (SocketException e) {
logger.error("获取IP地址异常", e);
}
return host_ip_list;
}
class ServerNodeChangeWatcher implements Watcher {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
logger.info("Watch received SyncConnected event");
updateProvider();
}
if (event.getState() == Event.KeeperState.Disconnected) {
logger.info("Watch received Disconnected event");
zooKeeperHelper = null;
}
}
}
}
IRegisterCenterInvoker //客户端逻辑
package one.bugu.zookeeper.rpc.framework.client.zookeeper;
import java.util.List;
import java.util.Map;
/**
* Created with IntelliJ IDEA.
* User: LangK
* Created Date 2019/2/14
* Time: 14:27
* Description:
*/
public interface IRegisterCenterInvoker {
/**
* 消费端初始化服务提供者信息本地缓存
*/
public void initProviderMap();
public void updateProviderMap();
/**
* 消费端获取服务提供者信息
*/
public Map<String, List<String>> getProviderMap();
}
RegisterCenterInvokerImpl //客户端逻辑实现
package one.bugu.zookeeper.rpc.framework.client.zookeeper;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZooKeeperHelper;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZookeeperConfiguration;
import com.google.gson.Gson;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created with IntelliJ IDEA.
* User: LangK
* Created Date 2019/2/14
* Time: 15:34
* Description:
*/
public class RegisterCenterInvokerImpl implements IRegisterCenterInvoker {
private Gson gson = new Gson();
/**
* zookeeper配置
*/
private ZookeeperConfiguration zookeeperConfiguration;
/**
* 保存所有的service以及对应的Netty提供者的IP:PORT
*/
private Map<String, List<String>> providerMap;
private static Logger logger = LoggerFactory.getLogger(RegisterCenterInvokerImpl.class);
/**
* Zookeeper连接器
*/
private ZooKeeperHelper zooKeeperHelper;
public RegisterCenterInvokerImpl(ZookeeperConfiguration zookeeperConfiguration) {
this.zookeeperConfiguration = zookeeperConfiguration;
}
@Override
public void initProviderMap() {
zooKeeperHelper = new ZooKeeperHelper(zookeeperConfiguration.getUrl(), new ClientNodeChangeWatcher());
providerMap = new ConcurrentHashMap<>();
updateProviderMap();
}
@Override
public void updateProviderMap() {
synchronized (this){
providerMap.clear();
List<String> serverList = zooKeeperHelper.getChildren(zookeeperConfiguration.getPath());
if (serverList == null || serverList.isEmpty()) {
return;
}
for (String server : serverList) {
String serverPath = zookeeperConfiguration.getPath() + "/" + server;
List<String> providerList = zooKeeperHelper.getChildren(serverPath);
if (providerList!=null&&!providerList.isEmpty()){
providerMap.put(server,providerList);
}
}
logger.info("update socket server finish. server list is:{}", gson.toJson(providerMap));
}
}
@Override
public Map<String, List<String>> getProviderMap() {
return providerMap;
}
class ClientNodeChangeWatcher implements Watcher {
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
logger.info("Watch received NodeChildrenChanged event");
updateProviderMap();
}
}
}
}
- Netty处理
Netty服务则是在检测到项目中包含@RpcServer时启动Netty服务。在检测到正在执行含有@RpcClient注解的类方法时拦截该方法,如果还未和服务端建立连接,则建立长连接,进行RPC通信(通信完成连接不断开,考虑到是服务端RPC通信,此处长连接更适合),调用服务端代码,如返回有正确结果则替换服务端返回的结果,异常时继续调用客户端方法的返回结果。
代码实现
ClientAspect //客户端拦截器
package one.bugu.zookeeper.rpc.framework.aspect;
import one.bugu.zookeeper.rpc.framework.annotations.RpcClient;
import one.bugu.zookeeper.rpc.framework.client.socket.ClientRequestPool;
import com.google.gson.Gson;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* Created with IntelliJ IDEA.
* User: LangK
* Created Date 2019/2/12
* Time: 18:28
* Description:客户端拦截器
*/
@Aspect
@Component
public class ClientAspect {
@Autowired
private ClientRequestPool clientRequestPool;
private Gson gson = new Gson();
private Logger logger = LoggerFactory.getLogger(ClientAspect.class);
/**
* 切入所有注有RpcClient注解实体类的方法
* @param pjp
* @return
* @throws Throwable
*/
@Around("@within(one.bugu.zookeeper.rpc.framework.annotations.RpcClient)")
public Object doSocket(ProceedingJoinPoint pjp) throws Throwable {
RpcClient an = (RpcClient) pjp.getSignature().getDeclaringType().getAnnotation(RpcClient.class);
String serverName = an.serverName();
String ip = an.serverIp();
String method = pjp.getSignature().getName();
String resultObject;
if (StringUtils.isEmpty(ip)) {
resultObject = clientRequestPool.send(serverName, method, pjp.getArgs());
} else {
resultObject = clientRequestPool.send(serverName, ip, method, pjp.getArgs());
}
Object object = pjp.proceed();
if (resultObject != null) {
try {
return gson.fromJson(resultObject, object.getClass());
} catch (Exception e) {
logger.info("RPC接收结果转换异常:{}", resultObject);
return object;
}
}
return object;
}
}
大体上的思路就是这样,具体代码当然少不了封装。这里就不过多的贴代码了,有兴趣的伙伴可以去LangK开源 / RpcFramework下载项目。
通过该项目,熟悉了使用Zookeeper做注册中心的关键功能。也对Netty框架NIO有了更深刻的理解。算是为2019年有个好的开始。