通过我之前的RestTemplate简单分析可以知道,虽然spring cloud 之中定义了LoadBalancerClient作为负载均衡的通用接口,并且对于Ribbon有了具体的实现RibbonLoadBalancerClient,但是,在具体使用客户端负载均衡时是使用Ribbon的ILoadBalancer接口实现的,现在我就根据ILoadBalancer接口的实现类来具体的分析分析。
AbstractLoadBalancer
AbstractLoadBalancer是ILoadBalancer的抽象实现,定义了关于服务器分组的枚举类ServerGroup,三种不同类型
public enum ServerGroup{
ALL,//所有服务实例
STATUS_UP,//正常服务的实例
STATUS_NOT_UP//停止服务的实例
}
还有一个chooseServer()方法,即调用接口的chooseServer(Object key)方法,表示在选择服务实例的时候忽略key条件判断。除此之外,还定义了两个抽象函数
- List<Server> getServerList(ServerGroup serverGroup):根据服务分组获取对应的服务实例列表
- LoadBalancerStats getLoadBalancerStats():定义了获取LoadBalancerStats的方法
下面,贴出具体源码
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
public Server chooseServer() {
return chooseServer(null);
}
public abstract List<Server> getServerList(ServerGroup serverGroup);
public abstract LoadBalancerStats getLoadBalancerStats();
}
具体的类结构
首先看 BaseLoadBalancer
BaseLoadBalancer是Ribbon负载均衡的基础实现类,定义了很多的基础概念
- addServers(List<Server> newServers) 实现,向负载均衡中增加新的 服务实例列表;将所有的服务实例假如到newServers中,然后调用setServersList方法对newList处理,负载均衡的扩展类一般都是通过对setServersList方法重写来实现的
public void addServers(List<Server> newServers) { if (newServers != null && newServers.size() > 0) { try { ArrayList<Server> newList = new ArrayList<Server>(); newList.addAll(allServerList); newList.addAll(newServers); setServersList(newList); } catch (Exception e) { logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e); } } }
- markServerDown(Server server):标记服务处于暂停状态
public void markServerDown(Server server) { if (server == null || !server.isAlive()) { return; } logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId()); server.setAlive(false); // forceQuickPing(); notifyServerStatusChangeListener(singleton(server)); }
- getReachableServers():获取可用的服务列表
public List<Server> getReachableServers() { return Collections.unmodifiableList(upServerList); }
- List<Server> getAllServers():获取所有服务列表
public List<Server> getAllServers() { return Collections.unmodifiableList(allServerList); }
- 两个维护服务实例server对象的列表,一个用于存储所有服务实例的清单,另一个用于存储正常服务的实例清单
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> allServerList = Collections .synchronizedList(new ArrayList<Server>()); @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> upServerList = Collections .synchronizedList(new ArrayList<Server>());
定义了心跳检测的IPing对象,默认为null,在构造方法里传入
protected IPing ping = null;
定义了心跳检测的执行策略对象IPingStrategy,默认使用的是SerialPingStrategy,该策略使用的是线性遍历ping服务实例的方式检测。当IPing的速度慢,或是Server列表过大的时候,就会影响性能了。
private static class SerialPingStrategy implements IPingStrategy { @Override public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; for (int i = 0; i < numCandidates; i++) { results[i] = false; /* Default answer is DEAD. */ try { if (ping != null) { results[i] = ping.isAlive(servers[i]); } } catch (Exception e) { logger.error("Exception while pinging Server: '{}'", servers[i], e); } } return results; } }
定义了负载均衡的处理规则IRule,默认使用RoundRobinRule,在BaseLoadBalancer的Server chooseServer(Object key) 方法中可以看到,实际选择服务实例是BaseLoadBalancer委托给具体的IRule进行选择的。这里的RoundRobinRule使用的是线性负载均衡规则
public Server chooseServer(Object key) {// if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { return rule.choose(key);//委托给IRule执行 } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }
BaseLoadBalancer到这就分析完了
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer继承于BaseLoadBalancer,是对基础负载均衡的扩展。在DynamicServerListLoadBalancer中,实现了服务实例清单在运行期动态更新的功能,具备对服务清单进行过滤的功能。
首先来看 volatile ServerList<T> serverListImpl
ServerList是一个接口,定义如下
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
public List<T> getUpdatedListOfServers();
}
定义了两个抽象方法,getInitialListOfServers用于获取初始化的服务列表,getUpdatedListOfServers获取更新的服务列表,该类的实现类有多个,但在DynamicServerListLoadBalancer里默认的实现类是啥呢?其实既然要实现服务列表的动态更新,那么Ribbon就一定要有访问Eureka的能力,在之前的配置类EurekaRibbonClientConfiguration 中,可以看到如下代码
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
创建了一个DomainExtractingServerList实例,从源码里可以看到,内部定义了一个ServerList<DiscoveryEnabledServer> list,同时,getInitialListOfServers和getUpdatedListOfServers都委托给了list实现
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
private ServerList<DiscoveryEnabledServer> list;
private final RibbonProperties ribbon;
private boolean approximateZoneFromHostname;
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
this.list = list;
this.ribbon = RibbonProperties.from(clientConfig);
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getInitialListOfServers());
return servers;
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getUpdatedListOfServers());
return servers;
}
private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
List<DiscoveryEnabledServer> result = new ArrayList<>();
boolean isSecure = this.ribbon.isSecure(true);
boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
for (DiscoveryEnabledServer server : servers) {
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
this.approximateZoneFromHostname));
}
return result;
}
}
从setZones方法可以看到,DiscoveryEnabledServer是DomainExtractingServer,而DomainExtractingServer是通过构造函数传入的DiscoveryEnabledNIWSServerList实现的。那DiscoveryEnabledNIWSServerList有时如何获取服务实例的呢,查看源码可以看到getInitialListOfServers和getUpdatedListOfServers都委托给了一个私有方法 obtainServersViaDiscovery,而obtainServersViaDiscovery主要时依靠EurekaClient从服务注册中心获取到具体的列表List<InstanceInfo>,这里要传入的东西vipAddresses在之前就已经讲过,可以理解为服务的逻辑名,获取到List<InstanceInfo>进行遍历,将状态为UP的服务实例转化为DiscoveryEnabledServer,最后返回DiscoveryEnabledServer的列表
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break;
}
}
}
return serverList;
}
通过DiscoveryEnabledNIWSServerList返回最新的服务列表后,就到了DomainExtractingServerList里,DomainExtractingServerList会调用setZone方法对其进行设置一个必要的属性,代码去前面贴过,这里就不贴了。
看到这里,我们已经知道了Ribbon和Eureka整合后,如何从Eureka获取服务实例列表,下面我们来看看时怎么触发去获取服务列表的。在DynamicServerListLoadBalancer我们可以看到有个东西叫ServerListUpdater,具体代码如下
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;
可以看到这里定义了一个ServerListUpdater.UpdateAction的实现类,即调用updateListOfServers方法,首先看下ServerListUpdater的定义吧
public interface ServerListUpdater {
public interface UpdateAction {
void doUpdate();
}
void start(UpdateAction updateAction);//启动ServerListUpdater
void stop();//停止ServerListUpdater
String getLastUpdate();//获取最近的更新时间戳
long getDurationSinceLastUpdateMs();//获取上一次到现在的更新世间间隔
int getNumberMissedCycles();//获取未跟新的周期数
int getCoreThreads();//获取核心线程数
}
ServerListUpdater的实现类不多,就两个
- PollingServerListUpdater:默认的动态服务列表更新策略, DynamicServerListLoadBalancer中默认的实现就时它,它时通过定时任务的方式进行服务跟新
- EurekaNotificationServerListUpdater:通过Eureka大的事件监听器来进行服务更新
下面,来看看默认的PollingServerListUpdater。先从它用于启动的start方法看起。从它的start方法可以看到,以定时任务的方式进行服务列表的更新。首先创建了Runnable的实现,在调用updateAction.doUpdate(),最后在启动一个定时任务来执行
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
在定时任务里,initialDelayMs和refreshIntervalMs默认定义为 1000和30000,。就是说更行服务实例实在初始化后延迟1秒开始执行,然后以30秒为周期重复执行。
更新实例的启动已经说完了,回到之前的ServerListUpdater.UpdateAction中,调用的是updateListOfServers方法,具体实现如下
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
可以看到,首先是从Eureka Server里获取最新的服务实例列表,然后又引入了一个新的对象filter,filter的类型是ServerListFilter,ServerListFilter的定义很简单,只有一个方法
public List<T> getFilteredListOfServers(List<T> servers)
主要是实现对服务实例列表进行过滤,通过传入的列表,根据一些规则过滤后返回新的服务实例。
下面是继承结构
除了ZonePreferenceServerListFilter外,其他的都是Ribbon的原生实现类,下面逐个看看每个类又什么特点
- AbstractServerListFilter:抽象过滤器,定义了过滤时需要用到的LoadBalancerStats,代码很简单,就不贴了
- ZoneAffinityServerListFilter:基于Zone的方式实现服务过滤的实例,Zone时在你的application.YML里面定义的
public List<T> getFilteredListOfServers(List<T> servers) { if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){ List<T> filteredServers = Lists.newArrayList(Iterables.filter( servers, this.zoneAffinityPredicate.getServerOnlyPredicate())); if (shouldEnableZoneAffinity(filteredServers)) { return filteredServers; } else if (zoneAffinity) { overrideCounter.increment(); } } return servers; }
从上面的代码可以看到,是通过Iterables.filter来进行服务列表过滤的,是根据zoneAffinityPredicate和消费者的Zone比较,过滤完后,也不会立即返回,而是通过shouldEnableZoneAffinity方法进行判断是否要开启 ZoneAffinity 功能
- DefaultNIWSServerListFilter:完全继承ZoneAffinityServerListFilter
- ServerListSubsetFilter:该过滤器适用于大规模集群的系统。它主要是产生一个‘ZoneAffinity’结果的子集列表,同时它会通过比较服务实例的额通信失败数量和并发连接数来判断选择那些实例,主要分为三步:
1. 获取‘ZoneAffinity’的过滤结果
2.从当前列表中移除不够强壮的实例
3.完成后,从候选列表里随机挑选出一批实例加入到列表里,用来保持服务实例的子集和原来的数量一致,默认的子集数量为30 配置参数 <clientName>.<nameSpace>.ServerListSubseetFilter.size - ZonePreferenceServerListFilter:整合spring cloud新增的过滤器,整合spring cloud 默认会使用该过滤器。功能为 通过配置或Eureka实例元数据的Zone来过滤相同Zone的服务实例。从它的源码可以看出,首先通过父类ZoneAffinityServerListFilter获取‘ZoneAffinity’的服务实例列表,然后遍历,取出消费者配置设置的Zone来进行过滤
public List<Server> getFilteredListOfServers(List<Server> servers) { List<Server> output = super.getFilteredListOfServers(servers); if (this.zone != null && output.size() == servers.size()) { List<Server> local = new ArrayList<>(); for (Server server : output) { if (this.zone.equalsIgnoreCase(server.getZone())) { local.add(server); } } if (!local.isEmpty()) { return local; } } return output; }
到这里DynamicServerListLoadBalancer就已经讲完了。
接下来再来看看ZoneAwareLoadBalancer。
从源码可以看到,ZoneAwareLoadBalancer重写了chooseServer(Object key)和setServerListForZones(Map<String, List<Server>> zoneServersMap)方法。setServerListForZones并不是接口中定义的方法,而是在DynamicServerListLoadBalancer中定义的方法,位于setServersList(List lsrv)的最后,ZoneAwareLoadBalancer对它的重写如下
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
可以看到创建了一个ConcurrentHashMap类型的balancers对象,用来存储每个Zone对应的LoanBalancer,而LoanBalancer实在第一个遍历里通过getLoadBalancer方法来完成的,在创建LoanBalancer的同时会创建它的规则,创建完LoanBalancer后会马上调用setServersList设置对应Zone的实例清单。第二个循环是对Zone进行检查,看出是否有Zone已经没有实例了,没有的话就将Zone对应的实例列表清空。
接下来看看ZoneAwareLoadBalancer具体是如何实现选择服务实例的
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
从源码可以看到,只有当Zone的个数大于1的时候才会执行,否则使用父类的chooose方法。当大于1时,具体的实现步骤如下
- 调用ZoneAvoidanceRule的静态方法Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats),创建一个快照,用于后续的算法
- 调用ZoneAvoidanceRule的静态方法Set<String> getAvailableZones,获取可用的Zone集合,通过快照中的统计数据来实现可用Zone的筛选
- 当可用Zone不为空,个数小于Zone总数,就随机选择一个Zone
- 确定了Zone后,获取对应的LoadBalancer,调用LoadBalancer的chooseServer方法挑选出具体的服务实例
到这,LoadBalancer就已经全部讲完了,负载均衡还有个具体的东西,就是负载均衡的策略,IRule接口,以后再做详细的分析