549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28

2024-01-11 13:10

本文主要是介绍549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

    • 一、Spring 整合 RocketMQ
      • 1.1 消息生产者
      • 1.2 消息消费者
      • 1.3 Spring 配置文件
      • 1.4 运行实例程序
    • 二、参考链接

一、Spring 整合 RocketMQ

不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件,Spring 社区已经通过多种方式提供了对这些中间件产品集成,例如通过 spring-jms 整合 ActiveMQ、通过 Spring AMQP 项目下的 spring-rabbit 整合 RabbitMQ、通过 spring-kafka 整合 kafka ,通过他们可以在 Spring 项目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三种方式,一是将消息生产者和消费者定义成 bean 对象交由 Spring 容器管理,二是使用 RocketMQ 社区的外部项目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通过 spring-jms 方式集成使用,三是如果你的应用是基于 spring-boot 的,可以使用 RocketMQ 的外部项目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比较方便的收发消息。
总的来讲 rocketmq-jms 项目实现了 JMS 1.1 规范的部分内容,目前支持 JMS 中的发布/订阅模型收发消息。rocketmq-spring-boot-starter 项目目前已经支持同步发送、异步发送、单向发送、顺序消费、并行消费、集群消费、广播消费等特性,如果比较喜欢 Spring Boot 这种全家桶的快速开发框架并且现有特性已满足业务要求可以使用该项目。当然从 API 使用上最灵活的还是第一种方式,下面以第一种方式为例简单看下Spring 如何集成 RocketMQ 的。

1.1 消息生产者

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;public class SpringProducer {private Logger logger = Logger.getLogger(getClass());private String producerGroupName;private String nameServerAddr;private DefaultMQProducer producer;public SpringProducer(String producerGroupName, String nameServerAddr) {this.producerGroupName = producerGroupName;this.nameServerAddr = nameServerAddr;}public void init() throws Exception {logger.info("开始启动消息生产者服务...");//创建一个消息生产者,并设置一个消息生产者组producer = new DefaultMQProducer(producerGroupName);//指定 NameServer 地址producer.setNamesrvAddr(nameServerAddr);//初始化 SpringProducer,整个应用生命周期内只需要初始化一次producer.start();logger.info("消息生产者服务启动成功.");}public void destroy() {logger.info("开始关闭消息生产者服务...");producer.shutdown();logger.info("消息生产者服务已关闭.");}public DefaultMQProducer getProducer() {return producer;}
}

消息生产者就是把生产者 DefaultMQProducer 对象的生命周期分成构造函数、init、destroy 三个方法,构造函数中将生产者组名、NameServer 地址作为变量由 Spring 容器在配置时提供,init 方法中实例化 DefaultMQProducer 对象、设置 NameServer 地址、初始化生产者对象,destroy 方法用于生产者对象销毁时清理资源。

1.2 消息消费者

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;public class SpringConsumer {private Logger logger = Logger.getLogger(getClass());private String consumerGroupName;private String nameServerAddr;private String topicName;private DefaultMQPushConsumer consumer;private MessageListenerConcurrently messageListener;public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {this.consumerGroupName = consumerGroupName;this.nameServerAddr = nameServerAddr;this.topicName = topicName;this.messageListener = messageListener;}public void init() throws Exception {logger.info("开始启动消息消费者服务...");//创建一个消息消费者,并设置一个消息消费者组consumer = new DefaultMQPushConsumer(consumerGroupName);//指定 NameServer 地址consumer.setNamesrvAddr(nameServerAddr);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅指定 Topic 下的所有消息consumer.subscribe(topicName, "*");//注册消息监听器consumer.registerMessageListener(messageListener);// 消费者对象在使用之前必须要调用 start 初始化consumer.start();logger.info("消息消费者服务启动成功.");}public void destroy(){logger.info("开始关闭消息消费者服务...");consumer.shutdown();logger.info("消息消费者服务已关闭.");}public DefaultMQPushConsumer getConsumer() {return consumer;}}

同消息生产者类似,消息消费者是把生产者 DefaultMQPushConsumer 对象的生命周期分成构造函数、init、destroy 三个方法,具体含义在介绍 Java 访问 RocketMQ 实例时已经介绍过了,不再赘述。当然,有了消费者对象还需要消息监听器在接收到消息后执行具体的处理逻辑。

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class MessageListener implements MessageListenerConcurrently {private Logger logger = Logger.getLogger(getClass());public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (list != null) {for (MessageExt ext : list) {try {logger.info("监听到消息 : " + new String(ext.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}

消息监听器类就是把前面 Java 示例中注册消息监听器时声明的匿名内部类代码抽取出来定义成单独一个类而已。

1.3 Spring 配置文件

因为只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要额外添加依赖包了。本例中将消息生产者和消息消费者分成两个配置文件,这样能更好的演示收发消息的效果。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="producerGroupName" value="spring_producer_group"/></bean>
</beans>

消息生产者配置很简单,定义了一个消息生产者对象,该对象初始化时调用 init 方法,对象销毁前执行 destroy 方法,将 Name Server 地址和生产者组配置好。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" /><bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="consumerGroupName" value="spring_consumer_group"/><constructor-arg name="topicName" value="spring-rocketMQ-topic" /><constructor-arg name="messageListener" ref="messageListener" /></bean></beans>

消息消费者同消息生产者配置类似,多了一个消息监听器对象的定义和绑定。

1.4 运行实例程序

按前述步骤 启动 Name Server 和 Broker,接着运行消息生产者和消息消费者程序,简化起见我们用两个单元测试类模拟这两个程序:

package org.study.mq.rocketMQ.spring;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringProducerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");}@Testpublic void sendMessage() throws Exception {SpringProducer producer = container.getBean(SpringProducer.class);for (int i = 0; i < 20; i++) {//创建一条消息对象,指定其主题、标签和消息内容Message msg = new Message("spring-rocketMQ-topic",null,("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */);//发送消息并返回结果SendResult sendResult = producer.getProducer().send(msg);System.out.printf("%s%n", sendResult);}}
}

SpringProducerTest 类模拟消息生产者发送消息。

package org.study.mq.rocketMQ.spring;import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringConsumerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");}@Testpublic void consume() throws Exception {SpringConsumer consumer = container.getBean(SpringConsumer.class);Thread.sleep(200 * 1000);consumer.destroy();}
}

SpringConsumerTest 类模拟消息消费者者接收消息,在 consume 方法返回之前需要让当前线程睡眠一段时间,使消费者程序继续存活才能监听到生产者发送的消息。

分别运行 SpringProducerTest 类 和 SpringConsumerTest 类,在 SpringConsumerTest 的控制台能看到接收的消息:
在这里插入图片描述
假如启动两个 SpringConsumerTest 类进程,因为它们属于同一消费者组,在 SpringConsumerTest 的控制台能看到它们均摊到了消息:
在这里插入图片描述
在这里插入图片描述

二、参考链接

[01] 消息队列之 RocketMQ

这篇关于549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/594470

相关文章

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

Java集合中的List超详细讲解

《Java集合中的List超详细讲解》本文详细介绍了Java集合框架中的List接口,包括其在集合中的位置、继承体系、常用操作和代码示例,以及不同实现类(如ArrayList、LinkedList和V... 目录一,List的继承体系二,List的常用操作及代码示例1,创建List实例2,增加元素3,访问元

SpringBoot整合easy-es的详细过程

《SpringBoot整合easy-es的详细过程》本文介绍了EasyES,一个基于Elasticsearch的ORM框架,旨在简化开发流程并提高效率,EasyES支持SpringBoot框架,并提供... 目录一、easy-es简介二、实现基于Spring Boot框架的应用程序代码1.添加相关依赖2.添

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Goland debug失效详细解决步骤(合集)

《Golanddebug失效详细解决步骤(合集)》今天用Goland开发时,打断点,以debug方式运行,发现程序并没有断住,程序跳过了断点,直接运行结束,网上搜寻了大量文章,最后得以解决,特此在这... 目录Bug:Goland debug失效详细解决步骤【合集】情况一:Go或Goland架构不对情况二:

Python itertools中accumulate函数用法及使用运用详细讲解

《Pythonitertools中accumulate函数用法及使用运用详细讲解》:本文主要介绍Python的itertools库中的accumulate函数,该函数可以计算累积和或通过指定函数... 目录1.1前言:1.2定义:1.3衍生用法:1.3Leetcode的实际运用:总结 1.1前言:本文将详

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll