【Spring Cloud】源码-Eureka客户端的服务注册、服务获取与服务续约

在看源码之前,先说一下标题中提到的三个概念:

1. 服务注册:

    服务提供者(eureka客户端)在启动后,如果参数eureka.client.register-with-eureka为true,那么会将自己注册到服务注册中心中,注册的动作会将自己的元数据发送给注册中心,注册中心将接受的元数据保存在一个注册列表中,该列表是一个双层Map结构,具体为:Map<服务名, Map<实例名,服务实例>>

2. 服务续约:

    成功注册的eureka服务(eureka客户端)会在注册之后维护一个心跳来告诉注册中心“我还活着”,这样注册中心就不会从注册列表中将这个服务实例剔除,关于这个心跳机制涉及到两个配置参数:

    eureka.instance.lease-renewal-interval-in-seconds(默认30):心跳间隔时间

    eureka.instance.lease-expiration-duration-in-seconds(默认90):定义服务失效时间

3. 服务获取:

    前两个概念是针对服务提供者,而服务获取是针对服务消费者(也属于eureka客户端),即调用服务方才需要获取服务列表以便选择调用哪一个服务实例。在服务消费者启动后,会向服务注册中心请求一份服务清单,该清单记录了已经注册到服务中心的服务实例。该请求动作不会仅限于启动的时候,因为消费者需要访问正确的、健康的服务实例,因此会定时发送请求。间隔时间通过配置参数:

    eureka.instance.registry-fetch-interval-seconds(默认30)

 

那么接下来我们来简单看一下源码:

首先eureka客户端最重要的功能实现类就是org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient

这个类是对Eureka发现法务的封装,而SpringCloudEureka本身就是对NetflixEureka的功能封装,因此,EurekaDiscoveryClient类会持有一个com.netflix.discovery.EurekaClient.EurekaClient对象引用(为组合关系,具体实现是NetflixEureka的DiscoveryClient类),而SpringCloudEureka本身有一个对发现服务的常用方法的抽象,这就是org.springframework.cloud.netflix.eureka.DiscoveryClient接口,EurekaDiscoveryClient实现了该接口(为继承关系),他们的关系大概如下图所示:

《【Spring Cloud】源码-Eureka客户端的服务注册、服务获取与服务续约》

其中左边的两个是SpringCloudEureka的,右面的两个是NetflixEureka的。

既然服务发现的方法主要在Netflix的DiscoveryClient类中,可以看一下这个类的注释,主要告诉我们DiscoveryClient的主要功能:Eureka客户端的注册、续约、取消租约(服务关闭)、获取服务。

因此首先研究的就是它,在构造器中,DiscoveryClient会调用一个initScheduledTasks()方法,从命名就可以看出这是一个初始化方法,那么我们可以从这个方法入手,源码如下:

/**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {    // #1
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {    //#2
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);    // #3

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(    // #4
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

 

在这段代码中,出现两个重要的if判断,首先看#1处的if判断:

 

if(clientConfig.shouldFetchRegistry()),顾名思义,获取配置中参数eureka.client.fetch-registery(是否获取服务列表)的值,如果是服务消费者,那么这个值就为true,进入判断代码块后,首先会从配置中获取一个参数:

registeryFetchIntervalSeconds:对应配置参数eureka.client.registry-fetch-interval-seconds,即从服务注册中心获取服务列表的间隔时间,默认是30。之后会执行定时任务CacheRefreshThread(),为什么叫CacheRefreshThread呢?因为客户端所持有的服务列表会缓存起来,到了一定时间(即上面的eureka.client.registry-fetch-interval-seconds)后会更新缓存并重新从注册中心获取新的服务列表,因此有一个“Cache”开头的方法。这个方法的代码如下:

/**
     * The task that fetches the registry information at specified intervals.
     *
     */
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

run()方法中执行refreshRegistry(),方法代码如下:

@VisibleForTesting
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

            boolean remoteRegionsModified = false;
            // This makes sure that a dynamic change to remote regions to fetch is honored.
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently," +
                                    " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                } else {
                    // Just refresh mapping to reflect any DNS/Property change
                    instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }

            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }

            if (logger.isDebugEnabled()) {
                StringBuilder allAppsHashCodes = new StringBuilder();
                allAppsHashCodes.append("Local region apps hashcode: ");
                allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                allAppsHashCodes.append(", is fetching remote regions? ");
                allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                    allAppsHashCodes.append(", Remote region: ");
                    allAppsHashCodes.append(entry.getKey());
                    allAppsHashCodes.append(" , apps hashcode: ");
                    allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                }
                logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                        allAppsHashCodes.toString());
            }
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }        
    }

该段代码没研究…

 

 

 

接下来我们看#2处的if判断:

 

if(clientConfig.shouldRegisterWithEureka()),可以看出,此处判断参数eureka.client.register-with-eureka(是否将自己注册到服务注册中心,默认true),如果为true,那么就意味着需要做两件事:服务注册与服务续约。那么我们看代码进入#2的if判断之后,首先看#3处的代码,很明显这是一个定时任务,从renewalIntervalInSecs变量可以看出,这是心跳机制的定时任务,renewalIntervalInSecs变量则是从配置文件参数eureka.instance.lease-renewal-interval-in-seconds的值(默认为30),含义是心跳间隔时间,而HeartbeatThread()方法就是续约方法,

HeartbeatThread()方法代码如下:

/**
     * The heartbeat task that renews the lease in the given intervals.
     */
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {    // #3.1
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

#3.1调用方法renew():

/**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

该方法就是发送Rest请求给注册中心,若返回404,则调用register()方法,这个方法就是将自己的元数据重新注册到服务中心,具体代码不贴上来了。

之后在#4处的代码会创建一个InstanceInfoReplicator对象,这个对象也会做一个定时任务,具体run()方法代码如下:

public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();    // #4.1
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

很明显,#4.1处的代码行,register()方法正是服务注册方法,其实代码如下:

/**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);    // #4.1.1
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

#4.1.1处的代码就是具体的注册动作,该方法会发送一个Rest请求给服务注册中心,同时传递一个instanceInfo对象,之前我们说过,注册动作会将客户端自己的元数据传递给服务注册中心,那么这个instanceInfo对象就是客户端的元数据。

至此,#3#4两处代码分别是 服务的注册与续约功能实现。由于服务注册与续约均需要参数eureka.client.register-with-eureka为true,因此两个功能写入一个if判断中,而服务发现需要参数eureka.client.fetch-registery为true,因此单独在一个if判断中。

 

    原文作者:Spring Cloud
    原文地址: https://blog.csdn.net/candy_rainbow/article/details/80809024
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞