关于分布式限流,这几点你必须掌握!

2024-06-03 09:32

本文主要是介绍关于分布式限流,这几点你必须掌握!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

小刀爱编程 2019-04-25 13:40:27

前言

在一个高并发系统中对流量的把控是非常重要的,当巨大的流量直接请求到我们的服务器上没多久就可能造成接口不可用,不处理的话甚至会造成整个应用不可用。

比如最近就有个这样的需求,我作为客户端要向kafka生产数据,而kafka的消费者则再源源不断的消费数据,并将消费的数据全部请求到web服务器,虽说做了负载(有4台web服务器)但业务数据的量也是巨大的,每秒钟可能有上万条数据产生。如果生产者直接生产数据的话极有可能把web服务器拖垮。

关于分布式限流,这几点你必须掌握!

 

对此就必须要做限流处理,每秒钟生产一定限额的数据到kafka,这样就能极大程度的保证web的正常运转。

其实不管处理何种场景,本质都是降低流量保证应用的高可用。

常见算法

对于限流常见有两种算法:

  • 漏桶算法
  • 令牌桶算法

漏桶算法比较简单,就是将流量放入桶中,漏桶同时也按照一定的速率流出,如果流量过快的话就会溢出(漏桶并不会提高流出速率),溢出的流量则直接丢弃。

如下图所示:

关于分布式限流,这几点你必须掌握!

 

这种做法简单粗暴。

漏桶算法虽说简单,但却不能应对实际场景,比如突然暴增的流量。

这时就需要用到令牌桶算法:

令牌桶会以一个恒定的速率向固定容量大小桶中放入令牌,当有流量来时则取走一个或多个令牌。当桶中没有令牌则将当前请求丢弃或阻塞。

关于分布式限流,这几点你必须掌握!

 

相比之下令牌桶可以应对一定的突发流量。

RateLimiter实现

对于令牌桶的代码实现,可以直接使用Guava包中的RateLimiter。

@Override 
public BaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) { //调用远程服务 OrderNoReqVO vo = new OrderNoReqVO() ; vo.setReqNo(userReqVO.getReqNo()); RateLimiter limiter = RateLimiter.create(2.0) ; //批量调用 for (int i = 0 ;i< 10 ; i++){ double acquire = limiter.acquire(); logger.debug("获取令牌成功!,消耗=" + acquire); BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo); logger.debug("远程返回:"+JSON.toJSONString(orderNo)); } UserRes userRes = new UserRes() ; userRes.setUserId(123); userRes.setUserName("张三"); userRes.setReqNo(userReqVO.getReqNo()); userRes.setCode(StatusEnum.SUCCESS.getCode()); userRes.setMessage("成功"); return userRes ; 
} 

详见此。

调用结果如下:

代码可以看出以每秒向桶中放入两个令牌,请求一次消耗一个令牌。所以每秒钟只能发送两个请求。按照图中的时间来看也确实如此(返回值是获取此令牌所消耗的时间,差不多也是每500ms一个)。

使用RateLimiter有几个值得注意的地方:

允许先消费,后付款,意思就是它可以来一个请求的时候一次性取走几个或者是剩下所有的令牌甚至多取,但是后面的请求就得为上一次请求买单,它需要等待桶中的令牌补齐之后才能继续获取令牌。

总结

针对于单个应用的限流 RateLimiter 够用了,如果是分布式环境可以借助 Redis 来完成。

来做演示。

在 Order 应用提供的接口中采取了限流。首先是配置了限流工具的 Bean:

@Configuration 
public class RedisLimitConfig { @Value("${redis.limit}") private int limit; @Autowired private JedisConnectionFactory jedisConnectionFactory; @Bean public RedisLimit build() { RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection(); JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection(); RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster) .limit(limit) .build(); return redisLimit; } 
} 

接着在 Controller 使用组件:

@Autowired 
private RedisLimit redisLimit ; 
@Override 
@CheckReqNo 
public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) { BaseResponse<OrderNoResVO> res = new BaseResponse(); //限流 boolean limit = redisLimit.limit(); if (!limit){ res.setCode(StatusEnum.REQUEST_LIMIT.getCode()); res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage()); return res ; } res.setReqNo(orderNoReq.getReqNo()); if (null == orderNoReq.getAppId()){ throw new SBCException(StatusEnum.FAIL); } OrderNoResVO orderNoRes = new OrderNoResVO() ; orderNoRes.setOrderId(DateUtil.getLongTime()); res.setCode(StatusEnum.SUCCESS.getCode()); res.setMessage(StatusEnum.SUCCESS.getMessage()); res.setDataBody(orderNoRes); return res ; 
} 

为了方便使用,也提供了注解:

@Override 
@ControllerLimit 
public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) { BaseResponse<OrderNoResVO> res = new BaseResponse(); // 业务逻辑 return res ; 
} 

该注解拦截了 http 请求,会在请求达到阈值时直接返回。

普通方法也可使用:

@CommonLimit 
public void doSomething(){} 

会在调用达到阈值时抛出异常。

为了模拟并发,在 User 应用中开启了 10 个线程调用 Order(限流次数为5) 接口(也可使用专业的并发测试工具 JMeter 等)。

@Override 
public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) { //调用远程服务 OrderNoReqVO vo = new OrderNoReqVO(); vo.setAppId(1L); vo.setReqNo(userReq.getReqNo()); for (int i = 0; i < 10; i++) { executorService.execute(new Worker(vo, orderServiceClient)); } UserRes userRes = new UserRes(); userRes.setUserId(123); userRes.setUserName("张三"); userRes.setReqNo(userReq.getReqNo()); userRes.setCode(StatusEnum.SUCCESS.getCode()); userRes.setMessage("成功"); return userRes; 
} 
private static class Worker implements Runnable { private OrderNoReqVO vo; private OrderServiceClient orderServiceClient; public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) { this.vo = vo; this.orderServiceClient = orderServiceClient; } @Override public void run() { BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo); logger.info("远程返回:" + JSON.toJSONString(orderNo)); } 
} 

为了验证分布式效果启动了两个 Order 应用。

关于分布式限流,这几点你必须掌握!

 

效果如下:

关于分布式限流,这几点你必须掌握!

 

关于分布式限流,这几点你必须掌握!

 

关于分布式限流,这几点你必须掌握!

 

实现原理

实现原理其实很简单。既然要达到分布式全局限流的效果,那自然需要一个第三方组件来记录请求的次数。

其中 Redis 就非常适合这样的场景。

  • 每次请求时将当前时间(精确到秒)作为 Key 写入到 Redis 中,超时时间设置为 2 秒,Redis 将该 Key 的值进行自增。
  • 当达到阈值时返回错误。
  • 写入 Redis 的操作用 Lua 脚本来完成,利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性。

Lua 脚本如下:

--lua 下标从 1 开始-- 限流 keylocal key = KEYS[1]-- 限流大小local limit = tonumber(ARGV[1])-- 获取当前流量大小local curentLimit = tonumber(redis.call('get', key) or "0")if curentLimit + 1 > limit then -- 达到限流大小 返回 return 0;else -- 没有达到阈值 value + 1 redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1end

Java 中的调用逻辑:

--lua 下标从 1 开始 
-- 限流 key 
local key = KEYS[1] 
-- 限流大小 
local limit = tonumber(ARGV[1]) 
-- 获取当前流量大小 
local curentLimit = tonumber(redis.call('get', key) or "0") 
if curentLimit + 1 > limit then -- 达到限流大小 返回 return 0; 
else -- 没有达到阈值 value + 1 redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1 
end 

所以只需要在需要限流的地方调用该方法对返回值进行判断即可达到限流的目的。

当然这只是利用 Redis 做了一个粗暴的计数器,如果想实现类似于上文中的令牌桶算法可以基于 Lua 自行实现。

Builder 构建器

在设计这个组件时想尽量的提供给使用者清晰、可读性、不易出错的 API。

比如第一步,如何构建一个限流对象。

最常用的方式自然就是构造函数,如果有多个域则可以采用重叠构造器的方式:

public A(){} 
public A(int a){} 
public A(int a,int b){} 

缺点也是显而易见的:如果参数过多会导致难以阅读,甚至如果参数类型一致的情况下客户端颠倒了顺序,但不会引起警告从而出现难以预测的结果。

第二种方案可以采用 JavaBean 模式,利用 setter 方法进行构建:

A a = new A(); 
a.setA(a); 
a.setB(b); 

这种方式清晰易读,但却容易让对象处于不一致的状态,使对象处于线程不安全的状态。

所以这里采用了第三种创建对象的方式,构建器:

public class RedisLimit { private JedisCommands jedis; private int limit = 200; private static final int FAIL_CODE = 0; /** * lua script */ private String script; private RedisLimit(Builder builder) { this.limit = builder.limit ; this.jedis = builder.jedis ; buildScript(); } /** * limit traffic * @return if true */ public boolean limit() { String key = String.valueOf(System.currentTimeMillis() / 1000); Object result = null; if (jedis instanceof Jedis) { result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); } else if (jedis instanceof JedisCluster) { result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); } else { //throw new RuntimeException("instance is error") ; return false; } if (FAIL_CODE != (Long) result) { return true; } else { return false; } } /** * read lua script */ private void buildScript() { script = ScriptUtil.getScript("limit.lua"); } /** * the builder * @param <T> */ public static class Builder<T extends JedisCommands>{ private T jedis = null ; private int limit = 200; public Builder(T jedis){ this.jedis = jedis ; } public Builder limit(int limit){ this.limit = limit ; return this; } public RedisLimit build(){ return new RedisLimit(this) ; } } 
} 

这样客户端在使用时:

RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster) .limit(limit) .build(); 

更加的简单直接,并且避免了将创建过程分成了多个子步骤。

这在有多个构造参数,但又不是必选字段时很有作用。

因此顺便将分布式锁的构建器方式也一并更新了:

https://github.com/crossoverJie/distributed-redis-tool#features

API

从上文可以看出,使用过程就是调用 limit 方法。

//限流 boolean limit = redisLimit.limit(); if (!limit){ //具体限流逻辑 } 

为了减少侵入性,也为了简化客户端提供了两种注解方式。

@ControllerLimit

该注解可以作用于 @RequestMapping 修饰的接口中,并会在限流后提供限流响应。

实现如下:

@Component 
public class WebIntercept extends WebMvcConfigurerAdapter { private static Logger logger = LoggerFactory.getLogger(WebIntercept.class); @Autowired private RedisLimit redisLimit; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new CustomInterceptor()) .addPathPatterns("/**"); } private class CustomInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (redisLimit == null) { throw new NullPointerException("redisLimit is null"); } if (handler instanceof HandlerMethod) { HandlerMethod method = (HandlerMethod) handler; ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class); if (annotation == null) { //skip return true; } boolean limit = redisLimit.limit(); if (!limit) { logger.warn("request has bean limit"); response.sendError(500, "request limit"); return false; } } return true; } } 
} 

其实就是实现了 SpringMVC 中的拦截器,并在拦截过程中判断是否有使用注解,从而调用限流逻辑。

前提是应用需要扫描到该类,让 Spring 进行管理。

@ComponentScan(value = "com.crossoverjie.distributed.intercept") 

@CommonLimit

当然也可以在普通方法中使用。实现原理则是 Spring AOP (SpringMVC 的拦截器本质也是 AOP)。

@Aspect 
@Component 
@EnableAspectJAutoProxy(proxyTargetClass = true) 
public class CommonAspect { private static Logger logger = LoggerFactory.getLogger(CommonAspect.class); @Autowired private RedisLimit redisLimit ; @Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)") private void check(){} @Before("check()") public void before(JoinPoint joinPoint) throws Exception { if (redisLimit == null) { throw new NullPointerException("redisLimit is null"); } boolean limit = redisLimit.limit(); if (!limit) { logger.warn("request has bean limit"); throw new RuntimeException("request has bean limit") ; } } 
} 

很简单,也是在拦截过程中调用限流。

当然使用时也得扫描到该包:

@ComponentScan(value = "com.crossoverjie.distributed.intercept") 

总结

限流在一个高并发大流量的系统中是保护应用的一个利器,成熟的方案也很多,希望对刚了解这一块的朋友提供一些思路。

这篇关于关于分布式限流,这几点你必须掌握!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1026606

相关文章

全面掌握 SQL 中的 DATEDIFF函数及用法最佳实践

《全面掌握SQL中的DATEDIFF函数及用法最佳实践》本文解析DATEDIFF在不同数据库中的差异,强调其边界计算原理,探讨应用场景及陷阱,推荐根据需求选择TIMESTAMPDIFF或inte... 目录1. 核心概念:DATEDIFF 究竟在计算什么?2. 主流数据库中的 DATEDIFF 实现2.1

Jenkins分布式集群配置方式

《Jenkins分布式集群配置方式》:本文主要介绍Jenkins分布式集群配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.安装jenkins2.配置集群总结Jenkins是一个开源项目,它提供了一个容易使用的持续集成系统,并且提供了大量的plugin满

Spring Boot 实现 IP 限流的原理、实践与利弊解析

《SpringBoot实现IP限流的原理、实践与利弊解析》在SpringBoot中实现IP限流是一种简单而有效的方式来保障系统的稳定性和可用性,本文给大家介绍SpringBoot实现IP限... 目录一、引言二、IP 限流原理2.1 令牌桶算法2.2 漏桶算法三、使用场景3.1 防止恶意攻击3.2 控制资源

Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)

《Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)》本文主要介绍了Golang分布式锁实现,采用Redis+Lua脚本确保原子性,持可重入和自动续期,用于防止超卖及重复下单,具有一定... 目录1 概念应用场景分布式锁必备特性2 思路分析宕机与过期防止误删keyLua保证原子性可重入锁自动

JDK9到JDK21中值得掌握的29个实用特性分享

《JDK9到JDK21中值得掌握的29个实用特性分享》Java的演进节奏从JDK9开始显著加快,每半年一个新版本的发布节奏为Java带来了大量的新特性,本文整理了29个JDK9到JDK21中值得掌握的... 目录JDK 9 模块化与API增强1. 集合工厂方法:一行代码创建不可变集合2. 私有接口方法:接口

基于MongoDB实现文件的分布式存储

《基于MongoDB实现文件的分布式存储》分布式文件存储的方案有很多,今天分享一个基于mongodb数据库来实现文件的存储,mongodb支持分布式部署,以此来实现文件的分布式存储,需要的朋友可以参考... 目录一、引言二、GridFS 原理剖析三、Spring Boot 集成 GridFS3.1 添加依赖

Redis实现分布式锁全解析之从原理到实践过程

《Redis实现分布式锁全解析之从原理到实践过程》:本文主要介绍Redis实现分布式锁全解析之从原理到实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景介绍二、解决方案(一)使用 SETNX 命令(二)设置锁的过期时间(三)解决锁的误删问题(四)Re

Gradle下如何搭建SpringCloud分布式环境

《Gradle下如何搭建SpringCloud分布式环境》:本文主要介绍Gradle下如何搭建SpringCloud分布式环境问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录Gradle下搭建SpringCloud分布式环境1.idea配置好gradle2.创建一个空的gr

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka