Spring boot封装rocket mq 教程

2024-01-03 21:20

本文主要是介绍Spring boot封装rocket mq 教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、rocket mq版本

      5.1.3

2、pom引入rocket mq依赖

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency>

3、发送MQ消息工具类

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;@Slf4j
public class MqSendUtil {@SneakyThrowspublic static MessageId sendMq(String topic, String tag, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag(tag)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}@SneakyThrowspublic static MessageId sendMqNoTag(String topic, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}}

4、发送MQ消息测试代码

import cn.hutool.core.util.IdUtil;
import org.recipe.draw.common.util.MqSendUtil;public class MqSendTest {public static void test1() {MqSendUtil.sendMq("demo", "tag", "哈哈哈哈tag", IdUtil.getSnowflakeNextIdStr());}public static void main(String[] args) {test1();}
}

5、MessageContext 消息内容的封装

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Collection;
import java.util.Map;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {private String messageId;private String topic;private String body;private Map<String, String> properties;private Collection<String> keys;private Long deliveryTimestamp;private String bornHost;private Long bornTimestamp;private int deliveryAttempt;}

6、AbstractMqConsumer 发送mq消息的抽象类

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.boot.CommandLineRunner;import java.nio.charset.StandardCharsets;
import java.util.Collections;@Slf4j
public abstract class AbstractMqConsumer implements CommandLineRunner {public abstract String topic();public abstract String consumerGroup();public abstract String tag();public abstract void process(MessageContext messageContext);@Overridepublic void run(String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "127.0.0.1:9080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = StrUtil.isEmpty(tag()) ? "*" : tag();FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。String consumerGroup = consumerGroup();// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = topic();// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。MessageContext context = toMessageContext(messageView);
//                    log.info("收到mq消息主体内容:{}",context);try {process(context);} catch (Exception e) {log.error("处理mq消息出现异常,消息已自动丢弃,不会再投入队列:", e);}return ConsumeResult.SUCCESS;}).build();log.info("消费者初始化完成,topic:{},tag:{},consumerGroup:{}", topic, tag, consumerGroup);}private MessageContext toMessageContext(MessageView messageView) {Long deliveryTimestamp = messageView.getDeliveryTimestamp().isPresent() ? messageView.getDeliveryTimestamp().get() : null;return MessageContext.builder().messageId(messageView.getMessageId().toString()).topic(messageView.getTopic()).body(StandardCharsets.UTF_8.decode(messageView.getBody()).toString()).properties(messageView.getProperties()).keys(messageView.getKeys()).deliveryTimestamp(deliveryTimestamp).bornHost(messageView.getBornHost()).deliveryAttempt(messageView.getDeliveryAttempt()).build();}}

7、具体的消费类

topic指定消费者订阅的话题,comsumerGroup指明该消费者属于哪一个消费者分组,tag表明是否要获取指定标签的消息,process代表具体的业务处理逻辑,具体消息的内容可以MessageContext 类里面获取

import lombok.extern.slf4j.Slf4j;
import org.recipe.draw.common.mqcomsumer.abstracts.AbstractMqConsumer;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class DemoConsumer extends AbstractMqConsumer {@Overridepublic String topic() {return "demo";}@Overridepublic String consumerGroup() {return "demo";}@Overridepublic String tag() {return null;}@Overridepublic void process(MessageContext messageContext) {log.info("收到消息:{}", messageContext);}
}

这篇关于Spring boot封装rocket mq 教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/567099

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听