Sentinel限流源码解读

sentinel限流规则初始化

  1. FlowRule对象用来存配置信息
  2. FlowRuleManager用来加载配置信息
  3. 后台定时任务每秒获取一次最新配置 通过 MetricTimerListener 实现
    SentinelConfig初始化配置及获取最新配置
    MetricTimerListener 定时的数据采集,然后写到log文件里去
    遍历集群节点 汇总统计的数据
    
    StatisticNode 使用滑动窗口实时统计数据

限流规则对象 FlowRule

FlowRule
com.alibaba.csp.sentinel.slots.block.flow.FlowRule

resource 资源名
limitApp 限流来源,默认为default不区分来源
grade 限流类型: QPS限流、线程个数限流
count 限流阈值
strategy 限流策略 1.直接限流(direct) 2.关联限流(relate) 3.链路限流(chain)
refResource
controlBehavior 流控效果 0.直接拒绝(reject directly) 1.热启动(warm up) 2. 匀速排队(rate limiter) 3. 热启动 + 匀速排队 warm up + rate limiter
warmUpPeriodSec 流控效果为预热启动时的预热时长 默认10s
maxQueueingTimeMs 流控效果为排队等待时的等待时长 Max queueing time in rate limiter behavior 默认500
clusterMode
clusterConfig
controller

流控策略

LimitApp的作用域只在配置的流控策略为RuleConstant.STRATEGY_DIRECT(直接关联)时起作用。
其有三种配置,分别为default,origin_name,other
default 如果配置为default,表示统计不区分来源,当前资源的任何来源流量都会被统计(其实就是选择 Node 为 clusterNode 维度)
origin_name 如果配置为指定名称的 origin_name,则只会对当前配置的来源流量做统计
other 如果配置为other 则会对其他全部来源生效但不包括第二条配置的来源

当策略配置为 RuleConstant.STRATEGY_RELATE 或 RuleConstant.STRATEGY_CHAIN 时
STRATEGY_RELATE 关联其他的指定资源,如资源A想以资源B的流量状况来决定是否需要限流,这时资源A规则配置可以使用 STRATEGY_RELATE 策略
STRATEGY_CHAIN 对指定入口的流量限流,因为流量可以有多个不同的入口(EntranceNode)

流控策略校验

同一个资源名可以配置多条规则,规则的生效顺序为:{some_origin_name} > other > default

校验 FlowRuleChecker#selectNodeByRequesterAndStrategy

String limitApp = rule.getLimitApp(); // 不设置默认为default,设置了就是自己指定的

// 调用方限流-直接限流-针对特定的调用者  origin变量 不为`default` 或 `other`,并且配置的limitApp和origin变量 相等
if (limitApp.equals(origin) && filterOrigin(origin)) {
    if (strategy == RuleConstant.STRATEGY_DIRECT) {
        // Matches limit origin, return origin statistic node.
        return context.getOriginNode();
    }

    return selectReferenceNode(rule, context, node);
}
// 调用方限流-直接限流-默认
if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
    if (strategy == RuleConstant.STRATEGY_DIRECT) {
        // Return the cluster node.
        return node.getClusterNode();
    }

    return selectReferenceNode(rule, context, node);
}
// 调用方限流-直接限流-其它
if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
    && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
    if (strategy == RuleConstant.STRATEGY_DIRECT) {
        return context.getOriginNode();
    }

    return selectReferenceNode(rule, context, node);
}
// 
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
    String refResource = rule.getRefResource();
    int strategy = rule.getStrategy();

    if (StringUtil.isEmpty(refResource)) {
        return null;
    }

    if (strategy == RuleConstant.STRATEGY_RELATE) {
        return ClusterBuilderSlot.getClusterNode(refResource);
    }

    if (strategy == RuleConstant.STRATEGY_CHAIN) {
        if (!refResource.equals(context.getName())) {
            return null;
        }
        return node;
    }
    // No node.
    return null;
}

流控效果 流量控制实现

// 加载流控规则
FlowRuleManager.loadRules(rules);

com.alibaba.csp.sentinel.slots.block.flowFlowRuleUtil::buildFlowRuleMap

    /**
     * Build the flow rule map from raw list of flow rules, grouping by provided group function.
     *
     * @param list          raw list of flow rules
     * @param groupFunction grouping function of the map (by key)
     * @param filter        rule filter
     * @param shouldSort    whether the rules should be sorted
     * @param <K>           type of key
     * @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules
     */
    public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction,
                                                              Predicate<FlowRule> filter, boolean shouldSort) {
        // 存储所有的流控规则
        Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
        if (list == null || list.isEmpty()) {
            return newRuleMap;
        }
        Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();

        // 循环加载流控规则 
        for (FlowRule rule : list) {
            if (!isValidRule(rule)) {
                RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
                continue;
            }
            if (filter != null && !filter.test(rule)) {
                continue;
            }
            if (StringUtil.isBlank(rule.getLimitApp())) {
                rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
            }

            // 获取流控规则的阈值
            TrafficShapingController rater = generateRater(rule);
            rule.setRater(rater);

            K key = groupFunction.apply(rule);
            if (key == null) {
                continue;
            }
            Set<FlowRule> flowRules = tmpMap.get(key);

            if (flowRules == null) {
                // Use hash set here to remove duplicate rules.
                flowRules = new HashSet<>();
                tmpMap.put(key, flowRules);
            }

            flowRules.add(rule);
        }
        Comparator<FlowRule> comparator = new FlowRuleComparator();
        for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
            List<FlowRule> rules = new ArrayList<>(entries.getValue());
            if (shouldSort) {
                // Sort the rules.
                Collections.sort(rules, comparator);
            }
            newRuleMap.put(entries.getKey(), rules);
        }

        return newRuleMap;
    }
```  


```java
    // 0. 直接拒绝(reject directly), 
    // 1. 热启动(warm up), 
    // 2. 匀速排队 rate limiter, 
    // 3. 热启动 + 匀速排队 warm up + rate limiter
    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        // 是否是QPS限流
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    // 1  热启动 warm up   缓慢放量
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    // 3 热启动 + 匀速排队 warm up + rate limiter
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    // 2 匀速排队 rate limiter
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                    // 直接拒绝
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        // 直接拒绝 
        return new DefaultController(rule.getCount(), rule.getGrade());
    }

其他

限流规则存储在 Map<String, List> rules

SphU
com.alibaba.csp.sentinel

ProcessorSlotChain

ClusterBuilderSlot 调用方限流
NodeSelectorSlot 链路限流
关联流量限流

限流算法

滑动窗口算法
将时间窗口进行划分,每过一个时间单位,时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter

计数器算法
计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。

令牌桶算法
所有的请求在处理之前都需要拿到一个可用的令牌才会被处理
1)、所有的请求在处理之前都需要拿到一个可用的令牌才会被处理;
2)、根据限流大小,设置按照一定的速率往桶里添加令牌;
3)、桶设置最大的放置令牌限制,当桶满时、新添加的令牌就被丢弃或者拒绝;
4)、请求达到后首先要获取令牌桶中的令牌,拿着令牌才可以进行其他的业务逻辑,处理完业务逻辑之后,将令牌直接删除;
5)、令牌桶有最低限额,当桶中的令牌达到最低限额的时候,请求处理完之后将不会删除令牌,以此保证足够的限流;

漏桶算法
漏桶算法,可以粗略的认为就是注水漏水过程,往桶中以一定速率流出水,以任意速率流入水,当水超过桶流量则丢弃,因为桶容量是不变的,保证了整体的速率。

References

  1. Sentinel源码分析—FlowRuleManager加载规则做了什么? https://www.cnblogs.com/luozhiyun/p/11439993.html
  2. Sentinel源码分析—Sentinel是如何进行流量统计的? https://www.cnblogs.com/luozhiyun/p/11451557.html
  3. Sentinel源码分析— QPS流量控制是如何实现的? https://www.cnblogs.com/luozhiyun/p/11489128.html

Sentinel源码解析四(流控策略和流控效果) https://www.cnblogs.com/taromilk/p/11877242.html

sentinel 滑动窗口 限流 https://www.cnblogs.com/wuzhenzhao/p/11453649.html