RateLimiter实现令牌桶算法和漏桶算法

2024-06-11 23:28

本文主要是介绍RateLimiter实现令牌桶算法和漏桶算法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RateLimiter

第三方工具类:disruptor(高性能的无阻塞的无锁队列)、guava--RateLimit(高性能的信号量的限流器)----【基础的类库】

在 Guava 的 RateLimiter 中,并没有直接提供实现漏桶算法的方法,因为 RateLimiter 的设计就是基于令牌桶的。但是,如果我们想实现一个漏桶算法,我们需要自己编写代码来模拟水的流入和流出。

令牌桶算法(RateLimiter 实现)

  • 令牌以固定的速率产生并放入桶中。

  • 请求尝试从桶中取出令牌

  • 如果桶中有令牌,则请求可以继续;否则,请求可能被拒绝或等待。

在 Guava 的 RateLimiter 中,你使用 tryAcquire() 方法来尝试获取令牌。

漏桶算法

  • 水(请求)以任意速率流入桶中。

  • 桶有一个固定的容量。

  • 水(请求)以固定的速率从桶中流出。

  • 如果桶满了,新的水(请求)会被丢弃。

在漏桶算法的实现中,你需要跟踪桶的当前水位(即当前处理的请求数量),以及流入和流出的速率。你可能需要定时任务来模拟水的流出,或者使用某种数据结构(如队列)来跟踪等待处理的请求。

  • acquire():阻塞方法,尝试获取一个令牌。如果桶中没有令牌,它会阻塞当前线程直到有一个令牌可用。

  • tryAcquire()(无参数):非阻塞方法,尝试获取一个令牌。如果桶中没有令牌,它会立即返回 false

  • tryAcquire(long timeout, TimeUnit unit):带超时的非阻塞方法,尝试在指定的超时时间内获取一个令牌。如果在这段时间内获取到了令牌,它会返回 true;否则,它会返回 false

漏桶算法

限定一个固定速率,当超出了可以采取降级策略。

  • 不管来的速率,始终以匀速的速率处理

1.入门样例

 /*** 限流器示例类,用于演示如何使用Guava的RateLimiter进行速率限制。*/public class RateLimiterExample {/*** 静态的限流器实例,配置为每秒最多允许0.5个请求通过。* 这个配置意味着每两个请求之间至少需要一秒钟的时间间隔。*/private final static RateLimiter limiter = RateLimiter.create(0.5);​/*** 程序的入口点。* 创建一个固定大小的线程池,并提交10个任务来测试限流器。* 每个任务都会尝试获取限流器的许可,以便演示限流效果。** @param args 命令行参数*/public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(10);IntStream.range(0, 10).forEach(i ->service.submit(RateLimiterExample::testLimiter));}​/*** 测试限流器的函数。* 获取限流器的许可,并打印当前线程名以及获取许可所花费的时间。* 这个函数的目的是为了展示限流器如何限制并发请求的数量。*/private static void testLimiter() {// 获取限流器的许可,这可能需要等待,直到许可可用。System.out.println(currentThread() + " waiting " + limiter.acquire());}}

2.典型漏桶算法实现

 /*** Bucket类用于实现一个具有限流功能的队列。* 它使用并发链接deque作为容器,以限制队列中元素的数量,并通过RateLimiter控制取元素的速率。*/public class Bucket {/*** 使用并发链接deque作为容器,以支持并发操作。*/private final ConcurrentLinkedDeque<Integer> container = new ConcurrentLinkedDeque<>();​/*** 定义桶的容量限制。*/private final static int BUCKET_LIMIT = 1000;​/*** 使用RateLimiter来控制提交数据的速率。*/private final RateLimiter limiter = RateLimiter.create(10);​/*** 提交监视器,用于控制对容器进行提交操作的并发访问。*/private final Monitor offerMonitor = new Monitor();/*** 取消监视器,用于控制从容器中取出数据的并发访问。*/private final Monitor pollMonitor = new Monitor();​/*** 向桶中提交数据。* 提交数据到容器中。** 如果容器未满,则将数据添加到容器中;如果容器已满,则抛出异常。* 使用信号量机制来控制对容器的访问,以确保线程安全。** @param data 要提交的数据,必须为非空整数。* @throws IllegalArgumentException 如果容器已满,则抛出此异常。*/public void submit(Integer data){// 使用信号量检查容器是否已满,如果未满则获取访问权限if(offerMonitor.enterIf(offerMonitor.newGuard(()->container.size() < BUCKET_LIMIT))){try{// 在确保容器未满的情况下,添加数据到容器中container.offer(data);// 打印当前线程信息、提交的数据和容器的当前大小System.out.println(currentThread() + " submit data" + data + ",current size:" + container.size());}finally {// 无论添加数据成功与否,都释放访问权限offerMonitor.leave();}}else {// 如果容器已满,则抛出异常throw new IllegalArgumentException("The bucket is full");}}​​/*** 从桶中取出并消费一个元素。* 此方法用于从桶中取出一个元素,并使用提供的Consumer接口来消费这个元素。它首先尝试获取一个许可,确保桶不为空,* 如果成功获取许可,则执行元素的消费操作。这个方法的设计是为了在多线程环境下安全地访问和操作桶中的元素。** @param consumer 一个函数接口,用于定义如何消费桶中的元素。它接受一个整型参数,并没有返回值。*//*** 从桶中取出并消费一个元素。* 如果桶不为空,并且成功获得取元素的许可,则从桶中取出一个元素并使用提供的消费者进行消费。** @param consumer 消费元素的函数接口。*/public void takeThenConsume(Consumer<Integer> consumer){// 尝试获取许可以进入临界区,只有当桶不为空时才允许进入if(pollMonitor.enterIf(pollMonitor.newGuard(()->!container.isEmpty()))){try{// 打印当前线程正在等待获取许可的信息/*如果没有可用的许可,此方法可能会阻塞直到获得许可。这个方法的返回值通常表示成功获取的许可数量,在许多情况下对于一次性获取一个许可的情况会返回1,但具体取决于limiter的实现。*///todo 受RateLimiter.create(10);影响,控制速度System.out.println(currentThread() + " waiting " + limiter.acquire());// 从桶中poll(取出并删除)一个元素,并使用consumer接口消费这个元素consumer.accept(container.poll());}finally {// 无论操作成功与否,都释放许可pollMonitor.leave();}}}​}

测试类

 /*** BucketTest 类用于演示一个并发测试场景,其中多个线程向一个桶中添加数据,* 而其他线程从桶中获取数据并消费。*/public class BucketTest {/*** 主函数执行并发测试。* @param args 命令行参数*/public static void main(String[] args) {// 创建一个 Bucket 实例,用于线程间的数据传递。final Bucket bucket = new Bucket();// 创建一个 AtomicInteger,用于线程安全地自增,作为数据的唯一标识。final AtomicInteger DATA_CREATE = new AtomicInteger(0);​// 启动 5 个生产者线程,它们会不断地向桶中添加数据。IntStream.range(0, 5).forEach(i ->{new Thread(() ->{// 无限循环,生产者线程将持续添加数据。for (; ; ) {// 获取并增加数据序号,作为添加到桶中的数据。int data = DATA_CREATE.getAndIncrement();bucket.submit(data);try {// 休眠 200 毫秒,模拟数据生成的间隔。5个线程一秒提交五个,所以25TimeUnit.MILLISECONDS.sleep(200L);} catch (Exception e) {// 捕获并处理异常,这里只处理 IllegalArgumentException。if (e instanceof IllegalArgumentException) {System.out.println(e.getMessage());}}}}).start();});​// 25 : 10 ====> 5 : 2 是一个比例说明,表示生产者和RateLimit限定的速率(消费者?)的数量关系。​/*Thread[Thread-3,5,main] submit data1712,current size:1000Thread[Thread-7,5,main] waiting 0.09999Thread[Thread-7,5,main] W 711               711 / 1712 ~= 5 : 2*/​​// 启动 5 个消费者线程,它们会不断地从桶中获取数据并消费。IntStream.range(0, 5).forEach(i -> new Thread(() ->{// 无限循环,消费者线程将持续获取并消费数据。for (; ; ) {// 从桶中获取数据并消费,这里使用 lambda 表达式定义消费行为。bucket.takeThenConsume(x -> System.out.println(currentThread() + " W " + x));}}).start());}}

令牌桶算法

需要拿到令牌才允许进来

 /*** 令牌桶算法实现类。* 用于模拟销售系统中的限流策略,确保在限定的速率下出售商品,防止瞬间流量过高导致系统崩溃。*/public class TokenBucket {// 记录已售出的手机数量private AtomicInteger phoneNumbers = new AtomicInteger(0);​// 设置最大的销售限制private final static int LIMIT = 100;​// 使用RateLimiter实现限流,每10秒最多允许一个购买操作,  1s 10个把??private RateLimiter rateLimiter = RateLimiter.create(10);​// 每个实例的销售限制private final int saleLimit;​/*** 默认构造函数,使用预设的销售限制。*/public TokenBucket(){this(LIMIT);}​/*** 带有销售限制参数的构造函数。* * @param limit 销售限制的数量。*/public TokenBucket(int limit){this.saleLimit = limit;}​/*** 尝试购买手机的方法。* 如果令牌桶中有令牌,则尝试购买;如果已达到销售限制,则抛出异常。* * @return 返回购买成功的手机序号。* @throws IllegalStateException 如果已达到销售限制无法购买时抛出。* @throws RuntimeException 如果购买过程中发生异常时抛出。*/public int buy(){Stopwatch started = Stopwatch.createStarted();// 尝试获取令牌,如果10秒内无法获取则失败// 不想acquire会返回超时时间boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);if(success){// 检查是否已达到销售限制//todo 如果放在一开始,就会导致在tryAcquire阻塞多个线程,即使是原子类组合起来也会有线程安全问题。两个原子方法get、getAndIncrement组合起来了if(phoneNumbers.get() >= saleLimit){throw new IllegalStateException("Not any phone can be sale,please wait to next time!");}// 获取并增加已售出的手机数量int phoneNo = phoneNumbers.getAndIncrement();// 模拟处理订单的时间handleOrder();// 打印购买成功信息System.out.println(currentThread() + " user get the Mi phone " + phoneNo + ",ELT: " + started.stop());return phoneNo;}else {// 如果获取令牌失败,则停止计时并抛出异常started.stop();throw new RuntimeException("Sorry, occur exception when buy phone!");}}​/*** 模拟处理订单的时间,随机延迟1-10秒。* 这个方法用于模拟真实购买过程中的一些处理时间,使得购买过程更加真实。*/private void handleOrder(){try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();}}​}

测试类

 public class TokenBucketExample {public static void main(String[] args) {final TokenBucket tokenBucket = new TokenBucket();for(int i = 0;i < 110;i++){new Thread(tokenBucket::buy).start();}}}

区别

漏桶:如果一下子来了很多请求,但是请求会被放在池子里面,然后以固定的速率去处理请求。

令牌桶:以固定的速率往桶内放入令牌,一下来很多请求,只要桶内的令牌足够多,请求就会立马被处理,这就是允许突发大量请求进来。

漏桶是请求进入桶内,但是处理请求的速率是固定的,令牌桶是只要拿到令牌请求立马会被处理。

漏桶算法与令牌桶算法的区别在于,漏桶算法能够强行限制数据的传输速率,令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。

需要注意的是,在某些情况下,漏桶算法不能够有效地使用网络资源,因为漏桶的漏出速率是固定的,所以即使网络中没有发生拥塞,漏桶算法也不能使某一个单独的数据流达到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法结合起来为网络流量提供更高效的控制。

  • 从 API 调用上,两者可能看起来很相似(都是尝试获取某种“资源”),但它们的内部实现和逻辑是不同的。RateLimiter 直接提供了令牌桶算法的实现,而如果你需要实现漏桶算法,你需要自己编写额外的代码。

这篇关于RateLimiter实现令牌桶算法和漏桶算法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1052545

相关文章

不懂推荐算法也能设计推荐系统

本文以商业化应用推荐为例,告诉我们不懂推荐算法的产品,也能从产品侧出发, 设计出一款不错的推荐系统。 相信很多新手产品,看到算法二字,多是懵圈的。 什么排序算法、最短路径等都是相对传统的算法(注:传统是指科班出身的产品都会接触过)。但对于推荐算法,多数产品对着网上搜到的资源,都会无从下手。特别当某些推荐算法 和 “AI”扯上关系后,更是加大了理解的难度。 但,不了解推荐算法,就无法做推荐系

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

康拓展开(hash算法中会用到)

康拓展开是一个全排列到一个自然数的双射(也就是某个全排列与某个自然数一一对应) 公式: X=a[n]*(n-1)!+a[n-1]*(n-2)!+...+a[i]*(i-1)!+...+a[1]*0! 其中,a[i]为整数,并且0<=a[i]<i,1<=i<=n。(a[i]在不同应用中的含义不同); 典型应用: 计算当前排列在所有由小到大全排列中的顺序,也就是说求当前排列是第

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

【数据结构】——原来排序算法搞懂这些就行,轻松拿捏

前言:快速排序的实现最重要的是找基准值,下面让我们来了解如何实现找基准值 基准值的注释:在快排的过程中,每一次我们要取一个元素作为枢纽值,以这个数字来将序列划分为两部分。 在此我们采用三数取中法,也就是取左端、中间、右端三个数,然后进行排序,将中间数作为枢纽值。 快速排序实现主框架: //快速排序 void QuickSort(int* arr, int left, int rig

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo