在看源码之前,先说一下标题中提到的三个概念:
1. 服务注册:
服务提供者(eureka客户端)在启动后,如果参数eureka.client.register-with-eureka为true,那么会将自己注册到服务注册中心中,注册的动作会将自己的元数据发送给注册中心,注册中心将接受的元数据保存在一个注册列表中,该列表是一个双层Map结构,具体为:Map<服务名, Map<实例名,服务实例>>
2. 服务续约:
成功注册的eureka服务(eureka客户端)会在注册之后维护一个心跳来告诉注册中心“我还活着”,这样注册中心就不会从注册列表中将这个服务实例剔除,关于这个心跳机制涉及到两个配置参数:
eureka.instance.lease-renewal-interval-in-seconds(默认30):心跳间隔时间
eureka.instance.lease-expiration-duration-in-seconds(默认90):定义服务失效时间
3. 服务获取:
前两个概念是针对服务提供者,而服务获取是针对服务消费者(也属于eureka客户端),即调用服务方才需要获取服务列表以便选择调用哪一个服务实例。在服务消费者启动后,会向服务注册中心请求一份服务清单,该清单记录了已经注册到服务中心的服务实例。该请求动作不会仅限于启动的时候,因为消费者需要访问正确的、健康的服务实例,因此会定时发送请求。间隔时间通过配置参数:
eureka.instance.registry-fetch-interval-seconds(默认30)
那么接下来我们来简单看一下源码:
首先eureka客户端最重要的功能实现类就是org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient
这个类是对Eureka发现法务的封装,而SpringCloudEureka本身就是对NetflixEureka的功能封装,因此,EurekaDiscoveryClient类会持有一个com.netflix.discovery.EurekaClient.EurekaClient对象引用(为组合关系,具体实现是NetflixEureka的DiscoveryClient类),而SpringCloudEureka本身有一个对发现服务的常用方法的抽象,这就是org.springframework.cloud.netflix.eureka.DiscoveryClient接口,EurekaDiscoveryClient实现了该接口(为继承关系),他们的关系大概如下图所示:
其中左边的两个是SpringCloudEureka的,右面的两个是NetflixEureka的。
既然服务发现的方法主要在Netflix的DiscoveryClient类中,可以看一下这个类的注释,主要告诉我们DiscoveryClient的主要功能:Eureka客户端的注册、续约、取消租约(服务关闭)、获取服务。
因此首先研究的就是它,在构造器中,DiscoveryClient会调用一个initScheduledTasks()方法,从命名就可以看出这是一个初始化方法,那么我们可以从这个方法入手,源码如下:
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) { // #1
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) { //#2
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS); // #3
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator( // #4
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
在这段代码中,出现两个重要的if判断,首先看#1处的if判断:
if(clientConfig.shouldFetchRegistry()),顾名思义,获取配置中参数eureka.client.fetch-registery(是否获取服务列表)的值,如果是服务消费者,那么这个值就为true,进入判断代码块后,首先会从配置中获取一个参数:
registeryFetchIntervalSeconds:对应配置参数eureka.client.registry-fetch-interval-seconds,即从服务注册中心获取服务列表的间隔时间,默认是30。之后会执行定时任务CacheRefreshThread(),为什么叫CacheRefreshThread呢?因为客户端所持有的服务列表会缓存起来,到了一定时间(即上面的eureka.client.registry-fetch-interval-seconds)后会更新缓存并重新从注册中心获取新的服务列表,因此有一个“Cache”开头的方法。这个方法的代码如下:
/**
* The task that fetches the registry information at specified intervals.
*
*/
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
run()方法中执行refreshRegistry(),方法代码如下:
@VisibleForTesting
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes.toString());
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
该段代码没研究…
接下来我们看#2处的if判断:
if(clientConfig.shouldRegisterWithEureka()),可以看出,此处判断参数eureka.client.register-with-eureka(是否将自己注册到服务注册中心,默认true),如果为true,那么就意味着需要做两件事:服务注册与服务续约。那么我们看代码进入#2的if判断之后,首先看#3处的代码,很明显这是一个定时任务,从renewalIntervalInSecs变量可以看出,这是心跳机制的定时任务,renewalIntervalInSecs变量则是从配置文件参数eureka.instance.lease-renewal-interval-in-seconds的值(默认为30),含义是心跳间隔时间,而HeartbeatThread()方法就是续约方法,
HeartbeatThread()方法代码如下:
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) { // #3.1
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
#3.1调用方法renew():
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
该方法就是发送Rest请求给注册中心,若返回404,则调用register()方法,这个方法就是将自己的元数据重新注册到服务中心,具体代码不贴上来了。
之后在#4处的代码会创建一个InstanceInfoReplicator对象,这个对象也会做一个定时任务,具体run()方法代码如下:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register(); // #4.1
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
很明显,#4.1处的代码行,register()方法正是服务注册方法,其实代码如下:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo); // #4.1.1
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
#4.1.1处的代码就是具体的注册动作,该方法会发送一个Rest请求给服务注册中心,同时传递一个instanceInfo对象,之前我们说过,注册动作会将客户端自己的元数据传递给服务注册中心,那么这个instanceInfo对象就是客户端的元数据。
至此,#3、#4两处代码分别是 服务的注册与续约功能实现。由于服务注册与续约均需要参数eureka.client.register-with-eureka为true,因此两个功能写入一个if判断中,而服务发现需要参数eureka.client.fetch-registery为true,因此单独在一个if判断中。