springboot2+activemq(pub+sub单机版本)

2024-06-20 06:08

本文主要是介绍springboot2+activemq(pub+sub单机版本),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.项目依赖,pom.xml配置

<!--activemq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq连接池-->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.14.5</version>
</dependency>

2.yml文件配置:

spring:activemq:user: adminpassword: XXXXXXbroker-url: tcp://XXXXXXXXXXXXX:61616pool:enabled: truemax-connections: 10

3.对应的启动应用需要加注解@EnableJms

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;@SpringBootApplication
@EnableJms
public class LenderApplication {private final static Logger logger = LoggerFactory.getLogger(LenderApplication.class);public static void main(String[] args) {SpringApplication.run(LenderApplication.class, args);logger.info("LenderApplication is success!");}
}

 

4.对应的配置bean的创建

package com.stylefeng.guns.config.mq;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;import javax.jms.Destination;@Configuration
public class ActiveMQConfig {@Value("${loanTopic}")private String loanTopic;@Value("${repayTopic}")private String repayTopic;@Value("${spring.activemq.user}")private String usrName;@Value("${spring.activemq.password}")private String password;@Value("${spring.activemq.broker-url}")private String brokerUrl;@Beanpublic Destination topic() {return new ActiveMQTopic(loanTopic);}/*@Beanpublic Queue queue(){return new ActiveMQQueue(queueName);}*/@Beanpublic Destination topic2() {return new ActiveMQTopic(repayTopic);}@Beanpublic RedeliveryPolicy redeliveryPolicy() {RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(2);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy) {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(usrName, password, brokerUrl);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);activeMQConnectionFactory.setTrustAllPackages(true);return activeMQConnectionFactory;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory connectionFactory, Destination topic) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(loanTopic);return jmsTemplate;}@Beanpublic JmsTemplate jmsTemplate2(ActiveMQConnectionFactory connectionFactory, Destination topic2) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(connectionFactory);
//        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setDefaultDestination(topic2);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
//        jmsTemplate.setDefaultDestinationName(repayTopic);return jmsTemplate;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
//    @Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(false);return factory;}@Bean(name = "jmsTopicListener")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);//客户端签收模式factory.setSessionAcknowledgeMode(4);//使用监听模式factory.setPubSubDomain(true);//使用持久化factory.setSubscriptionDurable(true);//客户端ID,根据业务名称定义factory.setClientId("lenderLoadClient");return factory;}@Bean(name = "jmsTopicListener2")public JmsListenerContainerFactory<?> jmsTopicListenerContainerFactory2(ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setSessionTransacted(true);factory.setAutoStartup(true);factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);factory.setPubSubDomain(true);factory.setSubscriptionDurable(true);factory.setClientId("lenderRepayClient");return factory;}}

ps:这里面使用持久化的监听模式,因为业务需要创建了两个监听工厂!

activemq安全性配置需要

http://activemq.apache.org/objectmessage.html

 activeMQConnectionFactory.setTrustAllPackages(true);

 

5.消费者消息监听

package com.stylefeng.guns.modular.message;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.stylefeng.guns.modular.orderinfo.service.IOrderInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;@EnableJms
@Component
@Slf4j
public class LoanMessageListener {@Autowiredprivate IOrderInfoService orderInfoService;@JmsListener(destination = "loan.success.topic", containerFactory = "jmsTopicListener")public void onMessage(final TextMessage message, Session session) throws JMSException {try {log.info("=======接收到借款消息=={}", message);if (message instanceof TextMessage) {log.info("=======接收到借款消息內容=={}", message.getText());String result = message.getText();JSONObject jsonObject = JSON.parseObject(result);Long borrowId = jsonObject.getLongValue("borrowId");orderInfoService.updateOrderInfo(borrowId);log.info("===========borrowId={}", borrowId);message.acknowledge();}} catch (Exception e) {session.recover();}}}

7:一些说明总结

ps:使用了持久化订阅(消费在离线之后这点时间生产者产成的消息仍然可以在重新上线之后监听到)(防止消息丢失)

订阅模式消息能被多个监听的消息者获取到所有的消息:1:N(同时生产的所有消息都能被消费)(保证消息被消费者全部消费)

消费的重复消费问题:客户端去重

1.第一种,数据库保持数据的唯一性,如果往数据库里面新增一条数据可以根据id主键保证唯一性(单体应用)

2.分布式系统要消息的生产需要用到分布式ID,然后消费者可以把这个监听到的消费过的消息ID存入redis里面,再次监听消费时先查看redis里面是否存在消费过的消息ID。存在就不重复消费。

 

 

这篇关于springboot2+activemq(pub+sub单机版本)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077315

相关文章

Spring配置扩展之JavaConfig的使用小结

《Spring配置扩展之JavaConfig的使用小结》JavaConfig是Spring框架中基于纯Java代码的配置方式,用于替代传统的XML配置,通过注解(如@Bean)定义Spring容器的组... 目录JavaConfig 的概念什么是JavaConfig?为什么使用 JavaConfig?Jav

Spring Boot Interceptor的原理、配置、顺序控制及与Filter的关键区别对比分析

《SpringBootInterceptor的原理、配置、顺序控制及与Filter的关键区别对比分析》本文主要介绍了SpringBoot中的拦截器(Interceptor)及其与过滤器(Filt... 目录前言一、核心功能二、拦截器的实现2.1 定义自定义拦截器2.2 注册拦截器三、多拦截器的执行顺序四、过

springboot的controller中如何获取applicatim.yml的配置值

《springboot的controller中如何获取applicatim.yml的配置值》本文介绍了在SpringBoot的Controller中获取application.yml配置值的四种方式,... 目录1. 使用@Value注解(最常用)application.yml 配置Controller 中

springboot中配置logback-spring.xml的方法

《springboot中配置logback-spring.xml的方法》文章介绍了如何在SpringBoot项目中配置logback-spring.xml文件来进行日志管理,包括如何定义日志输出方式、... 目录一、在src/main/resources目录下,也就是在classpath路径下创建logba

SpringBoot返回文件让前端下载的几种方式

《SpringBoot返回文件让前端下载的几种方式》文章介绍了开发中文件下载的两种常见解决方案,并详细描述了通过后端进行下载的原理和步骤,包括一次性读取到内存和分块写入响应输出流两种方法,此外,还提供... 目录01 背景02 一次性读取到内存,通过响应输出流输出到前端02 将文件流通过循环写入到响应输出流

SpringBoot实现图形验证码的示例代码

《SpringBoot实现图形验证码的示例代码》验证码的实现方式有很多,可以由前端实现,也可以由后端进行实现,也有很多的插件和工具包可以使用,在这里,我们使用Hutool提供的小工具实现,本文介绍Sp... 目录项目创建前端代码实现约定前后端交互接口需求分析接口定义Hutool工具实现服务器端代码引入依赖获

SpringBoot项目整合Netty启动失败的常见错误总结

《SpringBoot项目整合Netty启动失败的常见错误总结》本文总结了SpringBoot集成Netty时常见的8类问题及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参... 目录一、端口冲突问题1. Tomcat与Netty端口冲突二、主线程被阻塞问题1. Netty启动阻

SpringBoot的内嵌和外置tomcat的实现方式

《SpringBoot的内嵌和外置tomcat的实现方式》本文主要介绍了在SpringBoot中定制和修改Servlet容器的配置,包括内嵌式和外置式Servlet容器的配置方法,文中通过示例代码介绍... 目录1.内嵌如何定制和修改Servlet容器的相关配置注册Servlet三大组件Servlet注册详

SpringBoot+Vue3整合SSE实现实时消息推送功能

《SpringBoot+Vue3整合SSE实现实时消息推送功能》在日常开发中,我们经常需要实现实时消息推送的功能,这篇文章将基于SpringBoot和Vue3来简单实现一个入门级的例子,下面小编就和大... 目录前言先大概介绍下SSE后端实现(SpringBoot)前端实现(vue3)1. 数据类型定义2.

Spring Boot基于 JWT 优化 Spring Security 无状态登录实战指南

《SpringBoot基于JWT优化SpringSecurity无状态登录实战指南》本文介绍如何使用JWT优化SpringSecurity实现无状态登录,提高接口安全性,并通过实际操作步骤... 目录Spring Boot 实战:基于 JWT 优化 Spring Security 无状态登录一、先搞懂:为什