前言
本文来介绍MetricReader的实现,其类图如下:
其中如下实现在之前的文章中有介绍,本文不在对其进行说明:
- BufferMetricReader–> 在 spring boot 源码解析44-PrefixMetricReader,PrefixMetricWriter,MultiMetricRepository 中有说明
- MetricRepository,InMemoryMetricRepository–>在 spring boot 源码解析37-CounterService详解中有说明
- RedisMetricRepository–>在spring boot 源码解析41-CounterWriter,GaugeWriter解析中介绍
解析
MetricReader
MetricReader–> 个简单的读取数据的接口–>用来审查Metric.代码如下:
public interface MetricReader {
// 获得1个是指定name的Metric的实例(通常会返回最后1个记录的)Metric,如果没有对应的,则返回null
Metric<?> findOne(String metricName);
// 获得所有MetricReader持有的Metric
Iterable<Metric<?>> findAll();
// 获得MetricReader持有的Metric得到数量
long count();
}
CompositeMetricReader
CompositeMetricReader–> 组合模式,所有的方法都是调用其内部持有的MetricReader依次去处理.代码如下:
public class CompositeMetricReader implements MetricReader {
private final List<MetricReader> readers = new ArrayList<MetricReader>();
public CompositeMetricReader(MetricReader... readers) {
Collections.addAll(this.readers, readers);
}
@Override
public Metric<?> findOne(String metricName) {
for (MetricReader delegate : this.readers) {
Metric<?> value = delegate.findOne(metricName);
if (value != null) {
return value;
}
}
return null;
}
@Override
public Iterable<Metric<?>> findAll() {
List<Metric<?>> values = new ArrayList<Metric<?>>((int) count());
for (MetricReader delegate : this.readers) {
Iterable<Metric<?>> all = delegate.findAll();
for (Metric<?> value : all) {
values.add(value);
}
}
return values;
}
@Override
public long count() {
long count = 0;
for (MetricReader delegate : this.readers) {
count += delegate.count();
}
return count;
}
}
实例化地方如下:
关于这两处的调用点会在后续的文章中有介绍
MetricRegistryMetricReader
MetricRegistryMetricReader–> 实现了MetricReader, MetricRegistryListener,从MetricRegistry中读取metrics的MetricReader,Gauges 和 Counters 被反射成1个单一值,Timers,Meters and Histograms 被扩展到1个metrics的(包含所有Number的属性的)集合中.
字段,构造器如下:
private static final Log logger = LogFactory.getLog(MetricRegistryMetricReader.class); // key--> Histograms,Meter,Timer,Snapshot 对应的class, value --> 该class中属性是number类型的属性名 private static final Map<Class<?>, Set<String>> numberKeys = new ConcurrentHashMap<Class<?>, Set<String>>(); private final Object monitor = new Object(); // key-->metricName,value-->MetricRegistry 中对应的Metric 名 private final Map<String, String> names = new ConcurrentHashMap<String, String>(); // key--> MetricRegistry 中对应的Metric 名 value --> metricName private final MultiValueMap<String, String> reverse = new LinkedMultiValueMap<String, String>(); private final MetricRegistry registry; public MetricRegistryMetricReader(MetricRegistry registry) { this.registry = registry; registry.addListener(this); }
方法实现如下:
onGaugeAdded–>当Gauge添加到MetricRegistry时触发.代码如下:
public void onGaugeAdded(String name, Gauge<?> gauge) { this.names.put(name, name); synchronized (this.monitor) { this.reverse.add(name, name); } }
onCounterAdded–> 当Counter添加到MetricRegistry时触发.代码如下:
public void onCounterAdded(String name, Counter counter) { this.names.put(name, name); synchronized (this.monitor) { this.reverse.add(name, name); } }
onHistogramAdded–> 当Histogram添加到MetricRegistry时触发.代码如下:
public void onHistogramAdded(String name, Histogram histogram) { for (String key : getNumberKeys(histogram)) { String metricName = name + "." + key; this.names.put(metricName, name); synchronized (this.monitor) { this.reverse.add(name, metricName); } } for (String key : getNumberKeys(histogram.getSnapshot())) { String metricName = name + ".snapshot." + key; this.names.put(metricName, name); synchronized (this.monitor) { this.reverse.add(name, metricName); } } }
获得Histogram中属性是Number类型的属性名,此时获得的是count 依次处理之
- 将Number类型的属性名依次加上指定的name前缀生成metricName
- 添加到names,reverse中
获得Snapshot中属性是Number类型的属性名,依次处理之,将Number类型的属性名依次加上指定的name前缀生成metricName,添加到names,reverse中.此时获得的是75thPercentile, 98thPercentile, min, 95thPercentile, 99thPercentile, median, max, mean, 999thPercentile, stdDev.
getNumberKeys方法如下:
private static Set<String> getNumberKeys(Object metric) { // 1. 尝试从缓存中获取,如果获取不到的话,则进行初始化 Set<String> result = numberKeys.get(metric.getClass()); if (result == null) { result = new HashSet<String>(); } if (result.isEmpty()) { // 2. 如果result 等于null,意味着是第1次创建,则获得metric所有的PropertyDescriptor,遍历之 for (PropertyDescriptor descriptor : BeanUtils .getPropertyDescriptors(metric.getClass())) { // 2.1 如果属性的类型是Number类型的话,则加入到result中 if (ClassUtils.isAssignable(Number.class, descriptor.getPropertyType())) { result.add(descriptor.getName()); } } // 3. 加入到numberKeys中 numberKeys.put(metric.getClass(), result); } return result; }
onMeterAdded–>当Meter添加到MetricRegistry时触发.代码如下:
public void onMeterAdded(String name, Meter meter) { for (String key : getNumberKeys(meter)) { String metricName = name + "." + key; this.names.put(metricName, name); synchronized (this.monitor) { this.reverse.add(name, metricName); } } }
获得Meter中属性是Number类型的属性名,依次处理之,将Number类型的属性名依次加上指定的name前缀生成metricName,添加到names,reverse中.此时获得的是count,fifteenMinuteRate,fiveMinuteRate,meanRate,oneMinuteRate
onTimerAdded–> 当Timer添加到MetricRegistry时触发.代码如下:
public void onTimerAdded(String name, Timer timer) { for (String key : getNumberKeys(timer)) { String metricName = name + "." + key; this.names.put(metricName, name); synchronized (this.monitor) { this.reverse.add(name, metricName); } } for (String key : getNumberKeys(timer.getSnapshot())) { String metricName = name + ".snapshot." + key; this.names.put(metricName, name); synchronized (this.monitor) { this.reverse.add(name, metricName); } } }
获得Timer中属性是Number类型的属性名,依次处理之,将Number类型的属性名依次加上指定的name前缀生成metricName,添加到names,reverse中.此时获得的是count,fifteenMinuteRate,fiveMinuteRate,meanRate,oneMinuteRate
获得Snapshot中属性是Number类型的属性名,依次处理之,将Number类型的属性名依次加上指定的name前缀生成metricName,添加到names,reverse中.此时获得的是75thPercentile, 98thPercentile, min, 95thPercentile, 99thPercentile, median, max, mean, 999thPercentile, stdDev.
onGaugeRemoved,onCounterRemoved,onHistogramRemoved,onMeterRemoved,onTimerRemoved最终都会调用remove方法来处理.代码如下:
private void remove(String name) { List<String> keys; synchronized (this.monitor) { keys = this.reverse.remove(name); } if (keys != null) { for (String key : keys) { this.names.remove(name + "." + key); } } }
从reverse,names中删除
findOne,代码如下:
public Metric<?> findOne(String metricName) { // 1. 从names中获得指定metricName在MetricRegistry中注册的名字,如果不存在则返回null String name = this.names.get(metricName); if (name == null) { return null; } // 2. 从MetricRegistry中获得对应的Metric,如果获取不到,则返回null com.codahale.metrics.Metric metric = this.registry.getMetrics().get(name); if (metric == null) { return null; } // 3. 如果是Counter if (metric instanceof Counter) { Counter counter = (Counter) metric; return new Metric<Number>(metricName, counter.getCount()); } // 4. 如果是Gauge if (metric instanceof Gauge) { Object value = ((Gauge<?>) metric).getValue(); // 4.1 如果统计值是number类型的,则实例化Metric 返回,否则返回null if (value instanceof Number) { return new Metric<Number>(metricName, (Number) value); } if (logger.isDebugEnabled()) { logger.debug("Ignoring gauge '" + name + "' (" + metric + ") as its value is not a Number"); } return null; } // 5. 如果是抽样 if (metric instanceof Sampling) { // 5.1 如果metricName 含有.snapshot. if (metricName.contains(".snapshot.")) { // 5.2 获得对应的测量值 Number value = getMetric(((Sampling) metric).getSnapshot(), metricName); // 5.3 如果该metricName对应的metric为Timer,则将其值转换为毫秒 if (metric instanceof Timer) { // convert back to MILLISEC value = TimeUnit.MILLISECONDS.convert(value.longValue(), TimeUnit.NANOSECONDS); } // 5.4 返回 return new Metric<Number>(metricName, value); } } // 6. 获得对应的测量值 封装为Metric 后返回 return new Metric<Number>(metricName, getMetric(metric, metricName)); }
- 从names中获得指定metricName在MetricRegistry中注册的名字,如果不存在则返回null
- 从MetricRegistry中获得对应的Metric,如果获取不到,则返回null
- 如果是Counter,则直接返回对应的Metric
如果是Gauge
- 如果统计值是number类型的,则实例化Metric 返回,否则返回null
如果是Sampling的实例.
如果metricName 含有.snapshot.
- 获得对应的测量值
- 如果该metricName对应的metric为Timer,则将其值转换为毫秒
- 返回Metric
获得对应的测量值 封装为Metric 后返回.代码如下:
private static Number getMetric(Object metric, String metricName) { // 1. 获取metricName 最后1个点的后的字符串 String key = StringUtils.getFilenameExtension(metricName); // 2. 将metric 包装为BeanWrapperImpl,直接读取该key所对应的值 return (Number) new BeanWrapperImpl(metric).getPropertyValue(key); }
findAll,代码如下:
public Iterable<Metric<?>> findAll() { return new Iterable<Metric<?>>() { @Override public Iterator<Metric<?>> iterator() { Set<Metric<?>> metrics = new HashSet<Metric<?>>(); // 1. 遍历names for (String name : MetricRegistryMetricReader.this.names.keySet()) { // 2. 根据name 获得对应的Metric,如果不等于null,则添加到metrics中 Metric<?> metric = findOne(name); if (metric != null) { metrics.add(metric); } } // 3. 返回 return metrics.iterator(); } }; }
count.代码如下:
public long count() { return this.names.size(); }
自动装配:
在MetricsDropwizardAutoConfiguration中声明,如下:
@Configuration @ConditionalOnClass(MetricRegistry.class) @AutoConfigureBefore(MetricRepositoryAutoConfiguration.class) public class MetricsDropwizardAutoConfiguration { .... @Bean public MetricReaderPublicMetrics dropwizardPublicMetrics( MetricRegistry metricRegistry) { MetricRegistryMetricReader reader = new MetricRegistryMetricReader( metricRegistry); return new MetricReaderPublicMetrics(reader); } }
当满足如下条件时生效:
- ConditionalOnClass(MetricRegistry.class)–> 在类路径下存在MetricRegistry.class
MetricsEndpointMetricReader
后续文章分析
AggregateMetricReader
注意,该类没有自动装配
该类的字段,构造器如下:
private MetricReader source; private String keyPattern = "d.d"; // 应用到所有的输出metrics中. private String prefix = "aggregate."; public AggregateMetricReader(MetricReader source) { this.source = source; }
keyPattern 说明如下:
指定该类对于repository中的key,如何操作.这些key在repository中被假定为是.分隔的,且给定的格式也必须是同样的格式,比如:”d.d.k.d”.keyPattern 需要与repository中的keys进行匹配,并将应用如下规则:
- d–>丢弃这些key段(对全局前缀比如系统标识或者聚合keys a.k.a的物理标识 )
- k–> 保持原样不需要进行改变(对逻辑标识有用比如app的名字)
默认是d.d(假设有全局前缀其长度为2)
方法实现如下:
findOne,代码如下:
public Metric<?> findOne(String metricName) { // 1. 如果不是指定前缀开头的,返回null if (!metricName.startsWith(this.prefix)) { return null; } // 2 . 实例化InMemoryMetricRepository InMemoryMetricRepository result = new InMemoryMetricRepository(); // 3. 去除前缀 String baseName = metricName.substring(this.prefix.length()); // 4. 获取source中的所有Metric,遍历 for (Metric<?> metric : this.source.findAll()) { // 4.1 获得SourceKey String name = getSourceKey(metric.getName()); if (baseName.equals(name)) { update(result, name, metric); } } // 5. 查找 return result.findOne(metricName); }
- 如果不是指定前缀开头的,返回nul
- 实例化InMemoryMetricRepository
- 去除前缀
获取source中的所有Metric,遍历
获得SourceKey.代码如下:
private String getSourceKey(String name) { // 1. 将给定的name,本类中的keyPattern 通过.进行分割 String[] keys = StringUtils.delimitedListToStringArray(name, "."); String[] patterns = StringUtils.delimitedListToStringArray(this.keyPattern, "."); StringBuilder builder = new StringBuilder(); // 2. 依次遍历patterns,如果其值是k,则加入到builder中 for (int i = 0; i < patterns.length; i++) { if ("k".equals(patterns[i])) { builder.append(builder.length() > 0 ? "." : ""); builder.append(keys[i]); } } // 3. 此时发生在 keys.length > patterns.length 的情况,那么就需要依次的将keys中的数据加入到builder中 for (int i = patterns.length; i < keys.length; i++) { builder.append(builder.length() > 0 ? "." : ""); builder.append(keys[i]); } // 4. 返回 return builder.toString(); }
- 将给定的name,本类中的keyPattern 通过.进行分割
- 依次遍历patterns,如果其值是k,则加入到builder中
- 如果发生 keys.length > patterns.length 的情况,那么就需要依次的将keys中的数据加入到builder中
- 返回
如果SourceKey和baseName相同的话,则添加到result中.代码如下:
private void update(InMemoryMetricRepository result, String key, Metric<?> metric) { // 1. 将key加上前缀 String name = this.prefix + key; // 2. 从InMemoryMetricRepository 进行查找,如果找不到的话,则实例化1个,此时一般情况下都会实例化的 Metric<?> aggregate = result.findOne(name); if (aggregate == null) { aggregate = new Metric<Number>(name, metric.getValue(), metric.getTimestamp()); } else if (key.contains("counter.")) { // accumulate all values aggregate = new Metric<Number>(name, metric.increment(aggregate.getValue().intValue()).getValue(), metric.getTimestamp()); } else if (aggregate.getTimestamp().before(metric.getTimestamp())) { // sort by timestamp and only take the latest aggregate = new Metric<Number>(name, metric.getValue(), metric.getTimestamp()); } // 3. 添加 result.set(aggregate); }
- 将key加上前缀
- 从InMemoryMetricRepository 进行查找,如果找不到的话,则实例化1个,此时一般情况下都会实例化的
- 添加
查找
findAll,count的实现和findOne类似,如下:
@Override public Iterable<Metric<?>> findAll() { InMemoryMetricRepository result = new InMemoryMetricRepository(); for (Metric<?> metric : this.source.findAll()) { String key = getSourceKey(metric.getName()); if (key != null) { update(result, key, metric); } } return result.findAll(); } @Override public long count() { Set<String> names = new HashSet<String>(); for (Metric<?> metric : this.source.findAll()) { String name = getSourceKey(metric.getName()); if (name != null) { names.add(name); } } return names.size(); }
SpringIntegrationMetricReader
SpringIntegrationMetricReader–>基于spring-integration来实现的.
字段,构造器如下:
private final IntegrationManagementConfigurer configurer; public SpringIntegrationMetricReader(IntegrationManagementConfigurer configurer) { this.configurer = configurer; }
方法实现如下:
findOne–> 默认返回null.代码如下:
public Metric<?> findOne(String metricName) { return null; }
findAll,代码如下:
public Iterable<Metric<?>> findAll() { List<Metric<?>> result = new ArrayList<Metric<?>>(); // 1. 获得配置的Channel,Handler,Source的名字 String[] channelNames = this.configurer.getChannelNames(); String[] handlerNames = this.configurer.getHandlerNames(); String[] sourceNames = this.configurer.getSourceNames(); // 2.添加Channel的统计 addChannelMetrics(result, channelNames); // 3. 添加Handler的统计 addHandlerMetrics(result, handlerNames); // 3. 添加source的统计 addSourceMetrics(result, sourceNames); // 4. 添加Channel,Handler,Source的数量统计 result.add(new Metric<Integer>("integration.handlerCount", handlerNames.length)); result.add(new Metric<Integer>("integration.channelCount", channelNames.length)); result.add(new Metric<Integer>("integration.sourceCount", sourceNames.length)); return result; }
- 获得配置的Channel,Handler,Source的名字
添加Channel的统计.代码如下:
private void addChannelMetrics(List<Metric<?>> result, String[] names) { // 1. 遍历Channel的名字,依次调用addChannelMetrics添加 for (String name : names) { addChannelMetrics(result, name, this.configurer.getChannelMetrics(name)); } }
遍历Channel的名字,依次调用addChannelMetrics添加.代码如下:
private void addChannelMetrics(List<Metric<?>> result, String name, MessageChannelMetrics metrics) { // 1. 在Channel 名字前加上前缀 String prefix = "integration.channel." + name; // 2. 添加该Channel所对应的错误率(每秒) result.addAll(getStatistics(prefix + ".errorRate", metrics.getErrorRate())); // 3. 添加成功发送数量的统计 result.add(new Metric<Long>(prefix + ".sendCount", metrics.getSendCountLong())); // 4. 添加发送消息的速率统计(每秒) result.addAll(getStatistics(prefix + ".sendRate", metrics.getSendRate())); if (metrics instanceof PollableChannelManagement) { // 5. 如果传入的PollableChannelManagement的实例,则添加拉取数据的统计 result.add(new Metric<Long>(prefix + ".receiveCount", ((PollableChannelManagement) metrics).getReceiveCountLong())); } }
- 在Channel 名字前加上前缀
- 添加该Channel所对应的错误率(每秒)
- 添加成功发送数量的统计
- 添加发送消息的速率统计(每秒)
- 如果传入的PollableChannelManagement的实例,则添加拉取数据的统计
getStatistics代码如下:
private Collection<? extends Metric<?>> getStatistics(String name, Statistics stats) { List<Metric<?>> metrics = new ArrayList<Metric<?>>(); metrics.add(new Metric<Double>(name + ".mean", stats.getMean())); metrics.add(new Metric<Double>(name + ".max", stats.getMax())); metrics.add(new Metric<Double>(name + ".min", stats.getMin())); metrics.add(new Metric<Double>(name + ".stdev", stats.getStandardDeviation()));// 标准差 metrics.add(new Metric<Long>(name + ".count", stats.getCountLong())); return metrics; }
添加Handler的统计.代码如下:
private void addHandlerMetrics(List<Metric<?>> result, String[] names) { // 2. 遍历andler的名字,依次调用addHandlerMetrics添加 for (String name : names) { addHandlerMetrics(result, name, this.configurer.getHandlerMetrics(name)); } }
遍历andler的名字,依次调用addHandlerMetrics添加.代码如下:
private void addHandlerMetrics(List<Metric<?>> result, String name, MessageHandlerMetrics metrics) { String prefix = "integration.handler." + name; result.addAll(getStatistics(prefix + ".duration", metrics.getDuration()));// 持续时间 long activeCount = metrics.getActiveCountLong(); // handler 的激活执行数量 result.add(new Metric<Long>(prefix + ".activeCount", activeCount)); }
添加source的统计.代码如下:
private void addSourceMetrics(List<Metric<?>> result, String[] names) { for (String name : names) { addSourceMetrics(result, name, this.configurer.getSourceMetrics(name)); } }
遍历Sourcer的名字,依次调用addSourceMetrics添加.代码如下:
private void addSourceMetrics(List<Metric<?>> result, String name, MessageSourceMetrics sourceMetrics) { String prefix = "integration.source." + name; result.add(new Metric<Long>(prefix + ".messageCount", sourceMetrics.getMessageCountLong())); // 消息数量 }
- 添加Channel,Handler,Source的数量统计
count–>统计Channel,Handler,Source的次数.代码如下:
public long count() { int totalChannelCount = this.configurer.getChannelNames().length; int totalHandlerCount = this.configurer.getHandlerNames().length; int totalSourceCount = this.configurer.getSourceNames().length; return totalChannelCount + totalHandlerCount + totalSourceCount; }
自动装配:
在IntegrationMetricsConfiguration中声明,如下:
@Configuration @ConditionalOnClass(EnableIntegrationManagement.class) @ConditionalOnJava(JavaVersion.SEVEN) @UsesJava7 static class IntegrationMetricsConfiguration { @Bean(name = IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME) @ConditionalOnMissingBean(value = IntegrationManagementConfigurer.class, name = IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME, search = SearchStrategy.CURRENT) public IntegrationManagementConfigurer managementConfigurer() { IntegrationManagementConfigurer configurer = new IntegrationManagementConfigurer(); configurer.setDefaultCountsEnabled(true); configurer.setDefaultStatsEnabled(true); return configurer; } @Bean @ConditionalOnMissingBean(name = "springIntegrationPublicMetrics") public MetricReaderPublicMetrics springIntegrationPublicMetrics( IntegrationManagementConfigurer managementConfigurer) { return new MetricReaderPublicMetrics( new SpringIntegrationMetricReader(managementConfigurer)); } }
IntegrationMetricsConfiguration在满足如何条件时生效:
- @ConditionalOnClass(EnableIntegrationManagement.class) –> 在类路径下存在EnableIntegrationManagement.class时生效
- @ConditionalOnJava(JavaVersion.SEVEN)–> 在jdk1.7及jdk1.7以上的环境中生效
springIntegrationPublicMetrics:
在满足BeanFactory中不存在id为springIntegrationPublicMetrics,类型为MetricReaderPublicMetrics的baan时生效
managementConfigurer:
在满足在当前BeanFactoty中不存在id为integrationManagementConfigurer,类型为IntegrationManagementConfigurer的bean时生效.