springboot2+activemq(pub+sub单机版本)

2024-06-20 06:08

本文主要是介绍springboot2+activemq(pub+sub单机版本),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.项目依赖,pom.xml配置

<!--activemq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq连接池-->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.14.5</version>
</dependency>

2.yml文件配置:

spring:activemq:user: adminpassword: XXXXXXbroker-url: tcp://XXXXXXXXXXXXX:61616pool:enabled: truemax-connections: 10

3.对应的启动应用需要加注解@EnableJms

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;@SpringBootApplication
@EnableJms
public class LenderApplication {private final static Logger logger = LoggerFactory.getLogger(LenderApplication.class);public static void main(String[] args) {SpringApplication.run(LenderApplication.class, args);logger.info("LenderApplication is success!");}
}

 

4.对应的配置bean的创建

package com.stylefeng.guns.config.mq;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;import javax.jms.Destination;@Configuration
public class ActiveMQConfig {@Value("${loanTopic}")private String loanTopic;@Value("${repayTopic}")private String repayTopic;@Value("${spring.activemq.user}")private String usrName;@Value("${spring.activemq.password}")private String password;@Value("${spring.activemq.broker-url}")private String brokerUrl;@Beanpublic Destination topic() {return new ActiveMQTopic(loanTopic);}/*@Beanpublic Queue queue(){return new ActiveMQQueue(queueName);}*/@Beanpublic Destination topic2() {return new ActiveMQTopic(repayTopic);}@Beanpublic RedeliveryPolicy redeliveryPolicy() {RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(2);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy) {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(usrName, password, brokerUrl);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);activeMQConnectionFactory.setTrustAllPackages(true);return activeMQConnectionFactory;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory connectionFactory, Destination topic) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(loanTopic);return jmsTemplate;}@Beanpublic JmsTemplate jmsTemplate2(ActiveMQConnectionFactory connectionFactory, Destination topic2) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic2);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(repayTopic);return jmsTemplate;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
//    @Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(false);return factory;}@Bean(name = "jmsTopicListener")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);//客户端签收模式factory.setSessionAcknowledgeMode(4);//使用监听模式factory.setPubSubDomain(true);//使用持久化factory.setSubscriptionDurable(true);//客户端ID,根据业务名称定义factory.setClientId("lenderLoadClient");return factory;}@Bean(name = "jmsTopicListener2")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory2(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(true);factory.setSubscriptionDurable(true);factory.setClientId("lenderRepayClient");return factory;}}

ps:这里面使用持久化的监听模式,因为业务需要创建了两个监听工厂!

activemq安全性配置需要

http://activemq.apache.org/objectmessage.html

 activeMQConnectionFactory.setTrustAllPackages(true);

 

5.消费者消息监听

package com.stylefeng.guns.modular.message;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.stylefeng.guns.modular.orderinfo.service.IOrderInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;@EnableJms
@Component
@Slf4j
public class LoanMessageListener {@Autowiredprivate IOrderInfoService orderInfoService;@JmsListener(destination = "loan.success.topic", containerFactory = "jmsTopicListener")public void onMessage(final TextMessage message, Session session) throws JMSException {try {log.info("=======接收到借款消息=={}", message);if (message instanceof TextMessage) {log.info("=======接收到借款消息內容=={}", message.getText());String result = message.getText();JSONObject jsonObject = JSON.parseObject(result);Long borrowId = jsonObject.getLongValue("borrowId");orderInfoService.updateOrderInfo(borrowId);log.info("===========borrowId={}", borrowId);message.acknowledge();}} catch (Exception e) {session.recover();}}}

7:一些说明总结

ps:使用了持久化订阅(消费在离线之后这点时间生产者产成的消息仍然可以在重新上线之后监听到)(防止消息丢失)

订阅模式消息能被多个监听的消息者获取到所有的消息:1:N(同时生产的所有消息都能被消费)(保证消息被消费者全部消费)

消费的重复消费问题:客户端去重

1.第一种,数据库保持数据的唯一性,如果往数据库里面新增一条数据可以根据id主键保证唯一性(单体应用)

2.分布式系统要消息的生产需要用到分布式ID,然后消费者可以把这个监听到的消费过的消息ID存入redis里面,再次监听消费时先查看redis里面是否存在消费过的消息ID。存在就不重复消费。

 

 

这篇关于springboot2+activemq(pub+sub单机版本)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077315

相关文章

Spring Security常见问题及解决方案

《SpringSecurity常见问题及解决方案》SpringSecurity是Spring生态的安全框架,提供认证、授权及攻击防护,支持JWT、OAuth2集成,适用于保护Spring应用,需配置... 目录Spring Security 简介Spring Security 核心概念1. ​Securit

SpringBoot+EasyPOI轻松实现Excel和Word导出PDF

《SpringBoot+EasyPOI轻松实现Excel和Word导出PDF》在企业级开发中,将Excel和Word文档导出为PDF是常见需求,本文将结合​​EasyPOI和​​Aspose系列工具实... 目录一、环境准备与依赖配置1.1 方案选型1.2 依赖配置(商业库方案)二、Excel 导出 PDF

SpringBoot改造MCP服务器的详细说明(StreamableHTTP 类型)

《SpringBoot改造MCP服务器的详细说明(StreamableHTTP类型)》本文介绍了SpringBoot如何实现MCPStreamableHTTP服务器,并且使用CherryStudio... 目录SpringBoot改造MCP服务器(StreamableHTTP)1 项目说明2 使用说明2.1

spring中的@MapperScan注解属性解析

《spring中的@MapperScan注解属性解析》@MapperScan是Spring集成MyBatis时自动扫描Mapper接口的注解,简化配置并支持多数据源,通过属性控制扫描路径和过滤条件,利... 目录一、核心功能与作用二、注解属性解析三、底层实现原理四、使用场景与最佳实践五、注意事项与常见问题六

Spring的RedisTemplate的json反序列泛型丢失问题解决

《Spring的RedisTemplate的json反序列泛型丢失问题解决》本文主要介绍了SpringRedisTemplate中使用JSON序列化时泛型信息丢失的问题及其提出三种解决方案,可以根据性... 目录背景解决方案方案一方案二方案三总结背景在使用RedisTemplate操作redis时我们针对

Spring Boot Maven 插件如何构建可执行 JAR 的核心配置

《SpringBootMaven插件如何构建可执行JAR的核心配置》SpringBoot核心Maven插件,用于生成可执行JAR/WAR,内置服务器简化部署,支持热部署、多环境配置及依赖管理... 目录前言一、插件的核心功能与目标1.1 插件的定位1.2 插件的 Goals(目标)1.3 插件定位1.4 核

如何使用Lombok进行spring 注入

《如何使用Lombok进行spring注入》本文介绍如何用Lombok简化Spring注入,推荐优先使用setter注入,通过注解自动生成getter/setter及构造器,减少冗余代码,提升开发效... Lombok为了开发环境简化代码,好处不用多说。spring 注入方式为2种,构造器注入和setter

SpringBoot整合Dubbo+ZK注册失败的坑及解决

《SpringBoot整合Dubbo+ZK注册失败的坑及解决》使用Dubbo框架时,需在公共pom添加依赖,启动类加@EnableDubbo,实现类用@DubboService替代@Service,配... 目录1.先看下公共的pom(maven创建的pom工程)2.启动类上加@EnableDubbo3.实

SpringBoot整合(ES)ElasticSearch7.8实践

《SpringBoot整合(ES)ElasticSearch7.8实践》本文详细介绍了SpringBoot整合ElasticSearch7.8的教程,涵盖依赖添加、客户端初始化、索引创建与获取、批量插... 目录SpringBoot整合ElasticSearch7.8添加依赖初始化创建SpringBoot项

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种