该文章内容发布已经超过一年,请注意检查文章中内容是否过时。

2-指标收集器的指标采集流程

Dubbo 指标模块源码分析-指标收集器的指标采集流程

二、指标收集器的指标采集流程

在前文中,我们了解了指标收集器(Collector)最终收集的数据只有三个来源:

  • 实现自混合指标收集器(CombMetricsCollector) 的元数据指标收集器(MetadataMetricsCollector)和注册中心指标收集器(RegistryMetricsCollector),它们的样本均存储在内置的基本数据聚合器中。具体来说,是基本数据聚合器下的四个子数据聚合器中:

    composite-struct

  • DefaultMetricsCollector 默认指标收集器,它的样本不仅来自于指标事件,还来自其下采样器(Sampler) 中,用于Dubbo核心模块的采样。

  • HistogramMetricsCollector 直方图指标收集器,由于采样数据的特殊性,它的样本直接以 Map 存储在内部。

接下来,我们需要明确它们存储的指标是如何添加进去的。

1,服务治理模块的指标采集流程

通过之前的分析,我们知道服务治理模块的指标采集器均实现自混合指标收集器(CombMetricsCollector)。它对基本数据聚合器(BaseStatComposite) 的大部分方法做了封装。基本数据聚合器又封装了四个负责存储不同类型指标采样的子聚合器。

这四个子聚合器包括:

  • ApplicationStatComposite
  • ServiceStatComposite
  • MethodStatComposite
  • RtStatComposite

实际上,元数据、注册中心指标收集器更新、添加指标的操作都是通过混合指标收集器暴露的方法进行。而具体的,是通过 setNumincrementaddRt 这三个方法(及它们的重载)进行操作。

//CombMetricsCollector
...
   private final BaseStatComposite stats;
...
@Override
    public void setNum(MetricsKey metricsKey, String applicationName, String serviceKey, int num) {
        this.stats.setServiceKey(metricsKey, applicationName, serviceKey, num);
    }

    @Override
    public void increment(String applicationName, MetricsKey metricsKey) {
        this.stats.incrementApp(metricsKey, applicationName, SELF_INCREMENT_SIZE);
    }

    public void increment(String applicationName, String serviceKey, MetricsKey metricsKey, int size) {
        this.stats.incrementServiceKey(metricsKey, applicationName, serviceKey, size);
    }

    @Override
    public void addRt(String applicationName, String registryOpType, Long responseTime) {
        stats.calcApplicationRt(applicationName, registryOpType, responseTime);
    }

    public void addRt(String applicationName, String serviceKey, String registryOpType, Long responseTime) {
        stats.calcServiceKeyRt(applicationName, serviceKey, registryOpType, responseTime);
    }
...

由于几个方法实际上的调用链路类似,我们选择从其中的 setNum 方法开始分析。

其在数据聚合器层面的调用链路可以总结为:setNum 方法调用基本数据聚合器的 setServiceKey 方法,该方法又会调用服务数据聚合器(ServiceStatComposite)的同名 setServiceKey 方法(我们已经知道基本数据聚合器内封装了四个不同类型的子聚合器),这个方法实质上是对应用层面的特定指标(由指标Key决定)进行注册并赋初始值(参数中的 num)。

setNum 的用法均位于注册中心事件多播器(RegistryMetricsEventMulticaster)中声明的 MCat 接口中,在 APPLICATION_NOTIFY_FINISH 和 APPLICATION_DIRECTORY_POST 两个常量初始化时被调用。MCat 接口本身作为常量类使用,并在初始化时注册真正的指标常量

//RegistryMetricsEventMulticaster.MCat
MetricsCat APPLICATION_NOTIFY_FINISH = new MetricsCat(MetricsKey.NOTIFY_METRIC_NUM_LAST,
            (key, placeType, collector) -> AbstractMetricsListener.onFinish(key,
                event -> {
                    collector.addRt(event.appName(), placeType.getType(), event.getTimePair().calc());
                    Map<String, Integer> lastNumMap = Collections.unmodifiableMap(event.getAttachmentValue(ATTACHMENT_KEY_LAST_NUM_MAP));
                    lastNumMap.forEach(
                        (k, v) -> collector.setNum(key, event.appName(), k, v));

                }
            ));

        MetricsCat APPLICATION_DIRECTORY_POST = new MetricsCat(MetricsKey.DIRECTORY_METRIC_NUM_VALID, (key, placeType, collector) -> AbstractMetricsListener.onEvent(key,
            event ->
            {
                Map<MetricsKey, Map<String, Integer>> summaryMap = event.getAttachmentValue(ATTACHMENT_DIRECTORY_MAP);
                summaryMap.forEach((metricsKey, map) ->
                    map.forEach(
                        (k, v) -> collector.setNum(metricsKey, event.appName(), k, v)));
            }
        ));
//...

此处声明的指标常量都是 MetricsCat 类型的。其中部分常量在创建时还传入了该指标的收集逻辑,如Key 为 NOTIFY_METRIC_NUM_LAST 的常量。以下为 MetricsCat 的定义:

public class MetricsCat {

    private MetricsPlaceType placeType;
    private final Function<CombMetricsCollector<TimeCounterEvent>, AbstractMetricsListener> eventFunc;

    public MetricsCat(MetricsKey metricsKey, BiFunction<MetricsKey, CombMetricsCollector<TimeCounterEvent>, AbstractMetricsListener> biFunc) {
        this.eventFunc = collector -> biFunc.apply(metricsKey, collector);
    }

    public MetricsCat(MetricsKey metricsKey, TpFunction<MetricsKey, MetricsPlaceType, CombMetricsCollector<TimeCounterEvent>, AbstractMetricsListener> tpFunc) {
        this.eventFunc = collector -> tpFunc.apply(metricsKey, placeType, collector);
    }

    public MetricsCat setPlaceType(MetricsPlaceType placeType) {
        this.placeType = placeType;
        return this;
    }

    public Function<CombMetricsCollector<TimeCounterEvent>, AbstractMetricsListener> getEventFunc() {
        return eventFunc;
    }

    //一个接受三个入参,一个返回值的函数接口。通过构造函数我们可以知道这三个入参分别是MetricsKey, MetricsPlaceType, CombMetricsCollector<TimeCounterEvent>,返回值为AbstractMetricsListener。
    @FunctionalInterface
    public interface TpFunction<T, U, K, R> {
        R apply(T t, U u, K k);
    }
}

MetricsCat 类除了构造器,只提供了两个public方法,都是获取其内部属性的。

其实质上 eventFunc 字段的载体,提供了为特定指标生产监听器的逻辑,因此 MetricsCat 可以看做为特定指标生产指标监听器的工厂,用户在创建时传入这个监听器的处理逻辑。

通过泛型,我们可以知道它构造时使用的两个参数分别为 MetricsKey(指标Key)和一个接受 MetricsKey, MetricsPlaceType, CombMetricsCollector<TimeCounterEvent> 三个参数,返回一个 AbstractMetricsListener 的函数。之所以要多封装一层函数,是因为 placeType 字段在 MetricsKey 实例构造之后才会提供,借此实现延迟初始化。

回到之前两个在 MCat 中定义了监听器生产方法的两个常量的初始化流程:它们在创建 MetricsCat 时传入的TpFunction中定义的操作为:返回通过 AbstractMetricsListener.onFinish获取的事件完成监听器。当指定MetricsKey 的指标统计事件完成时,这个监听器中的 onEventFinish 方法就会被调用。 而 MetricsCat 构造时传入的 MetricsKey 会被作为 AbstractMetricsListener 的构造参数,用于指定监听的指标。

//RegistryMetricsEventMulticaster.MCat
new MetricsCat(MetricsKey.NOTIFY_METRIC_NUM_LAST,
            (key, placeType, collector) -> AbstractMetricsListener.onFinish(key,
                event -> {
                    collector.addRt(event.appName(), placeType.getType(), event.getTimePair().calc());
                    Map<String, Integer> lastNumMap = Collections.unmodifiableMap(event.getAttachmentValue(ATTACHMENT_KEY_LAST_NUM_MAP));
                    lastNumMap.forEach(
                        (k, v) -> collector.setNum(key, event.appName(), k, v));

                }
            ));
  //AbstractMetricsListener
   public static AbstractMetricsListener onFinish(MetricsKey metricsKey, Consumer<TimeCounterEvent> finishFunc) {

        return new AbstractMetricsListener(metricsKey) {
            @Override
            public void onEventFinish(TimeCounterEvent event) {
                //此处是finishFunc就是之前 event ->{...} 中定义的lambda函数
                finishFunc.accept(event);
            }
        };
    }

三个形参 (key, placeType, collector) 中的 collector 为 CombMetricsCollector<TimeCounterEvent>,意味着它的三个实现(ConfigCenterMetricsCollector 、MetadataMetricsCollector、RegistryMetricsCollector)都可以作为参数。

至此,我们可以总结,对于这两个参数, MetricsCat 创建时嵌套的两层 lambda 函数最终是为了注册特定指标的监听器,并定义事件结束时的处理逻辑(内层的lambda)。在处理事件时,会调用混合指标收集器(CombMetricsCollector) 的 addRT 方法添加响应时间计时,还会调用 setNum 来添加指标计数。

由于此处的 MetricsKey 在 MetricsCat创建时就被传入,我们可以确定这两个字段存储了以下两个指标的统计逻辑:

  • NOTIFY_METRIC_NUM_LAST:Last Notify Nums , 最后一个事件完成时的计数 。监听器中使用的是 setNum,事件结束时直接更新指定key指标的计数为传入的值,同时使用 addRt 来统计事件持续时长

  • DIRECTORY_METRIC_NUM_VALID:Valid Directory Urls,服务目录中注册成功的url数量。监听器中同样使用的是 setNum,事件结束后直接更新为服务目录中的最新计数

之后,三个相关的 MetricsCat(指标类型) 实例会被绑定到一个 CategoryOverall(指标综合) 实例中,绑定的逻辑按一个事件进行的三个过程:事件发生、事件结束、事件失败,分别对应 CategoryOverall 的第2、3、4个参数,其中事件发生时的逻辑不能为 null。而第一个参数为 MetricsPlaceType,该参数封装了指标类型标识(如 register 服务注册、subscribe 服务订阅)和该指标的收集级别(应用还是服务)。

还记得 MetricsCatTpFunction 的三个入参吗?其中第二个 placeType 就是这个参数。 CategoryOverall 在构造时会将它设置到其中的三个 MetricsCat 中。

// CategorySet:常量接口,同样位于RegistryMetricsEventMulticaster中
interface CategorySet {
    //...
   
        CategoryOverall APPLICATION_NOTIFY = new CategoryOverall(OP_TYPE_NOTIFY, MCat.APPLICATION_NOTIFY_POST, MCat.APPLICATION_NOTIFY_FINISH, null);
    
        CategoryOverall SERVICE_DIRECTORY = new CategoryOverall(OP_TYPE_DIRECTORY, MCat.APPLICATION_DIRECTORY_POST, null, null);
    
        CategoryOverall SERVICE_REGISTER = new CategoryOverall(OP_TYPE_REGISTER_SERVICE, MCat.SERVICE_REGISTER_POST, MCat.SERVICE_REGISTER_FINISH, MCat.SERVICE_REGISTER_ERROR);

   //...
        List<CategoryOverall> ALL = Arrays.asList(APPLICATION_REGISTER, APPLICATION_SUBSCRIBE, APPLICATION_NOTIFY, SERVICE_DIRECTORY, SERVICE_REGISTER, SERVICE_SUBSCRIBE);
    }

CategorySet 中的常量都会被封装到List中,在 RegistryMetricsEventMulticaster 创建时统一调用:

public class RegistryMetricsCollector extends CombMetricsCollector<TimeCounterEvent> {
  public RegistryMetricsEventMulticaster(RegistryMetricsCollector collector) {

        CategorySet.ALL.forEach(categorySet ->
        {
            //通过 MetricsCat 实例中的定义的监听器创建逻辑,逐个注册监听器
            super.addListener(categorySet.getPost().getEventFunc().apply(collector));
            if (categorySet.getFinish() != null) {
                super.addListener(categorySet.getFinish().getEventFunc().apply(collector));
            }
            if (categorySet.getError() != null) {
                super.addListener(categorySet.getError().getEventFunc().apply(collector));
            }
        });
    }
//...

由此,我们也明确了 RegistryMetricsEventMulticaster (指标注册事件多播器)的作用:统一定义、管理事件,并在初始化时注册其中定义各种事件的监听器

它继承了 SimpleMetricsEventMulticaster,其中的 publishEvent 方法在被调用时就会尝试调用所有监听器,判断其是否对当前事件类型感兴趣,选择是否进行调用。同时,这些监听器会对特定指标数据进行计算,更新到对应的收集器中。

//SimpleMetricsEventMulticaster
public void publishEvent(MetricsEvent event) {
        if (event instanceof EmptyEvent) {
            return;
        }
        if (validateIfApplicationConfigExist(event)) return;
        for (MetricsListener listener : listeners) {
            if (listener.isSupport(event)) {
                listener.onEvent(event);
            }
        }
    }

我们通过分析混合指标收集器(CombMetricsCollector) 中的 setNum 方法的用法,了解到了 Composite 中的数据来源之一是注册指标事件多播器(RegistryMetricsEventMulticaster)中为服务注册相关指标创建的指标监听器。实际上,increment、addRt方法都是由指标监听器的各个实现调用的。

应用程序指标监听器(MetricsApplicationListener)中提供了 AbstractMetricsListener 的几个匿名实现,提供应用层面事件发生、完成、抛出异常三种情况下对给定指标的计数或RT的计算,大多数用做处理应用层面指标事件的 MetricsListener 都是它提供的三个监听器实现:

public class MetricsApplicationListener extends AbstractMetricsListener {

    public MetricsApplicationListener(MetricsKey metricsKey) {
        super(metricsKey);
    }
    //此处的Event均为TimeCounterEvent,在它被创建时就会自动开始计时
    public static AbstractMetricsListener onPostEventBuild(MetricsKey metricsKey, CombMetricsCollector<TimeCounterEvent> collector) {
        return AbstractMetricsListener.onEvent(metricsKey,
            event -> collector.increment(event.appName(), metricsKey)
        );
    }

    public static AbstractMetricsListener onFinishEventBuild(MetricsKey metricsKey, MetricsPlaceType placeType, CombMetricsCollector<TimeCounterEvent> collector) {
        return AbstractMetricsListener.onFinish(metricsKey,
            event -> {
                collector.increment(event.appName(), metricsKey);
                collector.addRt(event.appName(), placeType.getType(), event.getTimePair().calc());
            }
        );
    }

    public static AbstractMetricsListener onErrorEventBuild(MetricsKey metricsKey, MetricsPlaceType placeType, CombMetricsCollector<TimeCounterEvent> collector) {
        return AbstractMetricsListener.onError(metricsKey,
            event -> {
                collector.increment(event.appName(), metricsKey);
                collector.addRt(event.appName(), placeType.getType(), event.getTimePair().calc());
            }
        );
    }
}

还有 MetricsServiceListener(服务指标监听器),它和 MetricsApplicationListener 十分类似,提供的是服务层面的指标监听器的通用实现,不再重复分析。

可以用一句话简单的总结这三个 Collector 注册指标监听器的流程 : Collector 内部的 Mulicaster/Dispatcher 在被 Collector 创建时直接向自己注册已声明的指标监听器。

至此,我们可以总结出 MetricsEvent 的部分消息转发路径 :

event-dispatch-simple

2,Dubbo 核心模块的指标采集流程

DefaultMetricsCollector(默认指标采集器) 作为指标采集器的默认实现,其主要通过采样器(Sampler)收集dubbo应用核心RPC功能的相关指标。 采样器包括以下几种:

  • 线程池线程状态(最大线程数、最小线程数、活跃线程数等),对应 ThreadPoolMetricsSampler,线程池指标采样器
  • 线程池中线程耗尽事件的计数,对应 ThreadRejectMetricsCountSampler, 线程耗尽次数采样器
  • 应用指标收集情况(收集次数),对应 DefaultMetricsCollector 中实现的 SimpleMetricsCountSampler 匿名子类

这些采样器内部会存储其负责采样类型指标的样本。由于默认指标采集器同样继承自 CombMetricsCollector,它也同时具有与前文中分析的三大中心指标收集器相似的指标转发流程。

除了线程池指标采样器,其它两个采样器均实现自简单指标计数采样器(SimpleMetricsCountSampler)。它实现了通用的指标存取操作。

sampler-struct

简单指标计数采样器内部的指标样本容器:

public abstract class SimpleMetricsCountSampler<S, K, M extends Metric>
    implements MetricsCountSampler<S, K, M> {
...
 private final Map<K, ConcurrentMap<M, AtomicLong>> metricCounter = new ConcurrentHashMap<>();
...
}

其中:泛型 M 为指标类型,如方法指标 MethodMetric;泛型 K 为指标名称类型,如 String;泛型 S 为请求源类型,如 String 或 Invocation。请求源用于定位触发采样的请求来源,指标名称则用于对指标进行分组,便于按名称来分组检索指标数据。

以及对特定指标的增减操作:

//SimpleMetricsCountSampler
    public void inc(S source, K metricName) {
        doExecute(source, metricName, counter -> {
            counter.incrementAndGet();
            return false;
        });
    }
    public void dec(S source, K metricName) {
        doExecute(source, metricName, counter -> {
            counter.decrementAndGet();
            return false;
        });
    }
    public void incOnEvent(S source, K metricName) {
        doExecute(source, metricName, counter -> {
            counter.incrementAndGet();
            return true;
        });
    }
    public void decOnEvent(S source, K metricName) {
        doExecute(source, metricName, counter -> {
            counter.decrementAndGet();
            return true;
        });
    }

对于四个增加、减少计数的方法,它们最终都会调用 doExecute 方法来完成计数操作,其中 counter 函数定义了对计数器的操作(增加、减少)。

//SimpleMetricsCountSampler
private void doExecute(S source, K metricsName, Function<AtomicLong, Boolean> counter) {
        MetricsCountSampleConfigurer<S, K, M> sampleConfigure = new MetricsCountSampleConfigurer<>();
        sampleConfigure.setSource(source);
        sampleConfigure.setMetricsName(metricsName);

        //利用子类重写的countConfigure为 sampleConfigure 设置事件发布函数
        this.countConfigure(sampleConfigure);

        //通过指标名获取对应的指标计数器
        Map<M, AtomicLong> metricAtomic = metricCounter.get(metricsName);

        if (metricAtomic == null) {
            metricAtomic = metricCounter.computeIfAbsent(metricsName, k -> new ConcurrentHashMap<>());
        }

        Assert.notNull(sampleConfigure.getMetric(), "metrics is null");

        AtomicLong atomicCounter = metricAtomic.get(sampleConfigure.getMetric());

        if (atomicCounter == null) {
            atomicCounter = metricAtomic.computeIfAbsent(sampleConfigure.getMetric(), k -> new AtomicLong());
        }
        // counter函数定义了对atomicCounter的增减操作,如 inc方法定义的counter是对atomicCounter+1,dec方法定义的是对atomicCounter-1
        Boolean isEvent = counter.apply(atomicCounter);
        //如果本次计数操作应该触发事件...
        if (isEvent) {
            //获取子类设置的事件发布函数,发布事件
            sampleConfigure.getFireEventHandler().accept(sampleConfigure);
        }
    }

doExecute 做了两件事:

1,判断当前指标是否存在,如果不存在就放到容器中。

2,调用提供的计数函数对指标进行修改,对应 counter 字段。

以下为各采样器在 countConfigure 方法中提供的创建指标实例的逻辑:

  • DefaultMetricsCollector 中 SimpleMetricsCountSampler 的匿名实现 (applicationSampler)提供的countConfigure 方法:
        @Override
        protected void countConfigure(
            MetricsCountSampleConfigurer<String, MetricsEvent.Type, ApplicationMetric> sampleConfigure) {
            //提供根据 configure 创建指标实例的函数
            sampleConfigure.configureMetrics(configure -> new ApplicationMetric(sampleConfigure.getSource()));
        }
  • ThreadRejectMetricsCountSampler 中提供的 countConfigure 方法:
 @Override
    protected void countConfigure(MetricsCountSampleConfigurer<String, String, ThreadPoolRejectMetric> sampleConfigure) {
         //提供根据 configure 创建指标实例的函数
        sampleConfigure.configureMetrics(configure -> new ThreadPoolRejectMetric(collector.getApplicationName(),configure.getSource()));
    }

默认指标收集器继承自 CombMetricsCollector,内部包含一个 DefaultSubDispatcher,因此它自身也可以作为指标事件的转发器,接受其它指标监听器的注册。

在之前,我们发现了 AggregateMetricsCollector(聚合指标收集器)会将自己注册为 DefaultMetricsCollector 的监听器:

   public AggregateMetricsCollector(ApplicationModel applicationModel) {
   ...
            registerListener();
   ...
        }
    }
   
   private void registerListener() {
applicationModel.getBeanFactory().getBean(DefaultMetricsCollector.class).addListener(this);
   }

还有 HistogramMetricsCollector (直方图指标收集器)也会将自己注册为它的监听器。

  public HistogramMetricsCollector(ApplicationModel applicationModel) {
     ...
            registerListener();
     ...
  }

    private void registerListener() {        applicationModel.getBeanFactory().getBean(DefaultMetricsCollector.class).getEventMulticaster().addListener(this);
    } 

因此,聚合指标收集器和直方图指标收集器的指标事件来源于默认指标收集器转发的指标事件。通过默认指标转发器的 isSupport 方法,还可以发现这些指标事件的类型是 RequestEvent (RPC请求事件)或 RequestBeforeEvent(请求前失败事件:实际请求发送之前在 ClusterFilter 中产生的异常)。

    @Override
    public boolean isSupport(MetricsEvent event) {
        return event instanceof RequestEvent || event instanceof RequestBeforeEvent;
    }

default-metrics-collector-struct

至此,我们也明确了 Dubbo 应用内部核心模块的相关指标是如何收集的:默认指标收集器除了接受上层指标转发器的指标事件之外,还会通过各种采样器对埋点采样,通过 SubDispatcher 统一转发指标事件,通知注册为它的监听器的其它 Collector 完成采样。

3, 直方图相关指标的采集流程

直方图指标收集器(HistogramMetricsCollector)也是一个较为特殊的收集器,它主要负责RPC调用响应时间直方图指标这一种指标的收集。

由于直方图指标收集器只需要采集单一类型的指标,它直接使用Map来存储采样数据,而非更复杂的数据聚合器(Composite)。

//HistogramMetricsCollector
private final ConcurrentHashMap<MethodMetric, Timer> rt = new ConcurrentHashMap<>();

其中,key为方法指标,Timer则是该方法对应的RT计时器。该计时器由 micrometer 提供,在跟踪短时间内的大量事件时具有良好的性能。

前文中已经提到,直方图指标收集器在初始化时会将自己注册为默认指标收集器(DefaultMetricsCollector)中的监听器,与聚合指标收集器相同(AggregateMetricsCollector)。

    private void registerListener() {        applicationModel.getBeanFactory().getBean(DefaultMetricsCollector.class).getEventMulticaster().addListener(this);
    }

这意味着它接收的指标事件实际也来自于默认指标收集器中的采样器。之前的分析中,我们知道默认指标收集器目前实际只转发来自 MetricsDispatcher 的请求相关事件,因此直方图指标收集器也只会收集请求响应时间相关的指标采样。

//HistogramMetricsCollector
private void onRTEvent(RequestEvent event) {
        if (metricRegister != null) {
            MethodMetric metric = new MethodMetric(applicationModel.getApplicationName(), event.getAttachmentValue(MetricsConstants.INVOCATION));
            long responseTime = event.getTimePair().calc();

            HistogramMetricSample sample = new HistogramMetricSample(MetricsKey.METRIC_RT_HISTOGRAM.getNameByType(metric.getSide()),
                MetricsKey.METRIC_RT_HISTOGRAM.getDescription(), metric.getTags(), RT);

            Timer timer = ConcurrentHashMapUtils.computeIfAbsent(rt, metric, k -> metricRegister.register(sample));
            timer.record(responseTime, TimeUnit.MILLISECONDS);
        }
    }

当接收到事件时,直方图指标收集器会先计算当前调用花费的时间,然后为计时器(Time)添加一条响应时间记录。