【spring boot结合rabbit mq 到点执行,可精确到秒】

2024-02-28 12:04

本文主要是介绍【spring boot结合rabbit mq 到点执行,可精确到秒】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【spring boot结合rabbit mq 到点执行,可精确到秒】

  • 创建队列枚举
  • 创建自定义的队列消息pojo
  • 创建队列和延迟队列
  • 发送mq 消息
  • 接收mq 消息
  • DateTimeUtil
  • 测试
  • 注意点

创建队列枚举

public enum QueueEnum {/*** 各种异步消息频道*/TEST(1,"test","队列频道"),DELAY_TEST(2,"delay_test","延迟延迟频道"),;private Integer code;private String channel;private String desc;QueueEnum(Integer code, String channel, String desc) {this.code = code;this.channel = channel;this.desc = desc;}public Integer getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}public static String findChannelByCode(Integer code) {QueueEnum[] queueEnums = QueueEnum.values();for (QueueEnum queueEnum : queueEnums) {if (code == queueEnum.getCode()) {return queueEnum.getChannel();}}return "";}
}

创建自定义的队列消息pojo


import java.io.Serializable;
import java.time.LocalDate;/**** 队列消息** 注意:涉及序列化问题,请勿将此类移动与修改* @author linjianhui*/
public class QueueMessage implements Serializable {private static final long serialVersionUID = 1L;//自定义的队列枚举private QueueEnum queueEnum;private String activityId;/*** 任务日期- yyyy-MM-dd* 任务日期- yyyy-MM-dd HH:mm:ss*/private String taskDate;private String msgId;public String getActivityId() {return activityId;}public String getTaskDate() {return taskDate==null? LocalDate.now().toString():taskDate;}public void setQueueEnum(QueueEnum queueEnum) {this.queueEnum = queueEnum;}public void setActivityId(String activityId) {this.activityId = activityId;}public void setTaskDate(String taskDate) {this.taskDate = taskDate;}public String getMsgId() {return msgId;}public void setMsgId(String msgId) {this.msgId = msgId;}public QueueEnum getQueueEnum() {return queueEnum;}public QueueMessage() {}public QueueMessage(QueueEnum queueEnum, String activityId) {this.queueEnum = queueEnum;this.activityId = activityId;}public QueueMessage(QueueEnum queueEnum, String activityId,String msgId) {this.queueEnum = queueEnum;this.activityId = activityId;this.msgId=msgId;}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("QueueMessage{");sb.append("queueEnum=").append(queueEnum);sb.append(", activityId='").append(activityId).append('\'');sb.append(", taskDate='").append(taskDate).append('\'');sb.append(", mgsId='").append(msgId).append('\'');sb.append('}');return sb.toString();}

创建队列和延迟队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;import java.util.HashMap;@Configuration
//保证队列的创建优先于监听队列
@Order(1)
public class TestRabbitConfig {@Bean("testQueue")public Queue testQueue() {return new Queue(QueueEnum.TEST.getChannel());}@Bean("testExchange")public DirectExchange testExchange() {return new DirectExchange(QueueEnum.TEST.getChannel());}/*** 将队列绑定到exchange,使用指定的路由key* @return*/@BeanBinding bindingtestQueueToExchange(@Qualifier("testQueue") Queue testQueue, @Qualifier("testExchange")DirectExchange testExchange) {return BindingBuilder.bind(testQueue).to(testExchange).with(QueueEnum.TEST.getChannel());}/*** 描述:定义延迟更新队列【死信队列】*  当队列到期后就会通过死信交换机和路由key,路由到指定队列* x-message-ttl 消息定时时间* x-max-length  队列最大长度* x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange* x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送* @param* @return*/@Bean("delayTestQueue")public Queue delayTestQueue() {HashMap<String, Object> arguments = new HashMap<>(4);//设置延15天// arguments.put("x-message-ttl", 15*24*6*10*60*1000);//需要时可以打开// x-message-ttl这个设置对队列中所有的消息有效【属于队列级别】//如果你想要【为每个消息动态设置过期时间】,你需要在【消息级别】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间// arguments.put("x-message-ttl", 10*60*1000);//10分钟arguments.put("x-max-length", 500000);arguments.put("x-dead-letter-exchange", QueueEnum.TEST.getChannel());arguments.put("x-dead-letter-routing-key", QueueEnum.TEST.getChannel());return new Queue(QueueEnum.DELAY_TEST.getChannel(), true, false, false, arguments);}/*** 描述:定义延迟更新队列交换机* @param* @return*/@Bean("delayTestExchange")public DirectExchange delayTestExchange() {return new DirectExchange(QueueEnum.DELAY_TEST.getChannel());}/*** 描述:绑定延迟更新队列到exchange* @param* @return*/@BeanBinding bindingDelayTestQueueToExchange(@Qualifier("delayTestQueue")Queue delayTestQueue, @Qualifier("delayTestExchange")DirectExchange delayTestExchange) {return BindingBuilder.bind(delayTestQueue).to(delayTestExchange).with(QueueEnum.DELAY_TEST.getChannel());}

发送mq 消息


import com.alibaba.fastjson.JSON;
import com.project.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.time.LocalDateTime;/*** 描述:发送消息*/
@Component
@Slf4j(topic = "sendMqTask")
public class SendMqMessage {@AutowiredRabbitTemplate rabbitTemplate;public void sendTestMessage(QueueMessage queueMessage) {String messageId = StringUtil.getUniqueId("mq-");queueMessage.setMsgId(messageId);rabbitTemplate.convertAndSend(queueMessage.getQueueEnum().getChannel(), queueMessage.getQueueEnum().getChannel(), queueMessage, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 计算时间差long delayInMs = Duration.between(LocalDateTime.now(), DateTimeUtil.fromString2LocalDateTime(queueMessage.getTaskDate())).toMillis();//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。//设置消息多长时间后过期message.getMessageProperties().setExpiration(delayInMs+"");return message;}});}
}    

接收mq 消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.exceptions.PersistenceException;
import org.mybatis.spring.MyBatisSystemException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;/*** 描述:消息消费监听*/
@Component
@Order(2)
@Slf4j(topic = "receiveMqTask")
public class ReceiveMqMessage {// private static final Logger MQ_LOG = LoggerFactory.getLogger("mqTask");@Value("${spring.profiles.active}")private String active;/*** 判断是否是正式环境** @return*/private boolean isProdEnv() {return "prod".equals(active);}/*** 判断是否是测试环境** @return*/private boolean isTestEnv() {return "test".equals(active);}/*** 监听消息队列* @param queueMessage* @param message : org.springframework.amqp.core.Message* @param channel : com.rabbitmq.client.Channel*/@RabbitListener(queues = ApiConstants.TEST)@RabbitHandlerpublic void test(QueueMessage queueMessage, Message message, Channel channel) {String env=isProdEnv()?"正式":isTestEnv()?"测试":active;log.info("====={}== test Mq Message={}",env, queueMessage);// String consumerTag = message.getMessageProperties().getConsumerTag();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("发送时间是:"+ queueMessage.getTaskDate());System.out.println("当前时间是:"+ LocalDateTime.now().toLocalDate()+" "+LocalDateTime.now().toLocalTime());// 手动ACKtry {channel.basicAck(deliveryTag, false);} catch (IOException e) {log.error("MQ手动ACK错误: ", e);}} catch (Exception e) {log.error("test queue 失败");}}
}    

DateTimeUtil

/*** 日期工具类*/
public class DateTimeUtil {/*** yyyy-MM-dd HH:mm:ss*/public static final String FORMAT_DATETIME = "yyyy-MM-dd HH:mm:ss";/*** discription: */public static String getLocalDateTime(LocalDateTime localDateTime) {DateTimeFormatter df = DateTimeFormatter.ofPattern(DateTimeUtil.FORMAT_DATETIME);if (localDateTime != null) {String localTime = df.format(localDateTime);return localTime;}return null;}
}    

测试

@RestController
@RequestMapping(value = "/test")
public class TestController {@Autowiredprivate SendMqMessage sendMqMessage;@RequestMapping(value = "/testMqMessage", method = RequestMethod.GET)public ResultEntity testMqMessage(@RequestParam(value = "second",defaultValue = "20",required = false) Long second){QueueMessage queueMessage = new QueueMessage(QueueEnum.DELAY_TEST,"123");//设置20秒后更新【默认】queueMessage.setTaskDate(DateTimeUtil.getLocalDateTime(LocalDateTime.now().plusSeconds(second)));sendMqMessage.sendTestMessage(queueMessage);return "发送成功";}
}    

注意点

//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。

这篇关于【spring boot结合rabbit mq 到点执行,可精确到秒】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755445

相关文章

nginx -t、nginx -s stop 和 nginx -s reload 命令的详细解析(结合应用场景)

《nginx-t、nginx-sstop和nginx-sreload命令的详细解析(结合应用场景)》本文解析Nginx的-t、-sstop、-sreload命令,分别用于配置语法检... 以下是关于 nginx -t、nginx -s stop 和 nginx -s reload 命令的详细解析,结合实际应

Spring boot整合dubbo+zookeeper的详细过程

《Springboot整合dubbo+zookeeper的详细过程》本文讲解SpringBoot整合Dubbo与Zookeeper实现API、Provider、Consumer模式,包含依赖配置、... 目录Spring boot整合dubbo+zookeeper1.创建父工程2.父工程引入依赖3.创建ap

SpringBoot结合Docker进行容器化处理指南

《SpringBoot结合Docker进行容器化处理指南》在当今快速发展的软件工程领域,SpringBoot和Docker已经成为现代Java开发者的必备工具,本文将深入讲解如何将一个SpringBo... 目录前言一、为什么选择 Spring Bootjavascript + docker1. 快速部署与

Spring Boot spring-boot-maven-plugin 参数配置详解(最新推荐)

《SpringBootspring-boot-maven-plugin参数配置详解(最新推荐)》文章介绍了SpringBootMaven插件的5个核心目标(repackage、run、start... 目录一 spring-boot-maven-plugin 插件的5个Goals二 应用场景1 重新打包应用

SpringBoot+EasyExcel实现自定义复杂样式导入导出

《SpringBoot+EasyExcel实现自定义复杂样式导入导出》这篇文章主要为大家详细介绍了SpringBoot如何结果EasyExcel实现自定义复杂样式导入导出功能,文中的示例代码讲解详细,... 目录安装处理自定义导出复杂场景1、列不固定,动态列2、动态下拉3、自定义锁定行/列,添加密码4、合并

mybatis执行insert返回id实现详解

《mybatis执行insert返回id实现详解》MyBatis插入操作默认返回受影响行数,需通过useGeneratedKeys+keyProperty或selectKey获取主键ID,确保主键为自... 目录 两种方式获取自增 ID:1. ​​useGeneratedKeys+keyProperty(推

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

Java中读取YAML文件配置信息常见问题及解决方法

《Java中读取YAML文件配置信息常见问题及解决方法》:本文主要介绍Java中读取YAML文件配置信息常见问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录1 使用Spring Boot的@ConfigurationProperties2. 使用@Valu

创建Java keystore文件的完整指南及详细步骤

《创建Javakeystore文件的完整指南及详细步骤》本文详解Java中keystore的创建与配置,涵盖私钥管理、自签名与CA证书生成、SSL/TLS应用,强调安全存储及验证机制,确保通信加密和... 目录1. 秘密键(私钥)的理解与管理私钥的定义与重要性私钥的管理策略私钥的生成与存储2. 证书的创建与

浅析Spring如何控制Bean的加载顺序

《浅析Spring如何控制Bean的加载顺序》在大多数情况下,我们不需要手动控制Bean的加载顺序,因为Spring的IoC容器足够智能,但在某些特殊场景下,这种隐式的依赖关系可能不存在,下面我们就来... 目录核心原则:依赖驱动加载手动控制 Bean 加载顺序的方法方法 1:使用@DependsOn(最直