本文主要是介绍redis+lua实现分布式限流的示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《redis+lua实现分布式限流的示例》本文主要介绍了redis+lua实现分布式限流的示例,可以实现复杂的限流逻辑,如滑动窗口限流,并且避免了多步操作导致的并发问题,具有一定的参考价值,感兴趣的可...
为什么使用redis+lua实现分布式限流
- 原子性:通过Lua脚本执行限流逻辑,所有操作在一个原子上下文中完成,避免了多步操作导致的并发问题。
- 灵活性:Lua脚本可以编写复杂的逻辑,比如滑动窗口限流,易于扩展和定制化。
- 性能:由于所有逻辑在Redis服务器端执行,减少了网络往返,提高了执行效率。
使用ZSET也可以实现限流,为什么选择lua的方式
使用zset需要额度解决这些问题
- 并发控制:需要额外的逻辑来保证操作的原子性和准确性,可能需要配合Lua脚本或Lua脚本+WATCH/MULTI/EXEC模式来实现。
- 资源消耗:长期存储请求记录可能导致Redis占用更多的内存资源。
为什么redis+zset不能保证原子性和准确性
- 多步骤操作:滑动窗口限流通常需要执行多个步骤,比如检查当前窗口的请求次数、添加新的请求记录、可能还需要删除过期的请求记录等。这些操作如果分开执行,就有可能在多线程或多进程环境下出现不一致的情况。
- 非原子性复合操China编程作:虽然单个Redis命令是原子的,但当你需要执行一系列操作来维持限流状态时(例如,先检查计数、再增加计数、最后可能还要删除旧记录),没有一个单一的Redis命令能完成这些复合操作。如果在这系列操作之间有其他客户端修改了数据,就会导致限流不准确。
- 竞争条件:在高并发环境下,多个客户端可能几乎同时执行限流检查和增加请求的操作,如果没有适当的同步机制,可能会导致请求计数错误。
实现
依赖
<?XML version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.kang</groupId> <artifactId>rate-limiter-project</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rate-limiter-project</name> <description>rate-limiter-project</description> <properties> <Java.version>8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.0.1-jre</version> <!-- 请检查最新版本 --> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
lua脚本
-- KEYS[1] 是Redis中存储计数的key,,, local key = KEYS[1] -- ARGV[1]是当前时间戳-[当前时间戳] local now = tonumber(ARGV[1]) -- ARGV[2]是最大请求次数-[最大请求次数] local maxRequests = tonumber(ARGV[2]) -- ARGV[3]是时间窗口长度-[时间窗口长度] local Windowsize = tonumber(ARGV[3]) -- 获取当前时间窗口的起始时间 local windowStart = math.floor(now / windowSize) * windowSize -- 构建时间窗口内的key,用于区分不同窗口的计数 local windowKey = key .. ':' .. tostring(windowStart) -- 获取当前窗口的计数 local currentCount = tonumber(redis.call('get', windowKey) or '0') -- 如果当前时间不在窗口内,重置计数 if now > windowStart + windowSize then redis.call('del', windowKey) currentCount = 0 end -- 检查是否超过限制 if currentCount + 1 <= maxRequests then -- 未超过,增加计数并返回成功,并设置键的过期时间为窗口剩余时间,以自动清理过期数据。如果超过最大请求次数,则拒绝请求 redis.call('set', windowKey, currentCount + 1, 'EX', windowSize - (now - windowStart)) return 1 -- 成功 else return 0 -- 失败 end
yaml
server: port: 10086 spring: redis: host: 127.0.0.1 port: 6379 database: 0 lettuce: pool: max-active: 20 max-idle: 10 min-idle: 5
代码实现
启动类
package com.kang.limter; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @Slf4j @SpringBootApplication public class RateLimiterProjectApplication { public static void main(String[] args) { SpringApplication.run(RateLimiterProjectApplication.class, args); log.info("RateLimiterProjectApplication start success"); } }
CacheConfig
package com.kang.limter.cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.kang.limter.utils.LuaScriptUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_LUA_SCRIPT_PATH; /** * @Author Emperor Kang * @ClassName CacheConfig * @Description 缓存配置 * @Date 2024/6/13 10:07 * @Version 1.0 * @Motto 让营地比你来时更干净 */ @Slf4j @Configuration public class CacheConfig { /** * 缓存配置,加载lua脚本 * @return */ @Bean(name = "rateLimiterLuaCache") public LoadingCache<String, String> rateLimiterLuaCache() { LoadingCache<String, String> cache = CacheBuilder.newBuilder() // 设置缓存的最大容量,最多100个键值对 .maximumSize(100) // 设置缓存项过期策略:写入后2小时过期 .expireAfterWrite(2, TimeUnit.HOURS) // 缓存统计信息记录 .recordStats() // 构建缓存加载器,用于加载缓存项的值 .build(new CacheLoader<String, String>() { @Override public String load(String scriptPath) throws Exception { try { return LuaScriptUtils.loadLuaScript(scriptPath); } catch (Exception e) { log.error("加载lua脚本失败:{}", e.getMessage()); return null; } } }); // 预热缓存 warmUpCache(cache); return cache; } /** * 预热缓存 */ private void warmUpCache(LoadingCache<String, String> cache) { try { // 假设我们有一个已知的脚本列表需要预热 List<String> knownScripts = Collections.singletonList(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH); for (String script : knownScripts) { String luaScript = LuaScriptUtils.loadLuaScript(script); // 手动初始化缓存 cache.put(script, luaScript); log.info("预加载Lua脚本成功: {}, length: {}", script, luaScript.length()); } } catch (Exception e) { log.error("预加载Lua脚本失败: {}", e.getMessage(), e); } } }
- 这里使用缓存预热加快lua脚本的加载速度,基于JVM内存操作,所以很快
SystemConstant
package com.kang.limter.constant; /** * @Author Emperor Kang * @ClassName SystemConstant * @Description 系统常量 * @Date 2024/6/12 19:25 * @Version 1.0 * @Motto 让营地比你来时更干净 */ public class SystemConstant { /** * 限流配置缓存key前缀 */ public static final String REDIS_RATE_LIMITER_KEY_PREFIX = "outreach:config:limiter:%s"; /** * 限流lua脚本路径 */ public static final String REDIS_RATE_LIMITER_LUA_SCRIPT_PATH = "classpath:lua/rate_limiter.lua"; }
RateLimiterController
package com.kang.limter.controller; import com.kang.limter.dto.RateLimiterRequestDto; import com.kang.limter.utils.RateLimiterUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import static java.lang.Thread.sleep; /** * @Author Emperor Kang * @ClassName RateLimiterController * @Description TODO * @Date 2024/6/12 19:33 * @Version 1.0 * @Motto 让营地比你来时更干净 */ @Slf4j @RestController @RequestMapping("/rate/limiter") public class RateLimiterController { @Autowired private RateLimiterUtil rateLimiterUtil; @PostMapping("/test") public String test(@RequestBody RateLimiterRequestDto rateLimiterRequestDto) { // 是否限流 if (!rateLimiterUtil.tr编程yAcquire(rateLimiterRequestDto.getInterfaceCode(), 5, 1000)) { log.info("触发限流策略,InterfaceCode:{}", rateLimiterRequestDto.getInterfaceCode()); return "我被限流了InterfaceCode:" + rateLimiterRequestDto.getInterfaceCode(); } log.info("请求参数:{}", rateLimiterRequestDto); try { log.info("开始加工逻辑"); sleep(1000); } catch (InterruptedException e) { log.error("休眠异常"); Thread.currentThread().interrupt(); return "加工异常"; } return "加工成功,成功返回"; } }
RateLimiterRequestDto
package com.kang.limter.dto; import lombok.Data; /** * @Author Emperor Kang * @ClassName RateLimiterRequestDto * @Description TODO * @Date 2024/6/12 19:39 * @Version 1.0 * @Motto 让营地比你来时更干净 */ @Data public class RateLimiterRequestDto { /** * 接口编码 */ private String interfaceCode; }
ResourceLoaderException
package com.kang.limter.exception; /** * @Author Emperor Kang * @ClassName ResourceLoaderException * @Description 自定义资源加载异常 * @Date 2024/6/12 18:10 * @Version 1.0 * @Motto 让营地比你来时更干净 */ public class ResourceLoaderException extends Exception{ public ResourceLoaderException() { super(); } public ResourceLoaderException(String message) { super(message); } public ResourceLoaderException(String message, Throwable cause) { super(messagepmEWRPR, cause); } public ResourceLoaderException(Throwable cause) { super(cause); } protected ResourceLoaderException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } }
LuaScriptUtils
package com.kang.limter.utils; import com.kang.limter.exception.ResourceLoaderException; import lombok.extern.slf4j.Slf4j; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.util.Assert; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; @Slf4j public class LuaScriptUtils { /** * 从类路径下读取Lua脚本内容。 * @param scriptPath 类路径下的Lua脚本文件路径 * @return Lua脚本的文本内容 */ public static String loadLuaScript(String scriptPath) throws ResourceLoaderException { Assert.notNull(scriptPath, "script path must not be null"); try { // 读取lua脚本 ResourceLoader resourceLoader = new DefaultResourceLoader(); Resource resource = resourceLoader.getResource(scriptPath); try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream(), StandardCharsets.UTF_8))) { StringBuilder scriptBuilder = new StringBuilder(); String line; while ((line = reader.readLine()) != null) { scriptBuilder.append(line).append("\n"); } String lua = scriptBuilder.toString(); log.debug("读取的lua脚本为: {}", lua); return lua; } } catch (Exception e) { log.error("Failed to load Lua script from path: {}", scriptPath, e); throw new ResourceLoaderException("Failed to load Lua script from path: " + scriptPath, e); } } }
RateLimiterUtil
package com.kang.limter.utils;
import com.google.common.cache.LoadingCache;
import com.kang.limter.exception.ResourceLoaderException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.nio.charset.StandardCharsets;
import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_KEY_PREFIX;
impowww.chinasem.cnrt static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_LUA_SCRIPT_PATH;
/**
* @Author Emperor Kang
* @ClassName RateLimiterUtil
* @Description 限流工具类
* @Date 2024/6/12 17:56
* @Version 1.0
* @Motto 让营地比你来时更干净
*/
@Slf4j
@Component
public class RateLimiterUtil {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
@Qualifier("rateLimiterLuaCache")
private LoadingCache<String, String> rateLimiterLuaCache;
/**
* @param interfaceCode 接口标识
* @param maxRequests 最大请求数
* @param windowSizeMs 窗口大小
* @return boolean
* @Description 尝试获取令牌
* @Author Emperor Kang
* @Date 2024/6/12 17:57
* @Version 1.0
*/
public boolean tryAcquire(String interfaceCode, int maxRequests, long windowSizeMs) {
try {
long currentTimeMillis = System.currentTimeMillis();
String luaScript = rateLimiterLuaCache.get(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH);
log.info("缓存查询lua,length={}", luaScript.length());
if(StringUtils.isBlank(luaScript)){
log.info("从缓存中未获取到lua脚本,尝试手动读取");
luaScript = LuaScriptUtils.loadLuaScript(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH);
}
// 二次确认
if(StringUtils.isBlank(luaScript)){
log.info("lua脚本加载失败,暂时放弃获取许可,不再限流");
return true;
}
// 限流核心逻辑
String finalLuaScript = luaScript;
Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
// 用于存储的key
byte[] key = String.format(REDIS_RATE_LIMITER_KEY_PREFIX, interfaceCode).getBytes(StandardCharsets.UTF_8);
// 当前时间(毫秒)
byte[] now = String.valueOf(currentTimeMillis).getBytes(StandardCharsets.UTF_8);
// 最大请求数
byte[] maxRequestsBytes = String.valueOf(maxRequests).getBytes(StandardCharsets.UTF_8);
// 窗口大小
byte[] windowSizeBytes = String.valueOf(windowSizeMs).getBytes(StandardCharsets.UTF_8);
// 执行lua脚本
return connection.eval(finalLuaScript.getBytes(StandardCharsets.UTF_8), ReturnType.INTEGER, 1, key, now, maxRequestsBytes, windowSizeBytes);
});
Assert.notNull(result, "执行lua脚本响应结果为null");
// 获取结果
return result == 1L;
} catch (ResourceLoaderException e) {
log.error("加载lua脚本失败", e);
} catch (Exception e){
log.error("执行限流逻辑异常", e);
}
return true;
}
}
lua脚本
-- KEYS[1] 是Redis中存储计数的key,,, local key = KEYS[1] -- ARGV[1]是当前时间戳-[当前时间戳] local now = tonumber(ARGV[1]) -- ARGV[2]是最大请求次数-[最大请求次数] local maxRequests = tonumber(ARGV[2]) -- ARGV[3]是时间窗口长度-[时间窗口长度] local windowSize = tonumber(ARGV[3]) -- 获取当前时间窗口的起始时间 local windowStart = math.flohttp://www.chinasem.cnor(now / windowSize) * windowSize -- 构建时间窗口内的key,用于区分不同窗口的计数 local windowKey = key .. ':' .. tostring(windowStart) -- 获取当前窗口的计数 local currentCount = tonumber(redis.call('get', windowKey) or '0') -- 如果当前时间不在窗口内,重置计数 if now > windowStart + windowSize then redis.call('del', windowKey) currentCount = 0 end -- 检查是否超过限制 if currentCount + 1 <= maxRequests then -- 未超过,增加计数并返回成功,并设置键的过期时间为窗口剩余时间,以自动清理过期数据。如果超过最大请求次数,则拒绝请求 redis.call('set', windowKey, currentCount + 1, 'EX', windowSize - (now - windowStart)) return 1 -- 成功 else return 0 -- 失败 end
Jmeter压测
200次请求/s,限流了195,而我们设置的最大令牌数就是5
到此这篇关于redis+lua实现分布式限流的示例的文章就介绍到这了,更多相关redis+lua分布式限流内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于redis+lua实现分布式限流的示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!