本文主要是介绍通俗易懂的Java常见限流算法具体实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《通俗易懂的Java常见限流算法具体实现》:本文主要介绍Java常见限流算法具体实现的相关资料,包括漏桶算法、令牌桶算法、Nginx限流和Redis+Lua限流的实现原理和具体步骤,并比较了它们的...
一、漏桶算法
1.漏桶算法的思想和原理
1.固定容量的漏桶:系统维护一个固定容量的漏桶,用来存放请求。
2.请求入桶:当一个请求到达系统时,相当于将水倒入漏桶。如果漏桶已满,多余的请求会被丢弃或拒绝。
3.恒定速率的出桶:漏桶以恒定的速率处理请求,就像漏斗中的水稳定地漏出一样。
4.平滑流量:通过漏桶的出水速率,可以平滑流入系统的请求,避免突发流量。
5.限流判断:当一个请求到达时,会检查漏桶是否已满,如果漏桶已满,则触发限流机制,拒绝请求。
漏桶算法的实现步骤是,先声明一个队列用来保存请求,这个队列相当于漏斗,当队列容量满了之后就放弃新来的请求,然后重新声明一个线程定期(指定速率编程)从任务队列中获取一个或多个任务进行执行,这样就实现了漏桶算法。
优点:可以有效控制流量,避免突发请求的冲击,保持系统稳定性;
缺点:可能会影响请求响应时间,且不使用大并发量的请求系统;
2.具体实现
import Java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class LeakyBucket { private final long capacity; // 桶容量 private final long rate; // 漏水速率 private long water; // 当前水量 private long lastLeakTime; // 上一次漏水时间 private final AtomicLong requestCount; // 请求计数 public LeakyBucket(long capacity, long rate) { this.capacity = capacity; this.rate = rate; this.water = 0; this.lastLeakTime = System.currentTimeMillis(); this.requestCount = new AtomicLong(0); //以固定的速率漏水 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS); } //限流 public synchronized boolean allowRequest() { long currentTime = System.currentTimeMillis(); long elapsedTime = currentTime - lastLeakTime; water = Math.max(0, water - elapsedTime * rate); // 漏水 lastLeakTime = currentTime; if (water < capacity) { water++; requestCount.incrementAndGet(); return true; // 请求通过 } return false; // 漏桶已满,限流 } public long getRequestCount() { return requestCount.get(); } //以固定速率漏水 private synchronized void leakWater() { long currentTime = System.currentTimeMillis(); long elapsedTime = currentTime - lastLeakTime; water = Math.max(0, water - elapsedTime * rate); // 漏水 lastLeakTime = currentTime; } public static void main(String[] args) { // 创建一个容量为 10,速率为 2/S的漏桶 LeakyBucket leakyBucket = new LeakyBucket(10, 2); // 模拟请求 for (int i = 0; i < 20; i++) { boolean allowed = leakyBucket.allowRequest(); if (allowed) { System.out.println("Request " + (i + 1) + ": Allowed"); } else { System.out.println("Request " + (i + 1) + ": Limited"); } } // 输出总请求数 System.out.println("Total requests: " + leakyBucket.getRequestCount()); } }
二、令牌桶算法
1.令牌桶算法流程:
1.放入令牌到桶:按照固定的速率被放入令牌桶中,比如每秒放5个、10个、100个令牌到桶中。
2.获取令牌:所有的请求在处理之前都需要拿到一个可用的令牌才会被处理。
3.令牌桶满了拒绝:桶中最多能放1000个令牌,当桶满时,就不能继续放入了,新添加的令牌要么被丢弃,要么就直接拒绝。
优点:
1.避免了突发流量对系统的冲击。
2.可以根据需求调整令牌生成速率和令牌桶的容量,以适应不同的流星控制需求。
缺点:1.不适合瞬时突发流量,令牌桶算法可能无法处理突然涌入的大量请求,因为令牌桶的令牌生成速率是固定的。
2.如果请求需要等待令牌桶中的令牌,可能会导致一些请求的响应时间增加。
2.具体实现
2.1 编程实现
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TokenBucket {
private final long capacity; // 令牌桶容量
private final long rate; // 令牌生成速率
private AtomicLong tokens; // 当前令牌数量
private ScheduledExecutorService scheduler;
public TokenBucket(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = new AtomicLong(0);
this.scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::addToken, 0, 1, TimeUnit.SECONDS);
}
public boolean allowRequest() {
long currentTokens = tokens.getChina编程();
if (currentTokens > 0) {
tokens.decrementAndGet();
return true; // 有令牌,允许请求通过
}
return false; // 无令牌,限流
}
//添加令牌
private void addToken() {
long newTokens = Math.min(capacity, tokens.get() + rate);
tokens.set(newTokens);
}
public void shutdown() {
scheduler.shutdown();
}
public static void main(String[] args) {
TokenBucket tokenBucket = new TokenBucket(10, 2); // 创建容量为10,速率为2的令牌桶
// 模拟请求
for (int i = 0; i < 20; i++) {
boolean allowed = tokenBucket.allowRequest();
if (allowed) {
System.out.println("Request " + (i + 1) + ": Allowed");
} else {
System.out.println("Request " + (i + 1) + ": Limited");
}
}
tokenBucket.shutdown();
}
}
2.2 使用 Google 开源的 guava 包
(1)导入依赖
<!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.2-jre</version> </dependency>
(2)代码实现
import java.lang.annotation.*; import java.util.concurrent.TimeUnit; @Target(ElementType.METHOD) @RChina编程etention(RetentionPolicy.RUNTIME) @Documented public @interface Limiter { int NOT_LIMITED = 0; String LIMIT_ERROR = "使用太频繁了,稍后再试..." ; /** * 限流key,唯一 * * @return */ String key() default ""; /** * 时间单位内允许的次数 * * @return */ double qps() default NOT_LIMITED; /** * 最大等待时间 * * @return */ int timeout() default NOT_LIMITED; /** * 最大等待时间单位 * * @return */ TimeUnit timeUnit() default TimeUnit.MILLISECONDS; }
import cn.hutool.core.util.StrUtil; import com.google.common.util.concurrent.RateLimiter; import com.hytera.annotation.Limiter; import lombok.extern.slf4j.Slf4j; import org.ASPectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.stereotype.Component; import utils.IpUtil; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * @Author: zt 2024/1/9 17:48 * @CreateTime: 2024/1/9 17:48 * @描述:限流 **/ @Slf4j @Aspect @Component public class RateLimiterAspect { private static final ConcurrentMap<String, RateLimiter> RATE_LIMITER_CACHE = new ConcurrentHashMap<>(); @Around("@annotation(com.hytera.annotation.Limiter)") public Object pointcut(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); Limiter rateLimiter = AnnotationUtils.findAnnotation(method, Limiter.class); if (rateLimiter != null && rateLimiter.qps() > Limiter.NOT_LIMITED) { double qps = rateLimiter.qps(); String ip = IpUtil.getIpAddress(); String key = StrUtil.isEmpty(rateLimiter.key())?method.getName()+"-"+IpUtil.getIpAddress():rateLimiter.key()+"-"+ ip; RateLimiter limiter = RATE_LIMITER_CACHE.get(key); if (limiter == null) { RATE_LIMITER_CACHE.put(key, RateLimiter.create(qps)); log.debug("【{}】的QPS设置为: {}", method.getName(), RATE_LIMITER_CACHE.get(key).getRate()); }else { //超时或者获取不到令牌,则报错 boolean b = limiter.tryAcquire(rateLimiter.timeout(), rateLimiter.timeUnit()); if (b) { throw new RuntimeException(Limiter.LIMIT_ERROR);//自定义异常 } } } return point.proceed(); } }
三、Nginx限流
Nginx 提供了两种限流手段:一是控制速率,二是控制并发连接数。
一、控制速率
我们需要使用 limit_req_zone 用来限制单位时间内的请求数,即速率限制,示例配置如下:
#限制每个 IP 访问的速度为 2r/s,因为 Nginx 的限流统计是基于毫秒的,我们设置的速度是 2r/s,转换一下就是 500ms 内单个 IP 只允许通过 1 个请求,从 501ms 开始才允许通过第 2 个请求。 limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s; server { location / { limit_req zone=mylimit; } }
#使用 burst 关键字,控制一个 IP 单位总时间内的总访问次数 #burst=4,设置一个大小为4的缓冲区域,当大量请求到来,请求数量超过限流频率时,将其放入缓冲区域 limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s; server { location / { limit_req zone=mylimit burst=4; } }
二、控制并发连接数
#limit_conn perip 10 表示限制单个 IP 同时最多能持有 10 个连接; #limit_conn perserver 100 表示 server 同时能处理并发连接的总数为 100 个。 limit_conn_zone $binary_remote_addr zone=perip:10m; limit_conn_zone $server_name zone=perserver:10m; server { ... limit_conn perip 10; limit_conn perserver 100; }
四、Redis+Lua限流
1.Lua介绍
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功。
2.Lua优势:
(1)减少网络开销: 不使用 Lua 的代码需要向 Redis 发送多次请求, 而脚本只需一次即可, 减少网络传输;
(2)原子操作: Redis 将整个脚本作为一个原子执行, 无需担心并发, 也就无需事务;(3)复用: 脚本会永久保存 Redis 中, 其他客户端可继续使用。
3.具体实现:
(1)编写Lua脚本(将其放在resources/scripts/redis目录下):
-- 下标从 1 开始 local key = KEYS[1] local now = tonumber(ARGV[1]) local ttl = tonumber(ARGV[2]) local expired = tonumber(ARGV[3]) -- 最大访问量 local max = tonumber(ARGV[4]) -- 清除过期的数据 -- 移除指定分数区间内的所有元素,expired 即已经过期的 score -- 根据当前时间毫秒数 - 超时毫秒数,得到过期时间 expired redis.call('zremrangebyscore', key, 0, expired) -- 获取 zset 中的当前元素个数 local current = tonumber(redis.call('zcard', key)) local next = current + 1 if next > max then -- 达到限流大小 返回 0 return 0; else -- 往 zset 中添加一个值、得分均为当前时间戳的元素,[value,score] redis.call("zadd", key, now, now) -- 每次访问均重新设置 zset 的过期时间,单位毫秒 redis.call("pexpire", key, ttl) return next end
(2)代码实现:
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.scripting.support.ResourceScriptSource; @Configuration public class RedisConfig { @Bean @SuppressWarnings("unchecked") public RedisScript<Long> limitRedisScript() { DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("scripts/redis/limit.lua"))); redisScript.setResultType(Long.class); return redisScript; } }
import org.springframework.core.annotation.AliasFor; import org.springframework.core.annotation.AnnotationUtils; import java.lang.annotation.*; import java.util.concurrent.TimeUnit; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RateLimiter { long DEFAULT_REQUEST = 10; /** * max 最大请求数 */ @AliasFor("max") long value() default DEFAULT_REQUEST; /** * 限流key */ String key() default ""; /** * 超android时时长,默认1分钟 */ long timeout() default 1; /** * 超时时间单位,默认 分钟 */ TimeUnit timeUnit() default TimeUnit.MINUTES; }
import cn.hutool.core.util.StrUtil; import com.xkcoding.ratelimit.redis.annotation.RateLimiter; import com.xkcoding.ratelimit.redis.util.IpUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.laphpng.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.time.Instant; import java.util.Collections; import java.util.concurrent.TimeUnit; /** * <p> * 限流切面 * </p> * * @author yangkai.shen * @date Created in 2019-09-30 10:30 */ @Slf4j @Aspect @Component @RequiredArgsConstructor(onConstructor_ = @Autowired) public class RateLimiterAspect { private final static String SEPARATOR = ":"; private final static String REDIS_LIMIT_KEY_PREFIX = "limit:"; private final StringRedisTemplate stringRedisTemplate; private final RedisScript<Long> limitRedisScript; @Around("@annotation(com.xkcoding.ratelimit.redis.annotation.RateLimiter)") public Object pointcut(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); // 通过 AnnotationUtils.findAnnotation 获取 RateLimiter 注解 RateLimiter rateLimiter = AnnotationUtils.findAnnotation(method, RateLimiter.class); if (rateLimiter != null) { String key = rateLimiter.key(); // 默认用类名+方法名做限流的 key 前缀 if (StrUtil.isBlank(key)) { key = method.getDeclaringClass().getName() + StrUtil.DOT + method.getName(); } // 最终限流的 key 为 前缀 + IP地址 key = key + SEPARATOR + IpUtil.getIpAddr(); long max = rateLimiter.max(); long timeout = rateLimiter.timeout(); TimeUnit timeUnit = rateLimiter.timeUnit(); boolean limited = shouldLimited(key, max, timeout, timeUnit); if (limited) { throw new RuntimeException("手速太快了,慢点儿吧~"); } } return point.proceed(); } private boolean shouldLimited(String key, long max, long timeout, TimeUnit timeUnit) { // 最终的 key 格式为: // limit:自定义key:IP // limit:类名.方法名:IP key = REDIS_LIMIT_KEY_PREFIX + key; // 统一使用单位毫秒 long ttl = timeUnit.toMillis(timeout); // 当前时间毫秒数 long now = Instant.now().toEpochMilli(); long expired = now - ttl; Long executeTimes = stringRedisTemplate.execute(limitRedisScript, Collections.singletonList(key), now + "", ttl + "", expired + "", max + ""); if (executeTimes != null) { if (executeTimes == 0) { log.error("【{}】在单位时间 {} 毫秒内已达到访问上限,当前接口上限 {}", key, ttl, max); return true; } else { log.info("【{}】在单位时间 {} 毫秒内访问 {} 次", key, ttl, executeTimes); return false; } } return false; }
总结
到此这篇关于Java常见限流算法具体实现的文章就介绍到这了,更多相关Java常见限流算法内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于通俗易懂的Java常见限流算法具体实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!