令牌桶
以 warn up 预热限流控制器为例,其中就有使用令牌桶的算法。
warn up 模式是用来防止巨大的请求尖刺突入系统,对系统造成太大的压力导致系统崩溃。
2.0E-6 = 2 * 10^-6 = 0.000002
流程:
1.根据场景算出
- default(reject directly),
- warm up,
- rate limiter,
- warm up + rate limiter
public final class FlowRuleUtil {
/**
* 根据规则设定,使用不同的控制器
*
*/
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
// warm up
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
// rate limiter
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
// warm up + rate limiter
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
return new DefaultController(rule.getCount(), rule.getGrade());
}
}
以下默认的控制器:直接拒绝策略模式
package com.alibaba.csp.sentinel.slots.block.flow.controller;
/**
* Default throttling controller (immediately reject strategy).
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
private void sleep(long timeMillis) {
try {
Thread.sleep(timeMillis);
} catch (InterruptedException e) {
// Ignore.
}
}
}
package com.alibaba.csp.sentinel.node;
/**
* <p>The statistic node keep three kinds of real-time statistics metrics:</p>
* <ol>
* <li>metrics in second level ({@code rollingCounterInSecond})</li>
* <li>metrics in minute level ({@code rollingCounterInMinute})</li>
* <li>thread count</li>
* </ol>
*
* <p>
* Sentinel use sliding window to record and count the resource statistics in real-time.
* The sliding window infrastructure behind the {@link ArrayMetric} is {@code LeapArray}.
* </p>
*
* <p>
* case 1: When the first request comes in, Sentinel will create a new window bucket of
* a specified time-span to store running statics, such as total response time(rt),
* incoming request(QPS), block request(bq), etc. And the time-span is defined by sample count.
* </p>
* <pre>
* 0 100ms
* +-------+--→ Sliding Windows
* ^
* |
* request
* </pre>
* <p>
* Sentinel use the statics of the valid buckets to decide whether this request can be passed.
* For example, if a rule defines that only 100 requests can be passed,
* it will sum all qps in valid buckets, and compare it to the threshold defined in rule.
* </p>
*
* <p>case 2: continuous requests</p>
* <pre>
* 0 100ms 200ms 300ms
* +-------+-------+-------+-----→ Sliding Windows
* ^
* |
* request
* </pre>
*
* <p>case 3: requests keeps coming, and previous buckets become invalid</p>
* <pre>
* 0 100ms 200ms 800ms 900ms 1000ms 1300ms
* +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ Sliding Windows
* ^
* |
* request
* </pre>
*
* <p>The sliding window should become:</p>
* <pre>
* 300ms 800ms 900ms 1000ms 1300ms
* + ...... +-------+ ...... +-------+-----→ Sliding Windows
* ^
* |
* request
* </pre>
*
* @author qinan.qn
* @author jialiang.linjl
*/
public class StatisticNode implements Node {
/**
* Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
/**
* The counter for thread count.
*/
private LongAdder curThreadNum = new LongAdder();
/**
* The last timestamp when metrics were fetched.
*/
private long lastFetchTime = -1;
@Override
public Map<Long, MetricNode> metrics() {
// The fetch operation is thread-safe under a single-thread scheduler pool.
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
long newLastFetchTime = lastFetchTime;
// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
for (MetricNode node : nodesOfEverySecond) {
if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {
metrics.put(node.getTimestamp(), node);
newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
}
}
lastFetchTime = newLastFetchTime;
return metrics;
}
/**
* avgUsedTokens 中调用,用来计算已经通过的 QPS
*/
@Override
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
@Override
public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
long currentBorrow = rollingCounterInSecond.waiting();
if (currentBorrow >= maxCount) {
return OccupyTimeoutProperty.getOccupyTimeout();
}
int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;
int idx = 0;
/*
* Note: here {@code currentPass} may be less than it really is NOW, because time difference
* since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
* lead more tokens be borrowed.
*/
long currentPass = rollingCounterInSecond.pass();
while (earliestTime < currentTime) {
long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
break;
}
long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
return waitInMs;
}
earliestTime += windowLength;
currentPass -= windowPass;
idx++;
}
return OccupyTimeoutProperty.getOccupyTimeout();
}
}
package com.alibaba.csp.sentinel.slots.statistic.metric;
/**
* The basic metric class in Sentinel using a {@link BucketLeapArray} internal.
*
*/
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
/**
* For unit test.
*/
public ArrayMetric(LeapArray<MetricBucket> array) {
this.data = array;
}
@Override
public long pass() {
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
private MetricNode fromBucket(WindowWrap<MetricBucket> wrap) {
MetricNode node = new MetricNode();
node.setBlockQps(wrap.value().block());
node.setExceptionQps(wrap.value().exception());
node.setPassQps(wrap.value().pass());
long successQps = wrap.value().success();
node.setSuccessQps(successQps);
if (successQps != 0) {
node.setRt(wrap.value().rt() / successQps);
} else {
node.setRt(wrap.value().rt());
}
node.setTimestamp(wrap.windowStart());
node.setOccupiedPassQps(wrap.value().occupiedPass());
return node;
}
}
public abstract class LeapArray<T> {
/**
* 时间窗口长度,单位 ms
*/
protected int windowLengthInMs;
/**
* 切割窗口数量
*/
protected int sampleCount;
/**
* 统计的时间间隔 intervalInMs = windowLengthInMs * sampleCount
*/
protected int intervalInMs;
/**
* 窗口数组 数组大小 = sampleCount
*/
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* update lock 更新窗口时需要上锁
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* @param sampleCount 需要划分的窗口数
* @param intervalInMs 间隔的统计时间
*/
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* 获取当前窗口
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
}
预热模式下的令牌桶计算模式
package com.alibaba.csp.sentinel.slots.block.flow.controller;
/**
* <p>
* The principle idea comes from Guava. However, the calculation of Guava is
* rate-based, which means that we need to translate rate to QPS.
* </p>
*
*/
public class WarmUpController implements TrafficShapingController {
protected double count;
private int coldFactor;
protected int warningToken = 0;
private int maxToken;
protected double slope;
protected AtomicLong storedTokens = new AtomicLong(0);
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判断前提条件:
// 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}
public class Test{
public static void main(String[] args) {
int warningToken;
int maxToken;
double slope;
int warmUpPeriodInSec = 2;
int count = 1000;
int coldFactor = 3;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
System.out.println("warningtoken : " + warningToken);
System.out.println("maxtoken : " + maxToken);
System.out.println("slope : " + slope);
}
}
喜欢文章请关注我
程序领域
点击关注+转发,私信发送【面试】或者【资料】可以收获更多资源