深入理解Spring cloud源码篇之Eureka源码

1.eureka功能分析

       首先,eureka在springcloud中充当服务注册功能,相当于dubbo+zk里面得zk,但是比zk要简单得多,zk可以做得东西太多了,包括分布式锁,分布式队列都是基于zk里面得四种节点加watch机制通过长连接来实现得,但是eureka不一样,eureka是基于HTTPrest来实现的,就是把服务的信息放到一个ConcurrentHashMap中,然后服务启动的时候去读取这个map,来把所有服务关联起来,然后服务器之间调用的时候通过信息,进行http调用。eureka包括两部分,一部分就是服务提供者(对于eureka来说就是客户端),一部分是服务端,客户端需要每个读取每个服务的信息,然后注册到服务端,很明显了,这个服务端就是接受客户端提供的自身的一些信息。

2.eureka客户端源码分析

       如果看spring的源码的话我们一般会找到Spring 源码包里面的META-INF文件夹下面的spring.handlers文件,然后直接找到XXXHandler的源码文件,紧着着就会分析springxml里面的各种标签解析。在看cloud源码的时候,我们则是找到META-INF文件下的spring.factories,找到里面的类去分析功能。
       我们根据上面的描述首先找到eureka-client(1.4.0)包下面的spring.factories文件中的EurekaClientAutoConfiguration配置类。我们知道一个eureka客户端最重要的功能也就是四点:

  • 2.1读取该项目的ip,instance_id,端口号,注册到服务端
  • 2.2服务下架
  • 2.3心跳机制
  • 2.4获取其他服务器信息

2.1服务注册

       基于这个思想,我们先找到第一个配置就是在哪读取的application.properties文件,我们看到eurekaInstanceConfigBean()方法,就是读取配置文件到EurekaInstanceConfigBean对象中,并且有@bean注册到ioc的容器中。EurekaInstanceConfigBean对象就包括客户端的ip,instance_id,端口号等等信息。我们看到以下代码是对EurekaInstanceConfigBean的一个包装:

        @Bean
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {//上文说的eurekaInstanceConfigBean是EurekaInstanceConfig的实现类
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }

       接着就是服务注册了:

        @Bean(destroyMethod = "shutdown")
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
            manager.getInfo(); // force initialization
            return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
        }

       我们直接看到super里面的方法,在initScheduledTasks();之上就是创建一些线程池,initScheduledTasks里面开启了一个线程heartbeat,我们看到了:

private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

       在renew方法里,如果返回为404的话,则会调用register()方法去注册,这个发送心跳的时间间隔也可配置,在配置源码的定时器里可以找到,跟读源码的时候发现调用这个register方法除了renew还有InstanceInfoReplicator线程里面的run方法,这个定时器的时间间隔是40秒,在服务启动的时候也会去设置条件合适去执行定时器,这个定时器的作用就是当配置信息改变的时候去调用register,当初次启动的时候也会去调用一下,因为调用了refreshInstanceInfo(),所以isInstanceInfoDirty的值就变成了true,所以,初次注册的时候也会注册到这里,之后除了特殊情况其他的的都不会走register().特殊情况包括:IP的改变,某些配置文件参数的改变,从下面代码可以看出来:

 public void refreshDataCenterInfoIfRequired() {
        String existingAddress = instanceInfo.getHostName();

        String newAddress;
        if (config instanceof RefreshableInstanceConfig) {
            // Refresh data center info, and return up to date address
            newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
        } else {
            newAddress = config.getHostName(true);
        }
        String newIp = config.getIpAddress();

        if (newAddress != null && !newAddress.equals(existingAddress)) {
            logger.warn("The address changed from : {} => {}", existingAddress, newAddress);

            // :( in the legacy code here the builder is acting as a mutator.
            // This is hard to fix as this same instanceInfo instance is referenced elsewhere.
            // We will most likely re-write the client at sometime so not fixing for now.
            InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
            builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo());
            instanceInfo.setIsDirty();//设置isInstanceInfoDirty为true,lastDirtyTimestamp为当前时间
        }
    }

    public void refreshLeaseInfoIfRequired() {
        LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
        if (leaseInfo == null) {
            return;
        }
        int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
        int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
        if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {//配置参数变了
            LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
                    .setRenewalIntervalInSecs(currentLeaseRenewal)
                    .setDurationInSecs(currentLeaseDuration)
                    .build();
            instanceInfo.setLeaseInfo(newLeaseInfo);
            instanceInfo.setIsDirty();
        }
    }

       以上就是eureka客户端的注册。

2.2服务下架

       我们看EurekaClient接口,里面有个shutdown,我们看到@PreDestroy当servlet关闭的时候就会触发。

  if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }

            cancelScheduledTasks();//关闭心跳,服务替换,缓存刷新等定时器

            // If APPINFO was registered
            if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);//设置状态为down
                unregister();//通知服务端客户端下线
            }

            logger.info("Completed shut down of DiscoveryClient");
        }

2.3心跳机制

       2.1我们分析了服务注册,设计到了renew()当返回404的时候是服务注册,200的时候就是发送心跳的机制默认30秒发送一次。

2.4服务获取

       当eureka客户端启动的时候会注册到eureka服务端上,其他客户端也需要感知该eureka启动,从而读取配置信息,服务之间的信息获取也是通过定时器获取的,在initScheduledTasks();方法中,我们看到启动了一个CacheRefreshThread线程,时间间隔默认为30秒,我们直接看该线程里面的fetchRegistry(boolean forceFullRegistryFetch);方法,这里有两种拉取,一种是全量拉取,一种是增量拉取。全量拉取方法为getAndStoreFullRegistry()代码:

  private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())//rest请求服务器获得实例信息
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));//存放到DiscoveryClient对象的localRegionApps的AtomicReference对象中
        } else {   
        }
    }

3.eureka服务端源码分析

        分析eureka客户端功能的时候我们发现客户端是通过httprest请求来注册/拉取信息的,那么eureka服务端一定是一个类似spring MVC的项目结构。找到EurekaServerAutoConfiguration类,看到jerseyApplication()方法,在容器中存放了一个jerseyApplication对象,jerseyApplication()方法里的东西和Spring源码里扫描@Component逻辑类似,扫描@Path和@Provider标签,然后封装成beandefinition,封装到Application的set容器里。通过filter过滤器来过滤url进行映射到对象的Controller。

    @Bean
    public FilterRegistrationBean jerseyFilterRegistration(
            javax.ws.rs.core.Application eurekaJerseyApp) {
        FilterRegistrationBean bean = new FilterRegistrationBean();//核心是一个filter
        bean.setFilter(new ServletContainer(eurekaJerseyApp));
        bean.setOrder(Ordered.LOWEST_PRECEDENCE);
        bean.setUrlPatterns(
                Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));//拦截/eureka开头的所有请求

        return bean;
    }

以上是对jersey的初步介绍,通过分析eureka客户端,我们大概知道客户端有这几个功能

  • 服务接受请求(认识jersey)
  • 接受客户端注册/心跳/下架请求并处理
  • 服务剔除

以下是eureka服务端自身高可用层面的功能点

  • 自我保护
  • 服务之间的信息同步

3.1服务怎么接受请求

       上面介绍了jersey和eureka怎么集成jersey,这里就不多说。

3.2接受客户端注册/心跳/下架请求并处理

服务端接受客户端的注册

       在eurekawiki上https://github.com/Netflix/eureka/wiki/Eureka-REST-operations我们我们知道注册到服务端是调用的POST /eureka/v2/apps/appID 接口,找到了ApplicationsResource类中调用了ApplicationResource的addInstance()方法,找到register()方法

@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;//默认有效时长90m
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);//实例注册,下面具体看这个
        //同步到其他服务
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
 private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();//线程安全的一个服务实例map,name为cloud项目中的实例名字,嵌套里面的map是以key为instanceId,Lease对象为value的一个map
 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());//appname就为cloud配置里的spring.application.name
                REGISTER.increment(isReplication);
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);//如果第一个实例注册进来的时候会给registryput进去一个空的lease
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());//这个id就是instanceId

                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                //put了一个lease
                gMap.put(registrant.getId(), lease);

            } finally {
                read.unlock();
            }
        }

服务端接受客户端的续约(心跳)

       接口在InstanceResource#renewLease()。服务续约其实就是维护实例状态,更新一下最后更新时间,然后同步到其他服务端。直接看renew()方法

  public boolean renew(String appName, String id, boolean isReplication) {
            RENEW.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到实例对应的lease对象
            Lease<InstanceInfo> leaseToRenew = null;
            if (gMap != null) {
                leaseToRenew = gMap.get(id);//得到实例
            }
            if (leaseToRenew == null) {//error
                return false;
            } else {
                InstanceInfo instanceInfo = leaseToRenew.getHolder();
                if (instanceInfo != null) {
                    // touchASGCache(instanceInfo.getASGName());
                    InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                            instanceInfo, leaseToRenew, isReplication);
                    ...
                    if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                        Object[] args = {
                                instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId()
                        };
                        instanceInfo.setStatus(overriddenInstanceStatus);//修改实例状态
                    }
                }
                renewsLastMin.increment();
                leaseToRenew.renew();//更新组后更新时间
                return true;
            }
        }

服务端接受客户端要下架请求

       服务下架接口在InstanceResource#cancelLease()方法,直接看internalCancel()方法

protected boolean internalCancel(String appName, String id, boolean isReplication) {
            try {
                Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到所有实例
                Lease<InstanceInfo> leaseToCancel = null;
                if (gMap != null) {
                    leaseToCancel = gMap.remove(id);//从map中移除掉下架实例
                }
                synchronized (recentCanceledQueue) {
                    recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
                }
                InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
                if (instanceStatus != null) {
                    logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
                }
                if (leaseToCancel == null) {
                    return false;
                } else {
                    leaseToCancel.cancel();
                    InstanceInfo instanceInfo = leaseToCancel.getHolder();
                    String vip = null;
                    String svip = null;
                    if (instanceInfo != null) {
                        instanceInfo.setActionType(ActionType.DELETED);
                        recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                        instanceInfo.setLastUpdatedTimestamp();
                        vip = instanceInfo.getVIPAddress();
                        svip = instanceInfo.getSecureVipAddress();
                    }
                    invalidateCache(appName, vip, svip);
                    logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                    return true;
                }
            } finally {
            }
        }

3.3服务剔除

       当客户端长时间(默认90秒)没有给服务端发送请求的时候,就说明客户端down了,看过Spring源码得都明白,Spring源码比较重要得方法就在AbstractApplicationContext#refresh()方法,里面从扫描了xml/java文件到扫描注解,到进行DI到ioc容器然后再到销毁bean,最后有一个finishRefresh();方法,这是Spring所有工作做完之后调用得方法,一直调到了DefaultLifecycleProcessor#onRefresh()下得#startBeans(true);下的#start();下的doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);下的start()方法。在这里会调用到实现了Lifecycle接口的所有的start()方法,而在EurekaServerAutoConfiguration类中,我们看到import了一个实现了Lifecycle接口的EurekaServerInitializerConfiguration类,在start方法里初始化了一个单独的EurekaServerContext的上下文。在initEurekaServerContext()方法中,
执行了registry.openForTraffic(applicationInfoManager, registryCount);最后一句调用了AbstractInstanceRegistry#postInit()方法,在此方法里开启了一个每60秒调用一次EvictionTask#evict()的定时器。

  public void evict(long additionalLeaseMs) {
          if (!isLeaseExpirationEnabled()) {//如果开启自我保护,则不自动剔除。默认开启
            logger.debug("DS: lease expiration is currently disabled.");
            return;
            }

            List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
            for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
                Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
                if (leaseMap != null) {
                    for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                        Lease<InstanceInfo> lease = leaseEntry.getValue();
                        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                            expiredLeases.add(lease);//如果过期了,加入到expiredLeases的list中
                        }
                    }
                }
            }
            // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
            // triggering self-preservation. Without that we would wipe out full registry.
            int registrySize = (int) getLocalRegistrySize();
            int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
            int evictionLimit = registrySize - registrySizeThreshold;

            int toEvict = Math.min(expiredLeases.size(), evictionLimit);
            if (toEvict > 0) {
                logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

                Random random = new Random(System.currentTimeMillis());
                for (int i = 0; i < toEvict; i++) {
                    // Pick a random item (Knuth shuffle algorithm)
                    int next = i + random.nextInt(expiredLeases.size() - i);
                    Collections.swap(expiredLeases, i, next);
                    Lease<InstanceInfo> lease = expiredLeases.get(i);

                    String appName = lease.getHolder().getAppName();
                    String id = lease.getHolder().getId();
                    EXPIRED.increment();
                    logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                    internalCancel(appName, id, false);//移除服务器缓存,同步其他服务器
                }
            }
        }

3.4服务自我保护模式

       客户端长时间不发送续约(心跳),服务端默认每一分钟会进行一次服务剔除,3.3里又一个isLeaseExpirationEnabled()方法:

     /** * 期望 最大 每分钟 续租 次数。 计算公式 当前注册的应用实例数 x 2 */
     protected volatile int expectedNumberOfRenewsPerMin ;
     /** * 期望 最小 每分钟 续租 次数。 计算公式 expectedNumberOfRenewsPerMin * 续租百分比( eureka.renewalPercentThreshold ) */
     protected volatile int numberOfRenewsPerMinThreshold ;
  @Override
    public boolean isLeaseExpirationEnabled() {
        if (!isSelfPreservationModeEnabled()) {//默认打开自我保护,false则关闭自我保护
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;//每分钟心跳数大于期望最小每分钟续租次数代表这个实例还活着
    }

3.5服务之间信息同步

       上面说到的服务注册,服务剔除,服务续约等功能的时候在修改完本地业务之后会调用PeerAwareInstanceRegistryImpl#replicateToPeers()方法,同步到其他服务器。

 private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {//
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
 /** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

       Eureka二次传播问题:当eureka A配置到B,B配置到C的时候,客户端注册到服务器A,这个时候服务器A,B会有客户端信息,C则没有。代码分析结果如下:
《深入理解Spring cloud源码篇之Eureka源码》
当客户端注册到服务端A的时候A上有客户端信息,这个时候会同步一遍B服务端,则,B同步到C的时候isReplication则为false,就不会同步过去了。

    原文作者:Spring Cloud
    原文地址: https://blog.csdn.net/lgq2626/article/details/80288992
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞