MQ专题:延迟消息的通用方案

2024-08-30 09:44

本文主要是介绍MQ专题:延迟消息的通用方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、主要内容

本文将实现一个MQ延迟消息的通用方案。

方案不依赖于MQ中间件,依靠MySQL和DelayQueue解决,不管大家用的是什么MQ,具体是RocketMQ、RabbitMQ还是kafka,本文这个方案你都可以拿去直接使用,可以轻松实现任意时间的延迟消息投递。

二、涉及技术点

  1. SpringBoot2.7
  2. MyBatisPlus
  3. MySQL
  4. 线程池
  5. java中的延迟队列:DelayQueue
  6. 分布式锁

三、延迟消息常见的使用场景

  1. 订单超时处理

比如下单后15分钟,未支付,则自动取消订单,回退库存。

可以采用延迟队列实现:下单的时候可以投递一条延迟15分钟的消息,15分钟后消息将被消费。

  1. 消息消费失败重试

比如MQ消息消费失败后,可以延迟一段时间再次消费。

可以采用延迟消息实现:消费失败,可以投递一条延迟消息,触发再次消费

  1. 其他任意需要延迟处理的业务

业务中需要延迟处理的场景,都可以使用延迟消息来搞定。

四、延迟消息常见的实现方案

方案1:MySQL + job定时轮询

由于延迟消息的时间不确定,若要达到实时性很高的效果,也就是说消息的延迟时间是不知道的,那就需要轮询每一秒才能确保消息在指定的延迟时间被处理,这就要求job需要每秒查询一次db中待投递的消息。

这种方案访问db的频率比较高,对数据库造成了一定的压力。

方案2:RabbitMQ 中的TTL+死信队列

rabbitmq中可以使用TTL消息 + 死信队列实现,也可以安装延时插件。

此方案对中间件有依赖,不同的MQ实现是不一样的,若换成其他的MQ,方案要重新实现

方案3:MySQL + job定时轮询 + DelayQueue

可以对方案1进行改进,引入java中的 DelayQueue。

job可以采用1分钟执行一次,每次拉取未来2分钟内需要投递的消息,将其丢到java自带的 DelayQueue 这个延迟队列工具类中去处理,这样便能做到实时性很高的投递效果,且对db的压力也降低了很多。

这种方案对db也没什么压力,实时性非常高,且对MQ没有依赖,这样不管切换什么MQ,这种方案都不需要改动。

本文将落地该方案。

需要一张本地消息表(t_msg)

这张表用来暂存事务消息和延迟消息

create table if not exists t_msg
(id               varchar(32) not null primary key comment '消息id',body_json        text        not null comment '消息体,json格式',status           smallint    not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',expect_send_time datetime    not null comment '消息期望投递时间,大于当前时间,则为延迟消息,否则会立即投递',actual_send_time datetime comment '消息实际投递时间',create_time      datetime comment '创建时间',fail_msg         text comment 'status=2 时,记录消息投递失败的原因',fail_count       int         not null default 0 comment '已投递失败次数',send_retry       smallint    not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',next_retry_time  datetime comment '投递失败后,下次重试时间',update_time      datetime comment '最近更新时间',key idx_status (status)
) comment '本地消息表';

五、代码落地

将延迟消息保存到t_msg

在这里插入图片描述

投递事务消息 or 2分钟内的延时消息

在这里插入图片描述

每分钟执行一次Job,查出2分钟内应该被投递的延迟消息 和 2分钟内应该重新投递的上次投递失败的事务消息,再放到延时队列里

    /*** 每分钟执行一次*/@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.MINUTES)public void sendRetry() {/*** 查询出需要重试的消息(状态为0 and 期望投递时间 <= 当前时间 + 2分钟) || (投递失败的 and 需要重试 and 下次重试时间小于等于当前时间 + 2分钟)* select * from t_msg where ((status = 0 and expect_send_time<=当前时间+2分钟) or (status = 2 and send_retry = 1 and next_retry_time <= 当前时间 + 2分钟))*/LocalDateTime time = LocalDateTime.now().plusMinutes(2);LambdaQueryWrapper<MsgPO> qw = Wrappers.lambdaQuery(MsgPO.class).and(query -> query.and(lq ->lq.eq(MsgPO::getStatus, MsgStatusEnum.INIT.getStatus()).le(MsgPO::getExpectSendTime, time)).or(lq -> lq.eq(MsgPO::getStatus, MsgStatusEnum.FAIL.getStatus()).eq(MsgPO::getSendRetry, 1).le(MsgPO::getNextRetryTime, time)));qw.orderByAsc(MsgPO::getId);//先获取最小的一条记录的idMsgPO minMsgPo = this.msgService.findOne(qw);if (minMsgPo == null) {return;}this.msgSender.sendRetry(minMsgPo);String minMsgId = minMsgPo.getId();//循环中继续向后找出id>minMsgId的所有记录,然后投递重试while (true) {//select * from t_msg where ((status = 0 and expect_send_time<=当前时间+2分钟) or (status = 2 and send_retry = 1 and next_retry_time <= 当前时间 + 2分钟)) and id>#{minMsgId}qw = Wrappers.lambdaQuery(MsgPO.class).and(query -> query.and(lq ->lq.eq(MsgPO::getStatus, MsgStatusEnum.INIT.getStatus()).le(MsgPO::getExpectSendTime, time)).or(lq -> lq.eq(MsgPO::getStatus, MsgStatusEnum.FAIL.getStatus()).eq(MsgPO::getSendRetry, 1).le(MsgPO::getNextRetryTime, time)));qw.gt(MsgPO::getId, minMsgId);qw.orderByAsc(MsgPO::getId);Page<MsgPO> page = new Page<>();page.setCurrent(1);page.setSize(500);this.msgService.page(page, qw);//如果查询出来的为空 || 当前服务要停止了(stop=true),则退出循环if (CollUtils.isEmpty(page.getRecords()) || stop) {break;}//投递重试for (MsgPO msgPO : page.getRecords()) {this.msgSender.sendRetry(msgPO);}// minMsgId = 当前列表最后一条消息的idminMsgId = page.getRecords().get(page.getRecords().size() - 1).getId();}}

在这里插入图片描述

这篇关于MQ专题:延迟消息的通用方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120462

相关文章

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

【专题】2024飞行汽车技术全景报告合集PDF分享(附原数据表)

原文链接: https://tecdat.cn/?p=37628 6月16日,小鹏汇天旅航者X2在北京大兴国际机场临空经济区完成首飞,这也是小鹏汇天的产品在京津冀地区进行的首次飞行。小鹏汇天方面还表示,公司准备量产,并计划今年四季度开启预售小鹏汇天分体式飞行汽车,探索分体式飞行汽车城际通勤。阅读原文,获取专题报告合集全文,解锁文末271份飞行汽车相关行业研究报告。 据悉,业内人士对飞行汽车行业

Android平台播放RTSP流的几种方案探究(VLC VS ExoPlayer VS SmartPlayer)

技术背景 好多开发者需要遴选Android平台RTSP直播播放器的时候,不知道如何选的好,本文针对常用的方案,做个大概的说明: 1. 使用VLC for Android VLC Media Player(VLC多媒体播放器),最初命名为VideoLAN客户端,是VideoLAN品牌产品,是VideoLAN计划的多媒体播放器。它支持众多音频与视频解码器及文件格式,并支持DVD影音光盘,VCD影

JavaFX应用更新检测功能(在线自动更新方案)

JavaFX开发的桌面应用属于C端,一般来说需要版本检测和自动更新功能,这里记录一下一种版本检测和自动更新的方法。 1. 整体方案 JavaFX.应用版本检测、自动更新主要涉及一下步骤: 读取本地应用版本拉取远程版本并比较两个版本如果需要升级,那么拉取更新历史弹出升级控制窗口用户选择升级时,拉取升级包解压,重启应用用户选择忽略时,本地版本标志为忽略版本用户选择取消时,隐藏升级控制窗口 2.

如何选择SDR无线图传方案

在开源软件定义无线电(SDR)领域,有几个项目提供了无线图传的解决方案。以下是一些开源SDR无线图传方案: 1. **OpenHD**:这是一个远程高清数字图像传输的开源解决方案,它使用SDR技术来实现高清视频的无线传输。OpenHD项目提供了一个完整的工具链,包括发射器和接收器的硬件设计以及相应的软件。 2. **USRP(Universal Software Radio Periphera

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

MyBatis 切换不同的类型数据库方案

下属案例例当前结合SpringBoot 配置进行讲解。 背景: 实现一个工程里面在部署阶段支持切换不同类型数据库支持。 方案一 数据源配置 关键代码(是什么数据库,该怎么配就怎么配) spring:datasource:name: test# 使用druid数据源type: com.alibaba.druid.pool.DruidDataSource# @需要修改 数据库连接及驱动u

一种改进的red5集群方案的应用、基于Red5服务器集群负载均衡调度算法研究

转自: 一种改进的red5集群方案的应用: http://wenku.baidu.com/link?url=jYQ1wNwHVBqJ-5XCYq0PRligp6Y5q6BYXyISUsF56My8DP8dc9CZ4pZvpPz1abxJn8fojMrL0IyfmMHStpvkotqC1RWlRMGnzVL1X4IPOa_  基于Red5服务器集群负载均衡调度算法研究 http://ww