本文主要是介绍【Java核心能力】RateLimiter 限流底层原理解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术的推送!
在我后台回复 「资料」 可领取
编程高频电子书
!
在我后台回复「面试」可领取硬核面试笔记
!文章导读地址:点击查看文章导读!
感谢你的关注!
RateLimiter 限流底层原理解析
为什么要学习 RateLimiter 原理呢,其实是因为我的项目中使用到了 RateLimiter 进行限流,之前有一次在唯品会面试的时候,面试官就问到了 RateLimiter 的底层限流算法以及原理,当时回答的并不是很好,所以这里整理一下!
这篇文章并没有讲 RateLimiter 的源码,底层源码看起来比较复杂,主要来理解它的限流算法原理、存在的缺点、以及在真实项目中如何使用它!
限流原理
RateLimiter 是谷歌的 Guava 包提供的限流器,采用 令牌桶算法 ,即均匀向桶中添加令牌,每次消费的时候也必须持有令牌,如果没有的话,就需要等待
RateLimiter 限流器有两种:
- 基本的限流器 SmoothBursty :每秒产生的令牌数量固定
- 带预热效果的 SmoothWarmingUp :会有一个预热的时间,也就是限流器产生令牌的速度会慢慢提升至最大,而不是一开始就是最快速度产生令牌
常用 API
这里说一下两种限流器常用的 API 的使用
public static void main(String[] args) throws InterruptedException {// 平滑限流器,每秒生成令牌数量 2RateLimiter smoothBurstyRateLimiter = RateLimiter.create(2);// 预热限流器,每秒生成令牌数量 2,预热时间 10sRateLimiter smoothWarmingUpRateLimiter = RateLimiter.create(2, 10, TimeUnit.SECONDS);// 尝试获取令牌,获取不到就立即返回falsesmoothBurstyRateLimiter.tryAcquire();// 获取令牌,获取不到就一直阻塞等待smoothBurstyRateLimiter.acquire();// 允许 RateLimiter 在运行时动态修改每秒产生的令牌数量smoothBurstyRateLimiter.setRate(10);
}
SmoothBursty 限流器
SmoothBursty 限流器就是每秒以固定速度产生令牌,比如说一秒产生 5 枚令牌,那么就会以固定速度向令牌桶中放置令牌
RateLimiter 中的透支未来令牌设计:
RateLimiter 中是可以对未来的令牌进行透支的,也就是虽然令牌桶中的令牌不够,那么就先欠着,当下一次请求过来的时候,再去还账
这里简单来说的话,RateLimiter 限流是针对于下一次请求进行限流的,而不是针对当前请求限流
这里我举个例子:
public static void main(String[] args) throws InterruptedException {RateLimiter rateLimiter = RateLimiter.create(2);while (true) {double acquire = rateLimiter.acquire(4);System.out.println("获取了令牌" + acquire);}
}
/*** 输出:* 获取了令牌0.0* 获取了令牌1.999297* 获取了令牌1.998551*/
这里通过 RateLimiter.create(2)
来创建限流器的话,就会去创建默认的限流器,即 SmoothBursty
设置令牌桶中每秒生产 2 个令牌,但是在 while 循环中,我们每次去获取 4 个令牌,按理来说要获取的令牌数已经超过了我们令牌桶中的令牌数量,应该是获取不到的
但是在 RateLimiter 中,第一次请求是可以直接获取 4 个令牌,但是下一次请求来的时候,就需要等待两秒时间了,这就是 RateLimiter 中对未来令牌的透支 ,这种透支会带来一定的问题,后边我们会说到这个问题,以及如何去解决
带预热效果的 SmoothWarmingUp 限流器
SmoothWarmingUp 这个限流器需要一个指定的时间来进行预热,而是在这个预热时间之内产生令牌的速度逐渐增加到指定的速度,而不是一开始就直接达到指定的令牌产生速度
这种限流器 适用于需要资源预热的场景 ,比如假设有些系统的平均 QPS 为 500,但是系统的一些资源可能都是懒加载的,并且一些数据还没有来得及加载到缓存中,如果在应用重启之后,突然有大量请求过来,需要进行这些懒加载资源的初始化,那么此时的 QPS 肯定无法达到 500,因此需要这种有预热效果的限流器
预热在高并发场景中的重要性
这里主要说一下为何需要通过预热来加强应用的可用性?
预热 在高并发场景中是很重要的,如果没有预热操作,可能 后端服务、数据库 等在重启之后,又会再次发生故障,因为重启之后,应用的资源、缓存都还没有初始化,大量初始化以及连接建立的操作会给服务带来巨大的压力,导致发生故障
比如后端服务可能会有一些线程池、或者一些对象是懒加载的状态,而数据库会通过缓存来优化性能,因此在服务或者数据库重启之后,这些对象、缓存都还没有加载,导致还无法承受很大数量的请求
因此不管是在限流中、还是在负载均衡中,都要考虑应用重启之后的预热,给应用一个初始化的时间,当应用逐渐达到稳定的状态之后,再去接收大量的请求
RateLimiter 是否线程安全以及设计模式的应用
RateLimiter 是线程安全的,在最终 RateLimiter 获取令牌的时候,其实是通过 synchronized 来加锁获取的
那么这里的 this.mutex()
就是返回了一个单例对象,来保证该应用的线程安全,这里我们来看一下在 RateLimiter 中是 如何生成单例对象 的
可以看到这里通过 双重检测 的方式来保证生成的 mutex 对象是单例的,并且该单例对象使用 volatile 修饰,来保证多线程之间的可见性,这样每个线程来上锁都是针对同一个对象进行加锁的,才可以保证线程安全
RateLimiter 的缺陷
RateLimiter 是存在缺陷的,如果系统的并发量逐步升高,通过 acquire()
方法是一定会去获取令牌的,而由于 RateLimiter 中 透支未来令牌 的设计,这就会导致后边的请求等待时间会逐步升高,下边代码模拟了并发量逐步升高的场景,从输出结果看可以发现后边的请求等待的时间越来越长,这显然对后来的请求很不友好
public static void main(String[] args) throws InterruptedException {RateLimiter rateLimiter = RateLimiter.create(2);for (int i = 1; i < 20; i ++) {double acquire = rateLimiter.acquire(i);System.out.println("获取了" + i + "个令牌,等待时间为" + acquire);}/*** 输出:* 获取了1个令牌,等待时间为0.0* 获取了2个令牌,等待时间为0.499337* 获取了3个令牌,等待时间为0.998667* 获取了4个令牌,等待时间为1.499843* 获取了5个令牌,等待时间为1.996169* 获取了6个令牌,等待时间为2.499906* 获取了7个令牌,等待时间为2.993976* 获取了8个令牌,等待时间为3.499379* 获取了9个令牌,等待时间为3.999501* 获取了10个令牌,等待时间为4.490265*/
}
- 怎么来解决这个问题呢?
这个问题的原因就是 acquire() 方法一定会获取令牌,那么我们在获取令牌之前可以先使用 tryAcquired 检测:
1、如果可行再去 acquire()
2、如果令牌不足,适当拒绝请求
因此解决策略就是我们去 定义一个拒绝策略 ,当发现等待的时间远远超出了可以接受的范围,就将该请求给拒绝掉,这样就不会导致一致透支未来的令牌,导致后边的请求越来越慢
- acquire 包装代码解析
如下代码(来源于 xjjdog 作者的 Github),我们将 acquire 方法给包装一下,先通过 tryAcquire() 尝试获取令牌,如果获取不到返回 false,我们再将请求数量给记录到原子类中,再通过 acquire() 开始阻塞等待获取令牌,当发现等待的请求数量超过指定的最大请求数量之后,就将之后的请求给拒绝掉!
public class FollowController {private final RateLimiter rateLimiter;private int maxPermits;private Object mutex = new Object();//等待获取permits的请求个数,原则上可以通过maxPermits推算private int maxWaitingRequests;private AtomicInteger waitingRequests = new AtomicInteger(0);public FollowController(int maxPermits,int maxWaitingRequests) {this.maxPermits = maxPermits;this.maxWaitingRequests = maxWaitingRequests;rateLimiter = RateLimiter.create(maxPermits);}public FollowController(int permits,long warmUpPeriodAsSecond,int maxWaitingRequests) {this.maxPermits = maxPermits;this.maxWaitingRequests = maxWaitingRequests;rateLimiter = RateLimiter.create(permits,warmUpPeriodAsSecond, TimeUnit.SECONDS);}public boolean acquire() {return acquire(1);}public boolean acquire(int permits) {boolean success = rateLimiter.tryAcquire(permits);if (success) {rateLimiter.acquire(permits);//可能有出入return true;}if (waitingRequests.get() > maxWaitingRequests) {return false;}waitingRequests.getAndAdd(permits);rateLimiter.acquire(permits);waitingRequests.getAndAdd(0 - permits);return true;}}
常用限流算法:漏桶算法和令牌桶算法
常用的限流算法是 漏桶算法 和 令牌桶算法
RateLimiter 是基于令牌桶算法的思想实现,这里说一下这两种限流算法以及它们的区别
漏桶算法
漏桶算法的原理就是 将请求加入漏桶中,漏桶以固定速率出水,如果请求在漏桶中溢出就拒绝请求
那么这个漏桶就可以使用一定长度的队列来实现,长度就是这个漏桶所能容纳请求的数量,再通过另一个线程从队列的另一端去不断取出任务执行就可以了
- 漏桶算法存在的问题
漏桶算法存在的问题就是只能以固定速率处理到来的请求,无法处理突发请求 ,也就是一瞬间如果有超过漏桶大小的请求数量过来的话,超出的那部分请求就会被无情的抛弃
那么漏桶算法的这个问题在令牌桶算法中得到了解决 ,如果请求一开始数量较少,令牌桶中会积累令牌数量,当有突发流量到来的时候,会去使用已经积累的令牌数量来去处理这些请求,并且 RateLimiter 的实现中还可以对未来令牌数量透支,这样 RateLimiter 实现的令牌桶算法就可以很好的应对突发流量了,不过这样带来的缺点就是如果一直并发量比较高,导致对未来的令牌数量一直透支,会导致后边请求的阻塞等待时间逐渐变长,不过解决方法我们上边也说过了,适当的加一些请求拒绝策略就可以缓解这种现象
在高并发的场景中,突发流量还是比较常见的,因此在 RateLimiter 基于令牌桶算法实现中为了应对突发流量,做出了这些优化
漏桶算法和令牌桶算法还有一点区别就是:
漏桶算法是需要将请求给存储在队列中,而在令牌桶算法中,并没有真正去产生令牌,而是根据时间差来计算这段时间应该产生的令牌数, 所以令牌桶算法的性能相对于漏桶算法来说是比较高的!
令牌桶算法
令牌桶算法的原理就是 系统使用恒定速率往桶中放入令牌,如果请求需要被处理,就从桶中获取令牌,如果没有令牌的话,请求被拒绝
RateLimiter 就是基于令牌桶算法实现的,在他里边并没有真正的去创建令牌实体,而是根据时间差来计算这一段时间产生的令牌数,这样做的好处就是 性能比较高
如果真正要去创建令牌实体的话,肯定需要再启动一个任务,以固定速率向令牌桶中生成令牌,那么启动一个新的任务是会带来一定的系统开销的,可能会加重系统的负担,那么通过时间差来计算令牌数的话,通过简单的计算就可以拿到产生的令牌数量,开销大大减少
项目中如何使用限流
项目中使用限流的话,可以使用 AOP + RateLimiter 的方式来实现限流,也可以使用 AOP + RateLimiter + SpringBoot starter + 自定义注解 的方式来实现限流
这里两种方式都说一下,代码实现的话并不难,主要就是用到了 AOP
使用 starter 的方式来实现限流的话,在其他项目中如果需要使用到限流功能,直接引入这个 starter,在需要限流的方法上加入定义的限流注解就可以了,使用起来比较方便
使用 AOP 的方式实现限流的好处就是可以 减少对代码的入侵 ,将限流操作和业务操作隔离开来,互不影响,这也就是 代理模式 的好处!
AOP 方式实现限流
AOP 方式实现限流的话,就是定义一个切面,对需要限流的方法进行增强就可以了,这里将切面的代码给贴出来(当然依赖需要引入 Guava 的依赖包):
@Component
@Aspect
public class ServiceLogAspect {/*** 对 controller 限流*/@Pointcut("execution(* com.java.back.controller.*.*(..))")public void rateLimitPointCut() {}private static final RateLimiter rateLimiter = RateLimiter.create(10);@SneakyThrows // 使用之后不需要抛出异常,lombok会自动在编译时加上try/catch@Around("rateLimitPointCut()")public Object rateLimit(ProceedingJoinPoint joinPoint) {double rate = rateLimiter.getRate();System.out.println(rate);if (rateLimiter.tryAcquire()) {return joinPoint.proceed();} else {// 如果超出限流次数,拦截方法的执行,注意这里返回的对象要和 Controller 方法的返回对象类型相同,否则会报错return "访问太过频繁";}}
}
starter 方式实现限流
其实使用 starter 来设计的话也是比较方便的,相当于将限流组件设计成为了第三方插件,这样可以不用在每一个项目中都写一份,这种方式的复用性比较强
这里先说一下实现的大致流程,就是定义一个 Spring 项目,创建一个 spring.factories
文件,指定自动配置类,这个自动配置类就是将我们的切面给扫描到 Spring 的 Bean 容器中去,这样切面才可以生效
并且还需要自定义一个注解,当我们的项目需要使用限流功能时,引入这个限流的组件,将里边自定义的注解给加到需要限流的方法上去就可以了
那么实现一个 SpringBoot starter 的话,只需要:spring.factories 、 自动配置类 、切面 、自定义注解 这三块内容就可以完成
- 首先说一下 自定义注解 的实现,自定义注解中定义了限流的次数
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DoRateLimiter {double permitsPerSecond() default 0D;String errorResult() default "";
}
- 接下来是 切面 的实现,切面主要是对自定义的注解进行增强
@Component
@Aspect
public class DoRateLimiterPoint {// 该切面匹配了所有带有 @DoRateLimiter 注解的方法@Pointcut("@annotation(com.zqy.ratelimiter.annotation.DoRateLimiter)")public void aopPoint() {}// aopPoint() && @annotation(doRateLimiter) 这样处理,可以通过方法入参就直接拿到注解,比较方便@Around("aopPoint() && @annotation(doRateLimiter)")public Object doRouter(ProceedingJoinPoint jp, DoRateLimiter doRateLimiter) throws Throwable {System.out.println("进入了切面");IRateLimiterOpService rateLimiterOpService = new RateLimiterOpServiceImpl();return rateLimiterOpService.access(jp, getMethod(jp), doRateLimiter, jp.getArgs());}private Method getMethod(JoinPoint jp) throws NoSuchMethodException {Signature sig = jp.getSignature();MethodSignature methodSignature = (MethodSignature) sig;return jp.getTarget().getClass().getMethod(methodSignature.getName(), methodSignature.getParameterTypes());}
}
- 这里将限流操作其实封装在了 rateLimiterOpService 类中
public class RateLimiterOpServiceImpl implements IRateLimiterOpService{@Overridepublic Object access(ProceedingJoinPoint jp, Method method, DoRateLimiter doRateLimiter, Object[] args) throws Throwable {// 如果注解没有限流,则执行方法if (0 == doRateLimiter.permitsPerSecond()) return jp.proceed();String clzzName = jp.getTarget().getClass().getName();String methodName = method.getName();String key = clzzName + ":" + methodName;// 这里用 Map 缓存一下限流器,每个方法创建一个限流器缓存if (null == Constants.rateLimiterMap.get(key)) {// 如果该方法没有限流器的话,就创建一个Constants.rateLimiterMap.put(key, RateLimiter.create(doRateLimiter.permitsPerSecond()));}RateLimiter rateLimiter = Constants.rateLimiterMap.get(key);// 如果没有达到限流器上限if (rateLimiter.tryAcquire()) {return jp.proceed();}// 将错误信息返回return JSONObject.parseObject(doRateLimiter.errorResult());}
}
- 最后就是限流器的使用了
@RestController
public class HelloController {@DoRateLimiter(permitsPerSecond = 1, errorResult = "{\"code\": \"1001\",\"info\": \"调用方法超过最大次数,限流返回!\"}")@GetMapping("/hello")public Object hello() {return "hello";}
}
还有 spring.factories
文件不要忘记,该文件指定的自动配置类,这样在我们的 SpringBoot 项目引入限流的 starter 组件之后,才可以扫描到这个自动配置类,在这个自动配置类中创建我们的切面 Bean,这样切面的 Bean 就在 Spring 容器中可以生效了
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.zqy.ratelimiter.config.RateLimiterAutoConfig
完整的代码在代码仓库中查看:https://gitee.com/qylaile/rate-limiter-tool-starter
这篇关于【Java核心能力】RateLimiter 限流底层原理解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!