RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法

本文主要是介绍RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

stream  更多详细命令使用,可查看博文
redis基于Stream类型实现消息队列,命令操作,术语概念,个人总结等-CSDN博客

1 springboot整合redis 就不多说了

2 有用到hutool工具类,添加下 pom 依赖

<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.5</version><scope>compile</scope></dependency>

3 写个pojo类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SeckillOrder {private String userId;private String goodsId;private String orderId;}

4 往redis 写入些测试数据,如下

del order_streamkeys *# 往消息队列添加消息
xadd order_stream * userId user_01, goodsId goodsId_01, orderId order_01
xadd order_stream * userId user_02, goodsId goodsId_02, orderId order_02
xadd order_stream * userId user_03, goodsId goodsId_03, orderId order_03
xadd order_stream * userId user_04, goodsId goodsId_04, orderId order_04
xadd order_stream * userId user_05, goodsId goodsId_05, orderId order_05
xadd order_stream * userId user_06, goodsId goodsId_06, orderId order_06
xadd order_stream * userId user_07, goodsId goodsId_07, orderId order_07
xadd order_stream * userId user_08, goodsId goodsId_08, orderId order_08
xadd order_stream * userId user_09, goodsId goodsId_09, orderId order_09
xadd order_stream * userId user_10, goodsId goodsId_10, orderId order_10# 获取流中的数据(- 表示最小值,+表示最大值)
xrange order_stream - +创建g1消费者组,从头开始消费
XGROUP CREATE order_stream g1 0-0创建g2消费者组,从尾部开始消费
XGROUP CREATE order_stream g2 $# 通过 xreadgroup 的一个消费者(consumer1)读取流(order_stream)中的1条消息
xreadgroup group g1 consumer1 count 1 streams order_stream ># 显示g1消费者组 待处理消息的相关信息
xpending order_stream g1# 打印流信息
xinfo stream order_stream
# 打印消费组信息
xinfo groups order_stream

5 核心类,主要演示 xreadgroup,xpending,xack的用法

import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.example.service_a.domain.SeckillOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;@Component
@Slf4j
public class Redis_Stream_Test {@Autowiredprivate StringRedisTemplate redisTemplate;// 定义流的键名public static final String stream_key = "order_stream";// 定义消费者组名public static final String group_str = "g1";// 定义消费者名称public static final String consumer_str = "c1";/*** 消费流中的消息* 该方法会不断读取指定流中的消息并处理,直到没有消息为止。* 用到命令 xreadgroup* 每个消费者组执行这个方法后,假如队列有5条消息,第1次执行,就返回5条消息,再次执行此方法,就返回为空* 如果还想读这5条消息,就换个消费者组名称就可以了*/@EnableScheduling //添加到主启动类上@Scheduled(cron="0/5 * *  * * ? ")   //每5秒执行一次public void read_message() {try {if (!redisTemplate.hasKey(stream_key)) {return;}// 检查是否已创建消费者组,若未创建则创建Set<StreamInfo.XInfoGroup> collect = redisTemplate.opsForStream().groups(stream_key).stream().filter(infoGroup -> infoGroup.groupName().equals(group_str)).collect(Collectors.toSet());if (collect.isEmpty()) {// 创建消费者组,****从头开始读****// XGROUP CREATE order_stream g1 0-0String group = redisTemplate.opsForStream().createGroup(stream_key, ReadOffset.from("0-0"), group_str);log.info("消费者组添加成功,group = {}", group);}// 读取消息,可以不阻塞读,也可以阻塞方式来读 如 xreadgroup group g1 c1 count 5 block 60000 order_stream >List<MapRecord<String, Object, Object>> read = redisTemplate.opsForStream().read(Consumer.from(group_str, consumer_str),StreamReadOptions.empty().count(5),StreamOffset.create(stream_key, ReadOffset.lastConsumed()));// 若无消息则返回if (read.isEmpty()) {return;}// 记录读取到的消息数量log.info("读取到消息,size = {}", read.size());// 处理每条消息read.forEach(mapRecord -> {// 记录消息ID和内容RecordId recordId = mapRecord.getId();Map<Object, Object> value = mapRecord.getValue();log.info("消息id = {}", mapRecord.getId());log.info("消息内容 = {}", mapRecord.getValue());// 将消息内容转换为实体对象,用hutoolSeckillOrder seckillOrder = BeanUtil.fillBeanWithMap(value, new SeckillOrder(), false);log.info("消息内容转成实体查看 = {}", seckillOrder);// 进行业务逻辑处理等});} catch (Exception e) {// 记录读取消息失败的日志log.error("读取消息失败,e = {}", e);}}/*** 确认消息处理完成* 该方法会读取并确认指定消费者组下指定消费者处理完成的消息。* 用到命令 xpending , xack*/public void ack_message() {try {// 获取待处理的消息PendingMessages pending = redisTemplate.opsForStream().pending(stream_key,Consumer.from(group_str, consumer_str));log.info("待处理消息 = {}", pending);// 遍历并确认每条待处理消息pending.forEach(pendingMessage -> {// 记录待处理消息的元数据log.info("待处理消息元数据 = {}", pendingMessage);RecordId recordId = pendingMessage.getId();log.info("待处理消息id = {}", recordId);// 提交消息确认redisTemplate.opsForStream().acknowledge(stream_key, group_str, recordId);log.info("消息提交ack成功,id = {}", recordId);});} catch (Exception e) {throw new RuntimeException(e);}}/*** 消费者组 ,消费者,队列的信息*/public void xInfo() {StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(stream_key);log.info("获取组的信息:{}", groups);System.out.println();StreamInfo.XInfoConsumers consumers = redisTemplate.opsForStream().consumers(stream_key, group_str);log.info("获取消费者的信息:{}", consumers);System.out.println();StreamInfo.XInfoStream infoStream = redisTemplate.opsForStream().info(stream_key);log.info("获取流的信息:{}", infoStream);}
}

这篇关于RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/939066

相关文章

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Python如何实现PDF隐私信息检测

《Python如何实现PDF隐私信息检测》随着越来越多的个人信息以电子形式存储和传输,确保这些信息的安全至关重要,本文将介绍如何使用Python检测PDF文件中的隐私信息,需要的可以参考下... 目录项目背景技术栈代码解析功能说明运行结php果在当今,数据隐私保护变得尤为重要。随着越来越多的个人信息以电子形

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景

使用Python快速实现链接转word文档

《使用Python快速实现链接转word文档》这篇文章主要为大家详细介绍了如何使用Python快速实现链接转word文档功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 演示代码展示from newspaper import Articlefrom docx import

前端原生js实现拖拽排课效果实例

《前端原生js实现拖拽排课效果实例》:本文主要介绍如何实现一个简单的课程表拖拽功能,通过HTML、CSS和JavaScript的配合,我们实现了课程项的拖拽、放置和显示功能,文中通过实例代码介绍的... 目录1. 效果展示2. 效果分析2.1 关键点2.2 实现方法3. 代码实现3.1 html部分3.2

Python itertools中accumulate函数用法及使用运用详细讲解

《Pythonitertools中accumulate函数用法及使用运用详细讲解》:本文主要介绍Python的itertools库中的accumulate函数,该函数可以计算累积和或通过指定函数... 目录1.1前言:1.2定义:1.3衍生用法:1.3Leetcode的实际运用:总结 1.1前言:本文将详

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

java父子线程之间实现共享传递数据

《java父子线程之间实现共享传递数据》本文介绍了Java中父子线程间共享传递数据的几种方法,包括ThreadLocal变量、并发集合和内存队列或消息队列,并提醒注意并发安全问题... 目录通过 ThreadLocal 变量共享数据通过并发集合共享数据通过内存队列或消息队列共享数据注意并发安全问题总结在 J