Spring Cloud 源码学习之 Hystrix Metrics 收集

欢迎访问陈同学博客原文

文中源码基于 Spring Cloud Finchley.SR1、Spring Boot 2.0.6.RELEASE.

Hystrix 其他文章:Spring Cloud 源码学习之 Hystrix 入门Spring Cloud 源码学习之 Hystrix 工作原理Spring Cloud 之 Hystrix 跨线程传递数据

在 Hystrix Command 执行过程(开始执行、结束执行、异常、超时)时会不断发出各类事件,通过收集这些数据,提供给消费者。如断路器、Hystrix Dashboard可以统计分析这些数据,从而完成特定的功能。

本文学习了 Metrics 收集的源码,并整理成下图。由于 Hystrix 发出的事件种类很多,本文仅以命令结束执行事件作为学习实例。

《Spring Cloud 源码学习之 Hystrix Metrics 收集》

Subject简述

Hystrix 基于 RxJava,本文涉及到 Subject 概念,这里提一下 rx.subjects.Subject

public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {}

Subject 继承Observable,因此可作为被观察者、数据源,也就是一个数据发射器;

实现了接口 Observer,因此可作为观察者,可以订阅其他Observable,处理Observable发射出的数据。

因此,Subject既可以发射数据,也可以接收数据。类比于菜鸟驿站,可以收、发快递

Metrics 收集流程

整个过程分成以下三步:

1.使用HystrixCommandMetrics记录metrics

每个Command的构造器中会获取一个HystrixCommandMetrics工具,用来记录metrics。

// 构造器利用HystrixCommandMetrics获取命令key对应的对象
HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties);
// HystrixCommandMetrics 中存储HystrixCommandMetrics的数据结构
private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics;

也就是说,每个CommandKey会拥有一个对应的HystrixCommandMetrics工具。

例如:A服务利用Feign远程调用B服务,那下面的 service-B 会自动作为命令的key。

@FeignClient(name = "service-B")

下面是利用HystrixCommandMetrics工具发射 标记命令结束 的事件代码:

void markCommandDone(...) {
    HystrixThreadEventStream.getInstance().executionDone(...);
}

2.Per-Thread 事件处理者

HystrixCommandMetrics提供了基础工具方法给Command使用,而HystrixCommandMetrics的实际使用的是HystrixThreadEventStream: Per-thread event stream

它是线程级别的数据处理者,每个线程拥有自己的HystrixThreadEventStream,HystrixThreadEventStream.getInstance() 是从ThreadLocal中获取对象。

它包含了很多Subject<事件,事件>,用来接收和发射数据。下面是HystrixThreadEventStream类:

public class HystrixThreadEventStream {
    // Per-thread 的HystrixThreadEventStream
    private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams;
   	// 用来接收和发射HystrixCommandCompletion事件的Subject
    private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
}

HystrixCommandCompletion是事件(HystrixCommandEvent)的一种,writeOnlyCommandCompletionSubject这个Subject的初始化方式如下:

// 创建为一个数据发射器
writeOnlyCommandCompletionSubject = PublishSubject.create();
writeOnlyCommandCompletionSubject
        .onBackpressureBuffer()
    	// 绑定发射数据时的处理者
        .doOnNext(writeCommandCompletionsToShardedStreams)
        .unsafeSubscribe(Subscribers.empty());

writeCommandCompletionsToShardedStreams会怎么处理数据呢?下面是它的定义:

// 它是一个可执行的实体,没有返回值,可以传入一个参数; 和 Runnable很像
private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
    // 当接收到数据时, 又将数据发送给了command级别的处理者
    @Override
    public void call(HystrixCommandCompletion commandCompletion) {
        // 获取CommandKey对应的HystrixCommandCompletionStream
        HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
        // 写入数据
        commandStream.write(commandCompletion);
		...
    }
};

现在再回过来看HystrixThreadEventStream这个Per-thread的工具发射 标记命令结束事件 的代码:

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
    // 构建命令结束的数据对象
    HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
    // 利用上面的Subject发射数据, onNext()就是发射一条数据。
    writeOnlyCommandCompletionSubject.onNext(event);
}

由于writeOnlyCommandCompletionSubject绑定了数据处理者(上面的writeCommandCompletionsToShardedStreams这个Action1)。它会利用command级别的工具来发射数据。

3.Per-Command 事件处理者

通过上一步知道,每个线程有自己的工具(HystrixThreadEventStream)来处理数据,最终这个工具利用了命令级别的工具。上面的HystrixCommandCompletionStream 属于 HystrixEventStream 的一种,HystrixEventStream专门用于处理command级别的数据,它有如下几个子类:

HystrixCommandCompletionStream
HystrixCommandStartStream
HystrixThreadPoolCompletionStream
HystrixThreadPoolStartStream
HystrixCollapserEventStream

这几个子类都是用来处理特定类型事件的工具,以HystrixCommandCompletionStream为例子,这些子类的结构都很类似,可以接收数据,并将数据提供给其他消费者。

public class HystrixCommandCompletionStream {
    // 一个用于接收和发射结束事件的Subject
    private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlySubject;
    // 一个Observable,将接收到的数据作为数据源发射给其他消费者
	private final Observable<HystrixCommandCompletion> readOnlyStream;
}

先看看这个Per-Command 的对象是怎么创建的?

// 存储结构
private static final ConcurrentMap<String, HystrixCommandCompletionStream> streams = new ConcurrentHashMap<String, HystrixCommandCompletionStream>();

// 单例模式拿到HystrixCommandCompletionStream,以命令的key为索引存储在ConcurrentMap中
public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) {
    HystrixCommandCompletionStream initialStream = streams.get(commandKey.name());
    if (initialStream != null) {
        return initialStream;
    } else {
        synchronized (HystrixCommandCompletionStream.class) {
            ...
        }
    }
}

下面是它的构造函数:

HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
    this.commandKey = commandKey;
	// 创建可以发射数据的Subject
    this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
    // readOnlyStream是一个Observable, share()方法可以将上面Subject发射的数据全部广播给readOnlyStream,相当于拷贝了一份一模一样的数据
    this.readOnlyStream = writeOnlySubject.share();
}

这个类提供了很重要的两个方法:

// 提供了接收数据的方法,其他工具(如HystrixThreadEventStream)可以将数据写进来
public void write(HystrixCommandCompletion event) {
    writeOnlySubject.onNext(event);
}

// 实现HystrixEventStream的observe(方法), 其他消费者可以利用observe()拿到这个数据源,然后订阅它,处理它发射的所有数据
@Override
public Observable<HystrixCommandCompletion> observe() {
    return readOnlyStream;
}

小结

通过上面三步,数据流向就很清楚了:

  • Command直接使用HystrixCommandMetrics来记录命令开始、结束等事件
  • HystrixCommandMetrics利用线程级别的HystrixThreadEventStream的来接收数据
  • HystrixThreadEventStream完成各种事件的封装(如将结束事件封装成HystrixCommandCompletion),再利用command级别的HystrixEventStream来接收数据(HystrixEventStream有不同的子类来处理不同的事件)
  • 最终消费者通过HystrixEventStream的observe()方法,拿到这个数据源,然后订阅它,从而源源不断的拿到Command发射出的各种数据

谁在最终消费数据?

通过上述步骤,将Hystrix Command执行过程的各种信息转化成了特定数据结构的事件,然后提供了一个Observable作为数据源。如果需要使用这些数据,各观察者只需要订阅Observable就可以拿到这些已经分门别类且结构化的数据了。

例如:断路器就是利用这些信息,然后统计分析数据,最终提供断路器的功能。

本文不深入断路器,仅关注各项事件的收集过程中的数据流向。下一遍文章将分享断路器是如何利用这些基础数据,如何使用滑动窗口的原理来处理数据,感兴趣可以关注奥。

附录

HystrixEvent

HystrixEvent是一个事件标记接口,其子类都是些特定数据结构的数据对象。像HystrixThreadEventStream会封装这个事件。

《Spring Cloud 源码学习之 Hystrix Metrics 收集》

HystrixEventStream

HystrixEventStream各子类提供了write()方法供其他对象写入HystrixEvent,然后再提供observe()方法,供其他消费者来消费这些数据。

《Spring Cloud 源码学习之 Hystrix Metrics 收集》

欢迎关注陈同学的公众号,一起学习,一起成长

《Spring Cloud 源码学习之 Hystrix Metrics 收集》

    原文作者:Spring Cloud
    原文地址: https://blog.csdn.net/myle69/article/details/84995338
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞