Spring Boot Messaging Chapter 5 AMQP with Spring Boot

2024-03-02 23:08

本文主要是介绍Spring Boot Messaging Chapter 5 AMQP with Spring Boot,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

摘要:本章讨论高级消息队列协议(AMQP),它是一种不可知的消息协议。您将学习如何使用Spring AMQP模块,它将与RabbitMQ代理通信。RabbitMQ是全球最常用的代理之一,这是因为它易于安装和使用。最好的部分是它是开源的。

AMQP来自金融业,由摩根大通(JPMorgan Chase)于2003年创建。随后,更多的公司围绕它展开工作,以增强这种新的消息传递方式。Rabbit技术通过Erlang编程语言实现了AMQP,这就是RabbitMQ诞生的原因。几年之后,VMware/Pivotal收购了它。

我们将继续介绍一些重要的定义,这些定义将帮助您进一步了解AMQP和RabbitMQ。本章使用amqp-demo和rest-api-amqp项目。本章包含与amqp-demo相关的所有代码,我将让您深入研究rest-api-amqp项目,以完成主要需求,即从RabbitMQ接收新利率并将其发送给感兴趣的消费者。

一:The AMQP Model

本节介绍AMQP 0.9.1模型。现在,这个版本是最常见的。

       AMQP模型由发布到消息路由的消息组成。这些交换器根据绑定(规则)将消息分发到队列。然后消费者从这些队列中获取/提取消息。见图5 - 1。

您将看到,通常作为生产者或消费者,您需要创建到代理的连接,然后使用通道(主传输)将消息发布到exchange或使用队列中的消息。换句话说,AMQP协议通过通道指定多路连接。

AMQP模型具有不同的特性,允许开发人员在创建消息传递应用程序时具有很大的灵活性。这些特性包括消息属性(内容类型、编码、路由键、传递模式等)、消息确认、消息拒绝、消息重新队列、多路复用连接(通道)、虚拟主机(主机隔离环境)、多客户端和路由功能等。

1.1.Exchanges, Bindings, and Queues(消息路由、绑定、队列)

这些关键字也称为AMQP实体。让我们定义这些实体(记住,所有这些仍然是AMQP模型的一部分):

1•Exchanges: 生产者发送消息的实体。交换器将使用绑定将消息路由到正确的队列。交换器具有以下属性:名称、持久性(可以是持久的或瞬态的)、自动删除(当所有队列使用完交换器后删除)和参数(散列映射、代理依赖)。

2•Bindings: 将交换器连接到另一个交换器或队列的规则。这是一个字符串值。

3•Queue: 将消息存储在内存或磁盘中,直到应用程序使用它们为止。队列具有诸如名称、持久属性(在代理重新启动时存活)、独占属性(仅由一个连接使用,这意味着当连接关闭时队列将被删除)、自动删除属性(当最后一个使用者取消订阅时队列将被删除)和参数属性(散列映射、代理依赖)。

AMQP模型提供了四种类型的交换:

1•Direct exchange: 通过队列的绑定与队列之间的一对一关系。有一个默认的exchange,它是一个直接的exchange类型,使用队列的名称作为绑定的路由键。

2•Fanout exchange: 这个交换将为绑定到它的每个队列复制一条消息。你必须像广播一样思考;它非常类似于发布/订阅模式(主题)。

3•Topic exchange: 这个交换类似于直接交换,唯一的区别是它通过使用*(可以完全替换一个单词)和#(可以替换零个或多个单词)选项接受通配符(regex)作为路由键。

4•Headers exchange: 这个交换器将通过比较多个标头来进行路由。如果您希望消息头完全匹配,必须通过添加x-match:all(消息头:键)或通过将x-match:any(消息头:键)添加到相同的消息头来指定。

图5-2说明了这些交换类型。

这个小介绍足以让我们开始使用RabbitMQ并学习如何使用它。

二:RabbitMQ

RabbitMQ是一个开源消息中间件,它实现了AMQP模型(版本0.8.x到AMQP模型的1.0版本)。RabbitMQ是用Erlang编程语言编写的,这使得它既灵活又健壮。

以下是它的一些特点:

1.分布式节点

2.支持集群模式

3.插件的基础;最重要的插件是: 一致性哈希,社区插件

4.具有完全ACID的数据/状态复制(原子性、一致性、隔离性、持久性)

5.可靠性,可扩展性

6.具有镜像队列的高可用性

7.多协议支持:AMQP、MQTT、STOMP、SMTP、XMPP

8.Web控制台和Rest API(用于监视和管理)

9.安全:SSL和LDAP

10.多个客户端库:Java、.net、Ruby、Erlang、Python、Clojure、PHP、JavaScript等。

我们可以写一整本书来定义所有可用的特性,但是我想做的是让您了解RabbitMQ能做什么。(如果你需要更多信息,你可以在 http://www.rabbitmq.com/features.html.查看)。因此,让我们通过创建客户端来开始使用它。

三:RabbitMQ with Spring Boot

Spring Boot依赖于Spring的spring-amqp项目来完成所有的繁重工作来连接、发布、消费和管理RabbitMQ中间件。spring-amqp项目一直是消息应用程序中最常用的模块之一。

那么,为什么我们需要Spring Boot呢?请记住,Spring Boot是一个启动运行时将帮助我们从spring-amqp模块中配置我们需要的一些属性,这些属性可以添加到我们的application.properties文件或通过使用@Configuration类来覆盖其中的一些观点。

spring-amqp项目使用了已知的模板模式,它暴露了一个RabbitTemplate类,它允许我们发布和使用消息,以及其他任务。它还提供了易于使用的消息侦听器,它们连接到队列并使用消息。如果你担心线程、事务、重新连接(在失败的情况下)、管理等,大可不必,因为spring-amqp已经覆盖这些内容。

3.1.Producer(生产者)

让我们从一个简单的生产者开始。ampq-demo项目有你需要开始的所有东西。打开项目,从清单5-1中所示的简单生产者开始。

Listing 5-1. com.micai.spring.messaging.amqp.Producer.java

package com.micai.spring.messaging.amqp;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 10:19* @Description: 生产者*/
@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String routingKey, String message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);}}

清单5-1显示了一个简单的amqp生产者。让我们分析代码的重要部分:

•@Component: 这是Spring框架的一个标记,它将把这个类看作Spring Bean,以便您可以在应用程序中使用它。您将在接下来的主要应用程序中看到这一点。

•RabbitTemplate: 这是Spring AMQP模块的主要类之一,它带来了许多与RabbitMQ交互的功能,如发送、接收和执行RabbitMQ的管理任务。在这个例子中,它被用来转换和发送一条消息给代理。请记住,为了与RabbitMQ交互,您必须打开一个连接并创建一个通道(从该连接),然后将消息发送到消息路由。这个设置将由RabbitTemplate实例处理。

•convertAndSend: 这个方法将把消息转换成正确的类型(到一个字节数组),并将其发送给RabbitMQ中间件。这个特殊的方法有三个参数。第一个参数是消息路由的名称(消息将通过通道发送),第二个参数是路由键(将消息路由到右侧队列的绑定规则)和消息,在本例中只是一个字符串。

正如你所看到的,这是一个非常简单的生产者。值得一提的是,RabbitTemplate类有各种重载方法,可以帮助您发送、监听、使用自定义转换器,并执行专门的任务(稍后您将对此进行更多的了解)。

接下来,让我们看看如何使用这个生产者。打开AmqpDemoApplication.java类,如清单5-2中所示。

package com.micai.spring.messaging;import com.micai.spring.messaging.amqp.Producer;
import com.micai.spring.messaging.amqp.RpcClient;
import org.apache.http.util.Args;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import javax.validation.Valid;/*** @Auther: zhaoxinguo* @Date: 2018/8/6 14:28* @Description:*/
@SpringBootApplication
public class AmqpDemoApplication {public static void main(String [] args) {SpringApplication.run(AmqpDemoApplication.class, args);}@BeanCommandLineRunner simple(@Value("${micai.amqp.exchange:}") String exchange,@Value("${micai.amqp.queue}") String routingKey,Producer producer) {return args -> {producer.sendMessage(exchange, routingKey, "HELLO AMQP!");};}}

清单5-2显示了主Spring Boot应用程序。让我们回顾一下代码:

• @Bean CommandLineRunner: 您已经熟悉了这个注解和接口。当Spring容器初始化所有的bean并准备好使用时,它将被执行。

• @Value("${micai.amqp.exchange:}"): 这个注解评估属性(通过application.properties、命令行参数或环境变量),其中包括了一个micai.amqp.exchange的key。如果没有找到,它只会使用一个空字符串,也就是交换之后的字符串。

• @Value("${micai.amqp.queue}"): 这个注解将评估属性(通过 application.properties、命令行参数或环境变量),其中包括了一个micai.amqp.queue的key。这个密钥是强制性的,所以您将在src/resources/application.properties文件中找到spring-boot-queue的值。

• Producer: 这是一个简单的生产者类(如清单5-1所示)。正如您所看到的,我们使用sendMessage方法发送消息路由的名称、路由键和“HELLO AMQP!”消息。

在您测试它之前,您需要确保您的RabbitMQ中间件已经启动并运行。你还必须设置你的消息路由、绑定和队列,尽管你不需要在这里做这个,因为amqp-demo项目已经为你设置了这个配置。看一看AMQPConfig.java类,在清单5-3。

Listing 5-3. com.micai.spring.messaging.config.AMQPConfig.java

package com.micai.spring.messaging.config;import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 10:49* @Description:*/
@Configuration
@EnableConfigurationProperties(AMQPProperties.class)
public class AMQPConfig {@Beanpublic Queue queue(@Value("${micai.amqp.queue}")String queueName){return new Queue(queueName,false);}}

 清单5-3展示了定义以下内容的AMQPConfig:

• @EnableConfigurationProperties: 这将声明一个具有前缀的自定义属性: micai.amqp.*。这就是为什么我们可以使用 micai.amqp.queue 或者 micai.amqp.exchange 键的原因

• @Bean Queue:这是一个重要的部分,通过编程,我们声明队列(在本例中)将通过返回队列类的新实例来创建。

• @Value("${micai.amqp.queue}"): 这个注解将取得micai.amqp.queue的值,在src/main/resources/应用程序中定义的具有spring-boot-queue值的属性。

 

我认为这个配置非常简单,但是如果您仔细想想,它看起来就像是丢失了交换声明和路由到队列的绑定。好吧,每次你在RabbitMQ中创建一个队列时,它都被绑定到一个默认交换(通常只是声明为空字符串),而路由键恰好是队列的名称。正如您所看到的,我们将队列的名称作为路由键传递给生产者实例。

您现在可以运行申请书,并在RabbitMQ管理控制台中看到spring-boot-queue被创建并有一条消息。输出如图5-3所示。

图5-3显示了日志。在amqp-demo项目中,您将找到com.micai.springmessaging.aop.AMQPAudit.java类。它是一个Around通知,它将记录生产者方法调用。见图5 - 4。

 Figure 5-4. RabbitMQ Management Console (http://localhost:15672/#/queues): spring-boot-queue

图5-4展示了RabbitMQ管理控制台,您可以看到创建的队列和发送的消息。

你知道我们是如何连接到RabbitMQ代理的吗?如果我有一个远程服务器,我需要指定在哪里连接,传递IP,或者用户名或密码,会发生什么?

Spring Boot将会解决这个问题,因为Spring Boot在启动运行时会在类路径中找到spring-boot-starer-amqp的依赖关系,并会询问您是否已经有了ConnectionFactory(它将连接到RabbitMQ),并提供关于消息中间件的所有必要信息。如果您不这样做,它将尝试使用默认设置并寻找本地代理。

如果您想指定一个远程消息中间件,您可以通过提供spring.rabbitmq.*属性来实现在src/main/resources/application.properties 文件。这就足够连接到远程RabbitMQ了。

 

3.2.Consumer(消费者)

现在,让我们讨论一下如何消费使用生产者发送的消息。打开Consumer.java。清单5-4所示的java类

Listing 5-4. com.micai.spring.messaging.amqp.Consumer.java

package com.micai.spring.messaging.amqp;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 11:30* @Description: 消费者*/
@Component
public class Consumer implements MessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);@Overridepublic void onMessage(Message message) {LOGGER.info("接收spring-boot-queue消息内容为:{}", message);}
}

 清单5-4显示了最简单的消费者代码。这是一个异步的消费者,它实现了org.springframework.amqp.core.MessageListener接口和onMessage方法。这个方法将接收消息作为一个org.springframework.amqp.core.Message实例。为了使用这个消费者,您需要提供一些有用的配置设置。打开AMQPConfig.java类。请参见清单5-5中。

Listing 5-5. com.micai.spring.messaging.config.AMQPConfig.java

package com.micai.spring.messaging.config;import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 10:49* @Description:*/
@Configuration
@EnableConfigurationProperties(AMQPProperties.class)
public class AMQPConfig {@Beanpublic Queue queue(@Value("${micai.amqp.queue}")String queueName){return new Queue(queueName,false);}@Beanpublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageListener consumer,@Value("${micai.amqp.queue}")String queueName) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(queueName);container.setMessageListener(consumer);return container;}
}

清单5-5展示了增强的AMQPConfig类(清单5-3),我们有以下内容:

• SimpleMessageListenerContainer: 该类创建了一个消息侦听器容器,它将侦听任何消息的队列。这个类需要建立一个连接工厂、一个消息监听器处理程序和队列(s)的名称。正如您所看到的,这是一个bean,我们需要返回来启动消息侦听器容器。

• ConnectionFactory: 这个接口对于SimpleMessage ListenerContainer类是强制性的,因为它是知道RabbitMQ代理(主机、用户名、密码、vhost等)的那个接口。重要的是要知道这个连接工厂是由Spring Boot连接起来的,要么使用默认设置(根本没有配置),要么通过在应用程序中指定它的属性。带有spring.rabbitmq.*的属性。您也可以将自己的ConnectionFactory声明为bean(通过在方法中声明@Bean)。

• MessageListener: 正如您所看到的,这是方法容器(消费者)的参数之一,Spring将注册com.micia.spring.messaging.amqp.Consumer类作为一个处理程序。然后在调用容器的setMessageListener方法时使用它。

 

注意,我们使用的是queueName,它是由Spring Boot连接起来的,通过micai.amqp.queue属性使用。差不多就是这样。你现在有了一个完整的生产者和消费者。您可以运行程序并查看如图5-5所示的日志。

清单5-5显示了消费者的日志。注意,调用的方法是onMessage,它由org.springframework.amqp.core.Message实例提供。 

3.2.1.Consumer Using Annotations(消费者使用注解)

等一下!我告诉过你Spring Boot会更容易,对吧?在当前的例子中,我们需要注册我们的容器并实现MessageListener接口。好消息是,Spring AMQP提供了一种使用注释的方法,并且在Spring Boot的帮助下,一切都是正确的配置。

转到AMQPConfig类并删除容器方法。它应该类似于清单5-3。打开com.micai.spring.messaging.amqp。AnnotatedConsumer类;它应该类似于清单5-6。

Listing 5-6. com.micai.spring.messaging.amqp.AnnotatedConsumer.java

package com.micai.spring.messaging.amqp;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 13:39* @Description: 基于注解的消费者*/
@Component
public class AnnotatedConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(AnnotatedConsumer.class);@RabbitListener(queues = "${micai.amqp.queue}")public void process(String message) {LOGGER.info("基于注解接收消息内容为:{}", message);}}

清单5-6向您展示了如何在不创建容器的情况下创建消费者,只需添加@Rabbitlistener注解即可。Spring AMQP将为您创建一个容器,它将用Spring Boot的帮助将所有东西连接起来。正如您所看到的,它使用队列作为参数,并且使用了micai.amqp.queue属性。还要看这个方法接收到一个字符串(而不是一个消息对象)。您可以使用您自己的对象,这需要额外的步骤。不要担心,因为我们将在后面的部分中做这个。

您现在可以运行这个项目,您应该得到类似于图5-6的内容。 

 正如您所看到的,用几行代码创建生产者和消费者是非常简单的。接下来,让我们回顾一下如何创建和使用RPC模型。

3.3.RPC

远程过程调用(Remote Procedure Call,RPC)模型是60年代的众多用例之一,当时分布式计算是一个挑战(它仍然是)。RPC模型被认为是一个请求-响应协议,在这个协议中,您有一个客户端,通过向远程服务器发送一条请求消息来执行一个或多个任务,从而启动一个进程。然后远程服务器向客户端发送响应,这样它就可以继续处理这个过程。见图5 - 7。

在Spring Boot中创建RPC消息传递模型非常简单。请记住,Spring Boot依赖于Spring AMQP模块,因此您不必配置RabbitMQ中间件。Spring AMQP将会解决这个问题。让我们复习一下代码,这样你就能更好地了解正在发生的事情。

打开com.micai.spring.messaging.amqp.RpcClient类。它应该类似于清单5-7。

Listing 5-7. com.micai.spring.messaging.amqp.RpcClient.java

package com.micai.spring.messaging.amqp;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 13:52* @Description:*/
@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public Object sendMessage(String exchange, String routingKey, String message) {Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, message);return response;}}

 清单5-7向您展示了RpcClient类,如果您将其与清单5-1(生产者类)进行比较,您将会注意到,被调用的模板方法只有一个不同之处。在这个例子中,我们使用convertSendAndReceive方法,它接受三个参数一个消息路由名称、路由键和消息。它返回一个Object对象(返回通常会被包装成一个 org.springframework.amqp.core.Message实例)。当然,您可以从这个签名中找到更多重载的方法,但是现在,我们将使它尽可能的简单。

接下来,让我们看一下服务器。打开com.micai.spring.messaging.amqp.RpcServer类。参见清单5 - 8。

Listing 5-8. com.micai.spring.messaging.amqp.RpcServer.java

package com.micai.spring.messaging.amqp;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 14:05* @Description:*/
@Component
public class RpcServer {@RabbitListener(queues = "${micai.amqp.queue}")public Message<String> process(String message) {// More Processing here...return MessageBuilder.withPayload("PROCESSED:OK").setHeader("PROCESSED", new SimpleDateFormat("yyyy-MM-dd").format(new Date())).setHeader("CODE", UUID.randomUUID().toString()).build();}}

清单5-8展示了RpcServer类。让我们看看有什么新的特性不同于其他版本:

•@RabbitListener: 您已经熟悉这个注解了。它将创建一个消息侦听器容器,并且它将侦听来自于micai.amqp.queue队列传入的任何消息。(请记住,这是应用程序中application.properties文件指定的属性)。

•Message<String>: 如果您想要增强您的消息,这就是您需要返回的内容,因为它提供了一种使用头部的方法。在这个例子中,我们使用的是字符串类型消息。

•MessageBuilder: 这是一个助手类,它允许您构建一个新的消息、添加/复制标题,等等。正如您所看到的,我们只是创建了一个带有有效负载集的PROCESSED:OK ,并添加几个头信息。

如果您仔细查看清单5-8,您会注意到process方法返回一个String类型的消息,正因为如此,Spring AMQP使用RabbitMQ的直接回复功能。这个特性允许我们将服务器连接到客户端以获得响应,而无需创建reply-queue(这是自版本3.4以来的一个特性)。您不需要担心任何相关数据,因为Spring AMQP将为您创建它。您还可以定制它或创建自己的相关数据。现在,让我们看一看主应用程序。打开你的com.micai.spring.messaging.AmqpDemoApplication类;它应该类似于清单5-9。

Listing 5-9. com.micai.spring.messaging.AmqpDemoApplication.java

package com.micai.spring.messaging;import com.micai.spring.messaging.amqp.Producer;
import com.micai.spring.messaging.amqp.RpcClient;
import org.apache.http.util.Args;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import javax.validation.Valid;/*** @Auther: zhaoxinguo* @Date: 2018/8/6 14:28* @Description:*/
@SpringBootApplication
public class AmqpDemoApplication {public static void main(String [] args) {SpringApplication.run(AmqpDemoApplication.class, args);}@BeanCommandLineRunner simple(@Value("${micai.amqp.exchange:}") String exchange,@Value("${micai.amqp.queue}") String routingKey,RpcClient rpcClient) {return args -> {Object result = rpcClient.sendMessage(exchange, routingKey, "HELLO AMQP/RPC!");assert result != null;};}}

 清单5-9显示了主要的应用程序,您只使用了RpcClient。

在运行RPC示例之前,请确保没有使用相同队列的另一个侦听器。通过运行这款应用,你应该得到类似于图5-8的内容。

有时您需要对RPC有更多的控制。例如,您可能想要一个固定的队列,用于执行应答。为此,您需要向应用程序添加一些配置。参见清单5-10。

Listing 5-10. com.micai.spring.messaging.config.AMQPConfig.java

package com.micai.spring.messaging.config;import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 10:49* @Description:*/
@Configuration
@EnableConfigurationProperties(AMQPProperties.class)
public class AMQPConfig {@Beanpublic Queue queue(@Value("${micai.amqp.queue}")String queueName){return new Queue(queueName,false);}@Autowiredprivate ConnectionFactory connectionFactory;@Value("${micai.amqp.reply-queue}")private String replyQueueName;@Beanpublic RabbitTemplate fixedReplyQueueRabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setReplyAddress(replyQueueName);template.setReplyTimeout(60000L);return template;}@Beanpublic SimpleMessageListenerContainer replyListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueues(replyQueue());container.setMessageListener(fixedReplyQueueRabbitTemplate());return container;}@Beanpublic Queue replyQueue(){return new Queue(replyQueueName,false);}}

 清单5-10向您展示了如何设置一个固定的队列,该队列将由服务器用于应答客户端请求,而客户端将侦听该队列。这里的重要部分是RabbitTemplate,它将使用setReplyAddress方法来配置reply-to队列。也有必要使用与侦听器相同的模板来侦听来自服务器的响应(这是通过将侦听器容器设置为setMessageListener来完成的)。

3.4.Reply Management(回复管理)

使用Spring AMQP的一个很酷的事情是它有一些很好的特性。例如,您可以使用exchange和路由键来创建实际的reply-to场景。换句话说,您在不等待响应的情况下发送一条消息,然后您回复一个特定的交换/队列,它将有另一个流程。

Spring AMQP包含@SendTo注解,您可以将您的回复发送到消息路由或队列。打开com.micai.spring.messaging.amqp.ReplyToService类,如清单5-11所示。

Listing 5-11. com.micai.spring.messaging.amqp.ReplyToService.java

package com.micai.spring.messaging.amqp;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 14:42* @Description:*/
@Component
public class ReplyToService {private static final Logger LOGGER = LoggerFactory.getLogger(ReplyToService.class);@RabbitListener(queues="${micai.amqp.queue}")@SendTo("${micai.amqp.reply-exchange-queue}")public Message<String> replyToProcess(String message){LOGGER.info("message: {}", message);//More Processing here...return MessageBuilder.withPayload("PROCESSED:OK").setHeader("PROCESSED", new SimpleDateFormat("yyyy-MM-dd").format(new Date())).setHeader("CODE", UUID.randomUUID().toString()).build();}
}

清单5-11显示了@SendTo注释。正如您所看到的,replyToProcess方法也用@Rabbitlistener进行注释。这意味着它将侦听一个队列(micai.amqp.queues属性的值)。这里的重要部分是,它将返回一条消息,该消息将被发送到消息路由/路由键(由micai.amqp.reply-exchange-queue属性的值提供)作为@SendTo注释的回复机制。

您可以使用生产者类来发送消息(参见清单5-2),但是在运行这个示例之前,您需要使用RabbitMQ Web控制台执行以下操作:

• 创建一个名为my-exchange的消息路由

• 创建一个队列(使用您想要的任何名称)

• 用my-reply-rk路由键将my-exchange绑定到您刚刚创建的队列

当然,您也可以以编程的方式创建这个

现在你已经准备好实验了。运行这个示例,您应该在已经创建的队列中有一条消息。正如您所看到的,您有另一种方法来完成应答并创建另一个流。

3.5.Flow Control(流量控制)

 流量控制是RabbitMQ最好的特性之一。它将降低那些发送消息的发布者的连接速度,从而这些消息的速度不够快。

RabbitMQ通常会降低速度,有时它会阻塞连接,防止它们被穿透。了解这个特性是件好事,因为您可以使用它来确定瓶颈所在。要么您将并发消费者的数量增加到队列,要么您检查客户的代码,以确定为什么处理消息花费的时间太长。

那么,找到这些瓶颈有多容易呢?找到它们的一种方法是监控您的RabbitMQ控制台,看看您是否触发了流量控制机制。您可以通过检查连接或通道选项卡的状态来确定这一点,它们通常是黄色的,并表示流量控制。然而,有一种更简单的方法。RabbitMQ发送事件,允许您对流量控制作出反应并完成关机。

3.5.1.Blocking/Unblocking Events(阻塞事件/非阻塞事件)

Spring AMQP提供了一种机制,用于阻塞、非阻塞和关机事件。你可以在com.micai.spring.messaging.config.AMQConfig类中找到代码。

如果你想要监听阻塞或非阻塞事件,只需在配置中添加以下代码,即阻塞侦听器事件:

@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate template = new RabbitTemplate(connectionFactory);template.execute(new ChannelCallback<Object>() {public Object doInRabbit(Channel channel) throws Exception {channel.getConnection().addShutdownListener(new ShutdownListener() {public void shutdownCompleted(ShutdownSignalException cause) {// Process the shutdown}});channel.getConnection().addBlockedListener(new BlockedListener() {public void handleUnblocked() throws IOException {// Resume business logic}@Overridepublic void handleBlocked(String reason) throws IOException {// FlowControl -> Logic to handle block}});return null;}});return template;}

如果你只想监听一个失败或RabbitMQ关闭事件,你可以使用下面的代码,ShutdownListener事件

    @Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate template = new RabbitTemplate(connectionFactory);template.execute(new ChannelCallback<Object>() {public Object doInRabbit(Channel channel) throws Exception {channel.getConnection().addShutdownListener(new ShutdownListener() {public void shutdownCompleted(ShutdownSignalException cause) {// Process the shutdown}});return null;}});return template;}

 您可以只拥有一个RabbitTemplate,并将BlockedListener和ShutdownListener加入到相同的代码中。

四:More Features(更多特性)

在Spring AMQP模块中有很多特性,需要一整本书来解释其中的每一个。我在这一节中指出了其中的一些:

•Transactions: Spring AMQP允许您在代码中使用@Transactional注释,所以通过添加这个注释,Spring AMQP模块在事务模式下设置通道。然后它可以执行提交或回滚,这取决于具体情况。您还可以通过定义bean来指定想要使用的事务管理器

@Transactional
public void processInvoice() {String incoming = rabbitTemplate.receiveAndConvert();// Do some more database processing...String reply = reviewInvoice(incoming);rabbitTemplate.convertAndSend(reply);
}

• Multi-Listeners: Spring AMQP版本1.5.0和以上添加了一种新的方式,使一个类根据消息的类型(如果需要)处理多个侦听器。看一看多MultiListenerService的代码。如果需要的话,可以添加@Payload, @Header, and @Headers注解。

package com.micai.spring.messaging.amqp;import com.micai.spring.messaging.domain.Invoice;
import com.micai.spring.messaging.domain.InvoiceWithTax;
import com.micai.spring.messaging.domain.Item;
import com.micai.spring.messaging.domain.Order;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 15:38* @Description:*/
@Component
@RabbitListener(id="multi", queues = "${micai.amqp.queue}")
public class MultiListenerService {@RabbitHandler@SendTo("${micai.amqp.reply-exchange-queue}")public Order processInvoice(Invoice invoice) {Order order = new Order();//Process Invoice here...order.setInvoice(invoice);return order;}@RabbitHandlerpublic Order processInvoiceWithTax(InvoiceWithTax invoiceWithTax) {Order order = new Order();//Process Invoice with Tax here...return order;}@RabbitHandlerpublic String itemProcess(@Header("amqp_receivedRoutingKey") String routingKey, @Payload Item item) {//Some Process here...return "{\"message\": \"OK\"}";}}

• Retries:有时您会得到一个错误,要么通过处理消息,要么通过代理,您将需要在消费者级别重试。你需要在这个场景中使用这个特性:

// Retry for the consumer, normally this needs to be set in the container:
// container.setAdviceChain(new Advice[] { interceptor() });
//@Bean
public StatefulRetryOperationsInterceptor interceptor() {return RetryInterceptorBuilder.stateful().maxAttempts(3).backOffOptions(1000, 2.0, 10000) 
// initialInterval, multiplier, maxInterval.build();
}

• 或者是下面这样:

@Bean
RetryOperationsInterceptor interceptor(RabbitTemplate template,@Value("${micai.amqp.error-exchange:}")String errorExchange,@Value("${micai.amqp.error-routing-key}")String errorExchangeRountingKey) {return RetryInterceptorBuilder.stateless().maxAttempts(3).recoverer(new RepublishMessageRecoverer(template, errorExchange, errorExchangeRountingKey)).build();
}

有更多的。看一看Spring AMQP项目参考,了解更多关于这些令人敬畏的特性的信息。

五:Currency Project(货币项目)

 您可以继续使用货币项目,并添加必要的逻辑,以提供一种方式来消费有关不同市场汇率的信息。您可以检查所有的代码;它已经准备好了。使用演示来发送速率消息(演示在http://projects.spring.io/spring-amqp/)中有所有必要的代码。

六:总结

 本章讨论了AMQP模型,并解释了消息路由、绑定和队列之间的区别。它回顾了一些简单的例子,说明了创建生产者和消费者的方法。

您看到了Spring AMQP模块的一些惊人特性,以及Spring Boot如何帮助您进行配置。

使用Spring消息传递的好处之一是,每个Spring技术都使用相同的概念。我们发送信息的方式(生产者);模板模式的使用(JmsTemplate、RabbitTemplate等);我们使用接口(通过实现MessageListener)或注释(如@Jmslistener、@Rabbitlistener、@sendto)创建监听器(消费者)的方式,很快您就会看到@streamlistener了;通过注释(如@Payload、@Header等)直接访问消息结构的方法,它们在构建中都是相似的。

请记住,这只是您对Spring AMQP所能做的一小部分,我需要写一整本关于Spring AMQP的书来展示每一个特性。你现在拥有了创建很棒的AMQP应用所需的最低知识。下一章介绍了发布/订阅消息,但这次使用Redis作为主要的消息传递引擎。

七:源代码

https://gitee.com/micai/micai-spring-message/tree/master/amqp-demo

这篇关于Spring Boot Messaging Chapter 5 AMQP with Spring Boot的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/767712

相关文章

Java五子棋之坐标校正

上篇针对了Java项目中的解构思维,在这篇内容中我们不妨从整体项目中拆解拿出一个非常重要的五子棋逻辑实现:坐标校正,我们如何使漫无目的鼠标点击变得有序化和可控化呢? 目录 一、从鼠标监听到获取坐标 1.MouseListener和MouseAdapter 2.mousePressed方法 二、坐标校正的具体实现方法 1.关于fillOval方法 2.坐标获取 3.坐标转换 4.坐

Spring Cloud:构建分布式系统的利器

引言 在当今的云计算和微服务架构时代,构建高效、可靠的分布式系统成为软件开发的重要任务。Spring Cloud 提供了一套完整的解决方案,帮助开发者快速构建分布式系统中的一些常见模式(例如配置管理、服务发现、断路器等)。本文将探讨 Spring Cloud 的定义、核心组件、应用场景以及未来的发展趋势。 什么是 Spring Cloud Spring Cloud 是一个基于 Spring

Javascript高级程序设计(第四版)--学习记录之变量、内存

原始值与引用值 原始值:简单的数据即基础数据类型,按值访问。 引用值:由多个值构成的对象即复杂数据类型,按引用访问。 动态属性 对于引用值而言,可以随时添加、修改和删除其属性和方法。 let person = new Object();person.name = 'Jason';person.age = 42;console.log(person.name,person.age);//'J

java8的新特性之一(Java Lambda表达式)

1:Java8的新特性 Lambda 表达式: 允许以更简洁的方式表示匿名函数(或称为闭包)。可以将Lambda表达式作为参数传递给方法或赋值给函数式接口类型的变量。 Stream API: 提供了一种处理集合数据的流式处理方式,支持函数式编程风格。 允许以声明性方式处理数据集合(如List、Set等)。提供了一系列操作,如map、filter、reduce等,以支持复杂的查询和转

Java面试八股之怎么通过Java程序判断JVM是32位还是64位

怎么通过Java程序判断JVM是32位还是64位 可以通过Java程序内部检查系统属性来判断当前运行的JVM是32位还是64位。以下是一个简单的方法: public class JvmBitCheck {public static void main(String[] args) {String arch = System.getProperty("os.arch");String dataM

详细分析Springmvc中的@ModelAttribute基本知识(附Demo)

目录 前言1. 注解用法1.1 方法参数1.2 方法1.3 类 2. 注解场景2.1 表单参数2.2 AJAX请求2.3 文件上传 3. 实战4. 总结 前言 将请求参数绑定到模型对象上,或者在请求处理之前添加模型属性 可以在方法参数、方法或者类上使用 一般适用这几种场景: 表单处理:通过 @ModelAttribute 将表单数据绑定到模型对象上预处理逻辑:在请求处理之前

eclipse运行springboot项目,找不到主类

解决办法尝试了很多种,下载sts压缩包行不通。最后解决办法如图: help--->Eclipse Marketplace--->Popular--->找到Spring Tools 3---->Installed。

JAVA读取MongoDB中的二进制图片并显示在页面上

1:Jsp页面: <td><img src="${ctx}/mongoImg/show"></td> 2:xml配置: <?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001

Java面试题:通过实例说明内连接、左外连接和右外连接的区别

在 SQL 中,连接(JOIN)用于在多个表之间组合行。最常用的连接类型是内连接(INNER JOIN)、左外连接(LEFT OUTER JOIN)和右外连接(RIGHT OUTER JOIN)。它们的主要区别在于它们如何处理表之间的匹配和不匹配行。下面是每种连接的详细说明和示例。 表示例 假设有两个表:Customers 和 Orders。 Customers CustomerIDCus

22.手绘Spring DI运行时序图

1.依赖注入发生的时间 当Spring loC容器完成了 Bean定义资源的定位、载入和解析注册以后,loC容器中已经管理类Bean 定义的相关数据,但是此时loC容器还没有对所管理的Bean进行依赖注入,依赖注入在以下两种情况 发生: 、用户第一次调用getBean()方法时,loC容器触发依赖注入。 、当用户在配置文件中将<bean>元素配置了 lazy-init二false属性,即让