spring boot 源码解析50-Exporter详解

前言

本文我们来分析一下Exporter的相关实现,类图如下:

《spring boot 源码解析50-Exporter详解》

解析

Exporter

Exporter–>metric 暴露的通用接口.当你横向扩展度量的范围时,你可能会需要在本地缓冲测量数据然后周期性的暴露出去(比如:聚合集群的数据),因此,这是这些操作的标记结构.触发export操作的可能是周期性或者是事件驱动–>这些都处于Exporter的职责范围之外.比如,你可以创建Exporter的实例,并且使用@Scheduled注解.

其只声明了1个方法,如下:

void export();

AbstractMetricExporter

AbstractMetricExporter–> Exporter的抽象基类,拥有很多共有的特征,主要是导出测量数据被加上前缀和时间戳过滤(因此只有新值被暴露出去)

模板方法模式

  1. 该类实现了Exporter, Closeable, Flushable接口.

  2. 字段,构造器如下:

    private static final Log logger = LogFactory.getLog(AbstractMetricExporter.class);
    
    private final String prefix;
    
    // 数据导出的最早时间
    private Date earliestTimestamp = new Date();
    
    // 是否忽略时间戳(这将导致导出所有的测量数据),默认为false
    private boolean ignoreTimestamps = false;
    
    // 是否只导自上次导出后的新增数据,默认为true
    private boolean sendLatest = true;
    
    // 正在处理中的标记
    private volatile AtomicBoolean processing = new AtomicBoolean(false);
    
    // 上次导出数据的时间戳
    private Date latestTimestamp = new Date(0L);
    
    public AbstractMetricExporter(String prefix) {
        this.prefix = (!StringUtils.hasText(prefix) ? ""
                : (prefix.endsWith(".") ? prefix : prefix + "."));
    }
  3. 方法实现如下:

    1. export,方法如下:

      public void export() {
          // 1. 将processing置为true
          if (this.processing.compareAndSet(false, true)) {
              // 2. 获得当前时间
              long latestTimestamp = System.currentTimeMillis();
              try {
                  // 3. 导出数据
                  exportGroups();
              }
              catch (Exception ex) {
                  logger.warn("Could not write to MetricWriter: " + ex.getClass() + ": "
                          + ex.getMessage());
              }
              finally {
                  // 4. 调用flushQuietly方法,刷新数据,并将processing置为false
                  this.latestTimestamp = new Date(latestTimestamp);
                  flushQuietly();
                  this.processing.set(false);
              }
          }
      }
      1. 将processing置为true
      2. 获得当前时间
      3. 导出数据,代码如下:

        private void exportGroups() {
            // 1. 遍历分组:
            for (String group : groups()) {
                Collection<Metric<?>> values = new ArrayList<Metric<?>>();
                // 2. 获得属于该分组的metrics,依次处理
                for (Metric<?> metric : next(group)) {
                    Date timestamp = metric.getTimestamp();
                    // 2.1. 如果可以导出的化,则加入到values中
                    if (canExportTimestamp(timestamp)) {
                        values.add(getPrefixedMetric(metric));
                    }
                }
                // 3. 如果values非空,则写出
                if (!values.isEmpty()) {
                    write(group, values);
                }
            }
        }
        1. 遍历分组–>将metrics 进行分组(比如加上前缀).如果要导出的数据要进行分组通过String 作为标识符,子类应该覆写此方法.否则默认会迭代所有的metrics.代码如下:

          protected Iterable<String> groups() {
              return Collections.singleton("");
          }
        2. 获得属于该分组的metrics,依次处理,next是1个抽象方法,由子类实现
        3. 如果可以导出的化,则加入到values中,判断是否可以导出的代码如下:

          private boolean canExportTimestamp(Date timestamp) {
              if (this.ignoreTimestamps) {
                  return true;
              }
              if (this.earliestTimestamp.after(timestamp)) {
                  return false;
              }
              if (this.sendLatest && this.latestTimestamp.after(timestamp)) {
                  return false;
              }
              return true;
          }

          getPrefixedMetric –> 生成Metric,其name是加上前缀的.代码如下:

          private Metric<?> getPrefixedMetric(Metric<?> metric) {
              String name = this.prefix + metric.getName();
              return new Metric<Number>(name, metric.getValue(), metric.getTimestamp());
          }
        4. 如果values非空,则写出,write是1个抽象方法,由子类实现
      4. 调用flushQuietly方法,刷新数据,并将processing置为false.flushQuietly方法如下:

        private void flushQuietly() {
            try {
                flush();
            }
            catch (Exception ex) {
                logger.warn("Could not flush MetricWriter: " + ex.getClass() + ": "
                        + ex.getMessage());
            }
        }

        调用flush方法–>通过向底层数据流写入任何被缓冲的数据.默认空实现,只有MetricCopyExporter进行了复写. 代码如下:

        public void flush() {
        }
    2. close–>Closeable中声明的方法.实现如下:

      public void close() throws IOException {
          export();
          flushQuietly();
      }
    3. flush –> Flushable接口中的方法,之前有叙述.

MetricCopyExporter

MetricCopyExporter–> 继承自AbstractMetricExporter.通过从MetricReader copy数据到MetricWriter.实际上output writer 可以是GaugeWriter,在这种情况下,所有的metrics 都是简单的输出他们的当前值.如果output writer 是CounterWriter,则当metrics 的名字是以counter.开头的话,则:不会将他们写出而是会增长它的计数器的值.这涉及到exporter 存储之前的计数器的值以计算delta.为了更好的结果,不要在多线程环境下使用exporter(通常只能用周期性的顺序,即使只有1个后台程序,也是这样)

该类没有自动装配

  1. 字段,构造器如下:

    private static final Log logger = LogFactory.getLog(MetricCopyExporter.class);
    
    private final MetricReader reader;
    
    private final GaugeWriter writer;
    
    private final CounterWriter counter;
    
    // key--> counter 名字,value--> counter 值 ,用来缓存之前Counter的值
    private ConcurrentMap<String, Long> counts = new ConcurrentHashMap<String, Long>();
    
    // 过滤metrics使用(include)
    private String[] includes = new String[0];
    
    // 过滤metrics 使用(exclude)
    private String[] excludes = new String[0];
    
    public MetricCopyExporter(MetricReader reader, GaugeWriter writer) {
        this(reader, writer, "");
    }
    
    public MetricCopyExporter(MetricReader reader, GaugeWriter writer, String prefix) {
        super(prefix);
        this.reader = reader;
        this.writer = writer;
        if (writer instanceof CounterWriter) {
            this.counter = (CounterWriter) writer;
        }
        else {
            this.counter = null;
        }
    }
  2. 方法实现如下:

    1. next,代码如下:

      protected Iterable<Metric<?>> next(String group) {
          // 1. 如果includes,excludes 等于空,则直接查询所有的
          if (ObjectUtils.isEmpty(this.includes) && ObjectUtils.isEmpty(this.excludes)) {
              return this.reader.findAll();
          }
          // 2. 否则返回PatternMatchingIterable
          return new PatternMatchingIterable(MetricCopyExporter.this.reader);
      }
      1. 如果includes,excludes 等于空,则直接查询所有的
      2. 否则返回PatternMatchingIterable.PatternMatchingIterable实现了Iterable接口,代码如下:

        private class PatternMatchingIterable implements Iterable<Metric<?>> {
        
            private final MetricReader reader;
        
            PatternMatchingIterable(MetricReader reader) {
                this.reader = reader;
            }
        
            @Override
            public Iterator<Metric<?>> iterator() {
                return new PatternMatchingIterator(this.reader.findAll().iterator());
            }
        
        }

        其iterator最终返回的是PatternMatchingIterator,其实现了Iterator接口.

        1. 字段,构造器如下:

          private Metric<?> buffer = null;
          private Iterator<Metric<?>> iterator;
          PatternMatchingIterator(Iterator<Metric<?>> iterator) {
              this.iterator = iterator;
          }
        2. 方法实现如下:

          1. hasNext,代码如下:

            public boolean hasNext() {
                if (this.buffer != null) {
                    return true;
                }
                this.buffer = findNext();
                return this.buffer != null;
            }
            1. 如果buffer存在,则返回true
            2. 否则,调用findNext 查找符合要求的Metric,如果找到的话,则返回true,否则,返回false.findNext代码如下:

               private Metric<?> findNext() {
                      while (this.iterator.hasNext()) {
                          Metric<?> metric = this.iterator.next();
                          if (isMatch(metric)) {
                              return metric;
                          }
                      }
                      return null;
                  }

              遍历MetricReader中的Metric,如果其符合要求,则返回Metric,如果最终都没有找到的话,则返回null.isMatch方法实现如下:

              private boolean isMatch(Metric<?> metric) {
                  String[] includes = MetricCopyExporter.this.includes;
                  String[] excludes = MetricCopyExporter.this.excludes;
                  // 1. 获得metric的名字
                  String name = metric.getName();
                  // 2. 如果includes等于null或者name 匹配 includes
                  if (ObjectUtils.isEmpty(includes)
                          || PatternMatchUtils.simpleMatch(includes, name)) {
                      // 3. 如果excludes 匹配 name,则返回false,否则,返回true
                      return !PatternMatchUtils.simpleMatch(excludes, name);
                  }
                  // 4. 否则,返回false
                  return false;
              }
              1. 获得metric的名字
              2. 如果includes等于null或者name 匹配 includes

                1. 如果excludes 匹配 name,则返回false,否则,返回true
              3. 否则,返回false

          2. next 实现如下:

            public Metric<?> next() {
                Metric<?> metric = this.buffer;
                this.buffer = null;
                return metric;
            }
    2. write,代码如下:

      protected void write(String group, Collection<Metric<?>> values) {
          for (Metric<?> value : values) {
              if (value.getName().startsWith("counter.") && this.counter != null) {
                  this.counter.increment(calculateDelta(value));
              }
              else {
                  this.writer.set(value);
              }
          }
      }
      1. 遍历传入的values
      2. 如果是counter.开头的并且counter存在,则进行increment操作,否则调用GaugeWriter#set. calculateDelta 方法如下:

        private Delta<?> calculateDelta(Metric<?> value) {
            // 1. 获得当前的值
            long delta = value.getValue().longValue();
            // 2. 进行counts的替换操作
            Long old = this.counts.replace(value.getName(), delta);
            // 3. 如果之前存在对应的值,则进行计算获得增幅,否则,加入到counts中
            if (old != null) {
                delta = delta - old;
            }
            else {
                this.counts.putIfAbsent(value.getName(), delta);
            }
            // 4. 返回Delta
            return new Delta<Long>(value.getName(), delta, value.getTimestamp());
        }
        1. 获得当前的值
        2. 进行counts的替换操作
        3. 如果之前存在对应的值,则进行计算获得增幅,否则,加入到counts中
        4. 返回Delta
    3. flush,代码如下:

      public void flush() {
          flush(this.writer);
      }

      调用

      private void flush(GaugeWriter writer) { // 1. 如果是CompositeMetricWriter的实例,则遍历其持有的MetricWriter,依次调用该方法进行处理 if (writer instanceof CompositeMetricWriter) { for (MetricWriter child : (CompositeMetricWriter) writer) { flush(child); }
          }
          try { // 2. 如果存在java.io.Flushable if (ClassUtils.isPresent("java.io.Flushable", null)) { if (writer instanceof Flushable) { ((Flushable) writer).flush(); return; }
              }
              // 3. 如果writer存在flush方法
              Method method = ReflectionUtils.findMethod(writer.getClass(), "flush");
              if (method != null) { ReflectionUtils.invokeMethod(method, writer); } } catch (Exception ex) { logger.warn("Could not flush MetricWriter: " + ex.getClass() + ": " + ex.getMessage()); } }
      1. 如果是CompositeMetricWriter的实例,则遍历其持有的MetricWriter,依次调用该方法进行处理
      2. 如果存在java.io.Flushable,并且GaugeWriter实现了Flushable接口,则调用flush方法,然后retrun
      3. 如果writer存在flush方法,则通过反射的方式进行调用

PrefixMetricGroupExporter

PrefixMetricGroupExporter–> 1个对于从PrefixMetricReader获得metrics进行分组的实现.导出所有拥有指定前缀的metrics(或者是所有的metrics,如果前缀不存在的话)

注意,该类没有进行自动装配

  1. 字段,构造器如下:

    private final PrefixMetricReader reader;
    
    private final PrefixMetricWriter writer;
    
    // key--> counter 名字,value--> counter 值,用来缓存之前Counter的值
    private ConcurrentMap<String, Long> counts = new ConcurrentHashMap<String, Long>();
    
    private Set<String> groups = new HashSet<String>();
    
    
    public PrefixMetricGroupExporter(PrefixMetricReader reader,
            PrefixMetricWriter writer) {
        this(reader, writer, "");
    }
    
    
    public PrefixMetricGroupExporter(PrefixMetricReader reader, PrefixMetricWriter writer,
            String prefix) {
        super(prefix);
        this.reader = reader;
        this.writer = writer;
    }
  2. 方法实现如下:

    1. groups–> 如果持有的PrefixMetricReader是MultiMetricRepository的实例并且持有的groups为空,则最有返回的是MultiMetricRepository中的groups.否则,返回groups.代码如下:

      protected Iterable<String> groups() {
          if ((this.reader instanceof MultiMetricRepository) && this.groups.isEmpty()) {
              return ((MultiMetricRepository) this.reader).groups();
          }
          return this.groups;
      }
    2. next,代码如下:

      protected Iterable<Metric<?>> next(String group) {
          return this.reader.findAll(group);
      }
    3. write,代码如下:

      protected void write(String group, Collection<Metric<?>> values) {
          // 1. 如果组名是counter. 开头的,则调用PrefixMetricWriter#increment 进行处理
          if (group.contains("counter.")) {
              for (Metric<?> value : values) {
                  this.writer.increment(group, calculateDelta(value));
              }
          }
          else {
              // 2. 否则,调用PrefixMetricWriter#set
              this.writer.set(group, values);
          }
      }
      1. 如果组名是counter. 开头的,则调用PrefixMetricWriter#increment 进行处理,calculateDelta的实现和MetricCopyExporter中类似,如下:

        private Delta<?> calculateDelta(Metric<?> value) {
            long delta = value.getValue().longValue();
            Long old = this.counts.replace(value.getName(), delta);
            if (old != null) {
                delta = delta - old;
            }
            else {
                this.counts.putIfAbsent(value.getName(), delta);
            }
            return new Delta<Long>(value.getName(), delta, value.getTimestamp());
        }
      2. 否则,调用PrefixMetricWriter#set

RichGaugeExporter

RichGaugeExporter–> 继承自AbstractMetricExporter.导出或者转换RichGauge 的数据到以metric为基础的后端.每1个指标都存储在1系列的具有共同前缀的指标中(gauge 的名字),并且后缀表明了数据.比如:1个名字叫foo的gauge,被存储在foo.min, foo.max. foo.val, foo.count, foo.avg, foo.alpha.如果MetricWriter是通过MultiMetricRepository 实现的,则他们的值会被存储到1个组中.并因此可以从这个仓库中通过查询1个语句获得结果

注意,该类没有自动装配

  1. 字段,构造器如下:

    private static final String MIN = ".min";
    
    private static final String MAX = ".max";
    
    private static final String COUNT = ".count";
    
    private static final String VALUE = ".val";
    
    private static final String AVG = ".avg";
    
    private static final String ALPHA = ".alpha";
    
    private final RichGaugeReader reader;
    
    private final PrefixMetricWriter writer;
    
    public RichGaugeExporter(RichGaugeReader reader, PrefixMetricWriter writer) {
        this(reader, writer, "");
    }
    
    public RichGaugeExporter(RichGaugeReader reader, PrefixMetricWriter writer,
            String prefix) {
        super(prefix);
        this.reader = reader;
        this.writer = writer;
    }
  2. 方法实现如下:

    1. next,代码如下:

      protected Iterable<Metric<?>> next(String group) {
          // 1. 从RichGaugeReader中查找group对应的RichGauge
          RichGauge rich = this.reader.findOne(group);
          Collection<Metric<?>> metrics = new ArrayList<Metric<?>>();
          // 2. 依次添加RichGauge的指标到结果集中
          metrics.add(new Metric<Number>(group + MIN, rich.getMin()));
          metrics.add(new Metric<Number>(group + MAX, rich.getMax()));
          metrics.add(new Metric<Number>(group + COUNT, rich.getCount()));
          metrics.add(new Metric<Number>(group + VALUE, rich.getValue()));
          metrics.add(new Metric<Number>(group + AVG, rich.getAverage()));
          metrics.add(new Metric<Number>(group + ALPHA, rich.getAlpha()));
          return metrics;
      }
      1. 从RichGaugeReader中查找group对应的RichGauge
      2. 依次添加RichGauge的指标到结果集中
    2. groups–>从RichGaugeReader中获得所有的RichGauge,然后依次添加其名字到结果集中.代码如下:

      protected Iterable<String> groups() {
          Collection<String> names = new HashSet<String>();
          for (RichGauge rich : this.reader.findAll()) {
              names.add(rich.getName());
          }
          return names;
      }
    3. write,代码如下:

      protected void write(String group, Collection<Metric<?>> values) {
          this.writer.set(group, values);
      }
    原文作者:Spring Boot
    原文地址: https://blog.csdn.net/qq_26000415/article/details/79217461
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞