本文主要是介绍redis+lua实现分布式限流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
redis+lua实现分布式限流
文章目录
- redis+lua实现分布式限流
- 为什么使用redis+lua实现分布式限流
- 使用ZSET也可以实现限流,为什么选择lua的方式
- 实现
- 依赖
- lua脚本
- yaml
- 代码实现
- Jmeter压测
为什么使用redis+lua实现分布式限流
- 原子性:通过Lua脚本执行限流逻辑,所有操作在一个原子上下文中完成,避免了多步操作导致的并发问题。
- 灵活性:Lua脚本可以编写复杂的逻辑,比如滑动窗口限流,易于扩展和定制化。
- 性能:由于所有逻辑在Redis服务器端执行,减少了网络往返,提高了执行效率。
使用ZSET也可以实现限流,为什么选择lua的方式
使用zset需要额度解决这些问题
- 并发控制:需要额外的逻辑来保证操作的原子性和准确性,可能需要配合Lua脚本或Lua脚本+WATCH/MULTI/EXEC模式来实现。
- 资源消耗:长期存储请求记录可能导致Redis占用更多的内存资源。
为什么redis+zset不能保证原子性和准确性
- 多步骤操作:滑动窗口限流通常需要执行多个步骤,比如检查当前窗口的请求次数、添加新的请求记录、可能还需要删除过期的请求记录等。这些操作如果分开执行,就有可能在多线程或多进程环境下出现不一致的情况。
- 非原子性复合操作:虽然单个Redis命令是原子的,但当你需要执行一系列操作来维持限流状态时(例如,先检查计数、再增加计数、最后可能还要删除旧记录),没有一个单一的Redis命令能完成这些复合操作。如果在这系列操作之间有其他客户端修改了数据,就会导致限流不准确。
- 竞争条件:在高并发环境下,多个客户端可能几乎同时执行限流检查和增加请求的操作,如果没有适当的同步机制,可能会导致请求计数错误。
实现
依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.6.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.kang</groupId><artifactId>rate-limiter-project</artifactId><version>0.0.1-SNAPSHOT</version><name>rate-limiter-project</name><description>rate-limiter-project</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.2</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.0.1-jre</version> <!-- 请检查最新版本 --></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
lua脚本
-- KEYS[1] 是Redis中存储计数的key,,,
local key = KEYS[1]-- ARGV[1]是当前时间戳-[当前时间戳]
local now = tonumber(ARGV[1])-- ARGV[2]是最大请求次数-[最大请求次数]
local maxRequests = tonumber(ARGV[2])-- ARGV[3]是时间窗口长度-[时间窗口长度]
local windowSize = tonumber(ARGV[3])-- 获取当前时间窗口的起始时间
local windowStart = math.floor(now / windowSize) * windowSize-- 构建时间窗口内的key,用于区分不同窗口的计数
local windowKey = key .. ':' .. tostring(windowStart)-- 获取当前窗口的计数
local currentCount = tonumber(redis.call('get', windowKey) or '0')-- 如果当前时间不在窗口内,重置计数
if now > windowStart + windowSize thenredis.call('del', windowKey)currentCount = 0
end-- 检查是否超过限制
if currentCount + 1 <= maxRequests then-- 未超过,增加计数并返回成功,并设置键的过期时间为窗口剩余时间,以自动清理过期数据。如果超过最大请求次数,则拒绝请求redis.call('set', windowKey, currentCount + 1, 'EX', windowSize - (now - windowStart))return 1 -- 成功
elsereturn 0 -- 失败
end
yaml
server:port: 10086spring:redis:host: 127.0.0.1port: 6379database: 0lettuce:pool:max-active: 20max-idle: 10min-idle: 5
代码实现
启动类
package com.kang.limter;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@Slf4j
@SpringBootApplication
public class RateLimiterProjectApplication {public static void main(String[] args) {SpringApplication.run(RateLimiterProjectApplication.class, args);log.info("RateLimiterProjectApplication start success");}}
CacheConfig
package com.kang.limter.cache;import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.kang.limter.utils.LuaScriptUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_LUA_SCRIPT_PATH;/*** @Author Emperor Kang* @ClassName CacheConfig* @Description 缓存配置* @Date 2024/6/13 10:07* @Version 1.0* @Motto 让营地比你来时更干净*/
@Slf4j
@Configuration
public class CacheConfig {/*** 缓存配置,加载lua脚本* @return*/@Bean(name = "rateLimiterLuaCache")public LoadingCache<String, String> rateLimiterLuaCache() {LoadingCache<String, String> cache = CacheBuilder.newBuilder()// 设置缓存的最大容量,最多100个键值对.maximumSize(100)// 设置缓存项过期策略:写入后2小时过期.expireAfterWrite(2, TimeUnit.HOURS)// 缓存统计信息记录.recordStats()// 构建缓存加载器,用于加载缓存项的值.build(new CacheLoader<String, String>() {@Overridepublic String load(String scriptPath) throws Exception {try {return LuaScriptUtils.loadLuaScript(scriptPath);} catch (Exception e) {log.error("加载lua脚本失败:{}", e.getMessage());return null;}}});// 预热缓存warmUpCache(cache);return cache;}/*** 预热缓存*/private void warmUpCache(LoadingCache<String, String> cache) {try {// 假设我们有一个已知的脚本列表需要预热List<String> knownScripts = Collections.singletonList(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH);for (String script : knownScripts) {String luaScript = LuaScriptUtils.loadLuaScript(script);// 手动初始化缓存cache.put(script, luaScript);log.info("预加载Lua脚本成功: {}, length: {}", script, luaScript.length());}} catch (Exception e) {log.error("预加载Lua脚本失败: {}", e.getMessage(), e);}}
}
- 这里使用缓存预热加快lua脚本的加载速度,基于JVM内存操作,所以很快
SystemConstant
package com.kang.limter.constant;/*** @Author Emperor Kang* @ClassName SystemConstant* @Description 系统常量* @Date 2024/6/12 19:25* @Version 1.0* @Motto 让营地比你来时更干净*/
public class SystemConstant {/*** 限流配置缓存key前缀*/public static final String REDIS_RATE_LIMITER_KEY_PREFIX = "outreach:config:limiter:%s";/*** 限流lua脚本路径*/public static final String REDIS_RATE_LIMITER_LUA_SCRIPT_PATH = "classpath:lua/rate_limiter.lua";
}
RateLimiterController
package com.kang.limter.controller;import com.kang.limter.dto.RateLimiterRequestDto;
import com.kang.limter.utils.RateLimiterUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import static java.lang.Thread.sleep;/*** @Author Emperor Kang* @ClassName RateLimiterController* @Description TODO* @Date 2024/6/12 19:33* @Version 1.0* @Motto 让营地比你来时更干净*/
@Slf4j
@RestController
@RequestMapping("/rate/limiter")
public class RateLimiterController {@Autowiredprivate RateLimiterUtil rateLimiterUtil;@PostMapping("/test")public String test(@RequestBody RateLimiterRequestDto rateLimiterRequestDto) {// 是否限流if (!rateLimiterUtil.tryAcquire(rateLimiterRequestDto.getInterfaceCode(), 5, 1000)) {log.info("触发限流策略,InterfaceCode:{}", rateLimiterRequestDto.getInterfaceCode());return "我被限流了InterfaceCode:" + rateLimiterRequestDto.getInterfaceCode();}log.info("请求参数:{}", rateLimiterRequestDto);try {log.info("开始加工逻辑");sleep(1000);} catch (InterruptedException e) {log.error("休眠异常");Thread.currentThread().interrupt();return "加工异常";}return "加工成功,成功返回";}
}
RateLimiterRequestDto
package com.kang.limter.dto;import lombok.Data;/*** @Author Emperor Kang* @ClassName RateLimiterRequestDto* @Description TODO* @Date 2024/6/12 19:39* @Version 1.0* @Motto 让营地比你来时更干净*/
@Data
public class RateLimiterRequestDto {/*** 接口编码*/private String interfaceCode;
}
ResourceLoaderException
package com.kang.limter.exception;/*** @Author Emperor Kang* @ClassName ResourceLoaderException* @Description 自定义资源加载异常* @Date 2024/6/12 18:10* @Version 1.0* @Motto 让营地比你来时更干净*/
public class ResourceLoaderException extends Exception{public ResourceLoaderException() {super();}public ResourceLoaderException(String message) {super(message);}public ResourceLoaderException(String message, Throwable cause) {super(message, cause);}public ResourceLoaderException(Throwable cause) {super(cause);}protected ResourceLoaderException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {super(message, cause, enableSuppression, writableStackTrace);}
}
LuaScriptUtils
package com.kang.limter.utils;import com.kang.limter.exception.ResourceLoaderException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.util.Assert;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;@Slf4j
public class LuaScriptUtils {/*** 从类路径下读取Lua脚本内容。* @param scriptPath 类路径下的Lua脚本文件路径* @return Lua脚本的文本内容*/public static String loadLuaScript(String scriptPath) throws ResourceLoaderException {Assert.notNull(scriptPath, "script path must not be null");try {// 读取lua脚本ResourceLoader resourceLoader = new DefaultResourceLoader();Resource resource = resourceLoader.getResource(scriptPath);try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream(), StandardCharsets.UTF_8))) {StringBuilder scriptBuilder = new StringBuilder();String line;while ((line = reader.readLine()) != null) {scriptBuilder.append(line).append("\n");}String lua = scriptBuilder.toString();log.debug("读取的lua脚本为: {}", lua);return lua;}} catch (Exception e) {log.error("Failed to load Lua script from path: {}", scriptPath, e);throw new ResourceLoaderException("Failed to load Lua script from path: " + scriptPath, e);}}
}
RateLimiterUtil
package com.kang.limter.utils;import com.google.common.cache.LoadingCache;
import com.kang.limter.exception.ResourceLoaderException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;import java.nio.charset.StandardCharsets;import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_KEY_PREFIX;
import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_LUA_SCRIPT_PATH;/*** @Author Emperor Kang* @ClassName RateLimiterUtil* @Description 限流工具类* @Date 2024/6/12 17:56* @Version 1.0* @Motto 让营地比你来时更干净*/
@Slf4j
@Component
public class RateLimiterUtil {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowired@Qualifier("rateLimiterLuaCache")private LoadingCache<String, String> rateLimiterLuaCache;/*** @param interfaceCode 接口标识* @param maxRequests 最大请求数* @param windowSizeMs 窗口大小* @return boolean* @Description 尝试获取令牌* @Author Emperor Kang* @Date 2024/6/12 17:57* @Version 1.0*/public boolean tryAcquire(String interfaceCode, int maxRequests, long windowSizeMs) {try {long currentTimeMillis = System.currentTimeMillis();String luaScript = rateLimiterLuaCache.get(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH);log.info("缓存查询lua,length={}", luaScript.length());if(StringUtils.isBlank(luaScript)){log.info("从缓存中未获取到lua脚本,尝试手动读取");luaScript = LuaScriptUtils.loadLuaScript(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH);}// 二次确认if(StringUtils.isBlank(luaScript)){log.info("lua脚本加载失败,暂时放弃获取许可,不再限流");return true;}// 限流核心逻辑String finalLuaScript = luaScript;Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {// 用于存储的keybyte[] key = String.format(REDIS_RATE_LIMITER_KEY_PREFIX, interfaceCode).getBytes(StandardCharsets.UTF_8);// 当前时间(毫秒)byte[] now = String.valueOf(currentTimeMillis).getBytes(StandardCharsets.UTF_8);// 最大请求数byte[] maxRequestsBytes = String.valueOf(maxRequests).getBytes(StandardCharsets.UTF_8);// 窗口大小byte[] windowSizeBytes = String.valueOf(windowSizeMs).getBytes(StandardCharsets.UTF_8);// 执行lua脚本return connection.eval(finalLuaScript.getBytes(StandardCharsets.UTF_8), ReturnType.INTEGER, 1, key, now, maxRequestsBytes, windowSizeBytes);});Assert.notNull(result, "执行lua脚本响应结果为null");// 获取结果return result == 1L;} catch (ResourceLoaderException e) {log.error("加载lua脚本失败", e);} catch (Exception e){log.error("执行限流逻辑异常", e);}return true;}
}
lua脚本
-- KEYS[1] 是Redis中存储计数的key,,,
local key = KEYS[1]-- ARGV[1]是当前时间戳-[当前时间戳]
local now = tonumber(ARGV[1])-- ARGV[2]是最大请求次数-[最大请求次数]
local maxRequests = tonumber(ARGV[2])-- ARGV[3]是时间窗口长度-[时间窗口长度]
local windowSize = tonumber(ARGV[3])-- 获取当前时间窗口的起始时间
local windowStart = math.floor(now / windowSize) * windowSize-- 构建时间窗口内的key,用于区分不同窗口的计数
local windowKey = key .. ':' .. tostring(windowStart)-- 获取当前窗口的计数
local currentCount = tonumber(redis.call('get', windowKey) or '0')-- 如果当前时间不在窗口内,重置计数
if now > windowStart + windowSize thenredis.call('del', windowKey)currentCount = 0
end-- 检查是否超过限制
if currentCount + 1 <= maxRequests then-- 未超过,增加计数并返回成功,并设置键的过期时间为窗口剩余时间,以自动清理过期数据。如果超过最大请求次数,则拒绝请求redis.call('set', windowKey, currentCount + 1, 'EX', windowSize - (now - windowStart))return 1 -- 成功
elsereturn 0 -- 失败
end
Jmeter压测
- 200次请求/s,限流了195,而我们设置的最大令牌数就是5
这篇关于redis+lua实现分布式限流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!