本文主要是介绍Redis消息队列实现异步秒杀功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re...
1 Redis消息队列
在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给 Redis 处理,并通过异步方式执行。Redis 提供了多种数据结构来实现消息队列,总结三种。
1.1 List 结构
- 原理:基于 List 结构模拟消息队列,使用
BRPUSH
生产消息,BRPOP
消费消息。 - 命令示例
- 生产消息:
BRPUSH key value [value ...]
,将一个或多个元素推入到指定列表的头部。如果列表不存在,会自动创建一个新的列表。 - 消费消息:
BRPOP key [key ...] timeout
,从指定的一个或多个列表中弹出最后一个元素。如果列表为空,该命令会导致客户端阻塞,直到有数据可用或超过指定的超时时间。
- 生产消息:
- 优缺点
- 优点:不会内存超限、可以持久化、消息有序性。
- 缺点:无法避免数据丢失、只支持单消费者。
1.2 Pub/Sub 模式
- 原理:发布订阅模式,基本的点对点消息模型,支持多生产、多消费者。
- 命令示例
- 生产消息:
PUBLISH channel message
,用于向指定频道发布一条消息。 - 消费消息
SUBSCRIBE channel [channel]
:订阅一个或多个频道。UNSUBSCRIBE [channel [channel ...]]
:取消订阅一个或多个频道。PSUBSCRIBE pattern [pattern ...]
:订阅一个或多个符合给定模式的频道,接收消息。PUNSUBSCRIBE [pattern [pattern ...]]
:取消订阅一个或多个符合给定模式的频道。
- 生产消息:
- 优缺点
- 优点:支持多生产、多消费者。
- 缺点:不支持持久化、无法避免数据丢失,消息堆积有上限(消费者会缓存消息),超出会丢失消息。
1.3 Stream 结构
- 原理:Redis 5.0 引入的专门为消息队列设计的数据类型编程,支持消息可回溯、一个消息可以被多个消费者消费、可以阻塞读取。
- 命令示例
- 生产消息:
XADD key *|ID value [value ...]
,向指定的 Stream 流中添加一个消息。例如:XADD users * name jack age 21
,创建名为users
的队列,并向其中发送一个消息,内容是{name=jack,age=21}
,使用 Redis 自动生成 ID。 - 消费消息:
XREAD [COUNT count] [block milliseconds] STREAMS key [key ...] ID ID
。例如:XREAD COUNT 1 STREAMS users 0
:读取users
队列中的第一条消息。XREAD COUNT 1 BLOCK 1000 STREAMS users $
:阻塞 1 秒钟后从users
队列中读取最新消息。
- 生产消息:
- 消费者组模式
- 特点:消息分流、消息标识、消息确认。
- 命令示例
XGROUP CREATE key groupName ID
:创建消费者组。XGROUP DESTORY key groupName
:删除指定的消费者组。XGROUP CREATECONSUMER key groupName consumerName
:给指定的消费者组添加消费者。XGROUP DELCONSUMER key groupName consumerName
:删除消费者组中指定消费者。XREADGROUP GROUP
:从消费者组中读取消息。
- 优缺点
- 优点:消息可回溯、可以多消费者争抢消息,加快消费速度、可以阻塞读取、没有消息漏读的风险、有消息确认机制,保证消息至少被消费一次。
- 缺点:有消息漏读的风险(单消费方式下)。
1.4 Redis Stream消息队列的特点
Redis 5.0引入的Stream
类型是专门为消息队列设计的,支持以下特性:
- 消息持久化:消息存储在内存中,支持持久化到磁盘,避免消息丢失。
- 消费者组(Consumer Group):
- 消息分流:一个队列可以被多个消费者组订阅,组内多个消费者分摊消息处理。
- 消息回溯:支持按消息ID回溯历史消息。
- 消息确认(ACK):消费者处理完消息后需确认,否则消息会进入
pending-list
等待重试。
- 阻塞读取:消费者可以阻塞等待新消息,减少CPU空转。
- 避免消息丢失:通过
pending-list
机制,确保消息至少被消费一次。
2 秒杀业务处理
2.1 使用Lua脚本处理库存和订单
目标:在Redis中完成库存判断和订单校验,确保原子性。
-- 参数:优惠券ID、用户ID、订单ID local voucherId = ARGV[1] local userId = ARGV[2] local orderId = ARGV[3] -- 库存Key和订单Key local stockKey = 'seckill:stock:' .. voucherId local orderKey = 'seckill:order:' .. voucherId -- 判断库存是否充足 if (tonumber(redis.call('GET', stockKey)) <= 0 then return 1 -- 库存不足 end -- 判断用户是否已下单 if (redis.call('SISMEMBER', orderKey, userId) == 1 then return 2 -- 用户已下单 end -- 扣减库存并记录订单 redis.call('DECR', stockKey) redis.call('SADD', orderKey, userId) -- 将订单信息发送到消息队列 redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0 -- 成功
脚本说明:
- 原子性操作:库存检查、订单校验、消息发送在一个脚本中完成。
- 消息发送:使用
XADD
将订单信息写入stream.orders
队列。
2.2 创建消费者组
XGROUP CREATE stream.orders g1 0 MKSTREAM
g1
:消费者组名称。
MKSTREAM
:如果队列不存在则自动创建。
2.3 Java代码实现
init
方法:在类初始化时创建消息队列,并启动一个线程任务从消息队列中获取订单信息。VoucherOrderHandler
类:实现Runnable
接口,作为线程任务,不断从消息队列中获取订单信息。如果获取成功,将消息转换为VoucherOrder
对象,调用handleVoucherOrder
方法处理订单,并进行 ACK 确认;如果出现异常,调用handlePendingList
方法处理异常消息。handlePendingList
方法:从pendingList
中获取订单信息,处理订单并进行 ACK 确认,直到pendingList
中没有消息。handleVoucherOrder
方法:使用 Redisson 分布式锁确保一人一单,调用代理对象的createVoucherOrder
方法创建订单。seckillVoucher
方法:执行 Lua 脚本判断用户是否具有秒杀资格,如果具有资格,将订单信息发送到消息队列,并返回下单成功信息。createVoucherOrder
方法:判断当前用户是否是第一单,如果是则扣减库存并将订单保存到数据库。
系统启动与初始化
系统启动时,VoucherOrderServiceImpl
类的 @PostConstruct
注解会触发 init
方法执行。该方法先加载创建消息队列的 Lua 脚本,通过 stringRedisTemplate.execute
方法执行脚本创建 Redis Stream 消息队列和消费者组。若创建成功或队列已存在,会记录相应日志。之后,使用线程池 SECKILL_ORDER_EXECUTOR
启动 VoucherOrderHandler
线程,该线程负责后续从消息队列获取订单信息并处理。
用户发起秒杀请求
用户发起秒杀请求后,系统调用 VoucherOrderServiceImpl
的 seckillVoucher
方法。此方法先从 ThreadLocalUtls
中获取用户 ID,用 redisIdworker
生成订单 ID。接着执行判断用户秒杀资格的 Lua 脚本,该脚本接收优惠券 ID、用户 ID 和订单 ID 作为参数。若脚本返回值表明库存不足或用户已下单,方法返回相应的失败提示;若返回值为 0,说明用户有秒杀资格,创建代理对象并返回下单成功结果。
Lua 脚本执行逻辑
Lua 脚本接收到参数后,根据优惠券 ID 拼接库存和订单的 Redis key。先通过 GET
命令获取库存,若库存小于等于 0 则返回 1 表示库存不足。若库存充足,使用 SISMEMBER
命令检查用户是否已下单,若已下单则返回 2。若库存充足且用户未下单,使用 INCRBY
命令扣减库存,SADD
命令记录订单信息,最后返回 0 表示下单成功。
订单处理线程工作
VoucherOrderHandler
线程启动后进入无限循环,不断从 Redis Stream 消息队列获取订单信息。若未获取到消息,继续下一次循环;若获取到消息,将消息转换为 VoucherOrder
对象,调用 handleVoucherOrder
方法处理订单,处理完成后向消息队列发送 ACK 确认消息。若处理过程中出现异常,调用 handlePendingList
方法处理异常消息。
订单处理方法 handleVoucherOrder
handleVoucherOrder
方法接收 VoucherOrder
对象,根据用户 ID 获取 Redisson 分布式锁。尝试获取锁,若失败记录错误日志并返回;若成功,调用代理对象的 createVoucherOrder
方法创建订单,最后释放锁。
订单创建方法 createVoucherOrder
该方法先判断当前用户是否是第一单,通过查询数据库中该用户的订单数量来判断。若不是第一单,记录错误日志并返回;若是第一单,尝试扣减秒杀券库存,若扣减失败抛出异常。若库存扣减成功,将订单信息保存到数据库,若保存失败也抛出异常。
@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; /** * 当前类初始化完毕就立马执行该方法 */ @PostConstruct private void init() { // 创建消息队列 DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>(); mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua")); mqScript.setResultType(Long.class); Long result = null; try { result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME); } catch (Exception e) { log.error("队列创建失败", e); return; } int r = result.intValue(); String info = r == 1 ? "队列创建成功" : "队列已存在"; log.debug(info); // 执行线程任务 SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } /** * 线程池 */ private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); /** * 队列名 */ private static final String QUEUE_NAME = "stream.orders"; /** * 组名 */ private static final String GROUP_NAME = "g1"; /** * 线程任务: 不断从消息队列中获取订单 */ private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1、从消息队列中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order > List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed()) ); // 2、判断消息获取是否成功 if (messageList == null || messageList.isEmpty()) { // 2.1 消息获取失败,说明没有消息,进入下一次循环获取消息 continue; } // 3、消息获取成功,可以下单 // 将消息转成VoucherOrder对象 MapRecord<String, Object, Object> record = messageList.get(0); Map<Object,China编程 Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); // 4、ACK确认 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.ehttp://www.chinasem.cnrror("处理订单异常", e); // 处理异常消息 handlePendingList(); } } } } private void handlePendingList() { while (true) { try { // 1、从pendingList中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0 List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.from("0")) ); // 2、判断pendingList中是否有效性 if (messageList == null || messageList.isEmpty()) { // 2.1 pendingList中没有消息,直接结束循环 break; } // 3、pendingList中有消息 // 将消息转成VoucherOrder对象 MapRecord<String, Object, Object> record = messageList.get(0); Map<Object, Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); // 4、ACK确认 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.error("处理订单异常", e); // 这里不用调自己,直接就进入下一次循环,再从pendingList中取,这里只需要休眠一下,防止获取消息太频繁 try { Thread.sleep(20); } catch (InterruptedException ex) { log.error("线程休眠异常", ex); } } } } /** * 创建订单 * * @param voucherOrder */ private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId); boolean isLock = lock.tryLock(); if (!isLock) { // 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息) log.error("一人只能下一单"); return; } try { // 创建订单(使用代理对象调用,是为了确保事务生效) proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } /** * 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本 */ private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } /** * VoucherOrderServiceImpl类的代理对象 * 将代理对象的作用域进行提升,方面子线程取用 */ private IVoucherOrderService proxy; /** * 抢购秒杀券 * * @param voucherId * @return */ @Transactional @Override public Result seckillVoucher(Long voucherId) { Long userId = ThreadLocalUtls.getUser().getId(); long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER); // 1、执行Lua脚本,判断用户是否具有秒杀资格 Long result = null; try { result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); } catch (Exception e) { log.error("Lua脚本执行失败"); throw new RuntimeException(e); } if (result != null && !result.equals(0L)) { // result为1表示库存不足,result为2表示用户已下单 int r = result.intValue(); return Result.fail(r == 2 ? "不能重复下单" : "库存不足"); } // 2、result为0,下单成功,直接返回ok // 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); this.proxy = proxy; return Result.ok(); } /** * 创建订单 * * @param voucherOrder * @return */ @Transactional @Override public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); // 1、判断当前用户是否是第一单 int count = this.count(new LambdaQueryWrapper<VoucherOrder>() .eq(VoucherOrder::getUserId, userId)); if (count >= 1) { // 当前用户不是第一单 log.error("当前用户不是第一单"); return; } // 2、用户是第一单,可以下单,秒杀券库存数量减一 boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0) .setSql("stock = stock -1")); if (!flag) { throw new RuntimeException("秒杀券扣减失败"); } // 3、将订单保存到数据库 flag = this.save(voucherOrder); if (!flag) { throw new RuntimeException("创建秒杀券订单失败"); } } }
3 秒杀流程剖析
3.1 初始化操作
Lua 脚本准备:编写 Lua 脚本,接收优惠券 ID 和用户 ID 作为参数,判断库存是否充足以及用户是否已下单。若库存不足返回 1,用户已下单返回 2,下单成功返回 0。
-- 优惠券id local voucherId = ARGV[1]; -- 用户id local userId = ARGV[2]; local stockKey = 'seckill:stock:' .. voucherId; local orderKey = 'seckill:order:' .. voucherId; local stock = redis.call('GET', stockKey); if (tonumber(stock) <= 0) then return 1; end if (redis.call('SISMEMBER', orderKey, userId) == 1) then return 2; end redis.call('INCRBY', stockKey, -1); redis.call('SADD', orderKey, userId); return 0;
消息队列创建:在 Java 代码的 @PostConstruct
方法中,通过执行 Lua 脚本创建 Redis 的 Stream 消息队列和消费者组。
@PostConstruct private void init() { DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>(); mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua")); mqScript.setResultType(Long.class); Long result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME); if (result == 1) { log.debug("队列创建成功"); } else { log.debug("队列已存在"); } SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); }
3.2 秒杀请求处理
资格判断:用户发起秒杀请求,系统执行 Lua 脚本,根据返回结果判断用户是否具有秒杀资格。若返回 1 表示库存不足,返回 2 表示用户已下单,均返回失败信息;返回 0 则表示具有秒杀资格。
@Override public Result seckillVoucher(Long voucherId) { Long userId = ThreadLocalUtls.getUser().getId(); long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER); Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId)); if (result != 0) { return Result.fail(result == 2 ? "不能重复下单" : "库存不足"); } IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); this.proxy = proxy; return Result.ok(); }
订单入队:具有秒杀资格后,生成订单 ID,创建订单对象,将订单信息发送到 Redis 的 Stream 消息队列。
3.3 消息队列消费
订单处理线程:使用线程池启动一个线程任务 VoucherOrderHandler
,不断从消息队列中获取订单信息。
private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed()) ); if (messageList == null || messageList.isEmpty()) { continue; } MapRecord<String, Object, Object> record = messageList.get(0); Map<Object, Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.error("处理订单异常", e); handlePendingList(); } } } }
异常处理:若处理订单过程中出现异常,调用 handlePendingList
方法从 pendingList
中获取未处理的订单信息,继续处理。
3.4 订单创建
分布式锁保障:使用 Redisson 分布式锁,确保同一用户同一时间只能创建一个订单,避免一人多单问题。
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId); boolean isLock = lock.tryLock(); if (!isLock) { log.error("一人只能下一单"); return; } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } }
数据库操作:判断用户是否是第一单,若是则扣减库存并将订单保存到数据库。
@Override public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId)); if (count >= 1) { log.error("当前用户不是第一单"); return; } boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>() .eq(SeckillVouchChina编程er::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0) .setSql("stock = stock -1")); if (!flag) { throw new RuntimeException("秒杀券扣减失败"); } flag = this.save(voucherOrder); if (!flag) { throw new RuntimeException("创建秒杀券订单失败"); } }
4 秒杀流程(文字版)
1. 初始化准备
在系统启动阶段,我们会完成一些必要的初始化工作。一方面,编写好用于判断库存和订单情况的 Lua 脚本。这个脚本会接收优惠券 ID 和用户 ID 作为参数,编程通过 Redis 的相关命令判断库存是否充足以及用户是否已下单,保证这些判断操作的原子性。另一方面,在 Java 代码里利用 @PostConstruct
注解,通过执行另一个 Lua 脚本来创建 Redis 的 Stream 消息队列和消费者组,为后续处理订单消息做好准备。
2. 用户请求与资格判断
当用户发起秒杀请求后,系统会立即执行之前准备好的 Lua 脚本来判断用户是否具有秒杀资格。
- 如果脚本返回库存不足的标识,系统会迅速返回 “库存不足” 的提示信息,结束本次请求处理。
- 若返回用户已下单的标识,就会返回 “不能重复下单” 的提示,流程终止。
- 当判定用户具有秒杀资格时,系统会生成唯一的订单 ID,创建订单对象,然后将订单信息发送到 Redis 的 Stream 消息队列,进入异步处理阶段。
3. 消息队列消费
有一个专门的消息队列消费者线程会持续监听 Redis 的 Stream 消息队列。
- 如果没有获取到新的订单信息,线程会继续保持监听状态。
- 一旦获取到订单信息,线程会马上尝试获取 Redisson 分布式锁。这个锁非常关键,它能确保同一用户同一时间只能处理一个订单,有效避免一人多单的问题。
4. 订单创建与处理
获取到锁之后,系统会进一步处理订单。
- 首先判断当前用户是否是第一单。如果不是,系统会记录错误日志并释放锁,结束流程。
- 若是第一单,系统会尝试扣减库存。如果库存扣减失败,会抛出异常并释放锁;若扣减成功,就将订单信息保存到数据库。
- 在保存订单时,若保存失败会抛出异常并释放锁;保存成功后,系统会向 Redis 的 Stream 消息队列发送 ACK 确认消息,最后释放锁,完成整个秒杀流程。
到此这篇关于Redis消息队列实现异步秒杀的文章就介绍到这了,更多相关Redis异步秒杀内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于Redis消息队列实现异步秒杀功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!