关于分布式限流,这几点你必须掌握!

2024-06-03 09:32

本文主要是介绍关于分布式限流,这几点你必须掌握!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

小刀爱编程 2019-04-25 13:40:27

前言

在一个高并发系统中对流量的把控是非常重要的,当巨大的流量直接请求到我们的服务器上没多久就可能造成接口不可用,不处理的话甚至会造成整个应用不可用。

比如最近就有个这样的需求,我作为客户端要向kafka生产数据,而kafka的消费者则再源源不断的消费数据,并将消费的数据全部请求到web服务器,虽说做了负载(有4台web服务器)但业务数据的量也是巨大的,每秒钟可能有上万条数据产生。如果生产者直接生产数据的话极有可能把web服务器拖垮。

关于分布式限流,这几点你必须掌握!

 

对此就必须要做限流处理,每秒钟生产一定限额的数据到kafka,这样就能极大程度的保证web的正常运转。

其实不管处理何种场景,本质都是降低流量保证应用的高可用。

常见算法

对于限流常见有两种算法:

  • 漏桶算法
  • 令牌桶算法

漏桶算法比较简单,就是将流量放入桶中,漏桶同时也按照一定的速率流出,如果流量过快的话就会溢出(漏桶并不会提高流出速率),溢出的流量则直接丢弃。

如下图所示:

关于分布式限流,这几点你必须掌握!

 

这种做法简单粗暴。

漏桶算法虽说简单,但却不能应对实际场景,比如突然暴增的流量。

这时就需要用到令牌桶算法:

令牌桶会以一个恒定的速率向固定容量大小桶中放入令牌,当有流量来时则取走一个或多个令牌。当桶中没有令牌则将当前请求丢弃或阻塞。

关于分布式限流,这几点你必须掌握!

 

相比之下令牌桶可以应对一定的突发流量。

RateLimiter实现

对于令牌桶的代码实现,可以直接使用Guava包中的RateLimiter。

@Override 
public BaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) { //调用远程服务 OrderNoReqVO vo = new OrderNoReqVO() ; vo.setReqNo(userReqVO.getReqNo()); RateLimiter limiter = RateLimiter.create(2.0) ; //批量调用 for (int i = 0 ;i< 10 ; i++){ double acquire = limiter.acquire(); logger.debug("获取令牌成功!,消耗=" + acquire); BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo); logger.debug("远程返回:"+JSON.toJSONString(orderNo)); } UserRes userRes = new UserRes() ; userRes.setUserId(123); userRes.setUserName("张三"); userRes.setReqNo(userReqVO.getReqNo()); userRes.setCode(StatusEnum.SUCCESS.getCode()); userRes.setMessage("成功"); return userRes ; 
} 

详见此。

调用结果如下:

代码可以看出以每秒向桶中放入两个令牌,请求一次消耗一个令牌。所以每秒钟只能发送两个请求。按照图中的时间来看也确实如此(返回值是获取此令牌所消耗的时间,差不多也是每500ms一个)。

使用RateLimiter有几个值得注意的地方:

允许先消费,后付款,意思就是它可以来一个请求的时候一次性取走几个或者是剩下所有的令牌甚至多取,但是后面的请求就得为上一次请求买单,它需要等待桶中的令牌补齐之后才能继续获取令牌。

总结

针对于单个应用的限流 RateLimiter 够用了,如果是分布式环境可以借助 Redis 来完成。

来做演示。

在 Order 应用提供的接口中采取了限流。首先是配置了限流工具的 Bean:

@Configuration 
public class RedisLimitConfig { @Value("${redis.limit}") private int limit; @Autowired private JedisConnectionFactory jedisConnectionFactory; @Bean public RedisLimit build() { RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection(); JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection(); RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster) .limit(limit) .build(); return redisLimit; } 
} 

接着在 Controller 使用组件:

@Autowired 
private RedisLimit redisLimit ; 
@Override 
@CheckReqNo 
public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) { BaseResponse<OrderNoResVO> res = new BaseResponse(); //限流 boolean limit = redisLimit.limit(); if (!limit){ res.setCode(StatusEnum.REQUEST_LIMIT.getCode()); res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage()); return res ; } res.setReqNo(orderNoReq.getReqNo()); if (null == orderNoReq.getAppId()){ throw new SBCException(StatusEnum.FAIL); } OrderNoResVO orderNoRes = new OrderNoResVO() ; orderNoRes.setOrderId(DateUtil.getLongTime()); res.setCode(StatusEnum.SUCCESS.getCode()); res.setMessage(StatusEnum.SUCCESS.getMessage()); res.setDataBody(orderNoRes); return res ; 
} 

为了方便使用,也提供了注解:

@Override 
@ControllerLimit 
public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) { BaseResponse<OrderNoResVO> res = new BaseResponse(); // 业务逻辑 return res ; 
} 

该注解拦截了 http 请求,会在请求达到阈值时直接返回。

普通方法也可使用:

@CommonLimit 
public void doSomething(){} 

会在调用达到阈值时抛出异常。

为了模拟并发,在 User 应用中开启了 10 个线程调用 Order(限流次数为5) 接口(也可使用专业的并发测试工具 JMeter 等)。

@Override 
public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) { //调用远程服务 OrderNoReqVO vo = new OrderNoReqVO(); vo.setAppId(1L); vo.setReqNo(userReq.getReqNo()); for (int i = 0; i < 10; i++) { executorService.execute(new Worker(vo, orderServiceClient)); } UserRes userRes = new UserRes(); userRes.setUserId(123); userRes.setUserName("张三"); userRes.setReqNo(userReq.getReqNo()); userRes.setCode(StatusEnum.SUCCESS.getCode()); userRes.setMessage("成功"); return userRes; 
} 
private static class Worker implements Runnable { private OrderNoReqVO vo; private OrderServiceClient orderServiceClient; public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) { this.vo = vo; this.orderServiceClient = orderServiceClient; } @Override public void run() { BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo); logger.info("远程返回:" + JSON.toJSONString(orderNo)); } 
} 

为了验证分布式效果启动了两个 Order 应用。

关于分布式限流,这几点你必须掌握!

 

效果如下:

关于分布式限流,这几点你必须掌握!

 

关于分布式限流,这几点你必须掌握!

 

关于分布式限流,这几点你必须掌握!

 

实现原理

实现原理其实很简单。既然要达到分布式全局限流的效果,那自然需要一个第三方组件来记录请求的次数。

其中 Redis 就非常适合这样的场景。

  • 每次请求时将当前时间(精确到秒)作为 Key 写入到 Redis 中,超时时间设置为 2 秒,Redis 将该 Key 的值进行自增。
  • 当达到阈值时返回错误。
  • 写入 Redis 的操作用 Lua 脚本来完成,利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性。

Lua 脚本如下:

--lua 下标从 1 开始-- 限流 keylocal key = KEYS[1]-- 限流大小local limit = tonumber(ARGV[1])-- 获取当前流量大小local curentLimit = tonumber(redis.call('get', key) or "0")if curentLimit + 1 > limit then -- 达到限流大小 返回 return 0;else -- 没有达到阈值 value + 1 redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1end

Java 中的调用逻辑:

--lua 下标从 1 开始 
-- 限流 key 
local key = KEYS[1] 
-- 限流大小 
local limit = tonumber(ARGV[1]) 
-- 获取当前流量大小 
local curentLimit = tonumber(redis.call('get', key) or "0") 
if curentLimit + 1 > limit then -- 达到限流大小 返回 return 0; 
else -- 没有达到阈值 value + 1 redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1 
end 

所以只需要在需要限流的地方调用该方法对返回值进行判断即可达到限流的目的。

当然这只是利用 Redis 做了一个粗暴的计数器,如果想实现类似于上文中的令牌桶算法可以基于 Lua 自行实现。

Builder 构建器

在设计这个组件时想尽量的提供给使用者清晰、可读性、不易出错的 API。

比如第一步,如何构建一个限流对象。

最常用的方式自然就是构造函数,如果有多个域则可以采用重叠构造器的方式:

public A(){} 
public A(int a){} 
public A(int a,int b){} 

缺点也是显而易见的:如果参数过多会导致难以阅读,甚至如果参数类型一致的情况下客户端颠倒了顺序,但不会引起警告从而出现难以预测的结果。

第二种方案可以采用 JavaBean 模式,利用 setter 方法进行构建:

A a = new A(); 
a.setA(a); 
a.setB(b); 

这种方式清晰易读,但却容易让对象处于不一致的状态,使对象处于线程不安全的状态。

所以这里采用了第三种创建对象的方式,构建器:

public class RedisLimit { private JedisCommands jedis; private int limit = 200; private static final int FAIL_CODE = 0; /** * lua script */ private String script; private RedisLimit(Builder builder) { this.limit = builder.limit ; this.jedis = builder.jedis ; buildScript(); } /** * limit traffic * @return if true */ public boolean limit() { String key = String.valueOf(System.currentTimeMillis() / 1000); Object result = null; if (jedis instanceof Jedis) { result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); } else if (jedis instanceof JedisCluster) { result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); } else { //throw new RuntimeException("instance is error") ; return false; } if (FAIL_CODE != (Long) result) { return true; } else { return false; } } /** * read lua script */ private void buildScript() { script = ScriptUtil.getScript("limit.lua"); } /** * the builder * @param <T> */ public static class Builder<T extends JedisCommands>{ private T jedis = null ; private int limit = 200; public Builder(T jedis){ this.jedis = jedis ; } public Builder limit(int limit){ this.limit = limit ; return this; } public RedisLimit build(){ return new RedisLimit(this) ; } } 
} 

这样客户端在使用时:

RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster) .limit(limit) .build(); 

更加的简单直接,并且避免了将创建过程分成了多个子步骤。

这在有多个构造参数,但又不是必选字段时很有作用。

因此顺便将分布式锁的构建器方式也一并更新了:

https://github.com/crossoverJie/distributed-redis-tool#features

API

从上文可以看出,使用过程就是调用 limit 方法。

//限流 boolean limit = redisLimit.limit(); if (!limit){ //具体限流逻辑 } 

为了减少侵入性,也为了简化客户端提供了两种注解方式。

@ControllerLimit

该注解可以作用于 @RequestMapping 修饰的接口中,并会在限流后提供限流响应。

实现如下:

@Component 
public class WebIntercept extends WebMvcConfigurerAdapter { private static Logger logger = LoggerFactory.getLogger(WebIntercept.class); @Autowired private RedisLimit redisLimit; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new CustomInterceptor()) .addPathPatterns("/**"); } private class CustomInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (redisLimit == null) { throw new NullPointerException("redisLimit is null"); } if (handler instanceof HandlerMethod) { HandlerMethod method = (HandlerMethod) handler; ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class); if (annotation == null) { //skip return true; } boolean limit = redisLimit.limit(); if (!limit) { logger.warn("request has bean limit"); response.sendError(500, "request limit"); return false; } } return true; } } 
} 

其实就是实现了 SpringMVC 中的拦截器,并在拦截过程中判断是否有使用注解,从而调用限流逻辑。

前提是应用需要扫描到该类,让 Spring 进行管理。

@ComponentScan(value = "com.crossoverjie.distributed.intercept") 

@CommonLimit

当然也可以在普通方法中使用。实现原理则是 Spring AOP (SpringMVC 的拦截器本质也是 AOP)。

@Aspect 
@Component 
@EnableAspectJAutoProxy(proxyTargetClass = true) 
public class CommonAspect { private static Logger logger = LoggerFactory.getLogger(CommonAspect.class); @Autowired private RedisLimit redisLimit ; @Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)") private void check(){} @Before("check()") public void before(JoinPoint joinPoint) throws Exception { if (redisLimit == null) { throw new NullPointerException("redisLimit is null"); } boolean limit = redisLimit.limit(); if (!limit) { logger.warn("request has bean limit"); throw new RuntimeException("request has bean limit") ; } } 
} 

很简单,也是在拦截过程中调用限流。

当然使用时也得扫描到该包:

@ComponentScan(value = "com.crossoverjie.distributed.intercept") 

总结

限流在一个高并发大流量的系统中是保护应用的一个利器,成熟的方案也很多,希望对刚了解这一块的朋友提供一些思路。

这篇关于关于分布式限流,这几点你必须掌握!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1026606

相关文章

Eureka高可用注册中心registered-replicas没有分布式注册中心

自己在学习过程中发现,如果Eureka挂掉了,其他的Client就跑不起来了,那既然是商业项目,还是要处理好这个问题,所以决定用《Spring Cloud微服务实战》(PDF版在全栈技术交流群中自行获取)中说的“高可用注册中心”。 一开始我yml的配置是这样的 server:port: 8761eureka:instance:hostname: 127.0.0.1client:fetch-r

一道经典Python程序样例带你飞速掌握Python的字典和列表

Python中的列表(list)和字典(dict)是两种常用的数据结构,它们在数据组织和存储方面有很大的不同。 列表(List) 列表是Python中的一种有序集合,可以随时添加和删除其中的元素。列表中的元素可以是任何数据类型,包括数字、字符串、其他列表等。列表使用方括号[]表示,元素之间用逗号,分隔。 定义和使用 # 定义一个列表 fruits = ['apple', 'banana

[分布式网络通讯框架]----Zookeeper客户端基本操作----ls、get、create、set、delete

Zookeeper数据结构 zk客户端常用命令 进入客户端 在bin目录下输入./zkCli.sh 查看根目录下数据ls / 注意:要查看哪一个节点,必须把路径写全 查看节点数据信息 get /第一行代码数据,没有的话表示没有数据 创建节点create /sl 20 /sl为节点的路径,20为节点的数据 注意,不能跨越创建,也就是说,创建sl2的时候,必须确保sl

【Qt6.3 基础教程 16】 掌握Qt中的时间和日期:QTimer和QDateTime的高效应用

文章目录 前言QTimer:定时任务的强大工具QTimer的基本用法高级特性:单次定时器 QDateTime:处理日期和时间获取当前日期和时间日期和时间的格式化输出日期和时间计算 用例:创建一个倒计时应用结论 前言 在开发桌面应用程序时,处理时间和日期是一个常见且重要的任务。Qt框架提供了强大的工具来处理与时间相关的功能,其中QTimer和QDateTime是最核心的类。本

[分布式网络通讯框架]----ZooKeeper下载以及Linux环境下安装与单机模式部署(附带每一步截图)

首先进入apache官网 点击中间的see all Projects->Project List菜单项进入页面 找到zookeeper,进入 在Zookeeper主页的顶部点击菜单Project->Releases,进入Zookeeper发布版本信息页面,如下图: 找到需要下载的版本 进行下载既可,这里我已经下载过3.4.10,所以以下使用3.4.10进行演示其他的步骤。

分布式事务的解决方案(一)

前言应用场景 事务必须满足传统事务的特性,即原子性,一致性,分离性和持久性。但是分布式事务处理过程中, 某些场地比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量必须减1吧,怎么保证? 在搜索广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外, 还得去商家账户表中找到这个商家并扣除广告费吧,怎么保证? 一 本地事务 以用户A

深入探索 Nuxt3 Composables:掌握目录架构与内置API的高效应用

title: 深入探索 Nuxt3 Composables:掌握目录架构与内置API的高效应用 date: 2024/6/23 updated: 2024/6/23 author: cmdragon excerpt: 摘要:“本文深入探讨了Nuxt3 Composables,重点介绍了其目录架构和内置API的高效应用。通过学习本文,读者将能够更好地理解和利用Nuxt3 Composabl

必须记住的CSS选择器

选择器(selector)是CSS中很重要的概念,所有HTML语言中的标记都是通过不同的CSS选择器进行控制的。用户只需要通过选择器对不同的HTML标签进行控制,并赋予各种样式声明,即可实现各种效果。  1. * * { margin: 0; padding: 0; }   星号选择器用于选取页面中的所有元素,可用于快速清除所有元素的 margin 与 padding,但最好只在

分布式锁实现方案-基于Redis实现的分布式锁

目录 一、基于Lua+看门狗实现 1.1 缓存实体 1.2 延迟队列存储实体 1.3 分布式锁RedisDistributedLockWithDog 1.4 看门狗线程续期 1.5 测试类 1.6 测试结果 1.7 总结 二、RedLock分布式锁 2.1 Redlock分布式锁简介 2.2 RedLock测试例子 2.3 RedLock 加锁核心源码分析 2.4

分布式,容错:10台电脑坏了2台

由10台电脑组成的分布式系统,随机、任意坏了2台,剩下的8台电脑仍然储存着全部信息,可以继续服务。这是怎么做到的? 设N台电脑,坏了H台,要保证上述性质,需要有冗余,总的存储量降低为1/(H+1)。例如: H=1,随机坏1台,总容量变为1/2; H=2,随机坏2台,总容量变为1/3; 特别地,H=0,总容量不变; H=N-1,总容量变为1/N,这时,每台电脑都储存着全部信息,保证任意坏了N-1台