前言
本文我们来分析一下CounterWriter,GaugeWriter接口的实现.前面的几篇文章,我们介绍了spring boot中的CounterService,GaugeService.数据有了,总得有输出的地方.那么本文就为你介绍一番.关于这部分的类图如下:
这里面有些实现是不会进行详解的,有很多实现都是依赖第三方实现的.
解析
CounterWriter
CounterWriter–> 计数器的简单writer接口.代码如下:
public interface CounterWriter {
// 增加当前metric的值(或者减少,如果Delta 是负数的话). Delta 中指定的name指定了要增加的metric的名字
void increment(Delta<?> delta);
// 重置,通常会置为0.该操作是可选的(一些实现可能无法实现该契约--> 什么也没有做)
void reset(String metricName);
}
GaugeWriter
GaugeWriter–>代码如下:
public interface GaugeWriter {
void set(Metric<?> value);
}
OpenTsdbGaugeWriter
这是使用OpenTSDB来实现的,OpenTSDB –> 基于Hbase的分布式的,可伸缩的时间序列数据库。主要用途,就是做监控系统;譬如收集大规模集群(包括网络设备、操作系统、应用程序)的监控数据并进行存储,查询。关于这方面的内容可以参考如下链接:
MetricWriter
MetricWriter–> 继承自GaugeWriter,CounterWriter接口.没有声明其他的方法,只是1个合并接口。如下:
public interface MetricWriter extends GaugeWriter, CounterWriter {
}
StatsdMetricWriter
StatsdMetricWriter–> 依靠StatsD来实现MetricWriter. .Statsd 有计数器和gauge(测量)的概念,但是gauges只支持Long类型的数据.所以其值将被截断,接近零。Metrics 中含有timer. 名称的(不是gauge. 或者counter.)的,会被视作执行次数(在statsd的概念中)任何递增的被视作计数器,任何拥有快照值的被视作统计器.
关于StatsD的可以参考如下链接:
使用 StatsD + Grafana + InfluxDB 搭建 Node.js 监控系统
MessageChannelMetricWriter
MessageChannelMetricWriter–> 实现了MetricWriter接口,1个发布metric 更新消息通过MessageChannel的MetricWriter.发送的消息将Delta和Metric 作为消息体,并且带上了消息头–>metricName,说明是那个metric 需要处理.需要spring-messaging的依赖.
该类的字段,构造器如下:
// 此时注入的是id为metricsChannel的MessageChannel private final MessageChannel channel; public MessageChannelMetricWriter(MessageChannel channel) { this.channel = channel; }
该类方法实现如下:
public void increment(Delta<?> delta) { this.channel.send(MetricMessage.forIncrement(delta)); } public void set(Metric<?> value) { this.channel.send(MetricMessage.forSet(value)); } public void reset(String metricName) { this.channel.send(MetricMessage.forReset(metricName)); }
可以发现都是向d为metricsChannel的MessageChannel中发送消息.消息是由MetricMessage来创建的.
MetricMessage的字段,构造器如下:
// 构造Message中添加的头部 private static final String METRIC_NAME = "metricName"; // 构造Reset类型的消息时的消息体 private static final String DELETE = "delete"; private final Message<?> message; MetricMessage(Message<?> message) { this.message = message; }
方法如下:
public static Message<?> forIncrement(Delta<?> delta) { return forPayload(delta.getName(), delta); } public static Message<?> forSet(Metric<?> value) { return forPayload(value.getName(), value); } public static Message<?> forReset(String metricName) { return forPayload(metricName, DELETE); }
发现最终都是调用forPayload–> 根据消息的类型,metricName 构建出对应的Message.代码如下:
private static Message<?> forPayload(String metricName, Object payload) { MessageBuilder<Object> builder = MessageBuilder.withPayload(payload); builder.setHeader(METRIC_NAME, metricName); return builder.build(); }
自动装配:
在MetricsChannelAutoConfiguration中进行了装配,代码如下:
@Configuration @ConditionalOnClass(MessageChannel.class) @ConditionalOnBean(name = "metricsChannel") @AutoConfigureBefore(MetricRepositoryAutoConfiguration.class) public class MetricsChannelAutoConfiguration { @Bean @ExportMetricWriter @ConditionalOnMissingBean public MessageChannelMetricWriter messageChannelMetricWriter( @Qualifier("metricsChannel") MessageChannel channel) { return new MessageChannelMetricWriter(channel); } }
当满足以下条件时会进行装配:
- @ConditionalOnClass(MessageChannel.class)–> 在当前的类路径下存在MessageChannel.class时生效
- @ConditionalOnBean(name = “metricsChannel”) –> BeanFactory中存在id为metricsChannel的bean时生效
- @ConditionalOnMissingBean –> BeanFactory中不存在MessageChannelMetricWriter类型的bean时生效
使用案例:
关于如何使用,我们后续有文章会进行分析.
JmxMetricWriter
关于这个后续有文章会进行分析
CompositeMetricWriter
CompositeMetricWriter–> 组合其他MetricWriter的实现,在方法的实现过程中会依次调用其持有的MetricWriter进行处理.组合模式.代码如下:
public class CompositeMetricWriter implements MetricWriter, Iterable<MetricWriter> {
private final List<MetricWriter> writers = new ArrayList<MetricWriter>();
public CompositeMetricWriter(MetricWriter... writers) {
Collections.addAll(this.writers, writers);
}
public CompositeMetricWriter(List<MetricWriter> writers) {
this.writers.addAll(writers);
}
@Override
public Iterator<MetricWriter> iterator() {
return this.writers.iterator();
}
@Override
public void increment(Delta<?> delta) {
for (MetricWriter writer : this.writers) {
writer.increment(delta);
}
}
@Override
public void set(Metric<?> value) {
for (MetricWriter writer : this.writers) {
writer.set(value);
}
}
@Override
public void reset(String metricName) {
for (MetricWriter writer : this.writers) {
writer.reset(metricName);
}
}
}
MetricRepository,InMemoryMetricRepository
我们在spring boot 源码解析37-CounterService详解中详细记录,这里就不在赘述了.
RedisMetricRepository
RedisMetricRepository–> 1个使用redis 来实现MetricRepository. Metric的值被存储到zset中,时间戳存储到string中,其key是由前缀(默认是spring.metrics.)加metric名组成的. 如果你有许多的RedisMetricRepository使用同一个Redis的实例,可能需要改变其前缀使其唯一(除非你希望它们对同一前缀的metrics)
字段如下:
// redis中存储key的默认前缀 private static final String DEFAULT_METRICS_PREFIX = "spring.metrics."; private static final String DEFAULT_KEY = "keys.spring.metrics"; // 前缀 private String prefix = DEFAULT_METRICS_PREFIX; // redis中的zset的名称 private String key = DEFAULT_KEY; private BoundZSetOperations<String, String> zSetOperations; private final RedisOperations<String, String> redisOperations;
构造器如下:
public RedisMetricRepository(RedisConnectionFactory redisConnectionFactory) { this(redisConnectionFactory, null); } public RedisMetricRepository(RedisConnectionFactory redisConnectionFactory, String prefix) { this(redisConnectionFactory, prefix, null); } public RedisMetricRepository(RedisConnectionFactory redisConnectionFactory, String prefix, String key) { if (prefix == null) { prefix = DEFAULT_METRICS_PREFIX; if (key == null) { key = DEFAULT_KEY; } } else if (key == null) { key = "keys." + prefix; } Assert.notNull(redisConnectionFactory, "RedisConnectionFactory must not be null"); this.redisOperations = RedisUtils.stringTemplate(redisConnectionFactory); if (!prefix.endsWith(".")) { prefix = prefix + "."; } this.prefix = prefix; if (key.endsWith(".")) { key = key.substring(0, key.length() - 1); } this.key = key; this.zSetOperations = this.redisOperations.boundZSetOps(this.key); }
最终都会调用其RedisMetricRepository(RedisConnectionFactory redisConnectionFactory,String prefix, String key)构造器,处理逻辑如下:
如果prefix等于null,则使用默认的前缀–>spring.metrics.
- 如果key等于null,则使用默认的key–>keys.spring.metrics
如果前缀指定了,key没有指定,则key等于在指定的prefix前加上keys.
实例化RedisOperations
- 如果prefix不是以.结尾的,则为其加上. 并赋值给prefix.
- 如果key是以.结尾的,则进行截取,并设置给key
- 根据给的key 创建BoundZSetOperations,默认为keys.spring.metrics
其方法实现如下:
set,代码如下:
public void set(Metric<?> value) { // 1. 将Metric的名字加上前缀 String name = value.getName(); String key = keyFor(name); // 2. 加入到zset中,value 为key,评分为value值 trackMembership(key); this.zSetOperations.add(key, value.getValue().doubleValue()); // 3. 将Metric的时间戳存在redis中,key--> Metric的名字加上前缀,value-->时间戳的字符串格式 String raw = serialize(value); this.redisOperations.opsForValue().set(key, raw); }
将Metric的名字加上前缀.代码如下:
private String keyFor(String name) { return this.prefix + name; }
首先在zset中添加value为metric的名字加上前缀,评分为0,然后将其评分设置为value值
private void trackMembership(String redisKey) { this.zSetOperations.incrementScore(redisKey, 0.0D); }
将Metric的时间戳存在redis中,key–> Metric的名字加上前缀,value–>时间戳的字符串格式.代码如下:
private String serialize(Metric<?> entity) { return String.valueOf(entity.getTimestamp().getTime()); }
reset–>从zset和String结构中进行删除.代码如下:
public void reset(String metricName) { String key = keyFor(metricName); if (this.zSetOperations.remove(key) == 1) { this.redisOperations.delete(key); } }
- 对给定的metricName加上前缀
- 从zset和String结构中进行删除
increment,代码如下:
public void increment(Delta<?> delta) { String name = delta.getName(); String key = keyFor(name); trackMembership(key); double value = this.zSetOperations.incrementScore(key, delta.getValue().doubleValue()); String raw = serialize(new Metric<Double>(name, value, delta.getTimestamp())); this.redisOperations.opsForValue().set(key, raw); }
- 对指定Delta的名字加上前缀
- 如果zset中存在对应的key,则为其增长0,如果不存在,就加入,评分为0
- 对指定的值增长评分
- 实例化1个Metric后进行序列化,加入到String 结构中,key–> Delta的名字加上前缀,value->时间戳的字符串格式
count–>返回zset中的数量.代码如下:
public long count() { return this.zSetOperations.size(); }
findOne–> 根据给定的metricName获得对应的Metric.代码如下:
public Metric<?> findOne(String metricName) { // 1. 对metricName加上前缀生成rediskey String redisKey = keyFor(metricName); // 2. 获得对应的时间戳 String raw = this.redisOperations.opsForValue().get(redisKey); // 3. 反序列化 return deserialize(redisKey, raw, this.zSetOperations.score(redisKey)); }
- 对metricName加上前缀生成rediskey
- 获得对应的时间戳
获得在zset中存储得到对应的评分(value值),然后进行反序列化.代码如下:
private Metric<?> deserialize(String redisKey, String v, Double value) { if (redisKey == null || v == null || !redisKey.startsWith(this.prefix)) { return null; } Date timestamp = new Date(Long.valueOf(v)); return new Metric<Double>(nameFor(redisKey), value, timestamp); }
findAll,代码如下:
public Iterable<Metric<?>> findAll() { // This set is sorted // 1. 获得zset存储的所有的value Set<String> keys = this.zSetOperations.range(0, -1); Iterator<String> keysIt = keys.iterator(); List<Metric<?>> result = new ArrayList<Metric<?>>(keys.size()); // 2. 根据zset中存储的value获得存储在string结构中的值(时间戳) List<String> values = this.redisOperations.opsForValue().multiGet(keys); // 3. 依次遍历values for (String v : values) { // 3.1 获得存储在zset中的评分,然后进行反序列化为Metric. String key = keysIt.next(); Metric<?> value = deserialize(key, v, this.zSetOperations.score(key)); // 3.2 如果不为null,则加入到result中 if (value != null) { result.add(value); } } return result; }
- 获得zset存储的所有的value
- 根据zset中存储的value获得存储在string结构中的值(时间戳)
依次遍历values
- 获得存储在zset中的评分,然后进行反序列化为Metric.
- 如果不为null,则加入到result中
思考:为什么能够这样实现?
因为首先是通过zet zrange 0 -1 获得zset中所有的value,按照从小到大的顺序返回.然后通过MGET 命令获得对应的值,2者都是有序的,因此,可以这样实现
RichGaugeRepository,InMemoryRichGaugeRepository
RichGaugeRepository,InMemoryRichGaugeRepository的实现我们在后续的文章中会进行分析.