源码分析 Sentinel 实时数据采集实现原理

article/2025/8/21 21:49:40

本篇将重点关注 Sentienl 实时数据收集,即 Sentienl 具体是如何收集调用信息,以此来判断是否需要触发限流或熔断。

本节目录

    • 1、源码分析 StatisticSlot
        • 1.1 StatisticSlot entry 详解
        • 1.2 StatisticSlot exit 详解
    • 2、Sentienl Node 体系
    • 2、StatisticNode 详解
        • 2.1 核心类图
        • 2.2 addPassRequest
        • 2.3 totalRequest
        • 2.4 successQps
        • 2.5 metrics
    • 3、DefaultNode 详解
        • 3.1 类图
        • 3.2 increaseBlockQps

Sentienl 实时数据收集的入口类为 StatisticSlot。

我们先简单来看一下 StatisticSlot 该类的注释,来看一下该类的整体定位。

StatisticSlot,专用于实时统计的 slot。在进入一个资源时,在执行 Sentienl 的处理链条中会进入到该 slot 中,需要完成如下计算任务:

  • 集群维度计算资源的总统计信息,用于集群限流,后续文章将详细探讨。
  • 来自不同调用方/来源的群集节点的统计信息。
  • 特定调用上下文环境的统计信息。
  • 统计所有入口的统计信息。

接下来用源码分析的手段来详细分析 StatisticSlot 的实现原理。

1、源码分析 StatisticSlot

1.1 StatisticSlot entry 详解

StatisticSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable { try {// Do some checking.fireEntry(context, resourceWrapper, node, count, prioritized, args);  // @1// Request passed, add thread count and pass count.node.increaseThreadNum();                                                             // @2node.addPassRequest(count);if (context.getCurEntry().getOriginNode() != null) {                           // @3// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {                // @4// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {   // @5handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {                                                                                                                                // @6node.increaseThreadNum();if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (BlockException e) {     // @7                                                                                                              // Blocked, set block exception to current entry.context.getCurEntry().setError(e);// Add block count.node.increaseBlockQps(count);if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseBlockQps(count);}// Handle block event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onBlocked(e, context, resourceWrapper, node, count, args);}throw e;} catch (Throwable e) {   // @8// Unexpected error, set error to current entry.context.getCurEntry().setError(e);// This should not happen.node.increaseExceptionQps(count);if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseExceptionQps(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {Constants.ENTRY_NODE.increaseExceptionQps(count);}throw e;}
}

代码@1:首先调用 fireEntry,先调用 Sentinel Slot Chain 中其他的处理器,执行完其他处理器的逻辑,例如 FlowSlot、DegradeSlot,因为 StatisticSlot 的职责是收集统计信息。

代码@2:如果后续处理器成功执行,则将正在执行线程数统计指标加一,并将通过的请求数量指标增加对应的值。下文会对 Sentinel Node 体系进行详细的介绍,在 Sentinel 中使用 Node 来表示调用链中的某一个节点,每个节点关联一个资源,资源的实时统计信息就存储在 Node 中,故该部分也是调用 DefaultNode 的相关方法来改变线程数等,将在下文会向详细介绍。

代码@3:如果上下文环境中保存了调用的源头(调用方)的节点信息不为空,则更新该节点的统计数据:线程数与通过数量。

代码@4:如果资源的进入类型为 EntryType.IN,表示入站流量,更新入站全局统计数据(集群范围 ClusterNode)。

代码@5:执行注册的进入Handler,可以通过 StatisticSlotCallbackRegistry 的 addEntryCallback 注册相关监听器。

代码@6:如果捕获到 PriorityWaitException ,则认为是等待过一定时间,但最终还是算通过,只需增加线程的个数,但无需增加节点通过的数量,具体原因我们在详细分析限流部分时会重点讨论,也会再次阐述 PriorityWaitException 的含义。

代码@7:如果捕获到 BlockException,则主要增加阻塞的数量。

代码@8:如果是系统异常,则增加异常数量。

我想上面的代码应该不难理解,但涉及到统计指标数据的变化,都是调用 DefaultNode node 相关的方法,从这里也可以看出,Node 将是实时统计数据的直接持有者,那毋容置疑接下来将重点来学习 Node,为了知识体系的完备性,我们先来看一下 StatisticSlot 的 exit 方法。

1.2 StatisticSlot exit 详解

StatisticSlot#exit

public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {DefaultNode node = (DefaultNode)context.getCurNode();if (context.getCurEntry().getError() == null) {         // @1// Calculate response time (max RT is TIME_DROP_VALVE).long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();if (rt > Constants.TIME_DROP_VALVE) {rt = Constants.TIME_DROP_VALVE;}// Record response time and success count.node.addRtAndSuccess(rt, count);if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);}node.decreaseThreadNum();if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().decreaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {Constants.ENTRY_NODE.addRtAndSuccess(rt, count);Constants.ENTRY_NODE.decreaseThreadNum();}} else {// Error may happen.}// Handle exit event with registered exit callback handlers.Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();for (ProcessorSlotExitCallback handler : exitCallbacks) {      // @2handler.onExit(context, resourceWrapper, count, args);}fireExit(context, resourceWrapper, count);     // @3
}

代码@1:成功执行,则重点关注响应时间,其实现亮点如下:
计算本次响应时间,将本次响应时间收集到 Node 中。
将当前活跃线程数减一。

代码@2:执行退出时的 callback。可以通过 StatisticSlotCallbackRegistry 的 addExitCallback 方法添加退出回调函数。

代码@3:传播 exit 事件。

接下来我们将重点介绍 DefaultNode,即 Sentinel 的 Node 体系,持有资源的实时调用信息。

2、Sentienl Node 体系

2.1 Node 类体系图
在这里插入图片描述
我们先简单介绍一下上述核心类的作用与核心接口或核心属性的含义。

  • OccupySupport
    支持抢占未来的时间窗口,有点类似借用“未来”的令牌。其核心方法如下:
    • long tryOccupyNext(long currentTime, int acquireCount, double threshold)
      尝试抢占未来的令牌,返回值为调用该方法的线程应该 sleep 的时间。
      1、long currentTime
      当前时间。
      2、int acquireCount
      本次需要申请的令牌个数。
      3、double threshold
      设置的阔值。
  • long waiting()
    获取当前已申请的未来的令牌的个数。
  • void addWaitingRequest(long futureTime, int acquireCount)
    申请未来时间窗口中的令牌。
  • void addOccupiedPass(int acquireCount)
    增加申请未来令牌通过的个数。
  • double occupiedPassQps()
    当前抢占未来令牌的QPS。
  • Node
    持有实时统计信息的节点。定义了收集统计信息与获取统计信息的接口,上面方法根据方法名称即可得知其含义,故这里就不一一罗列了。
  • StatisticNode
    实现统计信息的默认实现类。
  • DefaultNode
    用于在特定上下文环境中保存某一个资源的实时统计信息。
  • ClusterNode
    实现基于集群限流模式的节点,将在集群限流模式部分详细介绍。
  • EntranceNode
    用来表示调用链入口的节点信息。

本文将详细介绍 DefaultNode 与 StatisticNode,重点阐述调用树与实时统计信息。DefaultNode 是 StatisticNode 的子类,我们先从 StatisticNode 开始 Node 体系的探究。

2、StatisticNode 详解

2.1 核心类图

在这里插入图片描述
我们对其核心属性进行一一解读:

  • Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL)
    每秒的实时统计信息,使用 ArrayMetric 实现,即基于滑动窗口实现,正是上篇文章详细介绍的,默认1s 采样 2次。即一个统计周期中包含两个滑动窗口。
  • Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false)
    每分钟实时统计信息,同样使用 ArrayMetric 实现,即基于滑动窗口实现。每1分钟,抽样60次,即包含60个滑动窗口,每一个窗口的时间间隔为 1s 。
  • LongAdder curThreadNum = new LongAdder()
    当前线程计数器。
  • long lastFetchTime = -1
    上一次获取资源的有效统计数据的时间,即调用 Node 的 metrics() 方法的时间。

关于 ArrayMetric 滑动窗口设计与实现原理,请参考笔者的另一篇博文:Alibaba Seninel 滑动窗口实现原理(文末附原理图)

接下来我们挑选几个具有代表性的方法进行探究。

2.2 addPassRequest

public void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);
}

增加通过请求数量。即将实时调用信息向滑动窗口中进行统计。addPassRequest 即报告成功的通过数量。就是分别调用 秒级、分钟即对应的滑动窗口中添加数量,然后限流规则、熔断规则将基于滑动窗口中的值进行计算。

2.3 totalRequest

public long totalRequest() {return rollingCounterInMinute.pass() + rollingCounterInMinute.block();
}

获取当前时间戳的总请求数,获取分钟级时间窗口中的统计信息。

2.4 successQps

public double successQps() {return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();
}

成功TPS,用秒级统计滑动窗口中统计的个数 除以 窗口的间隔得出其 tps,即抽样个数越大,其统计越精确。

温馨提示:上面的方法在学习了上文的滑动窗口设计原理后将显得非常简单,大家在学习的过程中,可以总结出一个规律,什么时候时候使用秒级滑动窗口,什么时候使用分钟级滑动窗口。

2.5 metrics

由于 Sentienl 基于滑动窗口来实时收集统计信息,并存储在内存中,并随着时间的推移,旧的滑动窗口将失效,故需要提供一个方法,及时将所有的统计信息进行汇总输出,供监控客户端定时拉取,转储都其他客户端,例如数据库,方便监控数据的可视化,这也通常是中间件用于监控指标的监控与采集的通用设计方法。

public Map<Long, MetricNode> metrics() {long currentTime = TimeUtil.currentTimeMillis();currentTime = currentTime - currentTime % 1000;   // @1Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();   // @2long newLastFetchTime = lastFetchTime;// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).for (MetricNode node : nodesOfEverySecond) { if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {    // @3metrics.put(node.getTimestamp(), node);newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());}}lastFetchTime = newLastFetchTime;return metrics;
}

代码@1:获取当前时间对应的滑动窗口的开始时间,可以对比上文计算滑动窗口的算法。

代码@2:获取一分钟内的所有滑动窗口中的统计数据,使用 MetricNode 表示。

代码@3:遍历所有节点,刷选出不是当前滑动窗口外的所有数据。这里的重点是方法:isNodeInTime。

private boolean isNodeInTime(MetricNode node, long currentTime) {return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime;
}

这里只刷选出不是当前窗口的数据,即 metrics 方法返回的是“过去”的统计数据。

接下来我们再来看看 DefaultNode 相关的几个特性方法。

3、DefaultNode 详解

3.1 类图

在这里插入图片描述
DefaultNode 是 StatisticNode 的子类,其额外增加的属性如下:

  • private ResourceWrapper id
    资源id,即 DefaultNode 才真正与资源挂钩,可以将 DefaultNode 看出是调用链中的一个节点,并且与资源关联。
  • private volatile Set< Node > childList
    子节点结合。以此来维持其调用链。
  • private ClusterNode clusterNode
    集群节点,同样为 StatisticNode 的子类,表示与资源集群相关的环境。

接下来我们将来看一下 DefaultNode 的核心方法。

3.2 increaseBlockQps

public void increaseBlockQps(int count) {super.increaseBlockQps(count);this.clusterNode.increaseBlockQps(count);
}

DefaultNode 的此类方法,通常是先调用 StatisticNode 的方法,然后再调用 clusterNode 的相关方法,最终就是使用在对应的滑动窗口中增加或减少计量值。

其他方法也比较简单,就不再细看了,我们可以通过 DefaultNode 的 printDefaultNode 方法来打印该节点的调用链。

本文就介绍到这里了,本文详细介绍了 Sentinel 实时数据收集的统一入口 StatisticSlot,并且介绍了 Seninel Node 体系,即调用链中的每一个节点,每一个节点对一个资源的实时统计信息。下一篇将开始重点限流是如何实现的,即 FlowSlot 的实现技巧。


欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录:
1、源码分析RocketMQ专栏(40篇+)
2、源码分析Sentinel专栏(12篇+)
3、源码分析Dubbo专栏(28篇+)
4、源码分析Mybatis专栏
5、源码分析Netty专栏(18篇+)
6、源码分析JUC专栏
7、源码分析Elasticjob专栏
8、Elasticsearch专栏(20篇+)
9、源码分析MyCat专栏


http://chatgpt.dhexx.cn/article/43JJ6YQn.shtml

相关文章

ISYSTEM调试实践10-实时数据采集工具daqIDEA

本文介绍一种实时数据采集的工具daqIDEA&#xff0c;该软件整合在了winIDEA内&#xff0c;可以直接通过winIDEA启动。 daqIDEA类似于jlink的jscop&#xff0c;stlink也有类似功能。原理就是利用仿真探头&#xff0c;将程序运行的变量实时采集出来&#xff0c;并通过曲线的方式显…

基于组态王6.55的实时数据采集助手V1.0

采集助手V1.0使用说明采集助手V1.0是基于北京亚控科技的组态王6.55开发的&#xff0c;用于对带485通讯功能的流量计、电能表等智能仪表的实时数据进行采集的程序&#xff0c;可将采集到的数据记录为xls电子表格形式&#xff0c;用于后期分析。 一、主要功能&#xff1a; 1、采…

如何使实时数据采集处理系统保持数据的高速传输

如何使实时数据采集处理系统保持数据的高速传输 1引言 当前&#xff0c;越来越多的设计应用领域要求具有高精度的A&#xff0f;D转换和实时处理功能。在实时数据采集处理系统设计中&#xff0c;一般需要考虑数据采集以及对采集数据的处理。而对于大数据量的实时数据采集处理系…

「山东城商行联盟数据库准实时数据采集系统」入选2021中国大数据应用样板案例

12月17日&#xff0c;由中国信息协会大数据分会主办的“2021中国大数据技术应用大会”在北京圆满落幕。来自中国信息协会、中国工程院、国家信息中心、中国软件评测中心、中国金融认证中心等的权威专家、知名企业代表出席本次大会&#xff0c;对大数据的需求、应用和产业发展进…

实时数据采集无压力,网络抓取基础架构全程保障

作为一位爬虫工程师&#xff0c;网络抓取是我在日常工作中的重要任务之一。在当今信息爆炸的时代&#xff0c;实时数据的价值愈发显著。对于企业和个人来说&#xff0c;准确及时地获取最新的数据信息&#xff0c;能够帮助做出更明智的决策&#xff0c;抓住商机&#xff0c;甚至…

BET365的websocket实时数据采集分析

BET365网站websocket实时数据采集分析 ** 前语&#xff1a; ** 本文仅用于交流学习&#xff0c;请勿用于非法用途&#xff0c;后果自负&#xff01; bet365是全球顶尖的涵盖足球篮球等各项赛事的赛事信息提供网站以及博彩网站。为啥要去爬这个网站呢&#xff1f;因为它赛事…

数据采集工具 -- Flume

1、Flume的定义 Flume由Cloudera公司开发&#xff0c;是一个分布式、高可靠、高可用的海量日志采集、聚合、传输的系统。Flume支持在日志系统中定制各类数据发送方&#xff0c;用于采集数据&#xff1b;Flume提供对数据进行简单处理&#xff0c;并写到各种数据接收方的能力。简…

PLC实时数据采集如何实现?

数据采集传输对于后续企业进行分析和决策是十分重要的&#xff0c;而实时数据采集更能提升整体生产的认识度&#xff0c;从而采取到更加及时高效的措施。因此PLC实时数据采集成为企业的基础应用&#xff0c;那么如何实现PLC的实时数据采集呢&#xff1f; 1、协议解析 协议解析…

实时数仓-数据采集层_1

实时数仓-数据采集层_1 一、实时数仓介绍1、普通实时计算与实时数仓比较2、实时电商数仓项目分层 二、实时需求概述1、离线计算与实时计算的比较2、实时需求种类2.1、日常统计报表或分析图中需要包含当日部分2.2、实时数据大屏监控2.3、数据预警或提示2.4、实时推荐系统 三、统…

低成本,高效率,更成熟的实时数据采集方案来了

现在&#xff0c;科技的发展正在飞速影响着各行各业的生产模式。其中物联网作为新技术载体&#xff0c;正在帮助各行业极大地释放数字化、智能化的空间。 物联网将信息网络连接和服务的对象从人扩展到物&#xff0c;让物流、金融、城市管理、能源勘探、工业制造等等领域复杂的业…

项目一实时数仓数据采集

目录 1. 项目介绍 1.1项目背景 1.2项目需求 1.3目标 1.4 二次开发 2. 项目部署 2.1业务数据采集 2.2 导入脚本编写和测试 2.3内容数据采集 2.3.1说明&#xff1a; 2.3.3 配置管理中心 2.3.4 注册域名 2.4 日志数据采集 2.4.1 nginx服务器的搭建 2.4.2 启动nginx&am…

【Oracle】准实时大规模数据提取

文中使用的Oracle版本为10g。 这篇文章是之前本人在前公司内部做可行性分析报告中的其中一个板块的内容&#xff0c;具体讲述的是为了做大规模数据提取和数据清洗做了一个试验demo。先说结论&#xff0c;一般来说像这种操作不应优先考虑关系型数据库去解决。本文中提到的采用J…

实时数据流采集工具Flume

实时数据流采集工具Flume 实时数据流采集工具Flume1.1 Flume的介绍1.2 Flume的特点1.3 Flume的功能架构1.4 Flume的功能原理1.5 Flume的安装部署1.6 Flume两种常见基础架构1.6.1 多路复用流Multiplexing The Flow1.6.2 Consolidation 1.7 Flume中常用的三大基础组件1.7.1 sourc…

必须了解的实时数据架构

随着互联网的发展进入下半场&#xff0c;数据的时效性对企业的精细化运营越来越重要&#xff0c; 商场如战场&#xff0c;在每天产生的海量数据中&#xff0c;如何能实时有效的挖掘出有价值的信息&#xff0c; 对企业的决策运营策略调整有很大帮助。此外&#xff0c;随着 5G 技…

puppeteer-0-大背景:服务端:生成图片 合成海报 截屏

背景 最近接手一个任务&#xff0c;大致要求就是可以动态合成图片。 没听懂&#xff1f;那我再解释下&#xff1a; 大致就是如上功能。 这个时候&#xff0c;会的&#xff0c;或者稍微会的&#xff0c;或者真的会的&#xff0c;就开会七嘴八舌了&#xff1a;吧唧吧唧.... 前…

小程序多张图片合成海报分享功能

项目有个需求&#xff1a;代言人模块分享出去时。需要合成几张图片作为一张海报分享出去 需要用到canvas组件&#xff0c;组件的部分合成图需要下载下来&#xff0c;所以后端要配置好下载合法域名如下图&#xff0c;配置ok后开发者工具会看到合法域名 代码逻辑 1、点击分享弹框…

php生成推广二维码海报、合成图片demo

php生成推广二维码海报、合成图片 1、海报背景图。背景图一般存服务器,程序本地读取; 2、推广二维码。可以是二维码图片链接,也可以是字符串图像流。如果自己生成二维码,详见phpqrcode官网,地址:https://sourceforge.net/projects/phpqrcode。 3、开启PHP的GD扩展 inde…

微信小程序海报生成图片合成工具类

背景 我目前参与小程序的产品研发&#xff0c;为了方便产品的转发和推广&#xff0c;会对课提供生成海报转发或者分享的功能&#xff0c;前期海报合成这个功能是由项目组的老同事负责开发&#xff0c;后来小程序海报这块功能需要单独做一个功能用于专门根据不同的课程类型来生…

平面设计新手如何用PS制作出一张合成海报

本文由:“学设计上兔课网”原创,图片素材来自网络,仅供学习分享 平面设计新手如何用PS制作出一张合成海报?本期教程给大家制作一张关于VR战争合成海报,从合成海报来讲我们要做到以下几点: 创意部分:如何通过创意表现产品功能或其他想要表现的点, 有了创意找到合适的…

小程序画布合成二维码海报图,并保存到相册

小程序画布合成二维码海报图&#xff0c;并保存到相册 实现效果如下图&#xff1a; 步骤分析 1&#xff0c;先获取需要合成的海报和二维码。 2&#xff0c;获取图片的本地路径&#xff0c;如果图片不是网络连接则不需要此操作。 3&#xff0c;通过手机型号&#xff0c;根据…