关于分布式限流,这几点你必须掌握!

2024-06-03 09:32

本文主要是介绍关于分布式限流,这几点你必须掌握!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

小刀爱编程 2019-04-25 13:40:27

前言

在一个高并发系统中对流量的把控是非常重要的,当巨大的流量直接请求到我们的服务器上没多久就可能造成接口不可用,不处理的话甚至会造成整个应用不可用。

比如最近就有个这样的需求,我作为客户端要向kafka生产数据,而kafka的消费者则再源源不断的消费数据,并将消费的数据全部请求到web服务器,虽说做了负载(有4台web服务器)但业务数据的量也是巨大的,每秒钟可能有上万条数据产生。如果生产者直接生产数据的话极有可能把web服务器拖垮。

关于分布式限流,这几点你必须掌握!

 

对此就必须要做限流处理,每秒钟生产一定限额的数据到kafka,这样就能极大程度的保证web的正常运转。

其实不管处理何种场景,本质都是降低流量保证应用的高可用。

常见算法

对于限流常见有两种算法:

  • 漏桶算法
  • 令牌桶算法

漏桶算法比较简单,就是将流量放入桶中,漏桶同时也按照一定的速率流出,如果流量过快的话就会溢出(漏桶并不会提高流出速率),溢出的流量则直接丢弃。

如下图所示:

关于分布式限流,这几点你必须掌握!

 

这种做法简单粗暴。

漏桶算法虽说简单,但却不能应对实际场景,比如突然暴增的流量。

这时就需要用到令牌桶算法:

令牌桶会以一个恒定的速率向固定容量大小桶中放入令牌,当有流量来时则取走一个或多个令牌。当桶中没有令牌则将当前请求丢弃或阻塞。

关于分布式限流,这几点你必须掌握!

 

相比之下令牌桶可以应对一定的突发流量。

RateLimiter实现

对于令牌桶的代码实现,可以直接使用Guava包中的RateLimiter。

@Override 
public BaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) { //调用远程服务 OrderNoReqVO vo = new OrderNoReqVO() ; vo.setReqNo(userReqVO.getReqNo()); RateLimiter limiter = RateLimiter.create(2.0) ; //批量调用 for (int i = 0 ;i< 10 ; i++){ double acquire = limiter.acquire(); logger.debug("获取令牌成功!,消耗=" + acquire); BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo); logger.debug("远程返回:"+JSON.toJSONString(orderNo)); } UserRes userRes = new UserRes() ; userRes.setUserId(123); userRes.setUserName("张三"); userRes.setReqNo(userReqVO.getReqNo()); userRes.setCode(StatusEnum.SUCCESS.getCode()); userRes.setMessage("成功"); return userRes ; 
} 

详见此。

调用结果如下:

代码可以看出以每秒向桶中放入两个令牌,请求一次消耗一个令牌。所以每秒钟只能发送两个请求。按照图中的时间来看也确实如此(返回值是获取此令牌所消耗的时间,差不多也是每500ms一个)。

使用RateLimiter有几个值得注意的地方:

允许先消费,后付款,意思就是它可以来一个请求的时候一次性取走几个或者是剩下所有的令牌甚至多取,但是后面的请求就得为上一次请求买单,它需要等待桶中的令牌补齐之后才能继续获取令牌。

总结

针对于单个应用的限流 RateLimiter 够用了,如果是分布式环境可以借助 Redis 来完成。

来做演示。

在 Order 应用提供的接口中采取了限流。首先是配置了限流工具的 Bean:

@Configuration 
public class RedisLimitConfig { @Value("${redis.limit}") private int limit; @Autowired private JedisConnectionFactory jedisConnectionFactory; @Bean public RedisLimit build() { RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection(); JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection(); RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster) .limit(limit) .build(); return redisLimit; } 
} 

接着在 Controller 使用组件:

@Autowired 
private RedisLimit redisLimit ; 
@Override 
@CheckReqNo 
public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) { BaseResponse<OrderNoResVO> res = new BaseResponse(); //限流 boolean limit = redisLimit.limit(); if (!limit){ res.setCode(StatusEnum.REQUEST_LIMIT.getCode()); res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage()); return res ; } res.setReqNo(orderNoReq.getReqNo()); if (null == orderNoReq.getAppId()){ throw new SBCException(StatusEnum.FAIL); } OrderNoResVO orderNoRes = new OrderNoResVO() ; orderNoRes.setOrderId(DateUtil.getLongTime()); res.setCode(StatusEnum.SUCCESS.getCode()); res.setMessage(StatusEnum.SUCCESS.getMessage()); res.setDataBody(orderNoRes); return res ; 
} 

为了方便使用,也提供了注解:

@Override 
@ControllerLimit 
public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) { BaseResponse<OrderNoResVO> res = new BaseResponse(); // 业务逻辑 return res ; 
} 

该注解拦截了 http 请求,会在请求达到阈值时直接返回。

普通方法也可使用:

@CommonLimit 
public void doSomething(){} 

会在调用达到阈值时抛出异常。

为了模拟并发,在 User 应用中开启了 10 个线程调用 Order(限流次数为5) 接口(也可使用专业的并发测试工具 JMeter 等)。

@Override 
public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) { //调用远程服务 OrderNoReqVO vo = new OrderNoReqVO(); vo.setAppId(1L); vo.setReqNo(userReq.getReqNo()); for (int i = 0; i < 10; i++) { executorService.execute(new Worker(vo, orderServiceClient)); } UserRes userRes = new UserRes(); userRes.setUserId(123); userRes.setUserName("张三"); userRes.setReqNo(userReq.getReqNo()); userRes.setCode(StatusEnum.SUCCESS.getCode()); userRes.setMessage("成功"); return userRes; 
} 
private static class Worker implements Runnable { private OrderNoReqVO vo; private OrderServiceClient orderServiceClient; public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) { this.vo = vo; this.orderServiceClient = orderServiceClient; } @Override public void run() { BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo); logger.info("远程返回:" + JSON.toJSONString(orderNo)); } 
} 

为了验证分布式效果启动了两个 Order 应用。

关于分布式限流,这几点你必须掌握!

 

效果如下:

关于分布式限流,这几点你必须掌握!

 

关于分布式限流,这几点你必须掌握!

 

关于分布式限流,这几点你必须掌握!

 

实现原理

实现原理其实很简单。既然要达到分布式全局限流的效果,那自然需要一个第三方组件来记录请求的次数。

其中 Redis 就非常适合这样的场景。

  • 每次请求时将当前时间(精确到秒)作为 Key 写入到 Redis 中,超时时间设置为 2 秒,Redis 将该 Key 的值进行自增。
  • 当达到阈值时返回错误。
  • 写入 Redis 的操作用 Lua 脚本来完成,利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性。

Lua 脚本如下:

--lua 下标从 1 开始-- 限流 keylocal key = KEYS[1]-- 限流大小local limit = tonumber(ARGV[1])-- 获取当前流量大小local curentLimit = tonumber(redis.call('get', key) or "0")if curentLimit + 1 > limit then -- 达到限流大小 返回 return 0;else -- 没有达到阈值 value + 1 redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1end

Java 中的调用逻辑:

--lua 下标从 1 开始 
-- 限流 key 
local key = KEYS[1] 
-- 限流大小 
local limit = tonumber(ARGV[1]) 
-- 获取当前流量大小 
local curentLimit = tonumber(redis.call('get', key) or "0") 
if curentLimit + 1 > limit then -- 达到限流大小 返回 return 0; 
else -- 没有达到阈值 value + 1 redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1 
end 

所以只需要在需要限流的地方调用该方法对返回值进行判断即可达到限流的目的。

当然这只是利用 Redis 做了一个粗暴的计数器,如果想实现类似于上文中的令牌桶算法可以基于 Lua 自行实现。

Builder 构建器

在设计这个组件时想尽量的提供给使用者清晰、可读性、不易出错的 API。

比如第一步,如何构建一个限流对象。

最常用的方式自然就是构造函数,如果有多个域则可以采用重叠构造器的方式:

public A(){} 
public A(int a){} 
public A(int a,int b){} 

缺点也是显而易见的:如果参数过多会导致难以阅读,甚至如果参数类型一致的情况下客户端颠倒了顺序,但不会引起警告从而出现难以预测的结果。

第二种方案可以采用 JavaBean 模式,利用 setter 方法进行构建:

A a = new A(); 
a.setA(a); 
a.setB(b); 

这种方式清晰易读,但却容易让对象处于不一致的状态,使对象处于线程不安全的状态。

所以这里采用了第三种创建对象的方式,构建器:

public class RedisLimit { private JedisCommands jedis; private int limit = 200; private static final int FAIL_CODE = 0; /** * lua script */ private String script; private RedisLimit(Builder builder) { this.limit = builder.limit ; this.jedis = builder.jedis ; buildScript(); } /** * limit traffic * @return if true */ public boolean limit() { String key = String.valueOf(System.currentTimeMillis() / 1000); Object result = null; if (jedis instanceof Jedis) { result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); } else if (jedis instanceof JedisCluster) { result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); } else { //throw new RuntimeException("instance is error") ; return false; } if (FAIL_CODE != (Long) result) { return true; } else { return false; } } /** * read lua script */ private void buildScript() { script = ScriptUtil.getScript("limit.lua"); } /** * the builder * @param <T> */ public static class Builder<T extends JedisCommands>{ private T jedis = null ; private int limit = 200; public Builder(T jedis){ this.jedis = jedis ; } public Builder limit(int limit){ this.limit = limit ; return this; } public RedisLimit build(){ return new RedisLimit(this) ; } } 
} 

这样客户端在使用时:

RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster) .limit(limit) .build(); 

更加的简单直接,并且避免了将创建过程分成了多个子步骤。

这在有多个构造参数,但又不是必选字段时很有作用。

因此顺便将分布式锁的构建器方式也一并更新了:

https://github.com/crossoverJie/distributed-redis-tool#features

API

从上文可以看出,使用过程就是调用 limit 方法。

//限流 boolean limit = redisLimit.limit(); if (!limit){ //具体限流逻辑 } 

为了减少侵入性,也为了简化客户端提供了两种注解方式。

@ControllerLimit

该注解可以作用于 @RequestMapping 修饰的接口中,并会在限流后提供限流响应。

实现如下:

@Component 
public class WebIntercept extends WebMvcConfigurerAdapter { private static Logger logger = LoggerFactory.getLogger(WebIntercept.class); @Autowired private RedisLimit redisLimit; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new CustomInterceptor()) .addPathPatterns("/**"); } private class CustomInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (redisLimit == null) { throw new NullPointerException("redisLimit is null"); } if (handler instanceof HandlerMethod) { HandlerMethod method = (HandlerMethod) handler; ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class); if (annotation == null) { //skip return true; } boolean limit = redisLimit.limit(); if (!limit) { logger.warn("request has bean limit"); response.sendError(500, "request limit"); return false; } } return true; } } 
} 

其实就是实现了 SpringMVC 中的拦截器,并在拦截过程中判断是否有使用注解,从而调用限流逻辑。

前提是应用需要扫描到该类,让 Spring 进行管理。

@ComponentScan(value = "com.crossoverjie.distributed.intercept") 

@CommonLimit

当然也可以在普通方法中使用。实现原理则是 Spring AOP (SpringMVC 的拦截器本质也是 AOP)。

@Aspect 
@Component 
@EnableAspectJAutoProxy(proxyTargetClass = true) 
public class CommonAspect { private static Logger logger = LoggerFactory.getLogger(CommonAspect.class); @Autowired private RedisLimit redisLimit ; @Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)") private void check(){} @Before("check()") public void before(JoinPoint joinPoint) throws Exception { if (redisLimit == null) { throw new NullPointerException("redisLimit is null"); } boolean limit = redisLimit.limit(); if (!limit) { logger.warn("request has bean limit"); throw new RuntimeException("request has bean limit") ; } } 
} 

很简单,也是在拦截过程中调用限流。

当然使用时也得扫描到该包:

@ComponentScan(value = "com.crossoverjie.distributed.intercept") 

总结

限流在一个高并发大流量的系统中是保护应用的一个利器,成熟的方案也很多,希望对刚了解这一块的朋友提供一些思路。

这篇关于关于分布式限流,这几点你必须掌握!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1026606

相关文章

PHP应用中处理限流和API节流的最佳实践

《PHP应用中处理限流和API节流的最佳实践》限流和API节流对于确保Web应用程序的可靠性、安全性和可扩展性至关重要,本文将详细介绍PHP应用中处理限流和API节流的最佳实践,下面就来和小编一起学习... 目录限流的重要性在 php 中实施限流的最佳实践使用集中式存储进行状态管理(如 Redis)采用滑动

Redis实现分布式锁全过程

《Redis实现分布式锁全过程》文章介绍Redis实现分布式锁的方法,包括使用SETNX和EXPIRE命令确保互斥性与防死锁,Redisson客户端提供的便捷接口,以及Redlock算法通过多节点共识... 目录Redis实现分布式锁1. 分布式锁的基本原理2. 使用 Redis 实现分布式锁2.1 获取锁

Redis分布式锁中Redission底层实现方式

《Redis分布式锁中Redission底层实现方式》Redission基于Redis原子操作和Lua脚本实现分布式锁,通过SETNX命令、看门狗续期、可重入机制及异常处理,确保锁的可靠性和一致性,是... 目录Redis分布式锁中Redission底层实现一、Redission分布式锁的基本使用二、Red

redis和redission分布式锁原理及区别说明

《redis和redission分布式锁原理及区别说明》文章对比了synchronized、乐观锁、Redis分布式锁及Redission锁的原理与区别,指出在集群环境下synchronized失效,... 目录Redis和redission分布式锁原理及区别1、有的同伴想到了synchronized关键字

基于Redisson实现分布式系统下的接口限流

《基于Redisson实现分布式系统下的接口限流》在高并发场景下,接口限流是保障系统稳定性的重要手段,本文将介绍利用Redisson结合Redis实现分布式环境下的接口限流,具有一定的参考价值,感兴趣... 目录分布式限流的核心挑战基于 Redisson 的分布式限流设计思路实现步骤引入依赖定义限流注解实现

分布式锁在Spring Boot应用中的实现过程

《分布式锁在SpringBoot应用中的实现过程》文章介绍在SpringBoot中通过自定义Lock注解、LockAspect切面和RedisLockUtils工具类实现分布式锁,确保多实例并发操作... 目录Lock注解LockASPect切面RedisLockUtils工具类总结在现代微服务架构中,分布

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

全面掌握 SQL 中的 DATEDIFF函数及用法最佳实践

《全面掌握SQL中的DATEDIFF函数及用法最佳实践》本文解析DATEDIFF在不同数据库中的差异,强调其边界计算原理,探讨应用场景及陷阱,推荐根据需求选择TIMESTAMPDIFF或inte... 目录1. 核心概念:DATEDIFF 究竟在计算什么?2. 主流数据库中的 DATEDIFF 实现2.1

Jenkins分布式集群配置方式

《Jenkins分布式集群配置方式》:本文主要介绍Jenkins分布式集群配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.安装jenkins2.配置集群总结Jenkins是一个开源项目,它提供了一个容易使用的持续集成系统,并且提供了大量的plugin满

Spring Boot 实现 IP 限流的原理、实践与利弊解析

《SpringBoot实现IP限流的原理、实践与利弊解析》在SpringBoot中实现IP限流是一种简单而有效的方式来保障系统的稳定性和可用性,本文给大家介绍SpringBoot实现IP限... 目录一、引言二、IP 限流原理2.1 令牌桶算法2.2 漏桶算法三、使用场景3.1 防止恶意攻击3.2 控制资源