RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法

本文主要是介绍RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

stream  更多详细命令使用,可查看博文
redis基于Stream类型实现消息队列,命令操作,术语概念,个人总结等-CSDN博客

1 springboot整合redis 就不多说了

2 有用到hutool工具类,添加下 pom 依赖

<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.5</version><scope>compile</scope></dependency>

3 写个pojo类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SeckillOrder {private String userId;private String goodsId;private String orderId;}

4 往redis 写入些测试数据,如下

del order_streamkeys *# 往消息队列添加消息
xadd order_stream * userId user_01, goodsId goodsId_01, orderId order_01
xadd order_stream * userId user_02, goodsId goodsId_02, orderId order_02
xadd order_stream * userId user_03, goodsId goodsId_03, orderId order_03
xadd order_stream * userId user_04, goodsId goodsId_04, orderId order_04
xadd order_stream * userId user_05, goodsId goodsId_05, orderId order_05
xadd order_stream * userId user_06, goodsId goodsId_06, orderId order_06
xadd order_stream * userId user_07, goodsId goodsId_07, orderId order_07
xadd order_stream * userId user_08, goodsId goodsId_08, orderId order_08
xadd order_stream * userId user_09, goodsId goodsId_09, orderId order_09
xadd order_stream * userId user_10, goodsId goodsId_10, orderId order_10# 获取流中的数据(- 表示最小值,+表示最大值)
xrange order_stream - +创建g1消费者组,从头开始消费
XGROUP CREATE order_stream g1 0-0创建g2消费者组,从尾部开始消费
XGROUP CREATE order_stream g2 $# 通过 xreadgroup 的一个消费者(consumer1)读取流(order_stream)中的1条消息
xreadgroup group g1 consumer1 count 1 streams order_stream ># 显示g1消费者组 待处理消息的相关信息
xpending order_stream g1# 打印流信息
xinfo stream order_stream
# 打印消费组信息
xinfo groups order_stream

5 核心类,主要演示 xreadgroup,xpending,xack的用法

import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.example.service_a.domain.SeckillOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;@Component
@Slf4j
public class Redis_Stream_Test {@Autowiredprivate StringRedisTemplate redisTemplate;// 定义流的键名public static final String stream_key = "order_stream";// 定义消费者组名public static final String group_str = "g1";// 定义消费者名称public static final String consumer_str = "c1";/*** 消费流中的消息* 该方法会不断读取指定流中的消息并处理,直到没有消息为止。* 用到命令 xreadgroup* 每个消费者组执行这个方法后,假如队列有5条消息,第1次执行,就返回5条消息,再次执行此方法,就返回为空* 如果还想读这5条消息,就换个消费者组名称就可以了*/@EnableScheduling //添加到主启动类上@Scheduled(cron="0/5 * *  * * ? ")   //每5秒执行一次public void read_message() {try {if (!redisTemplate.hasKey(stream_key)) {return;}// 检查是否已创建消费者组,若未创建则创建Set<StreamInfo.XInfoGroup> collect = redisTemplate.opsForStream().groups(stream_key).stream().filter(infoGroup -> infoGroup.groupName().equals(group_str)).collect(Collectors.toSet());if (collect.isEmpty()) {// 创建消费者组,****从头开始读****// XGROUP CREATE order_stream g1 0-0String group = redisTemplate.opsForStream().createGroup(stream_key, ReadOffset.from("0-0"), group_str);log.info("消费者组添加成功,group = {}", group);}// 读取消息,可以不阻塞读,也可以阻塞方式来读 如 xreadgroup group g1 c1 count 5 block 60000 order_stream >List<MapRecord<String, Object, Object>> read = redisTemplate.opsForStream().read(Consumer.from(group_str, consumer_str),StreamReadOptions.empty().count(5),StreamOffset.create(stream_key, ReadOffset.lastConsumed()));// 若无消息则返回if (read.isEmpty()) {return;}// 记录读取到的消息数量log.info("读取到消息,size = {}", read.size());// 处理每条消息read.forEach(mapRecord -> {// 记录消息ID和内容RecordId recordId = mapRecord.getId();Map<Object, Object> value = mapRecord.getValue();log.info("消息id = {}", mapRecord.getId());log.info("消息内容 = {}", mapRecord.getValue());// 将消息内容转换为实体对象,用hutoolSeckillOrder seckillOrder = BeanUtil.fillBeanWithMap(value, new SeckillOrder(), false);log.info("消息内容转成实体查看 = {}", seckillOrder);// 进行业务逻辑处理等});} catch (Exception e) {// 记录读取消息失败的日志log.error("读取消息失败,e = {}", e);}}/*** 确认消息处理完成* 该方法会读取并确认指定消费者组下指定消费者处理完成的消息。* 用到命令 xpending , xack*/public void ack_message() {try {// 获取待处理的消息PendingMessages pending = redisTemplate.opsForStream().pending(stream_key,Consumer.from(group_str, consumer_str));log.info("待处理消息 = {}", pending);// 遍历并确认每条待处理消息pending.forEach(pendingMessage -> {// 记录待处理消息的元数据log.info("待处理消息元数据 = {}", pendingMessage);RecordId recordId = pendingMessage.getId();log.info("待处理消息id = {}", recordId);// 提交消息确认redisTemplate.opsForStream().acknowledge(stream_key, group_str, recordId);log.info("消息提交ack成功,id = {}", recordId);});} catch (Exception e) {throw new RuntimeException(e);}}/*** 消费者组 ,消费者,队列的信息*/public void xInfo() {StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(stream_key);log.info("获取组的信息:{}", groups);System.out.println();StreamInfo.XInfoConsumers consumers = redisTemplate.opsForStream().consumers(stream_key, group_str);log.info("获取消费者的信息:{}", consumers);System.out.println();StreamInfo.XInfoStream infoStream = redisTemplate.opsForStream().info(stream_key);log.info("获取流的信息:{}", infoStream);}
}

这篇关于RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/939066

相关文章

Go语言使用Buffer实现高性能处理字节和字符

《Go语言使用Buffer实现高性能处理字节和字符》在Go中,bytes.Buffer是一个非常高效的类型,用于处理字节数据的读写操作,本文将详细介绍一下如何使用Buffer实现高性能处理字节和... 目录1. bytes.Buffer 的基本用法1.1. 创建和初始化 Buffer1.2. 使用 Writ

基于WinForm+Halcon实现图像缩放与交互功能

《基于WinForm+Halcon实现图像缩放与交互功能》本文主要讲述在WinForm中结合Halcon实现图像缩放、平移及实时显示灰度值等交互功能,包括初始化窗口的不同方式,以及通过特定事件添加相应... 目录前言初始化窗口添加图像缩放功能添加图像平移功能添加实时显示灰度值功能示例代码总结最后前言本文将

Redis延迟队列的实现示例

《Redis延迟队列的实现示例》Redis延迟队列是一种使用Redis实现的消息队列,本文主要介绍了Redis延迟队列的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录一、什么是 Redis 延迟队列二、实现原理三、Java 代码示例四、注意事项五、使用 Redi

C#实现WinForm控件焦点的获取与失去

《C#实现WinForm控件焦点的获取与失去》在一个数据输入表单中,当用户从一个文本框切换到另一个文本框时,需要准确地判断焦点的转移,以便进行数据验证、提示信息显示等操作,本文将探讨Winform控件... 目录前言获取焦点改变TabIndex属性值调用Focus方法失去焦点总结最后前言在一个数据输入表单

基于C#实现PDF文件合并工具

《基于C#实现PDF文件合并工具》这篇文章主要为大家详细介绍了如何基于C#实现一个简单的PDF文件合并工具,文中的示例代码简洁易懂,有需要的小伙伴可以跟随小编一起学习一下... 界面主要用于发票PDF文件的合并。经常出差要报销的很有用。代码using System;using System.Col

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da

Java操作PDF文件实现签订电子合同详细教程

《Java操作PDF文件实现签订电子合同详细教程》:本文主要介绍如何在PDF中加入电子签章与电子签名的过程,包括编写Word文件、生成PDF、为PDF格式做表单、为表单赋值、生成文档以及上传到OB... 目录前言:先看效果:1.编写word文件1.2然后生成PDF格式进行保存1.3我这里是将文件保存到本地后

Python实现数据清洗的18种方法

《Python实现数据清洗的18种方法》本文主要介绍了Python实现数据清洗的18种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录1. 去除字符串两边空格2. 转换数据类型3. 大小写转换4. 移除列表中的重复元素5. 快速统

用Java打造简易计算器的实现步骤

《用Java打造简易计算器的实现步骤》:本文主要介绍如何设计和实现一个简单的Java命令行计算器程序,该程序能够执行基本的数学运算(加、减、乘、除),文中通过代码介绍的非常详细,需要的朋友可以参考... 目录目标:一、项目概述与功能规划二、代码实现步骤三、测试与优化四、总结与收获总结目标:简单计算器,设计