eureka怎么接口服务注册请求
1 配置初始化
分析每个功能,通常都避免不了找初始化入口,eureka server 初始化的入口与erueka client的初始入口一样,都是有spring-factories文件中,只不过server的配置在server包中
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
就一配置类
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
//略过部分代码
@Configuration
protected static class EurekaServerConfigBeanConfiguration {
@Bean
@ConditionalOnMissingBean
public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
//eureka服务端信息
EurekaServerConfigBean server = new EurekaServerConfigBean();
if (clientConfig.shouldRegisterWithEureka()) {
// Set a sensible default if we are supposed to replicate
server.setRegistrySyncRetries(5);
}
return server;
}
}
//略去一波代码
@Bean
//eureka节点同步注册器(作用于信息同步)
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
@Bean
@ConditionalOnMissingBean
//eureka节点管理(保存差多个eureka节点信息)
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new PeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
@Bean
//eureka 当前服务上下文
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
@Bean
//被EurekaServerInitializerConfiguration调用,
//好像主要服务启动服剔除定时器
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
@Bean
//jersey过滤器
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
/** * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources * required by the Eureka server. */
@Bean
//构建一个jersey应用,相于spring-mvc配合tomcat,构建rest接口
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
false, environment);
// Filter to include only classes that have a particular annotation.
//扫描指定包下@Path.clas和@Provider注释标记的类
//(这里相当于spring-mvc的Requestmapping)
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// Find classes in Eureka packages (or subpackages)
//
Set<Class<?>> classes = new HashSet<Class<?>>();
for (String basePackage : EUREKA_PACKAGES) {
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
for (BeanDefinition bd : beans) {
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
resourceLoader.getClassLoader());
classes.add(cls);
}
}
// Construct the Jersey ResourceConfig
//
Map<String, Object> propsAndFeatures = new HashMap<String, Object>();
propsAndFeatures.put(
// Skip static content used by the webapp
ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
@Bean
public FilterRegistrationBean traceFilterRegistration(
@Qualifier("webRequestLoggingFilter") Filter filter) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(filter);
bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
return bean;
}
}
2 处理注册过程
由上一编,我们知道spring-cloud的服务注册是通过http rest请求的,上面源码提到的jersey 接口,打开断点我们可以看到有哪些rest接口被扫描到
虽然有9个接口类,但我们这里只关注ApplicationResource 类,因为接收注册请求的接口应在这个类身上
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
//校验注册信息
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
//注册服务信息
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
进入PeerAwareInstanceRegistryImpl
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//通过父及保存注册服务信息
super.register(info, leaseDuration, isReplication);
//同步服务信息到其它eureka节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
进入AbstractInstanceRegistry 保存服务注册信息
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
//已注册的服务保存到ConcurrentHashMap里
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
//如果还未保存过,则新建一个map并应名称作为key保存进去
//因为一个应用可能有多个实例,以用一个map来保存某个应用的
//所有实例
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//通过某应用的实例id,获取某实例注册信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
//比较续约时间,替换最新的续约
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//以某应用实例id为key保存注册信息
gMap.put(registrant.getId(), lease);
//略去一波代码
} finally {
read.unlock();
}
}
3 同步服务注册信息
上面把某服务信息注册上来了,但是这个服务信息只是保存到当前eureka服务节点;如果系统有多个eureka服务节点,那么当前eureka节点是如何把当前注册的服务信息同步同其它节点?带着疑问回到PeerAwareInstanceRegistryImpl 的
regiest方法:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//通过父及保存注册服务信息
super.register(info, leaseDuration, isReplication);
//同步服务信息到其它eureka节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
复制服务信息到其它节点
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
//如果没有其它节点或服务信息为复制过来的,再传播复,
//防止循环复制
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
//遍历所有eureka节点,把服务信息复制到所有节点
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
在下面的代码到,其实会有多个事件信息都会被制复到其它节点,但这里只跟注册,其它应该差不多
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
进入PeerEurekaNode,到这里就把信息通过http同步到某个eureka节点上去了
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
4 剔除服务
上面只是分析服务信息注册,但是服务如果过期或失效是怎么剔除的?还是回到配置初始化那里,注册流程我们仅更多关注jersey的初始化信息,但其它但配置信息并没有分析,但只是简单提示了EurekaServerBootstrap 的作用。另作在配置类中还发现了另外一个类EurekaServerInitializerConfiguration,该类实现了SmartLifecycle 接口,那么其 start将被spring容器调用
EurekaServerInitializerConfiguration 该类比较简单
@Configuration
@CommonsLog
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
//去掉一波代码
@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
//调起eurekaServerBootstrap 初始化方法
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
//再去掉一波代码
}
进去看看 EurekaServerBootstrap
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
//开启交通处理(不知怎么翻译)
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
进入PeerAwareInstanceRegistryImpl
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
//省去一波不太关注代码
super.postInit();
}
AbstractInstanceRegistry
protected void postInit() {
//开启心跳阈值测量
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
//开启定时剔除任务
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
剔除服务方法
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
//是否开启过期检测
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
//先收集所有过期服务信息,然后随机剔除
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}