redis+lua实现分布式限流的示例

2025-03-24 14:50

本文主要是介绍redis+lua实现分布式限流的示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《redis+lua实现分布式限流的示例》本文主要介绍了redis+lua实现分布式限流的示例,可以实现复杂的限流逻辑,如滑动窗口限流,并且避免了多步操作导致的并发问题,具有一定的参考价值,感兴趣的可...

为什么使用redis+lua实现分布式限流

  • 原子性:通过Lua脚本执行限流逻辑,所有操作在一个原子上下文中完成,避免了多步操作导致的并发问题。
  • 灵活性:Lua脚本可以编写复杂的逻辑,比如滑动窗口限流,易于扩展和定制化。
  • 性能:由于所有逻辑在Redis服务器端执行,减少了网络往返,提高了执行效率。

使用ZSET也可以实现限流,为什么选择lua的方式

使用zset需要额度解决这些问题

  • 并发控制:需要额外的逻辑来保证操作的原子性和准确性,可能需要配合Lua脚本或Lua脚本+WATCH/MULTI/EXEC模式来实现。
  • 资源消耗:长期存储请求记录可能导致Redis占用更多的内存资源。

为什么redis+zset不能保证原子性和准确性

  • 多步骤操作:滑动窗口限流通常需要执行多个步骤,比如检查当前窗口的请求次数、添加新的请求记录、可能还需要删除过期的请求记录等。这些操作如果分开执行,就有可能在多线程或多进程环境下出现不一致的情况。
  • 非原子性复合操China编程作:虽然单个Redis命令是原子的,但当你需要执行一系列操作来维持限流状态时(例如,先检查计数、再增加计数、最后可能还要删除旧记录),没有一个单一的Redis命令能完成这些复合操作。如果在这系列操作之间有其他客户端修改了数据,就会导致限流不准确。
  • 竞争条件:在高并发环境下,多个客户端可能几乎同时执行限流检查和增加请求的操作,如果没有适当的同步机制,可能会导致请求计数错误。

实现

依赖

<?XML version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.kang</groupId>
    <artifactId>rate-limiter-project</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rate-limiter-project</name>
    <description>rate-limiter-project</description>
    <properties>
        <Java.version>8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.6.2</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>31.0.1-jre</version> <!-- 请检查最新版本 -->
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

lua脚本

-- KEYS[1] 是Redis中存储计数的key,,,
local key = KEYS[1]

-- ARGV[1]是当前时间戳-[当前时间戳]
local now = tonumber(ARGV[1])

-- ARGV[2]是最大请求次数-[最大请求次数]
local maxRequests = tonumber(ARGV[2])

-- ARGV[3]是时间窗口长度-[时间窗口长度]
local Windowsize = tonumber(ARGV[3])

-- 获取当前时间窗口的起始时间
local windowStart = math.floor(now / windowSize) * windowSize

-- 构建时间窗口内的key,用于区分不同窗口的计数
local windowKey = key .. ':' .. tostring(windowStart)

-- 获取当前窗口的计数
local currentCount = tonumber(redis.call('get', windowKey) or '0')

-- 如果当前时间不在窗口内,重置计数
if now > windowStart + windowSize then
    redis.call('del', windowKey)
    currentCount = 0
end

-- 检查是否超过限制
if currentCount + 1 <= maxRequests then
    -- 未超过,增加计数并返回成功,并设置键的过期时间为窗口剩余时间,以自动清理过期数据。如果超过最大请求次数,则拒绝请求
    redis.call('set', windowKey, currentCount + 1, 'EX', windowSize - (now - windowStart))
    return 1 -- 成功
else
    return 0 -- 失败
end

yaml

server:
  port: 10086

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    lettuce:
      pool:
        max-active: 20
        max-idle: 10
        min-idle: 5

代码实现

redis+lua实现分布式限流的示例

启动类

package com.kang.limter;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@Slf4j
@SpringBootApplication
public class RateLimiterProjectApplication {

    public static void main(String[] args) {
        SpringApplication.run(RateLimiterProjectApplication.class, args);
        log.info("RateLimiterProjectApplication start success");
    }

}

CacheConfig

package com.kang.limter.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.kang.limter.utils.LuaScriptUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_LUA_SCRIPT_PATH;

/**
 * @Author Emperor Kang
 * @ClassName CacheConfig
 * @Description 缓存配置
 * @Date 2024/6/13 10:07
 * @Version 1.0
 * @Motto 让营地比你来时更干净
 */
@Slf4j
@Configuration
public class CacheConfig {

    /**
     * 缓存配置,加载lua脚本
     * @return
     */
    @Bean(name = "rateLimiterLuaCache")
    public LoadingCache<String, String> rateLimiterLuaCache() {
        LoadingCache<String, String> cache = CacheBuilder.newBuilder()
                // 设置缓存的最大容量,最多100个键值对
                .maximumSize(100)
                // 设置缓存项过期策略:写入后2小时过期
                .expireAfterWrite(2, TimeUnit.HOURS)
                // 缓存统计信息记录
                .recordStats()
                // 构建缓存加载器,用于加载缓存项的值
                .build(new CacheLoader<String, String>() {
                    @Override
                    public String load(String scriptPath) throws Exception {
                        try {
                            return LuaScriptUtils.loadLuaScript(scriptPath);
                        } catch (Exception e) {
                            log.error("加载lua脚本失败:{}", e.getMessage());
                            return null;
                        }
                    }
                });

        // 预热缓存
        warmUpCache(cache);

        return cache;
    }

    /**
     * 预热缓存
     */
    private void warmUpCache(LoadingCache<String, String> cache) {
        try {
            // 假设我们有一个已知的脚本列表需要预热
            List<String> knownScripts = Collections.singletonList(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH);
            for (String script : knownScripts) {
                String luaScript = LuaScriptUtils.loadLuaScript(script);
                // 手动初始化缓存
                cache.put(script, luaScript);
                log.info("预加载Lua脚本成功: {}, length: {}", script, luaScript.length());
            }
        } catch (Exception e) {
            log.error("预加载Lua脚本失败: {}", e.getMessage(), e);
        }
    }
}
  • 这里使用缓存预热加快lua脚本的加载速度,基于JVM内存操作,所以很快

SystemConstant

package com.kang.limter.constant;

/**
 * @Author Emperor Kang
 * @ClassName SystemConstant
 * @Description 系统常量
 * @Date 2024/6/12 19:25
 * @Version 1.0
 * @Motto 让营地比你来时更干净
 */
public class SystemConstant {
    /**
     * 限流配置缓存key前缀
     */
    public static final String REDIS_RATE_LIMITER_KEY_PREFIX = "outreach:config:limiter:%s";

    /**
     * 限流lua脚本路径
     */
    public static final String REDIS_RATE_LIMITER_LUA_SCRIPT_PATH = "classpath:lua/rate_limiter.lua";
}

RateLimiterController

package com.kang.limter.controller;

import com.kang.limter.dto.RateLimiterRequestDto;
import com.kang.limter.utils.RateLimiterUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import static java.lang.Thread.sleep;

/**
 * @Author Emperor Kang
 * @ClassName RateLimiterController
 * @Description TODO
 * @Date 2024/6/12 19:33
 * @Version 1.0
 * @Motto 让营地比你来时更干净
 */
@Slf4j
@RestController
@RequestMapping("/rate/limiter")
public class RateLimiterController {
    @Autowired
    private RateLimiterUtil rateLimiterUtil;

    @PostMapping("/test")
    public String test(@RequestBody RateLimiterRequestDto rateLimiterRequestDto) {
        // 是否限流
        if (!rateLimiterUtil.tr编程yAcquire(rateLimiterRequestDto.getInterfaceCode(), 5, 1000)) {
            log.info("触发限流策略,InterfaceCode:{}", rateLimiterRequestDto.getInterfaceCode());
            return "我被限流了InterfaceCode:" + rateLimiterRequestDto.getInterfaceCode();
        }

        log.info("请求参数:{}", rateLimiterRequestDto);

        try {
            log.info("开始加工逻辑");
            sleep(1000);
        } catch (InterruptedException e) {
            log.error("休眠异常");
            Thread.currentThread().interrupt();
            return "加工异常";
        }

        return "加工成功,成功返回";
    }
}

RateLimiterRequestDto

package com.kang.limter.dto;

import lombok.Data;

/**
 * @Author Emperor Kang
 * @ClassName RateLimiterRequestDto
 * @Description TODO
 * @Date 2024/6/12 19:39
 * @Version 1.0
 * @Motto 让营地比你来时更干净
 */
@Data
public class RateLimiterRequestDto {
    /**
     * 接口编码
     */
    private String interfaceCode;
}

ResourceLoaderException

package com.kang.limter.exception;

/**
 * @Author Emperor Kang
 * @ClassName ResourceLoaderException
 * @Description 自定义资源加载异常
 * @Date 2024/6/12 18:10
 * @Version 1.0
 * @Motto 让营地比你来时更干净
 */
public class ResourceLoaderException extends Exception{
    public ResourceLoaderException() {
        super();
    }

    public ResourceLoaderException(String message) {
        super(message);
    }

    public ResourceLoaderException(String message, Throwable cause) {
        super(messagepmEWRPR, cause);
    }

    public ResourceLoaderException(Throwable cause) {
        super(cause);
    }

    protected ResourceLoaderException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
    }
}

LuaScriptUtils

package com.kang.limter.utils;

import com.kang.limter.exception.ResourceLoaderException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.util.Assert;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

@Slf4j
public class LuaScriptUtils {

    /**
     * 从类路径下读取Lua脚本内容。
     * @param scriptPath 类路径下的Lua脚本文件路径
     * @return Lua脚本的文本内容
     */
    public static String loadLuaScript(String scriptPath) throws ResourceLoaderException {
        Assert.notNull(scriptPath, "script path must not be null");
        try {
            // 读取lua脚本
            ResourceLoader resourceLoader = new DefaultResourceLoader();
            Resource resource = resourceLoader.getResource(scriptPath);
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream(), StandardCharsets.UTF_8))) {
                StringBuilder scriptBuilder = new StringBuilder();
                String line;
                while ((line = reader.readLine()) != null) {
                    scriptBuilder.append(line).append("\n");
                }
                String lua = scriptBuilder.toString();
                log.debug("读取的lua脚本为: {}", lua);
                return lua;
            }
        } catch (Exception e) {
            log.error("Failed to load Lua script from path: {}", scriptPath, e);
            throw new ResourceLoaderException("Failed to load Lua script from path: " + scriptPath, e);
        }
    }
}

RateLimiterUtil

package com.kang.limter.utils;

import com.google.common.cache.LoadingCache;
import com.kang.limter.exception.ResourceLoaderException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import java.nio.charset.StandardCharsets;

import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_KEY_PREFIX;
impowww.chinasem.cnrt static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_LUA_SCRIPT_PATH;

/**
 * @Author Emperor Kang
 * @ClassName RateLimiterUtil
 * @Description 限流工具类
 * @Date 2024/6/12 17:56
 * @Version 1.0
 * @Motto 让营地比你来时更干净
 */
@Slf4j
@Component
public class RateLimiterUtil {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    @Qualifier("rateLimiterLuaCache")
    private LoadingCache<String, String> rateLimiterLuaCache;


    /**
     * @param interfaceCode 接口标识
     * @param maxRequests   最大请求数
     * @param windowSizeMs  窗口大小
     * @return boolean
     * @Description 尝试获取令牌
     * @Author Emperor Kang
     * @Date 2024/6/12 17:57
     * @Version 1.0
     */
    public boolean tryAcquire(String interfaceCode, int maxRequests, long windowSizeMs) {
        try {
            long currentTimeMillis = System.currentTimeMillis();

            String luaScript = rateLimiterLuaCache.get(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH);
            log.info("缓存查询lua,length={}", luaScript.length());

            if(StringUtils.isBlank(luaScript)){
                log.info("从缓存中未获取到lua脚本,尝试手动读取");
                luaScript = LuaScriptUtils.loadLuaScript(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH);
            }

            // 二次确认
            if(StringUtils.isBlank(luaScript)){
                log.info("lua脚本加载失败,暂时放弃获取许可,不再限流");
                return true;
            }

            // 限流核心逻辑
            String finalLuaScript = luaScript;
            Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
                // 用于存储的key
                byte[] key = String.format(REDIS_RATE_LIMITER_KEY_PREFIX, interfaceCode).getBytes(StandardCharsets.UTF_8);
                // 当前时间(毫秒)
                byte[] now = String.valueOf(currentTimeMillis).getBytes(StandardCharsets.UTF_8);
                // 最大请求数
                byte[] maxRequestsBytes = String.valueOf(maxRequests).getBytes(StandardCharsets.UTF_8);
                // 窗口大小
                byte[] windowSizeBytes = String.valueOf(windowSizeMs).getBytes(StandardCharsets.UTF_8);
                // 执行lua脚本
                return connection.eval(finalLuaScript.getBytes(StandardCharsets.UTF_8), ReturnType.INTEGER, 1, key, now, maxRequestsBytes, windowSizeBytes);
            });

            Assert.notNull(result, "执行lua脚本响应结果为null");

            // 获取结果
            return result == 1L;
        } catch (ResourceLoaderException e) {
            log.error("加载lua脚本失败", e);
        } catch (Exception e){
            log.error("执行限流逻辑异常", e);
        }
        return true;
    }
}

lua脚本

-- KEYS[1] 是Redis中存储计数的key,,,
local key = KEYS[1]

-- ARGV[1]是当前时间戳-[当前时间戳]
local now = tonumber(ARGV[1])

-- ARGV[2]是最大请求次数-[最大请求次数]
local maxRequests = tonumber(ARGV[2])

-- ARGV[3]是时间窗口长度-[时间窗口长度]
local windowSize = tonumber(ARGV[3])

-- 获取当前时间窗口的起始时间
local windowStart = math.flohttp://www.chinasem.cnor(now / windowSize) * windowSize

-- 构建时间窗口内的key,用于区分不同窗口的计数
local windowKey = key .. ':' .. tostring(windowStart)

-- 获取当前窗口的计数
local currentCount = tonumber(redis.call('get', windowKey) or '0')

-- 如果当前时间不在窗口内,重置计数
if now > windowStart + windowSize then
    redis.call('del', windowKey)
    currentCount = 0
end

-- 检查是否超过限制
if currentCount + 1 <= maxRequests then
    -- 未超过,增加计数并返回成功,并设置键的过期时间为窗口剩余时间,以自动清理过期数据。如果超过最大请求次数,则拒绝请求
    redis.call('set', windowKey, currentCount + 1, 'EX', windowSize - (now - windowStart))
    return 1 -- 成功
else
    return 0 -- 失败
end

Jmeter压测

redis+lua实现分布式限流的示例

redis+lua实现分布式限流的示例

200次请求/s,限流了195,而我们设置的最大令牌数就是5

到此这篇关于redis+lua实现分布式限流的示例的文章就介绍到这了,更多相关redis+lua分布式限流内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于redis+lua实现分布式限流的示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1153920

相关文章

使用PyTorch实现手写数字识别功能

《使用PyTorch实现手写数字识别功能》在人工智能的世界里,计算机视觉是最具魅力的领域之一,通过PyTorch这一强大的深度学习框架,我们将在经典的MNIST数据集上,见证一个神经网络从零开始学会识... 目录当计算机学会“看”数字搭建开发环境MNIST数据集解析1. 认识手写数字数据库2. 数据预处理的

Redis中管道操作pipeline的实现

《Redis中管道操作pipeline的实现》RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操... 目录什么是pipeline场景一:我要向Redis新增大批量的数据分批处理事务( MULTI/EXE

Python实现常用文本内容提取

《Python实现常用文本内容提取》在日常工作和学习中,我们经常需要从PDF、Word文档中提取文本,本文将介绍如何使用Python编写一个文本内容提取工具,有需要的小伙伴可以参考下... 目录一、引言二、文本内容提取的原理三、文本内容提取的设计四、文本内容提取的实现五、完整代码示例一、引言在日常工作和学

Python实战之屏幕录制功能的实现

《Python实战之屏幕录制功能的实现》屏幕录制,即屏幕捕获,是指将计算机屏幕上的活动记录下来,生成视频文件,本文主要为大家介绍了如何使用Python实现这一功能,希望对大家有所帮助... 目录屏幕录制原理图像捕获音频捕获编码压缩输出保存完整的屏幕录制工具高级功能实时预览增加水印多平台支持屏幕录制原理屏幕

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

SpringBoot3使用Jasypt实现加密配置文件

《SpringBoot3使用Jasypt实现加密配置文件》这篇文章主要为大家详细介绍了SpringBoot3如何使用Jasypt实现加密配置文件功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编... 目录一. 使用步骤1. 添加依赖2.配置加密密码3. 加密敏感信息4. 将加密信息存储到配置文件中5

Python实现自动化表单填写功能

《Python实现自动化表单填写功能》在Python中,自动化表单填写可以通过多种库和工具实现,本文将详细介绍常用的自动化表单处理工具,并对它们进行横向比较,可根据需求选择合适的工具,感兴趣的小伙伴跟... 目录1. Selenium简介适用场景示例代码优点缺点2. Playwright简介适用场景示例代码

SpringBoot整合jasypt实现重要数据加密

《SpringBoot整合jasypt实现重要数据加密》Jasypt是一个专注于简化Java加密操作的开源工具,:本文主要介绍详细介绍了如何使用jasypt实现重要数据加密,感兴趣的小伙伴可... 目录jasypt简介 jasypt的优点SpringBoot使用jasypt创建mapper接口配置文件加密

SpringBoot项目使用MDC给日志增加唯一标识的实现步骤

《SpringBoot项目使用MDC给日志增加唯一标识的实现步骤》本文介绍了如何在SpringBoot项目中使用MDC(MappedDiagnosticContext)为日志增加唯一标识,以便于日... 目录【Java】SpringBoot项目使用MDC给日志增加唯一标识,方便日志追踪1.日志效果2.实现步

Spring Boot 集成 Quartz 使用Cron 表达式实现定时任务

《SpringBoot集成Quartz使用Cron表达式实现定时任务》本文介绍了如何在SpringBoot项目中集成Quartz并使用Cron表达式进行任务调度,通过添加Quartz依赖、创... 目录前言1. 添加 Quartz 依赖2. 创建 Quartz 任务3. 配置 Quartz 任务调度4. 启