springboot2+activemq(pub+sub单机版本)

2024-06-20 06:08

本文主要是介绍springboot2+activemq(pub+sub单机版本),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.项目依赖,pom.xml配置

<!--activemq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq连接池-->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.14.5</version>
</dependency>

2.yml文件配置:

spring:activemq:user: adminpassword: XXXXXXbroker-url: tcp://XXXXXXXXXXXXX:61616pool:enabled: truemax-connections: 10

3.对应的启动应用需要加注解@EnableJms

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;@SpringBootApplication
@EnableJms
public class LenderApplication {private final static Logger logger = LoggerFactory.getLogger(LenderApplication.class);public static void main(String[] args) {SpringApplication.run(LenderApplication.class, args);logger.info("LenderApplication is success!");}
}

 

4.对应的配置bean的创建

package com.stylefeng.guns.config.mq;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;import javax.jms.Destination;@Configuration
public class ActiveMQConfig {@Value("${loanTopic}")private String loanTopic;@Value("${repayTopic}")private String repayTopic;@Value("${spring.activemq.user}")private String usrName;@Value("${spring.activemq.password}")private String password;@Value("${spring.activemq.broker-url}")private String brokerUrl;@Beanpublic Destination topic() {return new ActiveMQTopic(loanTopic);}/*@Beanpublic Queue queue(){return new ActiveMQQueue(queueName);}*/@Beanpublic Destination topic2() {return new ActiveMQTopic(repayTopic);}@Beanpublic RedeliveryPolicy redeliveryPolicy() {RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(2);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy) {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(usrName, password, brokerUrl);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);activeMQConnectionFactory.setTrustAllPackages(true);return activeMQConnectionFactory;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory connectionFactory, Destination topic) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(loanTopic);return jmsTemplate;}@Beanpublic JmsTemplate jmsTemplate2(ActiveMQConnectionFactory connectionFactory, Destination topic2) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic2);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(repayTopic);return jmsTemplate;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
//    @Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(false);return factory;}@Bean(name = "jmsTopicListener")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);//客户端签收模式factory.setSessionAcknowledgeMode(4);//使用监听模式factory.setPubSubDomain(true);//使用持久化factory.setSubscriptionDurable(true);//客户端ID,根据业务名称定义factory.setClientId("lenderLoadClient");return factory;}@Bean(name = "jmsTopicListener2")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory2(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(true);factory.setSubscriptionDurable(true);factory.setClientId("lenderRepayClient");return factory;}}

ps:这里面使用持久化的监听模式,因为业务需要创建了两个监听工厂!

activemq安全性配置需要

http://activemq.apache.org/objectmessage.html

 activeMQConnectionFactory.setTrustAllPackages(true);

 

5.消费者消息监听

package com.stylefeng.guns.modular.message;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.stylefeng.guns.modular.orderinfo.service.IOrderInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;@EnableJms
@Component
@Slf4j
public class LoanMessageListener {@Autowiredprivate IOrderInfoService orderInfoService;@JmsListener(destination = "loan.success.topic", containerFactory = "jmsTopicListener")public void onMessage(final TextMessage message, Session session) throws JMSException {try {log.info("=======接收到借款消息=={}", message);if (message instanceof TextMessage) {log.info("=======接收到借款消息內容=={}", message.getText());String result = message.getText();JSONObject jsonObject = JSON.parseObject(result);Long borrowId = jsonObject.getLongValue("borrowId");orderInfoService.updateOrderInfo(borrowId);log.info("===========borrowId={}", borrowId);message.acknowledge();}} catch (Exception e) {session.recover();}}}

7:一些说明总结

ps:使用了持久化订阅(消费在离线之后这点时间生产者产成的消息仍然可以在重新上线之后监听到)(防止消息丢失)

订阅模式消息能被多个监听的消息者获取到所有的消息:1:N(同时生产的所有消息都能被消费)(保证消息被消费者全部消费)

消费的重复消费问题:客户端去重

1.第一种,数据库保持数据的唯一性,如果往数据库里面新增一条数据可以根据id主键保证唯一性(单体应用)

2.分布式系统要消息的生产需要用到分布式ID,然后消费者可以把这个监听到的消费过的消息ID存入redis里面,再次监听消费时先查看redis里面是否存在消费过的消息ID。存在就不重复消费。

 

 

这篇关于springboot2+activemq(pub+sub单机版本)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077315

相关文章

SpringBoot线程池配置使用示例详解

《SpringBoot线程池配置使用示例详解》SpringBoot集成@Async注解,支持线程池参数配置(核心数、队列容量、拒绝策略等)及生命周期管理,结合监控与任务装饰器,提升异步处理效率与系统... 目录一、核心特性二、添加依赖三、参数详解四、配置线程池五、应用实践代码说明拒绝策略(Rejected

一文详解SpringBoot中控制器的动态注册与卸载

《一文详解SpringBoot中控制器的动态注册与卸载》在项目开发中,通过动态注册和卸载控制器功能,可以根据业务场景和项目需要实现功能的动态增加、删除,提高系统的灵活性和可扩展性,下面我们就来看看Sp... 目录项目结构1. 创建 Spring Boot 启动类2. 创建一个测试控制器3. 创建动态控制器注

Spring Boot中WebSocket常用使用方法详解

《SpringBoot中WebSocket常用使用方法详解》本文从WebSocket的基础概念出发,详细介绍了SpringBoot集成WebSocket的步骤,并重点讲解了常用的使用方法,包括简单消... 目录一、WebSocket基础概念1.1 什么是WebSocket1.2 WebSocket与HTTP

SpringBoot+Docker+Graylog 如何让错误自动报警

《SpringBoot+Docker+Graylog如何让错误自动报警》SpringBoot默认使用SLF4J与Logback,支持多日志级别和配置方式,可输出到控制台、文件及远程服务器,集成ELK... 目录01 Spring Boot 默认日志框架解析02 Spring Boot 日志级别详解03 Sp

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

Spring WebFlux 与 WebClient 使用指南及最佳实践

《SpringWebFlux与WebClient使用指南及最佳实践》WebClient是SpringWebFlux模块提供的非阻塞、响应式HTTP客户端,基于ProjectReactor实现,... 目录Spring WebFlux 与 WebClient 使用指南1. WebClient 概述2. 核心依

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

SpringBoot整合liteflow的详细过程

《SpringBoot整合liteflow的详细过程》:本文主要介绍SpringBoot整合liteflow的详细过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋...  liteflow 是什么? 能做什么?总之一句话:能帮你规范写代码逻辑 ,编排并解耦业务逻辑,代码