RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法

本文主要是介绍RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

stream  更多详细命令使用,可查看博文
redis基于Stream类型实现消息队列,命令操作,术语概念,个人总结等-CSDN博客

1 springboot整合redis 就不多说了

2 有用到hutool工具类,添加下 pom 依赖

<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.5</version><scope>compile</scope></dependency>

3 写个pojo类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SeckillOrder {private String userId;private String goodsId;private String orderId;}

4 往redis 写入些测试数据,如下

del order_streamkeys *# 往消息队列添加消息
xadd order_stream * userId user_01, goodsId goodsId_01, orderId order_01
xadd order_stream * userId user_02, goodsId goodsId_02, orderId order_02
xadd order_stream * userId user_03, goodsId goodsId_03, orderId order_03
xadd order_stream * userId user_04, goodsId goodsId_04, orderId order_04
xadd order_stream * userId user_05, goodsId goodsId_05, orderId order_05
xadd order_stream * userId user_06, goodsId goodsId_06, orderId order_06
xadd order_stream * userId user_07, goodsId goodsId_07, orderId order_07
xadd order_stream * userId user_08, goodsId goodsId_08, orderId order_08
xadd order_stream * userId user_09, goodsId goodsId_09, orderId order_09
xadd order_stream * userId user_10, goodsId goodsId_10, orderId order_10# 获取流中的数据(- 表示最小值,+表示最大值)
xrange order_stream - +创建g1消费者组,从头开始消费
XGROUP CREATE order_stream g1 0-0创建g2消费者组,从尾部开始消费
XGROUP CREATE order_stream g2 $# 通过 xreadgroup 的一个消费者(consumer1)读取流(order_stream)中的1条消息
xreadgroup group g1 consumer1 count 1 streams order_stream ># 显示g1消费者组 待处理消息的相关信息
xpending order_stream g1# 打印流信息
xinfo stream order_stream
# 打印消费组信息
xinfo groups order_stream

5 核心类,主要演示 xreadgroup,xpending,xack的用法

import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.example.service_a.domain.SeckillOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;@Component
@Slf4j
public class Redis_Stream_Test {@Autowiredprivate StringRedisTemplate redisTemplate;// 定义流的键名public static final String stream_key = "order_stream";// 定义消费者组名public static final String group_str = "g1";// 定义消费者名称public static final String consumer_str = "c1";/*** 消费流中的消息* 该方法会不断读取指定流中的消息并处理,直到没有消息为止。* 用到命令 xreadgroup* 每个消费者组执行这个方法后,假如队列有5条消息,第1次执行,就返回5条消息,再次执行此方法,就返回为空* 如果还想读这5条消息,就换个消费者组名称就可以了*/@EnableScheduling //添加到主启动类上@Scheduled(cron="0/5 * *  * * ? ")   //每5秒执行一次public void read_message() {try {if (!redisTemplate.hasKey(stream_key)) {return;}// 检查是否已创建消费者组,若未创建则创建Set<StreamInfo.XInfoGroup> collect = redisTemplate.opsForStream().groups(stream_key).stream().filter(infoGroup -> infoGroup.groupName().equals(group_str)).collect(Collectors.toSet());if (collect.isEmpty()) {// 创建消费者组,****从头开始读****// XGROUP CREATE order_stream g1 0-0String group = redisTemplate.opsForStream().createGroup(stream_key, ReadOffset.from("0-0"), group_str);log.info("消费者组添加成功,group = {}", group);}// 读取消息,可以不阻塞读,也可以阻塞方式来读 如 xreadgroup group g1 c1 count 5 block 60000 order_stream >List<MapRecord<String, Object, Object>> read = redisTemplate.opsForStream().read(Consumer.from(group_str, consumer_str),StreamReadOptions.empty().count(5),StreamOffset.create(stream_key, ReadOffset.lastConsumed()));// 若无消息则返回if (read.isEmpty()) {return;}// 记录读取到的消息数量log.info("读取到消息,size = {}", read.size());// 处理每条消息read.forEach(mapRecord -> {// 记录消息ID和内容RecordId recordId = mapRecord.getId();Map<Object, Object> value = mapRecord.getValue();log.info("消息id = {}", mapRecord.getId());log.info("消息内容 = {}", mapRecord.getValue());// 将消息内容转换为实体对象,用hutoolSeckillOrder seckillOrder = BeanUtil.fillBeanWithMap(value, new SeckillOrder(), false);log.info("消息内容转成实体查看 = {}", seckillOrder);// 进行业务逻辑处理等});} catch (Exception e) {// 记录读取消息失败的日志log.error("读取消息失败,e = {}", e);}}/*** 确认消息处理完成* 该方法会读取并确认指定消费者组下指定消费者处理完成的消息。* 用到命令 xpending , xack*/public void ack_message() {try {// 获取待处理的消息PendingMessages pending = redisTemplate.opsForStream().pending(stream_key,Consumer.from(group_str, consumer_str));log.info("待处理消息 = {}", pending);// 遍历并确认每条待处理消息pending.forEach(pendingMessage -> {// 记录待处理消息的元数据log.info("待处理消息元数据 = {}", pendingMessage);RecordId recordId = pendingMessage.getId();log.info("待处理消息id = {}", recordId);// 提交消息确认redisTemplate.opsForStream().acknowledge(stream_key, group_str, recordId);log.info("消息提交ack成功,id = {}", recordId);});} catch (Exception e) {throw new RuntimeException(e);}}/*** 消费者组 ,消费者,队列的信息*/public void xInfo() {StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(stream_key);log.info("获取组的信息:{}", groups);System.out.println();StreamInfo.XInfoConsumers consumers = redisTemplate.opsForStream().consumers(stream_key, group_str);log.info("获取消费者的信息:{}", consumers);System.out.println();StreamInfo.XInfoStream infoStream = redisTemplate.opsForStream().info(stream_key);log.info("获取流的信息:{}", infoStream);}
}

这篇关于RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/939066

相关文章

C++ vector的常见用法超详细讲解

《C++vector的常见用法超详细讲解》:本文主要介绍C++vector的常见用法,包括C++中vector容器的定义、初始化方法、访问元素、常用函数及其时间复杂度,通过代码介绍的非常详细,... 目录1、vector的定义2、vector常用初始化方法1、使编程用花括号直接赋值2、使用圆括号赋值3、ve

MySQL中FIND_IN_SET函数与INSTR函数用法解析

《MySQL中FIND_IN_SET函数与INSTR函数用法解析》:本文主要介绍MySQL中FIND_IN_SET函数与INSTR函数用法解析,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一... 目录一、功能定义与语法1、FIND_IN_SET函数2、INSTR函数二、本质区别对比三、实际场景案例分

SpringBoot实现微信小程序支付功能

《SpringBoot实现微信小程序支付功能》小程序支付功能已成为众多应用的核心需求之一,本文主要介绍了SpringBoot实现微信小程序支付功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作... 目录一、引言二、准备工作(一)微信支付商户平台配置(二)Spring Boot项目搭建(三)配置文件

pytorch之torch.flatten()和torch.nn.Flatten()的用法

《pytorch之torch.flatten()和torch.nn.Flatten()的用法》:本文主要介绍pytorch之torch.flatten()和torch.nn.Flatten()的用... 目录torch.flatten()和torch.nn.Flatten()的用法下面举例说明总结torch

基于Python实现高效PPT转图片工具

《基于Python实现高效PPT转图片工具》在日常工作中,PPT是我们常用的演示工具,但有时候我们需要将PPT的内容提取为图片格式以便于展示或保存,所以本文将用Python实现PPT转PNG工具,希望... 目录1. 概述2. 功能使用2.1 安装依赖2.2 使用步骤2.3 代码实现2.4 GUI界面3.效

MySQL更新某个字段拼接固定字符串的实现

《MySQL更新某个字段拼接固定字符串的实现》在MySQL中,我们经常需要对数据库中的某个字段进行更新操作,本文就来介绍一下MySQL更新某个字段拼接固定字符串的实现,感兴趣的可以了解一下... 目录1. 查看字段当前值2. 更新字段拼接固定字符串3. 验证更新结果mysql更新某个字段拼接固定字符串 -

java实现延迟/超时/定时问题

《java实现延迟/超时/定时问题》:本文主要介绍java实现延迟/超时/定时问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java实现延迟/超时/定时java 每间隔5秒执行一次,一共执行5次然后结束scheduleAtFixedRate 和 schedu

Java Optional避免空指针异常的实现

《JavaOptional避免空指针异常的实现》空指针异常一直是困扰开发者的常见问题之一,本文主要介绍了JavaOptional避免空指针异常的实现,帮助开发者编写更健壮、可读性更高的代码,减少因... 目录一、Optional 概述二、Optional 的创建三、Optional 的常用方法四、Optio

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

Spring Boot项目中结合MyBatis实现MySQL的自动主从切换功能

《SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能》:本文主要介绍SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能,本文分步骤给大家介绍的... 目录原理解析1. mysql主从复制(Master-Slave Replication)2. 读写分离3.