前言
eureka是springcloud 常用的注册中心,这里简单介绍下,eureka client注册逻辑的实现。eureka-client是1.6.2版本。
eureka server api参考 https://blog.csdn.net/qq_30062125/article/details/83829357
这里,eureka client分析主要分三个步骤:
- eureka client 自动配置类EurekaClientAutoConfiguration是怎么找到的
- eureka客户端逻辑是怎么触发的
- eureka client 如何发起注册
1、 自动配置类是怎么找到的
spring boot项目启动类一般会加一个注解@SpringBootApplication,这个注解包含了@EnableAutoConfiguration。@EnableAutoConfiguration是springboot各种自动配置插件的入口。
EnableAutoConfiguration 代码清单
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import(EnableAutoConfigurationImportSelector.class)
public @interface EnableAutoConfiguration {
String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration";
/**
* Exclude specific auto-configuration classes such that they will never be applied.
* @return the classes to exclude
*/
Class<?>[] exclude() default {};
/**
* Exclude specific auto-configuration class names such that they will never be
* applied.
* @return the class names to exclude
* @since 1.3.0
*/
String[] excludeName() default {};
}
我们可以看到@Import(EnableAutoConfigurationImportSelector.class),进入它的父类AutoConfigurationImportSelector,可以看到引入bean逻辑:
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
if (!isEnabled(annotationMetadata)) {
return NO_IMPORTS;
}
try {
AutoConfigurationMetadata autoConfigurationMetadata = AutoConfigurationMetadataLoader
.loadMetadata(this.beanClassLoader);
AnnotationAttributes attributes = getAttributes(annotationMetadata);
// 获取需要加载到spring容器中的bean列表
List<String> configurations = getCandidateConfigurations(annotationMetadata,
attributes);
configurations = removeDuplicates(configurations);
configurations = sort(configurations, autoConfigurationMetadata);
Set<String> exclusions = getExclusions(annotationMetadata, attributes);
checkExcludedClasses(configurations, exclusions);
configurations.removeAll(exclusions);
configurations = filter(configurations, autoConfigurationMetadata);
fireAutoConfigurationImportEvents(configurations, exclusions);
return configurations.toArray(new String[configurations.size()]);
}
catch (IOException ex) {
throw new IllegalStateException(ex);
}
}
getCandidateConfigurations 逻辑如下:
protected List<String> getCandidateConfigurations(AnnotationMetadata metadata,
AnnotationAttributes attributes) {
List<String> configurations = SpringFactoriesLoader.loadFactoryNames(
getSpringFactoriesLoaderFactoryClass(), getBeanClassLoader());
Assert.notEmpty(configurations,
"No auto configuration classes found in META-INF/spring.factories. If you "
+ "are using a custom packaging, make sure that file is correct.");
return configurations;
}
这里会加载所有的META-INF/spring.factories文件,并且会循环找出来org.springframework.boot.autoconfigure.EnableAutoConfiguration对应的自动注册类,返回回去。
我们这里关心的是Eureka client相关的,路径在spring-cloud-netflix-eureka-client/1.3.6.RELEASE/spring-cloud-netflix-eureka-client-1.3.6.RELEASE.jar!/META-INF/spring.factories,其中内容为:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
这里加载了org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,这是一个比较重要的类,eureka client的很多逻辑都是从这入口的。
2、 eureka客户端逻辑是怎么触发的
开启EurekaClient需要配置@EnableDiscoveryClient或者@EnableEurekaClient,当前版本,这两个没啥区别。
上面我们分析了如何加载EurekaClientAutoConfiguration,不过如果细心看一下,我们会发现EurekaClientAutoConfiguration依旧需要一些条件才会加载。
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration")
public class EurekaClientAutoConfiguration {
......
}
其中关键的一点是 @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
我们来看下这个bean是怎么加载的。
这得从@EnableDiscoveryClient开始说起:
EnableDiscoveryClient代码:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
/**
* If true, the ServiceRegistry will automatically register the local server.
*/
boolean autoRegister() default true;
}
原理和上面类似,也是通过@Import注入bean。
我们看下EnableDiscoveryClientImportSelector的父类方法 org.springframework.cloud.commons.util.SpringFactoryImportSelector#selectImports:
@Override
public String[] selectImports(AnnotationMetadata metadata) {
if (!isEnabled()) {
return new String[0];
}
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
// 获取需要加载的bean列表
// Find all possible auto configuration classes, filtering duplicates
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
if (factories.isEmpty() && !hasDefaultFactory()) {
throw new IllegalStateException("Annotation @" + getSimpleName()
+ " found, but there are no implementations. Did you forget to include a starter?");
}
if (factories.size() > 1) {
// there should only ever be one DiscoveryClient, but there might be more than
// one factory
log.warn("More than one implementation " + "of @" + getSimpleName()
+ " (now relying on @Conditionals to pick one): " + factories);
}
return factories.toArray(new String[factories.size()]);
}
和上面自动注入逻辑类似,也是从META-INF/spring.factories中查找当前注解对应的加载类。
这里找的是
org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
查看EurekaDiscoveryClientConfiguration代码
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@CommonsLog
public class EurekaDiscoveryClientConfiguration {
class Marker {}
@Bean
public Marker eurekaDiscoverClientMarker() {
return new Marker();
}
......
}
最上面就是Marker bean的声明,也就是前面@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)依赖的bean,所以只有加上@EnableDiscoveryClient,eureka客户端逻辑才会生效。
3、 eureka client 如何发起注册
首先看org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration#eurekaInstanceConfigBean,eureka client的客户端配置基本都是通过它来加载的。
@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils) {
RelaxedPropertyResolver relaxedPropertyResolver = new RelaxedPropertyResolver(env, "eureka.instance.");
String hostname = relaxedPropertyResolver.getProperty("hostname");
boolean preferIpAddress = Boolean.parseBoolean(relaxedPropertyResolver.getProperty("preferIpAddress"));
EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
instance.setNonSecurePort(this.nonSecurePort);
instance.setInstanceId(getDefaultInstanceId(this.env));
instance.setPreferIpAddress(preferIpAddress);
if (this.managementPort != this.nonSecurePort && this.managementPort != 0) {
if (StringUtils.hasText(hostname)) {
instance.setHostname(hostname);
}
String statusPageUrlPath = relaxedPropertyResolver.getProperty("statusPageUrlPath");
String healthCheckUrlPath = relaxedPropertyResolver.getProperty("healthCheckUrlPath");
if (StringUtils.hasText(statusPageUrlPath)) {
instance.setStatusPageUrlPath(statusPageUrlPath);
}
if (StringUtils.hasText(healthCheckUrlPath)) {
instance.setHealthCheckUrlPath(healthCheckUrlPath);
}
String scheme = instance.getSecurePortEnabled() ? "https" : "http";
instance.setStatusPageUrl(scheme + "://" + instance.getHostname() + ":"
+ this.managementPort + instance.getStatusPageUrlPath());
instance.setHealthCheckUrl(scheme + "://" + instance.getHostname() + ":"
+ this.managementPort + instance.getHealthCheckUrlPath());
}
return instance;
}
由于EurekaInstanceConfigBean类上面配置了@ConfigurationProperties,所以生成bean的过程中,在ConfigurationPropertiesBindingPostProcessor逻辑中,会注入配置文件的配置参数。
@Data
@ConfigurationProperties("eureka.instance")
public class EurekaInstanceConfigBean implements CloudEurekaInstanceConfig, EnvironmentAware {
EurekaInstanceConfigBean这个bean为ApplicationInfoManager依赖,而ApplicationInfoManager被EurekaClient依赖。
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
eurekaClient bean是client的入口,注入逻辑
主要在
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient
和
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.RefreshableEurekaClientConfiguration#eurekaClient
两个是针对不同场景的client注入。
EurekaClient bean 声明
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
CloudEurekaClient 构造函数
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config,
DiscoveryClientOptionalArgs args,
ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
查看CloudEurekaClient父类DiscoveryClient声明。
com.netflix.discovery.DiscoveryClient#DiscoveryClient
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
......
initScheduledTasks();
.....
}
注册主要通过initScheduledTasks实现
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
这里我们看下heartbeat定时任务,通过new HeartbeatThread()实现。
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
renew()逻辑,向注册中心发送心跳返回404,就会发起注册。
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
注册服务到注册中心
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
// 实际注册逻辑
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
最终调用com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
// 这个地方发起post请求,url类似http://localhost:8761/eureka/apps/SERVICES2
// 传递的对象是 InstanceInfo
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
到这里 服务已经注册到注册中心了。