【Redis】redis高阶-使用zset实现延时队列

2024-06-03 22:20

本文主要是介绍【Redis】redis高阶-使用zset实现延时队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Hi,大家好,我是抢老婆酸奶的小肥仔。

最近在使用redis时,就想能不能用其实现消息队列?也在网上看了下其他小伙伴写的实现,结合自身业务实现了如下消息队列,希望对大家有用。

废话不多说,直接开撸。

1、为什么zset可以做消息队列?

首先我们来看下,设计消息队列需要考虑的需求:有序性,消息重复性,可靠性。

  • 有序性:zset所有元素可以根据成员关联的score来进行从低到高的排序,例如,我们可以利用时间戳来进行排序
  • 消息重复性:在zset中每个元素都是唯一的,这也保证了消息的唯一性
  • 可靠性:zset会自动维护元素之间的顺序,在添加或删除元素时无需手动排序,提升操作速度。

2、使用的zset命令

命令描述
zadd将一个给定score的成员添加到有序集合中,返回添加元素的个数
zrange根据元素在有序排序中的位置,从有序集合中获取多个元素
rank(K key, Object o)获取指定元素在集合中的索引,索引从0开始

3、代码实现

使用zset实现消息队列时,具体的流程,如下:

生产者流程:

  1. 用户获取消息Id,并封装消息体
  2. 用户发送数据到生产者,先获取锁
  3. 如果获取到锁,则校验该消息体是否已添加到队列中,已添加则直接返回提醒。
  4. 若未添加则调用方法将数据保存到zset集合中,否则等到指定时间后再获取锁。
  5. 推送数据后,释放锁

消费者流程:

  1. 调用方法获取数据
  2. 获取到数据,则直接返回,否则到指定时间后再次获取数据,直到获取到数据并返回。

统一返回类:

/*** @Author: jiangjs* @Description:* @Date: 2021/11/12 15:46**/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ResultUtil<T> implements Serializable {private int code;private String msg;private T data;public static <T> ResultUtil<T> success(){return ResultUtil.<T>builder().code(1000).msg("成功").build();}public static <T> ResultUtil<T> success(T data){return ResultUtil.<T>builder().code(1000).msg("成功").data(data).build();}public static <T> ResultUtil<T> error(String msg){return ResultUtil.<T>builder().code(5000).msg(msg).data(null).build();}public static <T> ResultUtil<T> error(int code,String msg){return ResultUtil.<T>builder().code(code).msg(msg).build();}
}

3.1 消息实体

需添加消息Id,主要防止消息重复消费。

/*** @author: jiangjs* @description: 消息实体* @date: 2023/5/30 11:11**/
@Data
@Accessors(chain = true)
public class QueueTask<T> {/*** 消息Id*/private String taskId;/*** 任务*/private T task;
}

3.2 队列类型

队列类型可以理解为队列的名称,通过枚举,可以随意添加队列名称。

/*** @author: jiangjs* @description: 队列类型* @date: 2023/5/30 10:53**/
public enum QueueTypeEnum {/*** 订单*/ORDER("order");private final String type;QueueTypeEnum(String type){this.type = type;}public String getType(){return type;}
}

3.3 创建消息工具

package com.jiashn.springbootproject.redis.utils;import com.jiashn.springbootproject.redis.domain.QueueTask;
import com.jiashn.springbootproject.utils.ResultUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author: jiangjs* @description: redis实现消息队列* @date: 2023/5/30 10:51**/
public class RedisQueueUtil<T> {private static final Logger log = LoggerFactory.getLogger(RedisQueueUtil.class);private RedisTemplate<String,QueueTask<T>> redisTemplate;/*** 队列类型,即名称*/private final QueueTypeEnum typeEnum;public RedisQueueUtil(QueueTypeEnum typeEnum,RedisTemplate<String,QueueTask<T>> redisTemplate){this.typeEnum = typeEnum;this.redisTemplate = redisTemplate;}/*** 添加消息数据* @param queueTask 消息* @param time 延迟时间,单位s*/public ResultUtil<String> sendQueueTask(QueueTask<T> queueTask, long time){//加锁if (getLock()){try {Long rank = redisTemplate.opsForZSet().rank(typeEnum.getType(), queueTask);if (Objects.nonNull(rank)){return ResultUtil.error(6000,"消息数据已经存在,不予添加......");}Boolean result = redisTemplate.opsForZSet().add(typeEnum.getType(), queueTask, System.currentTimeMillis() + time*1000);if (Objects.nonNull(result) && result){log.info("添加消息数据成功:" + queueTask + ",添加时间:" + LocalDateTime.now());return ResultUtil.success("添加消息数据成功");}return ResultUtil.error("添加消息数据失败");}finally {//释放锁releaseLock();}} else {log.info("未获取到锁,稍后再试");return ResultUtil.error("未获取到锁,稍后再试");}}/*** 获取zset前count数据* @param count 数据数* @return 返回获取到数据*/public Set<QueueTask<T>> loopGetTask(int count) {//rangeByScore,根据score顺序获取zset数据的值return redisTemplate.opsForZSet().rangeByScore(typeEnum.getType(), 0, System.currentTimeMillis(), 0, count-1);}/*** 注销消息队列* @param typeEnum 消息队列名称*/public void destroy(QueueTypeEnum typeEnum){redisTemplate.opsForZSet().remove(typeEnum.getType());}/*** 获取任务Id* @return 返回消息Id*/public String getTaskId(){return typeEnum.getType() + "_" + UUID.randomUUID().toString().replace("-","");}/*** 获取锁* @return 返回加锁状态*/private boolean getLock(){Boolean absent = redisTemplate.opsForValue().setIfAbsent(typeEnum.getType() + "_Locked", null, 30L, TimeUnit.MINUTES);return Objects.nonNull(absent) ? absent : false;}/*** 释放锁*/public void releaseLock(){redisTemplate.delete(typeEnum.getType() + "_Locked");}
}

在消息工具类中,创建消息任务时添加了锁,只有在获取锁的前提下才能添加消息任务。

提供获取消息Id的方法是为了让提交消息任务前,先获取Id,即使在提交时网络发生问题,提交的Id还是同一个,再进行消息消费时,可以根据这个Id来进行判断该消息任务是否已被消费,被消费则直接丢弃。

3.4 消费消息

/*** @author: jiangjs* @description: 启动消费* @date: 2023/5/30 14:27**/
@Component
public class CustomerTaskLineRunner implements CommandLineRunner {@Resourceprivate RedisTemplate<String,QueueTask<String>> redisTemplate;private final static String QUEUE_TYPE = QueueTypeEnum.ORDER.getType();private final static Logger log = LoggerFactory.getLogger(CustomerTaskLineRunner.class);@Overridepublic void run(String... args) throws Exception {RedisQueueUtil<String> queueUtil = new RedisQueueUtil<>(QueueTypeEnum.ORDER,redisTemplate);while (true){Set<QueueTask<String>> queueTasks = queueUtil.loopGetTask(10);if (CollectionUtils.isNotEmpty(queueTasks)){for (QueueTask<String> queueTask : queueTasks) {//校验当前消息是否已消费,主要防止网络延时,导致多次提交同一任务 存在QueueTask<String> stringQueueTask = redisTemplate.opsForValue().get(QUEUE_TYPE + "_" + queueTask.getTaskId());if (Objects.nonNull(stringQueueTask)){log.info("该任务已经消费,不能重复消费");redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);continue;}Long removeNum = redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);if (Objects.nonNull(removeNum) && removeNum > 0){String task = queueTask.getTask();log.info("消费任务数据:" + task);//设置过期时间,10分钟内则默认是重复提交redisTemplate.opsForValue().set(QUEUE_TYPE + "_" + queueTask.getTaskId(),queueTask,10L, TimeUnit.MINUTES);}}}log.info("------1分钟后再次获取------");Thread.sleep(60000);}}
}

校验重复消息,若消息重复且在10分钟内未被消费,则直接将该消息从队列中删除。在消息任务被消费后,将数据从队列中移除。

执行结果:

谢谢大家,今天的分享就到这,不合理的地方希望大家指正。

这篇关于【Redis】redis高阶-使用zset实现延时队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1028278

相关文章

C语言中联合体union的使用

本文编辑整理自: http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=179471 一、前言 “联合体”(union)与“结构体”(struct)有一些相似之处。但两者有本质上的不同。在结构体中,各成员有各自的内存空间, 一个结构变量的总长度是各成员长度之和。而在“联合”中,各成员共享一段内存空间, 一个联合变量

C++对象布局及多态实现探索之内存布局(整理的很多链接)

本文通过观察对象的内存布局,跟踪函数调用的汇编代码。分析了C++对象内存的布局情况,虚函数的执行方式,以及虚继承,等等 文章链接:http://dev.yesky.com/254/2191254.shtml      论C/C++函数间动态内存的传递 (2005-07-30)   当你涉及到C/C++的核心编程的时候,你会无止境地与内存管理打交道。 文章链接:http://dev.yesky

Tolua使用笔记(上)

目录   1.准备工作 2.运行例子 01.HelloWorld:在C#中,创建和销毁Lua虚拟机 和 简单调用。 02.ScriptsFromFile:在C#中,对一个lua文件的执行调用 03.CallLuaFunction:在C#中,对lua函数的操作 04.AccessingLuaVariables:在C#中,对lua变量的操作 05.LuaCoroutine:在Lua中,

Vim使用基础篇

本文内容大部分来自 vimtutor,自带的教程的总结。在终端输入vimtutor 即可进入教程。 先总结一下,然后再分别介绍正常模式,插入模式,和可视模式三种模式下的命令。 目录 看完以后的汇总 1.正常模式(Normal模式) 1.移动光标 2.删除 3.【:】输入符 4.撤销 5.替换 6.重复命令【. ; ,】 7.复制粘贴 8.缩进 2.插入模式 INSERT

Lipowerline5.0 雷达电力应用软件下载使用

1.配网数据处理分析 针对配网线路点云数据,优化了分类算法,支持杆塔、导线、交跨线、建筑物、地面点和其他线路的自动分类;一键生成危险点报告和交跨报告;还能生成点云数据采集航线和自主巡检航线。 获取软件安装包联系邮箱:2895356150@qq.com,资源源于网络,本介绍用于学习使用,如有侵权请您联系删除! 2.新增快速版,简洁易上手 支持快速版和专业版切换使用,快速版界面简洁,保留主

如何免费的去使用connectedpapers?

免费使用connectedpapers 1. 打开谷歌浏览器2. 按住ctrl+shift+N,进入无痕模式3. 不需要登录(也就是访客模式)4. 两次用完,关闭无痕模式(继续重复步骤 2 - 4) 1. 打开谷歌浏览器 2. 按住ctrl+shift+N,进入无痕模式 输入网址:https://www.connectedpapers.com/ 3. 不需要登录(也就是

通过SSH隧道实现通过远程服务器上外网

搭建隧道 autossh -M 0 -f -D 1080 -C -N user1@remotehost##验证隧道是否生效,查看1080端口是否启动netstat -tuln | grep 1080## 测试ssh 隧道是否生效curl -x socks5h://127.0.0.1:1080 -I http://www.github.com 将autossh 设置为服务,隧道开机启动

Windows/macOS/Linux 安装 Redis 和 Redis Desktop Manager 可视化工具

本文所有安装都在macOS High Sierra 10.13.4进行,Windows安装相对容易些,Linux安装与macOS类似,文中会做区分讲解 1. Redis安装 1.下载Redis https://redis.io/download 把下载的源码更名为redis-4.0.9-source,我喜欢跟maven、Tomcat放在一起,就放到/Users/zhan/Documents

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测 目录 时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测基本介绍程序设计参考资料 基本介绍 MATLAB实现LSTM时间序列未来多步预测-递归预测。LSTM是一种含有LSTM区块(blocks)或其他的一种类神经网络,文献或其他资料中LSTM区块可能被描述成智能网络单元,因为

vue项目集成CanvasEditor实现Word在线编辑器

CanvasEditor实现Word在线编辑器 官网文档:https://hufe.club/canvas-editor-docs/guide/schema.html 源码地址:https://github.com/Hufe921/canvas-editor 前提声明: 由于CanvasEditor目前不支持vue、react 等框架开箱即用版,所以需要我们去Git下载源码,拿到其中两个主