Skip to content

Commit c946f6a

Browse files
committed
feat: 分布式限流算法示例
1 parent 70b3983 commit c946f6a

File tree

5 files changed

+269
-0
lines changed

5 files changed

+269
-0
lines changed

codes/java-distributed/java-rate-limit/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
</properties>
1818

1919
<dependencies>
20+
<dependency>
21+
<groupId>redis.clients</groupId>
22+
<artifactId>jedis</artifactId>
23+
<version>5.1.0</version>
24+
</dependency>
2025
<dependency>
2126
<groupId>cn.hutool</groupId>
2227
<artifactId>hutool-all</artifactId>
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package io.github.dunwu.distributed.ratelimit;
2+
3+
import cn.hutool.core.collection.CollectionUtil;
4+
import cn.hutool.core.io.FileUtil;
5+
import cn.hutool.core.io.resource.ResourceUtil;
6+
import cn.hutool.core.util.RandomUtil;
7+
import cn.hutool.core.util.StrUtil;
8+
import redis.clients.jedis.Jedis;
9+
import redis.clients.jedis.exceptions.JedisConnectionException;
10+
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.Collections;
13+
import java.util.List;
14+
import java.util.concurrent.TimeUnit;
15+
16+
/**
17+
* 基于 Redis + Lua 实现的固定时间窗口限流算法
18+
*
19+
* @author <a href="mailto:[email protected]">Zhang Peng</a>
20+
* @date 2024-01-23
21+
*/
22+
public class RedisFixedWindowRateLimiter implements RateLimiter {
23+
24+
private static final String REDIS_HOST = "localhost";
25+
26+
private static final int REDIS_PORT = 6379;
27+
28+
private static final Jedis JEDIS;
29+
30+
public static final String SCRIPT;
31+
32+
static {
33+
// Jedis 有多种构造方法,这里选用最简单的一种情况
34+
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);
35+
36+
// 触发 ping 命令
37+
try {
38+
JEDIS.ping();
39+
System.out.println("jedis 连接成功");
40+
} catch (JedisConnectionException e) {
41+
e.printStackTrace();
42+
}
43+
44+
SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/fixed_window_rate_limit.lua"),
45+
StandardCharsets.UTF_8);
46+
}
47+
48+
private final long maxPermits;
49+
private final long periodSeconds;
50+
private final String key;
51+
52+
public RedisFixedWindowRateLimiter(long qps, String key) {
53+
this(qps * 60, 60, TimeUnit.SECONDS, key);
54+
}
55+
56+
public RedisFixedWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, String key) {
57+
this.maxPermits = maxPermits;
58+
this.periodSeconds = timeUnit.toSeconds(period);
59+
this.key = key;
60+
}
61+
62+
@Override
63+
public boolean tryAcquire(int permits) {
64+
List<String> keys = Collections.singletonList(key);
65+
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(periodSeconds),
66+
String.valueOf(maxPermits));
67+
Object eval = JEDIS.eval(SCRIPT, keys, args);
68+
long value = (long) eval;
69+
return value != -1;
70+
}
71+
72+
public static void main(String[] args) throws InterruptedException {
73+
74+
int qps = 20;
75+
RateLimiter jedisFixedWindowRateLimiter = new RedisFixedWindowRateLimiter(qps, "rate:limit:20240122210000");
76+
77+
// 模拟在一分钟内,不断收到请求,限流是否有效
78+
int seconds = 60;
79+
long okNum = 0L;
80+
long total = 0L;
81+
long beginTime = System.currentTimeMillis();
82+
int num = RandomUtil.randomInt(qps, 100);
83+
for (int second = 0; second < seconds; second++) {
84+
for (int i = 0; i < num; i++) {
85+
total++;
86+
if (jedisFixedWindowRateLimiter.tryAcquire(1)) {
87+
okNum++;
88+
System.out.println("请求成功");
89+
} else {
90+
System.out.println("请求限流");
91+
}
92+
}
93+
TimeUnit.SECONDS.sleep(1);
94+
}
95+
long endTime = System.currentTimeMillis();
96+
long time = (endTime - beginTime) / 1000;
97+
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
98+
}
99+
100+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package io.github.dunwu.distributed.ratelimit;
2+
3+
import cn.hutool.core.collection.CollectionUtil;
4+
import cn.hutool.core.io.FileUtil;
5+
import cn.hutool.core.io.resource.ResourceUtil;
6+
import cn.hutool.core.util.RandomUtil;
7+
import cn.hutool.core.util.StrUtil;
8+
import redis.clients.jedis.Jedis;
9+
import redis.clients.jedis.exceptions.JedisConnectionException;
10+
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.List;
13+
import java.util.concurrent.TimeUnit;
14+
15+
/**
16+
* 基于 Redis + Lua 实现的令牌桶限流算法
17+
*
18+
* @author <a href="mailto:[email protected]">Zhang Peng</a>
19+
* @date 2024-01-23
20+
*/
21+
public class RedisTokenBucketRateLimiter implements RateLimiter {
22+
23+
private static final String REDIS_HOST = "localhost";
24+
25+
private static final int REDIS_PORT = 6379;
26+
27+
private static final Jedis JEDIS;
28+
29+
public static final String SCRIPT;
30+
31+
static {
32+
// Jedis 有多种构造方法,这里选用最简单的一种情况
33+
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);
34+
35+
// 触发 ping 命令
36+
try {
37+
JEDIS.ping();
38+
System.out.println("jedis 连接成功");
39+
} catch (JedisConnectionException e) {
40+
e.printStackTrace();
41+
}
42+
43+
SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/token_bucket_rate_limit.lua"),
44+
StandardCharsets.UTF_8);
45+
}
46+
47+
private final long qps;
48+
private final long capacity;
49+
private final String tokenKey;
50+
private final String timeKey;
51+
52+
public RedisTokenBucketRateLimiter(long qps, long capacity, String tokenKey, String timeKey) {
53+
this.qps = qps;
54+
this.capacity = capacity;
55+
this.tokenKey = tokenKey;
56+
this.timeKey = timeKey;
57+
}
58+
59+
@Override
60+
public boolean tryAcquire(int permits) {
61+
long now = System.currentTimeMillis();
62+
List<String> keys = CollectionUtil.newLinkedList(tokenKey, timeKey);
63+
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(qps),
64+
String.valueOf(capacity), String.valueOf(now));
65+
Object eval = JEDIS.eval(SCRIPT, keys, args);
66+
long value = (long) eval;
67+
return value != -1;
68+
}
69+
70+
public static void main(String[] args) throws InterruptedException {
71+
72+
int qps = 20;
73+
int bucket = 100;
74+
RedisTokenBucketRateLimiter redisTokenBucketRateLimiter =
75+
new RedisTokenBucketRateLimiter(qps, bucket, "token:rate:limit", "token:rate:limit:time");
76+
77+
// 先将令牌桶预热令牌申请完,后续才能真实反映限流 QPS
78+
redisTokenBucketRateLimiter.tryAcquire(bucket);
79+
TimeUnit.SECONDS.sleep(1);
80+
81+
// 模拟在一分钟内,不断收到请求,限流是否有效
82+
int seconds = 60;
83+
long okNum = 0L;
84+
long total = 0L;
85+
long beginTime = System.currentTimeMillis();
86+
for (int second = 0; second < seconds; second++) {
87+
int num = RandomUtil.randomInt(qps, 100);
88+
for (int i = 0; i < num; i++) {
89+
total++;
90+
if (redisTokenBucketRateLimiter.tryAcquire(1)) {
91+
okNum++;
92+
System.out.println("请求成功");
93+
} else {
94+
System.out.println("请求限流");
95+
}
96+
}
97+
TimeUnit.SECONDS.sleep(1);
98+
}
99+
long endTime = System.currentTimeMillis();
100+
long time = (endTime - beginTime) / 1000;
101+
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
102+
}
103+
104+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- 缓存 Key
2+
local key = KEYS[1]
3+
-- 访问请求数
4+
local permits = tonumber(ARGV[1])
5+
-- 过期时间
6+
local seconds = tonumber(ARGV[2])
7+
-- 限流阈值
8+
local limit = tonumber(ARGV[3])
9+
10+
-- 获取统计值
11+
local count = tonumber(redis.call('GET', key) or "0")
12+
13+
if count + permits > limit then
14+
-- 请求拒绝
15+
return -1
16+
else
17+
-- 请求通过
18+
redis.call('INCRBY', key, permits)
19+
redis.call('EXPIRE', key, seconds)
20+
return count + permits
21+
end
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
local tokenKey = KEYS[1]
2+
local timeKey = KEYS[2]
3+
4+
-- 申请令牌数
5+
local permits = tonumber(ARGV[1])
6+
-- QPS
7+
local qps = tonumber(ARGV[2])
8+
-- 桶的容量
9+
local capacity = tonumber(ARGV[3])
10+
-- 当前时间(单位:毫秒)
11+
local nowMillis = tonumber(ARGV[4])
12+
-- 填满令牌桶所需要的时间
13+
local fillTime = capacity / qps
14+
local ttl = math.min(capacity, math.floor(fillTime * 2))
15+
16+
local currentTokenNum = tonumber(redis.call("GET", tokenKey))
17+
if currentTokenNum == nil then
18+
currentTokenNum = capacity
19+
end
20+
21+
local endTimeMillis = tonumber(redis.call("GET", timeKey))
22+
if endTimeMillis == nil then
23+
endTimeMillis = 0
24+
end
25+
26+
local gap = nowMillis - endTimeMillis
27+
local newTokenNum = math.max(0, gap * qps / 1000)
28+
local currentTokenNum = math.min(capacity, currentTokenNum + newTokenNum)
29+
30+
if currentTokenNum < permits then
31+
-- 请求拒绝
32+
return -1
33+
else
34+
-- 请求通过
35+
local finalTokenNum = currentTokenNum - permits
36+
redis.call("SETEX", tokenKey, ttl, finalTokenNum)
37+
redis.call("SETEX", timeKey, ttl, nowMillis)
38+
return finalTokenNum
39+
end

0 commit comments

Comments
 (0)