本文主要是介绍RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
stream 更多详细命令使用,可查看博文
redis基于Stream类型实现消息队列,命令操作,术语概念,个人总结等-CSDN博客
1 springboot整合redis 就不多说了
2 有用到hutool工具类,添加下 pom 依赖
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.5</version><scope>compile</scope></dependency>
3 写个pojo类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SeckillOrder {private String userId;private String goodsId;private String orderId;}
4 往redis 写入些测试数据,如下
del order_streamkeys *# 往消息队列添加消息
xadd order_stream * userId user_01, goodsId goodsId_01, orderId order_01
xadd order_stream * userId user_02, goodsId goodsId_02, orderId order_02
xadd order_stream * userId user_03, goodsId goodsId_03, orderId order_03
xadd order_stream * userId user_04, goodsId goodsId_04, orderId order_04
xadd order_stream * userId user_05, goodsId goodsId_05, orderId order_05
xadd order_stream * userId user_06, goodsId goodsId_06, orderId order_06
xadd order_stream * userId user_07, goodsId goodsId_07, orderId order_07
xadd order_stream * userId user_08, goodsId goodsId_08, orderId order_08
xadd order_stream * userId user_09, goodsId goodsId_09, orderId order_09
xadd order_stream * userId user_10, goodsId goodsId_10, orderId order_10# 获取流中的数据(- 表示最小值,+表示最大值)
xrange order_stream - +创建g1消费者组,从头开始消费
XGROUP CREATE order_stream g1 0-0创建g2消费者组,从尾部开始消费
XGROUP CREATE order_stream g2 $# 通过 xreadgroup 的一个消费者(consumer1)读取流(order_stream)中的1条消息
xreadgroup group g1 consumer1 count 1 streams order_stream ># 显示g1消费者组 待处理消息的相关信息
xpending order_stream g1# 打印流信息
xinfo stream order_stream
# 打印消费组信息
xinfo groups order_stream
5 核心类,主要演示 xreadgroup,xpending,xack的用法
import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.example.service_a.domain.SeckillOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;@Component
@Slf4j
public class Redis_Stream_Test {@Autowiredprivate StringRedisTemplate redisTemplate;// 定义流的键名public static final String stream_key = "order_stream";// 定义消费者组名public static final String group_str = "g1";// 定义消费者名称public static final String consumer_str = "c1";/*** 消费流中的消息* 该方法会不断读取指定流中的消息并处理,直到没有消息为止。* 用到命令 xreadgroup* 每个消费者组执行这个方法后,假如队列有5条消息,第1次执行,就返回5条消息,再次执行此方法,就返回为空* 如果还想读这5条消息,就换个消费者组名称就可以了*/@EnableScheduling //添加到主启动类上@Scheduled(cron="0/5 * * * * ? ") //每5秒执行一次public void read_message() {try {if (!redisTemplate.hasKey(stream_key)) {return;}// 检查是否已创建消费者组,若未创建则创建Set<StreamInfo.XInfoGroup> collect = redisTemplate.opsForStream().groups(stream_key).stream().filter(infoGroup -> infoGroup.groupName().equals(group_str)).collect(Collectors.toSet());if (collect.isEmpty()) {// 创建消费者组,****从头开始读****// XGROUP CREATE order_stream g1 0-0String group = redisTemplate.opsForStream().createGroup(stream_key, ReadOffset.from("0-0"), group_str);log.info("消费者组添加成功,group = {}", group);}// 读取消息,可以不阻塞读,也可以阻塞方式来读 如 xreadgroup group g1 c1 count 5 block 60000 order_stream >List<MapRecord<String, Object, Object>> read = redisTemplate.opsForStream().read(Consumer.from(group_str, consumer_str),StreamReadOptions.empty().count(5),StreamOffset.create(stream_key, ReadOffset.lastConsumed()));// 若无消息则返回if (read.isEmpty()) {return;}// 记录读取到的消息数量log.info("读取到消息,size = {}", read.size());// 处理每条消息read.forEach(mapRecord -> {// 记录消息ID和内容RecordId recordId = mapRecord.getId();Map<Object, Object> value = mapRecord.getValue();log.info("消息id = {}", mapRecord.getId());log.info("消息内容 = {}", mapRecord.getValue());// 将消息内容转换为实体对象,用hutoolSeckillOrder seckillOrder = BeanUtil.fillBeanWithMap(value, new SeckillOrder(), false);log.info("消息内容转成实体查看 = {}", seckillOrder);// 进行业务逻辑处理等});} catch (Exception e) {// 记录读取消息失败的日志log.error("读取消息失败,e = {}", e);}}/*** 确认消息处理完成* 该方法会读取并确认指定消费者组下指定消费者处理完成的消息。* 用到命令 xpending , xack*/public void ack_message() {try {// 获取待处理的消息PendingMessages pending = redisTemplate.opsForStream().pending(stream_key,Consumer.from(group_str, consumer_str));log.info("待处理消息 = {}", pending);// 遍历并确认每条待处理消息pending.forEach(pendingMessage -> {// 记录待处理消息的元数据log.info("待处理消息元数据 = {}", pendingMessage);RecordId recordId = pendingMessage.getId();log.info("待处理消息id = {}", recordId);// 提交消息确认redisTemplate.opsForStream().acknowledge(stream_key, group_str, recordId);log.info("消息提交ack成功,id = {}", recordId);});} catch (Exception e) {throw new RuntimeException(e);}}/*** 消费者组 ,消费者,队列的信息*/public void xInfo() {StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(stream_key);log.info("获取组的信息:{}", groups);System.out.println();StreamInfo.XInfoConsumers consumers = redisTemplate.opsForStream().consumers(stream_key, group_str);log.info("获取消费者的信息:{}", consumers);System.out.println();StreamInfo.XInfoStream infoStream = redisTemplate.opsForStream().info(stream_key);log.info("获取流的信息:{}", infoStream);}
}
这篇关于RedisTemplate-opsForStream实现消息队列,主要演示 xgroup,xreadgroup,xpending,xack,xinfo的用法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!