十、Spring cloud服务短路(Hystrix)之源码解析

  • Spring Cloud Hystrix 源码解读
  • Nertflix Hystrix 源码解读
  • RxJava 基础

一、Spring Cloud Hystrix 源码解读

1、@EnableCircuitBreaker

(1)职责:
激活 Circuit Breaker

(2)调用链路:

    @EnableCircuitBreaker 
    <!-- 通过 EnableCircuitBreaker 注解上的注解 @Import(EnableCircuitBreakerImportSelector.class) 可知 -->
    -> EnableCircuitBreakerImportSelector 
    <!-- EnableCircuitBreakerImportSelector 继承 SpringFactoryImportSelector<EnableCircuitBreaker> ,通过探寻 发现,SpringFactoryImportSelector 类下的 selectImports() 方法中的 List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); 可知,这里是以 EnableCircuitBreaker 全限定名为key,找到对应的默认实现。通过 搜索 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker 找到 spring cloud 框架下的 spring.factories 文件中,找到了其默认实现: org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration -->
    -> HystrixCircuitBreakerConfiguration

2、HystrixCircuitBreakerConfiguration

(1)初始化组件

  • HystrixCommandAspect
  • HystrixShutdownHook
  • HasFeatures

二、Nertflix Hystrix 源码解读

1、HystrixCommandAspect

(1)依赖组件

  • MetaHolderFactory:生成拦截方法元信息
  • HystrixCommandFactory:生成 HystrixInvokable
  • HystrixInvokable
    • CommandCollapser
    • GenericObservableCommand
    • GenericCommand

2、Future 来实现超时熔断

/**
 * 通过 {@link Future} 实现 服务熔断
 * @author 咸鱼
 * @date 2018/11/14 20:12
 */
public class FutureCircuitBreakerDeo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //初始化线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        RandomCommand randomCommand = new RandomCommand();

        Future<String> future = executorService.submit(() -> {
            //获取 run 方法计算结果
            return randomCommand.run();
        });

        String result = null;
        // 100 ms 超时时间
        try {
            result = future.get(100, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            // fallback 方法调用
            result = randomCommand.fallback();
        }

        System.out.println(result);

        executorService.shutdown();
    }

    /**
     * 随即对象
     */
    private static final Random RANDOM = new Random();

    /**
     * 随机事件执行命令
     */
    static class RandomCommand implements Command<String> {
        @Override
        public String run() throws InterruptedException {
            long executeTime = RANDOM.nextInt(200);

            System.out.println("execute time : " + executeTime + "ms");

            Thread.sleep(executeTime);

            return "hello";
        }

        @Override
        public String fallback() {
            return "fallback";
        }
    }


    public static interface Command<T> {
        /**
         * 正常执行,并且返回结果
         * @return T
         */
        T run() throws InterruptedException;

        /**
         * 错误时,返回容错结果
         * @return T
         */
        T fallback();
    }
}

三、RxJava 基础

1、单数据:Single API

//仅能发布单个数据
        Single.just("Hello,World!")
                //在I/O线程执行
                .subscribeOn(Schedulers.io())
                //订阅并且消费数据
                .subscribe(RxJavaDemo::println);
                Thread.sleep(100);

2、多数据:Observable API

List<Integer> values = Arrays.asList(1,2,3,4,5,6,7,8);
        //发布多个数据
        Observable.from(values)
                .subscribeOn(Schedulers.computation())
                //订阅并且消费数据
                .subscribe(RxJavaDemo::println);
        Thread.sleep(100);

3、使用标准 Reactive 模式:

public static void demoStandardReactive() throws InterruptedException {
        List<Integer> values = Arrays.asList(1,2,3);
        //发布多个数据
        Observable.from(values)
                .subscribeOn(Schedulers.newThread())
                //subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
                //接口 Action1:需实现call(T t)
                //接口 Action0:需实现call()
                .subscribe(
                        //参数一:消费数据
                        value -> {
                            if (value > 2) {
                                throw new IllegalStateException("数据不容许大于2");
                            }
                            println("消费数据:" + value);
                        },
                        //参数二:当发生异常时,中断执行
                        e -> println("发生异常:" + e.getMessage()),
                        //参数三:当逻辑执行完毕时
                        () -> println("逻辑执行完毕"))
        ;
        //上面是异步执行,需要休眠等待其执行完毕
        Thread.sleep(100);
    }
    原文作者:Spring Cloud
    原文地址: https://blog.csdn.net/panchang199266/article/details/84072712
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞