- Spring Cloud Hystrix 源码解读
- Nertflix Hystrix 源码解读
- RxJava 基础
一、Spring Cloud Hystrix 源码解读
1、@EnableCircuitBreaker
(1)职责:
激活 Circuit Breaker
(2)调用链路:
@EnableCircuitBreaker
<!-- 通过 EnableCircuitBreaker 注解上的注解 @Import(EnableCircuitBreakerImportSelector.class) 可知 -->
-> EnableCircuitBreakerImportSelector
<!-- EnableCircuitBreakerImportSelector 继承 SpringFactoryImportSelector<EnableCircuitBreaker> ,通过探寻 发现,SpringFactoryImportSelector 类下的 selectImports() 方法中的 List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); 可知,这里是以 EnableCircuitBreaker 全限定名为key,找到对应的默认实现。通过 搜索 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker 找到 spring cloud 框架下的 spring.factories 文件中,找到了其默认实现: org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration -->
-> HystrixCircuitBreakerConfiguration
2、HystrixCircuitBreakerConfiguration
(1)初始化组件
- HystrixCommandAspect
- HystrixShutdownHook
- HasFeatures
二、Nertflix Hystrix 源码解读
1、HystrixCommandAspect
(1)依赖组件
- MetaHolderFactory:生成拦截方法元信息
- HystrixCommandFactory:生成 HystrixInvokable
- HystrixInvokable
- CommandCollapser
- GenericObservableCommand
- GenericCommand
2、Future 来实现超时熔断
/**
* 通过 {@link Future} 实现 服务熔断
* @author 咸鱼
* @date 2018/11/14 20:12
*/
public class FutureCircuitBreakerDeo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//初始化线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
RandomCommand randomCommand = new RandomCommand();
Future<String> future = executorService.submit(() -> {
//获取 run 方法计算结果
return randomCommand.run();
});
String result = null;
// 100 ms 超时时间
try {
result = future.get(100, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// fallback 方法调用
result = randomCommand.fallback();
}
System.out.println(result);
executorService.shutdown();
}
/**
* 随即对象
*/
private static final Random RANDOM = new Random();
/**
* 随机事件执行命令
*/
static class RandomCommand implements Command<String> {
@Override
public String run() throws InterruptedException {
long executeTime = RANDOM.nextInt(200);
System.out.println("execute time : " + executeTime + "ms");
Thread.sleep(executeTime);
return "hello";
}
@Override
public String fallback() {
return "fallback";
}
}
public static interface Command<T> {
/**
* 正常执行,并且返回结果
* @return T
*/
T run() throws InterruptedException;
/**
* 错误时,返回容错结果
* @return T
*/
T fallback();
}
}
三、RxJava 基础
1、单数据:Single API
//仅能发布单个数据
Single.just("Hello,World!")
//在I/O线程执行
.subscribeOn(Schedulers.io())
//订阅并且消费数据
.subscribe(RxJavaDemo::println);
Thread.sleep(100);
2、多数据:Observable API
List<Integer> values = Arrays.asList(1,2,3,4,5,6,7,8);
//发布多个数据
Observable.from(values)
.subscribeOn(Schedulers.computation())
//订阅并且消费数据
.subscribe(RxJavaDemo::println);
Thread.sleep(100);
3、使用标准 Reactive 模式:
public static void demoStandardReactive() throws InterruptedException {
List<Integer> values = Arrays.asList(1,2,3);
//发布多个数据
Observable.from(values)
.subscribeOn(Schedulers.newThread())
//subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
//接口 Action1:需实现call(T t)
//接口 Action0:需实现call()
.subscribe(
//参数一:消费数据
value -> {
if (value > 2) {
throw new IllegalStateException("数据不容许大于2");
}
println("消费数据:" + value);
},
//参数二:当发生异常时,中断执行
e -> println("发生异常:" + e.getMessage()),
//参数三:当逻辑执行完毕时
() -> println("逻辑执行完毕"))
;
//上面是异步执行,需要休眠等待其执行完毕
Thread.sleep(100);
}