【spring boot结合rabbit mq 到点执行,可精确到秒】

2024-02-28 12:04

本文主要是介绍【spring boot结合rabbit mq 到点执行,可精确到秒】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【spring boot结合rabbit mq 到点执行,可精确到秒】

  • 创建队列枚举
  • 创建自定义的队列消息pojo
  • 创建队列和延迟队列
  • 发送mq 消息
  • 接收mq 消息
  • DateTimeUtil
  • 测试
  • 注意点

创建队列枚举

public enum QueueEnum {/*** 各种异步消息频道*/TEST(1,"test","队列频道"),DELAY_TEST(2,"delay_test","延迟延迟频道"),;private Integer code;private String channel;private String desc;QueueEnum(Integer code, String channel, String desc) {this.code = code;this.channel = channel;this.desc = desc;}public Integer getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}public static String findChannelByCode(Integer code) {QueueEnum[] queueEnums = QueueEnum.values();for (QueueEnum queueEnum : queueEnums) {if (code == queueEnum.getCode()) {return queueEnum.getChannel();}}return "";}
}

创建自定义的队列消息pojo


import java.io.Serializable;
import java.time.LocalDate;/**** 队列消息** 注意:涉及序列化问题,请勿将此类移动与修改* @author linjianhui*/
public class QueueMessage implements Serializable {private static final long serialVersionUID = 1L;//自定义的队列枚举private QueueEnum queueEnum;private String activityId;/*** 任务日期- yyyy-MM-dd* 任务日期- yyyy-MM-dd HH:mm:ss*/private String taskDate;private String msgId;public String getActivityId() {return activityId;}public String getTaskDate() {return taskDate==null? LocalDate.now().toString():taskDate;}public void setQueueEnum(QueueEnum queueEnum) {this.queueEnum = queueEnum;}public void setActivityId(String activityId) {this.activityId = activityId;}public void setTaskDate(String taskDate) {this.taskDate = taskDate;}public String getMsgId() {return msgId;}public void setMsgId(String msgId) {this.msgId = msgId;}public QueueEnum getQueueEnum() {return queueEnum;}public QueueMessage() {}public QueueMessage(QueueEnum queueEnum, String activityId) {this.queueEnum = queueEnum;this.activityId = activityId;}public QueueMessage(QueueEnum queueEnum, String activityId,String msgId) {this.queueEnum = queueEnum;this.activityId = activityId;this.msgId=msgId;}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("QueueMessage{");sb.append("queueEnum=").append(queueEnum);sb.append(", activityId='").append(activityId).append('\'');sb.append(", taskDate='").append(taskDate).append('\'');sb.append(", mgsId='").append(msgId).append('\'');sb.append('}');return sb.toString();}

创建队列和延迟队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;import java.util.HashMap;@Configuration
//保证队列的创建优先于监听队列
@Order(1)
public class TestRabbitConfig {@Bean("testQueue")public Queue testQueue() {return new Queue(QueueEnum.TEST.getChannel());}@Bean("testExchange")public DirectExchange testExchange() {return new DirectExchange(QueueEnum.TEST.getChannel());}/*** 将队列绑定到exchange,使用指定的路由key* @return*/@BeanBinding bindingtestQueueToExchange(@Qualifier("testQueue") Queue testQueue, @Qualifier("testExchange")DirectExchange testExchange) {return BindingBuilder.bind(testQueue).to(testExchange).with(QueueEnum.TEST.getChannel());}/*** 描述:定义延迟更新队列【死信队列】*  当队列到期后就会通过死信交换机和路由key,路由到指定队列* x-message-ttl 消息定时时间* x-max-length  队列最大长度* x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange* x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送* @param* @return*/@Bean("delayTestQueue")public Queue delayTestQueue() {HashMap<String, Object> arguments = new HashMap<>(4);//设置延15天// arguments.put("x-message-ttl", 15*24*6*10*60*1000);//需要时可以打开// x-message-ttl这个设置对队列中所有的消息有效【属于队列级别】//如果你想要【为每个消息动态设置过期时间】,你需要在【消息级别】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间// arguments.put("x-message-ttl", 10*60*1000);//10分钟arguments.put("x-max-length", 500000);arguments.put("x-dead-letter-exchange", QueueEnum.TEST.getChannel());arguments.put("x-dead-letter-routing-key", QueueEnum.TEST.getChannel());return new Queue(QueueEnum.DELAY_TEST.getChannel(), true, false, false, arguments);}/*** 描述:定义延迟更新队列交换机* @param* @return*/@Bean("delayTestExchange")public DirectExchange delayTestExchange() {return new DirectExchange(QueueEnum.DELAY_TEST.getChannel());}/*** 描述:绑定延迟更新队列到exchange* @param* @return*/@BeanBinding bindingDelayTestQueueToExchange(@Qualifier("delayTestQueue")Queue delayTestQueue, @Qualifier("delayTestExchange")DirectExchange delayTestExchange) {return BindingBuilder.bind(delayTestQueue).to(delayTestExchange).with(QueueEnum.DELAY_TEST.getChannel());}

发送mq 消息


import com.alibaba.fastjson.JSON;
import com.project.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.time.LocalDateTime;/*** 描述:发送消息*/
@Component
@Slf4j(topic = "sendMqTask")
public class SendMqMessage {@AutowiredRabbitTemplate rabbitTemplate;public void sendTestMessage(QueueMessage queueMessage) {String messageId = StringUtil.getUniqueId("mq-");queueMessage.setMsgId(messageId);rabbitTemplate.convertAndSend(queueMessage.getQueueEnum().getChannel(), queueMessage.getQueueEnum().getChannel(), queueMessage, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 计算时间差long delayInMs = Duration.between(LocalDateTime.now(), DateTimeUtil.fromString2LocalDateTime(queueMessage.getTaskDate())).toMillis();//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。//设置消息多长时间后过期message.getMessageProperties().setExpiration(delayInMs+"");return message;}});}
}    

接收mq 消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.exceptions.PersistenceException;
import org.mybatis.spring.MyBatisSystemException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;/*** 描述:消息消费监听*/
@Component
@Order(2)
@Slf4j(topic = "receiveMqTask")
public class ReceiveMqMessage {// private static final Logger MQ_LOG = LoggerFactory.getLogger("mqTask");@Value("${spring.profiles.active}")private String active;/*** 判断是否是正式环境** @return*/private boolean isProdEnv() {return "prod".equals(active);}/*** 判断是否是测试环境** @return*/private boolean isTestEnv() {return "test".equals(active);}/*** 监听消息队列* @param queueMessage* @param message : org.springframework.amqp.core.Message* @param channel : com.rabbitmq.client.Channel*/@RabbitListener(queues = ApiConstants.TEST)@RabbitHandlerpublic void test(QueueMessage queueMessage, Message message, Channel channel) {String env=isProdEnv()?"正式":isTestEnv()?"测试":active;log.info("====={}== test Mq Message={}",env, queueMessage);// String consumerTag = message.getMessageProperties().getConsumerTag();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("发送时间是:"+ queueMessage.getTaskDate());System.out.println("当前时间是:"+ LocalDateTime.now().toLocalDate()+" "+LocalDateTime.now().toLocalTime());// 手动ACKtry {channel.basicAck(deliveryTag, false);} catch (IOException e) {log.error("MQ手动ACK错误: ", e);}} catch (Exception e) {log.error("test queue 失败");}}
}    

DateTimeUtil

/*** 日期工具类*/
public class DateTimeUtil {/*** yyyy-MM-dd HH:mm:ss*/public static final String FORMAT_DATETIME = "yyyy-MM-dd HH:mm:ss";/*** discription: */public static String getLocalDateTime(LocalDateTime localDateTime) {DateTimeFormatter df = DateTimeFormatter.ofPattern(DateTimeUtil.FORMAT_DATETIME);if (localDateTime != null) {String localTime = df.format(localDateTime);return localTime;}return null;}
}    

测试

@RestController
@RequestMapping(value = "/test")
public class TestController {@Autowiredprivate SendMqMessage sendMqMessage;@RequestMapping(value = "/testMqMessage", method = RequestMethod.GET)public ResultEntity testMqMessage(@RequestParam(value = "second",defaultValue = "20",required = false) Long second){QueueMessage queueMessage = new QueueMessage(QueueEnum.DELAY_TEST,"123");//设置20秒后更新【默认】queueMessage.setTaskDate(DateTimeUtil.getLocalDateTime(LocalDateTime.now().plusSeconds(second)));sendMqMessage.sendTestMessage(queueMessage);return "发送成功";}
}    

注意点

//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。

这篇关于【spring boot结合rabbit mq 到点执行,可精确到秒】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755445

相关文章

Spring Boot项目部署命令java -jar的各种参数及作用详解

《SpringBoot项目部署命令java-jar的各种参数及作用详解》:本文主要介绍SpringBoot项目部署命令java-jar的各种参数及作用的相关资料,包括设置内存大小、垃圾回收... 目录前言一、基础命令结构二、常见的 Java 命令参数1. 设置内存大小2. 配置垃圾回收器3. 配置线程栈大小

SpringBoot实现微信小程序支付功能

《SpringBoot实现微信小程序支付功能》小程序支付功能已成为众多应用的核心需求之一,本文主要介绍了SpringBoot实现微信小程序支付功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作... 目录一、引言二、准备工作(一)微信支付商户平台配置(二)Spring Boot项目搭建(三)配置文件

解决SpringBoot启动报错:Failed to load property source from location 'classpath:/application.yml'

《解决SpringBoot启动报错:Failedtoloadpropertysourcefromlocationclasspath:/application.yml问题》这篇文章主要介绍... 目录在启动SpringBoot项目时报如下错误原因可能是1.yml中语法错误2.yml文件格式是GBK总结在启动S

Spring中配置ContextLoaderListener方式

《Spring中配置ContextLoaderListener方式》:本文主要介绍Spring中配置ContextLoaderListener方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录Spring中配置ContextLoaderLishttp://www.chinasem.cntene

java实现延迟/超时/定时问题

《java实现延迟/超时/定时问题》:本文主要介绍java实现延迟/超时/定时问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java实现延迟/超时/定时java 每间隔5秒执行一次,一共执行5次然后结束scheduleAtFixedRate 和 schedu

Java Optional避免空指针异常的实现

《JavaOptional避免空指针异常的实现》空指针异常一直是困扰开发者的常见问题之一,本文主要介绍了JavaOptional避免空指针异常的实现,帮助开发者编写更健壮、可读性更高的代码,减少因... 目录一、Optional 概述二、Optional 的创建三、Optional 的常用方法四、Optio

Spring Boot项目中结合MyBatis实现MySQL的自动主从切换功能

《SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能》:本文主要介绍SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能,本文分步骤给大家介绍的... 目录原理解析1. mysql主从复制(Master-Slave Replication)2. 读写分离3.

idea maven编译报错Java heap space的解决方法

《ideamaven编译报错Javaheapspace的解决方法》这篇文章主要为大家详细介绍了ideamaven编译报错Javaheapspace的相关解决方法,文中的示例代码讲解详细,感兴趣的... 目录1.增加 Maven 编译的堆内存2. 增加 IntelliJ IDEA 的堆内存3. 优化 Mave

Java String字符串的常用使用方法

《JavaString字符串的常用使用方法》String是JDK提供的一个类,是引用类型,并不是基本的数据类型,String用于字符串操作,在之前学习c语言的时候,对于一些字符串,会初始化字符数组表... 目录一、什么是String二、如何定义一个String1. 用双引号定义2. 通过构造函数定义三、St

springboot filter实现请求响应全链路拦截

《springbootfilter实现请求响应全链路拦截》这篇文章主要为大家详细介绍了SpringBoot如何结合Filter同时拦截请求和响应,从而实现​​日志采集自动化,感兴趣的小伙伴可以跟随小... 目录一、为什么你需要这个过滤器?​​​二、核心实现:一个Filter搞定双向数据流​​​​三、完整代码