【spring boot结合rabbit mq 到点执行,可精确到秒】

2024-02-28 12:04

本文主要是介绍【spring boot结合rabbit mq 到点执行,可精确到秒】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【spring boot结合rabbit mq 到点执行,可精确到秒】

  • 创建队列枚举
  • 创建自定义的队列消息pojo
  • 创建队列和延迟队列
  • 发送mq 消息
  • 接收mq 消息
  • DateTimeUtil
  • 测试
  • 注意点

创建队列枚举

public enum QueueEnum {/*** 各种异步消息频道*/TEST(1,"test","队列频道"),DELAY_TEST(2,"delay_test","延迟延迟频道"),;private Integer code;private String channel;private String desc;QueueEnum(Integer code, String channel, String desc) {this.code = code;this.channel = channel;this.desc = desc;}public Integer getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}public static String findChannelByCode(Integer code) {QueueEnum[] queueEnums = QueueEnum.values();for (QueueEnum queueEnum : queueEnums) {if (code == queueEnum.getCode()) {return queueEnum.getChannel();}}return "";}
}

创建自定义的队列消息pojo


import java.io.Serializable;
import java.time.LocalDate;/**** 队列消息** 注意:涉及序列化问题,请勿将此类移动与修改* @author linjianhui*/
public class QueueMessage implements Serializable {private static final long serialVersionUID = 1L;//自定义的队列枚举private QueueEnum queueEnum;private String activityId;/*** 任务日期- yyyy-MM-dd* 任务日期- yyyy-MM-dd HH:mm:ss*/private String taskDate;private String msgId;public String getActivityId() {return activityId;}public String getTaskDate() {return taskDate==null? LocalDate.now().toString():taskDate;}public void setQueueEnum(QueueEnum queueEnum) {this.queueEnum = queueEnum;}public void setActivityId(String activityId) {this.activityId = activityId;}public void setTaskDate(String taskDate) {this.taskDate = taskDate;}public String getMsgId() {return msgId;}public void setMsgId(String msgId) {this.msgId = msgId;}public QueueEnum getQueueEnum() {return queueEnum;}public QueueMessage() {}public QueueMessage(QueueEnum queueEnum, String activityId) {this.queueEnum = queueEnum;this.activityId = activityId;}public QueueMessage(QueueEnum queueEnum, String activityId,String msgId) {this.queueEnum = queueEnum;this.activityId = activityId;this.msgId=msgId;}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("QueueMessage{");sb.append("queueEnum=").append(queueEnum);sb.append(", activityId='").append(activityId).append('\'');sb.append(", taskDate='").append(taskDate).append('\'');sb.append(", mgsId='").append(msgId).append('\'');sb.append('}');return sb.toString();}

创建队列和延迟队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;import java.util.HashMap;@Configuration
//保证队列的创建优先于监听队列
@Order(1)
public class TestRabbitConfig {@Bean("testQueue")public Queue testQueue() {return new Queue(QueueEnum.TEST.getChannel());}@Bean("testExchange")public DirectExchange testExchange() {return new DirectExchange(QueueEnum.TEST.getChannel());}/*** 将队列绑定到exchange,使用指定的路由key* @return*/@BeanBinding bindingtestQueueToExchange(@Qualifier("testQueue") Queue testQueue, @Qualifier("testExchange")DirectExchange testExchange) {return BindingBuilder.bind(testQueue).to(testExchange).with(QueueEnum.TEST.getChannel());}/*** 描述:定义延迟更新队列【死信队列】*  当队列到期后就会通过死信交换机和路由key,路由到指定队列* x-message-ttl 消息定时时间* x-max-length  队列最大长度* x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange* x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送* @param* @return*/@Bean("delayTestQueue")public Queue delayTestQueue() {HashMap<String, Object> arguments = new HashMap<>(4);//设置延15天// arguments.put("x-message-ttl", 15*24*6*10*60*1000);//需要时可以打开// x-message-ttl这个设置对队列中所有的消息有效【属于队列级别】//如果你想要【为每个消息动态设置过期时间】,你需要在【消息级别】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间// arguments.put("x-message-ttl", 10*60*1000);//10分钟arguments.put("x-max-length", 500000);arguments.put("x-dead-letter-exchange", QueueEnum.TEST.getChannel());arguments.put("x-dead-letter-routing-key", QueueEnum.TEST.getChannel());return new Queue(QueueEnum.DELAY_TEST.getChannel(), true, false, false, arguments);}/*** 描述:定义延迟更新队列交换机* @param* @return*/@Bean("delayTestExchange")public DirectExchange delayTestExchange() {return new DirectExchange(QueueEnum.DELAY_TEST.getChannel());}/*** 描述:绑定延迟更新队列到exchange* @param* @return*/@BeanBinding bindingDelayTestQueueToExchange(@Qualifier("delayTestQueue")Queue delayTestQueue, @Qualifier("delayTestExchange")DirectExchange delayTestExchange) {return BindingBuilder.bind(delayTestQueue).to(delayTestExchange).with(QueueEnum.DELAY_TEST.getChannel());}

发送mq 消息


import com.alibaba.fastjson.JSON;
import com.project.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.time.LocalDateTime;/*** 描述:发送消息*/
@Component
@Slf4j(topic = "sendMqTask")
public class SendMqMessage {@AutowiredRabbitTemplate rabbitTemplate;public void sendTestMessage(QueueMessage queueMessage) {String messageId = StringUtil.getUniqueId("mq-");queueMessage.setMsgId(messageId);rabbitTemplate.convertAndSend(queueMessage.getQueueEnum().getChannel(), queueMessage.getQueueEnum().getChannel(), queueMessage, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 计算时间差long delayInMs = Duration.between(LocalDateTime.now(), DateTimeUtil.fromString2LocalDateTime(queueMessage.getTaskDate())).toMillis();//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。//设置消息多长时间后过期message.getMessageProperties().setExpiration(delayInMs+"");return message;}});}
}    

接收mq 消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.exceptions.PersistenceException;
import org.mybatis.spring.MyBatisSystemException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;/*** 描述:消息消费监听*/
@Component
@Order(2)
@Slf4j(topic = "receiveMqTask")
public class ReceiveMqMessage {// private static final Logger MQ_LOG = LoggerFactory.getLogger("mqTask");@Value("${spring.profiles.active}")private String active;/*** 判断是否是正式环境** @return*/private boolean isProdEnv() {return "prod".equals(active);}/*** 判断是否是测试环境** @return*/private boolean isTestEnv() {return "test".equals(active);}/*** 监听消息队列* @param queueMessage* @param message : org.springframework.amqp.core.Message* @param channel : com.rabbitmq.client.Channel*/@RabbitListener(queues = ApiConstants.TEST)@RabbitHandlerpublic void test(QueueMessage queueMessage, Message message, Channel channel) {String env=isProdEnv()?"正式":isTestEnv()?"测试":active;log.info("====={}== test Mq Message={}",env, queueMessage);// String consumerTag = message.getMessageProperties().getConsumerTag();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("发送时间是:"+ queueMessage.getTaskDate());System.out.println("当前时间是:"+ LocalDateTime.now().toLocalDate()+" "+LocalDateTime.now().toLocalTime());// 手动ACKtry {channel.basicAck(deliveryTag, false);} catch (IOException e) {log.error("MQ手动ACK错误: ", e);}} catch (Exception e) {log.error("test queue 失败");}}
}    

DateTimeUtil

/*** 日期工具类*/
public class DateTimeUtil {/*** yyyy-MM-dd HH:mm:ss*/public static final String FORMAT_DATETIME = "yyyy-MM-dd HH:mm:ss";/*** discription: */public static String getLocalDateTime(LocalDateTime localDateTime) {DateTimeFormatter df = DateTimeFormatter.ofPattern(DateTimeUtil.FORMAT_DATETIME);if (localDateTime != null) {String localTime = df.format(localDateTime);return localTime;}return null;}
}    

测试

@RestController
@RequestMapping(value = "/test")
public class TestController {@Autowiredprivate SendMqMessage sendMqMessage;@RequestMapping(value = "/testMqMessage", method = RequestMethod.GET)public ResultEntity testMqMessage(@RequestParam(value = "second",defaultValue = "20",required = false) Long second){QueueMessage queueMessage = new QueueMessage(QueueEnum.DELAY_TEST,"123");//设置20秒后更新【默认】queueMessage.setTaskDate(DateTimeUtil.getLocalDateTime(LocalDateTime.now().plusSeconds(second)));sendMqMessage.sendTestMessage(queueMessage);return "发送成功";}
}    

注意点

//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。

这篇关于【spring boot结合rabbit mq 到点执行,可精确到秒】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755445

相关文章

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

Spring MVC如何设置响应

《SpringMVC如何设置响应》本文介绍了如何在Spring框架中设置响应,并通过不同的注解返回静态页面、HTML片段和JSON数据,此外,还讲解了如何设置响应的状态码和Header... 目录1. 返回静态页面1.1 Spring 默认扫描路径1.2 @RestController2. 返回 html2

Spring常见错误之Web嵌套对象校验失效解决办法

《Spring常见错误之Web嵌套对象校验失效解决办法》:本文主要介绍Spring常见错误之Web嵌套对象校验失效解决的相关资料,通过在Phone对象上添加@Valid注解,问题得以解决,需要的朋... 目录问题复现案例解析问题修正总结  问题复现当开发一个学籍管理系统时,我们会提供了一个 API 接口去

Java操作ElasticSearch的实例详解

《Java操作ElasticSearch的实例详解》Elasticsearch是一个分布式的搜索和分析引擎,广泛用于全文搜索、日志分析等场景,本文将介绍如何在Java应用中使用Elastics... 目录简介环境准备1. 安装 Elasticsearch2. 添加依赖连接 Elasticsearch1. 创

Spring核心思想之浅谈IoC容器与依赖倒置(DI)

《Spring核心思想之浅谈IoC容器与依赖倒置(DI)》文章介绍了Spring的IoC和DI机制,以及MyBatis的动态代理,通过注解和反射,Spring能够自动管理对象的创建和依赖注入,而MyB... 目录一、控制反转 IoC二、依赖倒置 DI1. 详细概念2. Spring 中 DI 的实现原理三、