本文主要是介绍Guava-RateLimiter秒杀限流技术详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
使用场景
系统使用下游资源时,需要考虑下游对资源受限、处理能力,在下游资源无法或者短时间内无法提升处理性能的情况下,可以使用限流器或者类似保护机制,避免下游服务崩溃造成整体服务的不可用。
常用算法
常见限流算法有两种:漏桶算法和令牌桶算法。
漏桶算法
具体问题
网站的访问ip中,找出进行频繁连接的ip,并对这些ip的访问频率进行限制。
解决方案
Leak Bucket / Token Bucket
学习资料
http://en.wikipedia.org/wiki/Leaky_bucket
概述
将上述的寻找频繁访问ip的问题提升到一个更高的抽象层次,就是网站的流量控制。Leaky Bucket就是一种可以辅助实现流量控制的算法。
在我看来,Leaky Bucket是一个抽象层次略高的算法。它的作用,是通过一种模型(即桶),建立了一种合理地判断流量是否异常的算法。
至于在判断出异常流量后,要触发怎样的操作——抛弃?放入等待队列暂缓发送?——仍然要交给算法的实现者根据具体需求作出选择。这并不是Leaky Bucket的管辖范畴。
根据wiki上的介绍,Leaky Bucket实际上有两种不同的含义。
1)as a meter(作为计量工具)
2)as a queue(作为调度队列)
其中,第一种含义和Token Bucket是等价的,只是表述的角度不同。更有趣的是,第二种含义其实是第一种的特例。这些对比和区别在后面再谈,先整体看一下Leaky Bucket。
Leaky Bucket整体思想
Leaky Bucket的核心抽象模型就如字面意思:一个会漏水的桶。
如图,桶本身具有一个恒定的速率往下漏水,而上方时快时慢地会有水进入桶中。当桶还未满时,上方的水可以加入。一旦水满,上方的水就无法加入了。桶满正是算法中的一个的关键触发条件(即流量异常判断成立的条件)。而此条件下如何处理上方欲留下的水,则有了下面两种常见的方式。
Traffic Shaping和Traffic Policing
在桶满水之后,常见的两种处理方式为:
1)暂时拦截住上方水的向下流动,等待桶中的一部分水漏走后,再放行上方水。
2)溢出的上方水直接抛弃。
将水看作网络通信中数据包的抽象,则
方式1起到的效果称为Traffic Shaping,
方式2起到的效果称为Traffic Policing。
由此可见,Traffic Shaping的核心理念是“等待”,Traffic Policing的核心理念是“丢弃”。它们是两种常见的流速控制方法。
(http://en.wikipedia.org/wiki/Traffic_shaping,http://en.wikipedia.org/wiki/Traffic_policing)
算法所需的参数
现在,再回顾一下上面的图,可以看出算法只需要两个参数:
1)桶漏水的速率
2)桶的大小
核心:
利用桶模型判断何时的流量达到异常了
外延:
1)流量异常时的处理方法:traffic policing v.s. traffic shaping
2)处理的数据包是否定长:定长 v.s. 变长
3)桶的大小是否等同于每个tick放行的水量:as a queue v.s. as a meter
总结
回头再看,其实Leaky Bucket是一个很简单的想法,在处理流量控制上也能有不错的效果。wiki上的资料非常繁复,看了我一个下午。其实更多的是在大家运用这个词时的情景多种多样,而没有很好地叙述出算法的核心和外延。
我这里做学习笔记,其实主要也是为了理清自己在学习Leaky Bucket时的混乱,试图真正搞清楚哪些是核心,哪些是外延。
注意事项
在学习的过程中,我发现网上所有的中文资料在谈及Leaky Bucket(漏桶)和Token Bucket(令牌桶)算法时,都是把漏桶看作wiki解释中的第二种。所以,以上文章里的“漏桶”和本文的“Leaky Bucket”并不等价。
我个人倒是并不反对用漏桶来指代wiki的第二种解释,因为这样就可以明确区分出“漏桶”和“令牌桶”。但是,在这种解释下,我们需要牢记,“漏桶”就只是“令牌桶”的一个特例而已了。
令牌桶算法
令牌桶算法基于这样的场景的模拟:
有一个装有token且token数量固定的桶,token添加的速率时固定的,当有请求来(或者数据包到达),会检查下桶中是否包含足够多的token(一个请求可能需要多个token)。对于数据包而言,数据包的长度等同于需要获取的token数量。即从桶中消费token,若token数量足够,则消费掉,不够则根据不同的策略处理(阻塞当前或提前消费等)。
Guava Ratelimiter实现
Guava实现更接近于令牌桶算法:将一秒钟切割为令牌数的时间片段,每个时间片段等同于一个token。
关键变量:
nextFreeTicketMicros
:表示下一次允许补充许可的时间(时刻)。这个变量的解释比较拗口,看下面流程会比较清晰maxPermits
:最大许可数storedPermits
:存储的许可数,数量不能超过最大许可数
实现
这里有一个关键方法(重)同步方法,在初始化以及获取操作时都会用到:
void resync(long nowMicros) {// if nextFreeTicket is in the past, resync to nowif (nowMicros > nextFreeTicketMicros) {double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();storedPermits = min(maxPermits, storedPermits + newPermits);nextFreeTicketMicros = nowMicros;}
}
如果当前时间(不是时刻,而是自创建起所流经的时间,下同)超过了上一次所设定的nextFreeTicketMicros
时间,则会重新进行同步:
- 通过计算上一次设定
nextFreeTicketMicros
到当前时刻的时间差获取新增的可用许可数; - 计算可用的许可数:如果新增的许可数+原有的许可数小于最大许可数,则存储的许可数增加新增的数量,否则同步为最大许可数;
- 同步下一次允许补充许可时间为当前时间
初始化
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);rateLimiter.setRate(permitsPerSecond);return rateLimiter;
}
这里使用一个StopWatch
来计时,主要是获取自限速器创建所流经的时间。
初始化关键变量(其实就是通过resync方法来实现主要逻辑的
):
nextFreeTicketMicros为当前时间;maxPermits为传入的每秒允许的许可数;storedPermits则为0
初始化
获取许可(acquire)
获取一定数量的许可,如果获取不到,则阻塞相应时间,然后获取相应许可。并返回当前操作所等待的时间。
- 尝试
resync
操作 - 返回值所需等待时间设置为min(
nextFreeTicketMicros
-nowMicros,0) - 实际消耗的许可数:min(请求许可数,存储许可数中的小值);
- 需要刷新获取的许可数(
freshPermits
):请求许可数-实际消耗许可数 - 等待时间(
waitMicros
):需要刷新获取的许可数(freshPermits
)*每个许可数所需时间 - 下一次允许补充许可时间(
nextFreeTicketMicros
)同步为:nextFreeTicketMicros+=waitMicros - 更新剩余存储的许可数:存储许可数-本次实际消耗许可数
根据resync
方法条件:if (nowMicros > nextFreeTicketMicros)
不难发现,如果申请获取的许可数多于剩余可分配的许可数,更新后的nextFreeTicketMicros
时间会超过nowMicros
,但是当前请求所需等待时间为0。即对于超量许可申请(大于当前可提供的许可数),等待操作是在下一次请求时才会发生。通俗点说就是:前人挖坑后人跳。
当nextFreeTicketMicros
早于当前时间,且许可数足够的情况:
nextFreeTicketMicros早于nowMicros且许可足够
当nextFreeTicketMicros
早于当前,但是许可数不够的情况:
nextFreeTicketMicros早于nowMicros但许可不足
当nextFreeTicketMicros
晚于当前时间,主要是阻塞时间计算,许可数分发以及时间计算等同上两场景。
尝试获取许可(tryAcquire)
如果nextFreeTicketMicros
-timeout
<=nowMicros
,说明经过超时时间内也不会有一个许可可以分配(按上描述,只要有许可,就可用分配,无论申请的数量有多少),则tryAcquire操作直接返回false。否则按照acquire操作流程获取许可信息。
预热(warmingup)
首先申请一个容量为100(每秒)的限流器,然后多线程并发获取许可,并发数量为20,且每个线程只获取一次。
附上测试代码:
public void testCurrent(){RateLimiter rateLimiter = RateLimiter.create(100);ExecutorService executorService = Executors.newFixedThreadPool(100);Runnable runnable = ()->{if(!rateLimiter.tryAcquire(1,100,TimeUnit.MILLISECONDS)){System.out.println("F"+Thread.currentThread().getName());}else {System.out.println("A"+Thread.currentThread().getName());}};for (int i = 0; i < 20; i++) {executorService.execute(runnable);}try {executorService.awaitTermination(1,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}
}
按上算法描述应当不会出现F
开头的输出,但是实际却发现20次输出基本有小半数的尝试获取失败:
1489453467102 pool-1-thread-1
1489453467102 pool-1-thread-2
1489453467104 pool-1-thread-3
1489453467104 pool-1-thread-4
1489453467105 pool-1-thread-5
1489453467105 pool-1-thread-6
1489453467105 pool-1-thread-7
1489453467107 pool-1-thread-8
1489453467107 pool-1-thread-9
F 1489453467108 pool-1-thread-15
F 1489453467108 pool-1-thread-16
F 1489453467109 pool-1-thread-17
F 1489453467109 pool-1-thread-18
F 1489453467109 pool-1-thread-19
F 1489453467109 pool-1-thread-20
1489453467219 pool-1-thread-10
1489453467239 pool-1-thread-11
1489453467259 pool-1-thread-12
1489453467274 pool-1-thread-13
1489453467297 pool-1-thread-14
问题来自于初始化时,storedPermits
存储的许可数为0,而第一个线程进行获取时,离初始时时间非常近,导致第一个线程获取许可后,存储的可用许可数并非为声明的最大许可数,从而导致后续线程尝试获取几次后会耗尽存储的许可数,继而导致tryAcquire
操作失败。
Guava-RateLimiter详解2
常用的限流算法有漏桶算法和令牌桶算法,guava的RateLimiter使用的是令牌桶算法,也就是以固定的频率向桶中放入令牌,例如一秒钟10枚令牌,实际业务在每次响应请求之前都从桶中获取令牌,只有取到令牌的请求才会被成功响应,获取的方式有两种:阻塞等待令牌或者取不到立即返回失败.
本次实战,我们用的是guava的RateLimiter,场景是spring mvc在处理请求时候,从桶中申请令牌,申请到了就成功响应,申请不到时直接返回失败。
实例一
1、添加guava jar包
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>18.0</version></dependency>
2、AccessLimitService.java 限流服务封装到一个类中AccessLimitService,提供tryAcquire()方法,用来尝试获取令牌,返回true表示获取到
@Service
public class AccessLimitService {//每秒只发出5个令牌RateLimiter rateLimiter = RateLimiter.create(5.0);/*** 尝试获取令牌* @return*/public boolean tryAcquire(){return rateLimiter.tryAcquire();}
}
3、Controller层每次收到请求的时候都尝试去获取令牌,获取成功和失败打印不同的信息
@Controller
public class HelloController {private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate AccessLimitService accessLimitService;@RequestMapping("/access")@ResponseBodypublic String access(){//尝试获取令牌if(accessLimitService.tryAcquire()){//模拟业务执行500毫秒try {Thread.sleep(500);}catch (InterruptedException e){e.printStackTrace();}return "aceess success [" + sdf.format(new Date()) + "]";}else{return "aceess limit [" + sdf.format(new Date()) + "]";}}
}
4、测试:十个线程并发访问接口
public class AccessClient {ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);/*** get请求* @param realUrl* @return*/public static String sendGet(URL realUrl) {String result = "";BufferedReader in = null;try {// 打开和URL之间的连接URLConnection connection = realUrl.openConnection();// 设置通用的请求属性connection.setRequestProperty("accept", "*/*");connection.setRequestProperty("connection", "Keep-Alive");connection.setRequestProperty("user-agent","Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");// 建立实际的连接connection.connect();// 定义 BufferedReader输入流来读取URL的响应in = new BufferedReader(new InputStreamReader(connection.getInputStream()));String line;while ((line = in.readLine()) != null) {result += line;}} catch (Exception e) {System.out.println("发送GET请求出现异常!" + e);e.printStackTrace();}// 使用finally块来关闭输入流finally {try {if (in != null) {in.close();}} catch (Exception e2) {e2.printStackTrace();}}return result;}public void access() throws Exception{final URL url = new URL("http://localhost:8080/guavalimitdemo/access");for(int i=0;i<10;i++) {fixedThreadPool.submit(new Runnable() {public void run() {System.out.println(sendGet(url));}});}fixedThreadPool.shutdown();fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);}public static void main(String[] args) throws Exception{AccessClient accessClient = new AccessClient();accessClient.access();}
}
部分请求由于获取的令牌可以成功执行,其余请求没有拿到令牌,我们可以根据实际业务来做区分处理。还有一点要注意,我们通过RateLimiter.create(5.0)配置的是每一秒5枚令牌,但是限流的时候发出的是6枚,改用其他值验证,也是实际的比配置的大1。
以上就是快速实现限流的实战过程,此处仅是单进程服务的限流,而实际的分布式服务中会考虑更多因素,会复杂很多。
RateLimiter方法摘要
修饰符和类型 | 方法和描述 |
---|---|
double | acquire() 从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求 |
double | **acquire(int permits)**从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求 |
static RateLimiter | create(double permitsPerSecond)根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询) |
static RateLimiter | **create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)**根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和) |
double | **getRate()**返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数 |
void | **setRate(double permitsPerSecond)**更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。 |
String | toString()返回对象的字符表现形式 |
boolean | **tryAcquire()**从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话 |
boolean | **tryAcquire(int permits)**从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话 |
boolean | **tryAcquire(int permits, long timeout, TimeUnit unit)**从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待) |
boolean | **tryAcquire(long timeout, TimeUnit unit)**从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) |
- 举例来说明如何使用RateLimiter,想象下我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个:
//速率是每秒两个许可
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) {for (Runnable task : tasks) {rateLimiter.acquire(); // 也许需要等待executor.execute(task);}
}
官方文档:http://ifeve.com/guava-ratelimiter
实例二
package com.didispace.demo;import com.google.common.util.concurrent.RateLimiter;import java.util.Random;
import java.util.concurrent.CountDownLatch;/*** Created by daizhao.* User: tony* Date: 2018-10-30* Time: 10:01* info: RateLimiter限流技术(令牌桶算法)*/
public class TestRateLimiter {//每秒只发出10个令牌RateLimiter rateLimiter = RateLimiter.create(10);public void sendRequest() {if (rateLimiter.tryAcquire()) {//获取令牌成功System.out.println(Thread.currentThread().getName() + "请求成功");} else {System.out.println(Thread.currentThread().getName() + "请求失败");}}public static void main(String[] args) {TestRateLimiter testRateLimiter = new TestRateLimiter();CountDownLatch countDownLatch = new CountDownLatch(1);//发令枪Random random = new Random(10);for (int i = 0; i < 20; i++) {new Thread(() -> {try {countDownLatch.await();//所有线程阻塞在此Thread.sleep(random.nextInt(1000));testRateLimiter.sendRequest();} catch (InterruptedException e) {e.printStackTrace();}}, i + "tt").start();}countDownLatch.countDown();//所有线程同时开启}}
这篇关于Guava-RateLimiter秒杀限流技术详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!