本文主要是介绍Spring Boot Messaging Chapter 5 AMQP with Spring Boot,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
摘要:本章讨论高级消息队列协议(AMQP),它是一种不可知的消息协议。您将学习如何使用Spring AMQP模块,它将与RabbitMQ代理通信。RabbitMQ是全球最常用的代理之一,这是因为它易于安装和使用。最好的部分是它是开源的。
AMQP来自金融业,由摩根大通(JPMorgan Chase)于2003年创建。随后,更多的公司围绕它展开工作,以增强这种新的消息传递方式。Rabbit技术通过Erlang编程语言实现了AMQP,这就是RabbitMQ诞生的原因。几年之后,VMware/Pivotal收购了它。
我们将继续介绍一些重要的定义,这些定义将帮助您进一步了解AMQP和RabbitMQ。本章使用amqp-demo和rest-api-amqp项目。本章包含与amqp-demo相关的所有代码,我将让您深入研究rest-api-amqp项目,以完成主要需求,即从RabbitMQ接收新利率并将其发送给感兴趣的消费者。
一:The AMQP Model
本节介绍AMQP 0.9.1模型。现在,这个版本是最常见的。
AMQP模型由发布到消息路由的消息组成。这些交换器根据绑定(规则)将消息分发到队列。然后消费者从这些队列中获取/提取消息。见图5 - 1。
您将看到,通常作为生产者或消费者,您需要创建到代理的连接,然后使用通道(主传输)将消息发布到exchange或使用队列中的消息。换句话说,AMQP协议通过通道指定多路连接。
AMQP模型具有不同的特性,允许开发人员在创建消息传递应用程序时具有很大的灵活性。这些特性包括消息属性(内容类型、编码、路由键、传递模式等)、消息确认、消息拒绝、消息重新队列、多路复用连接(通道)、虚拟主机(主机隔离环境)、多客户端和路由功能等。
1.1.Exchanges, Bindings, and Queues(消息路由、绑定、队列)
这些关键字也称为AMQP实体。让我们定义这些实体(记住,所有这些仍然是AMQP模型的一部分):
1•Exchanges: 生产者发送消息的实体。交换器将使用绑定将消息路由到正确的队列。交换器具有以下属性:名称、持久性(可以是持久的或瞬态的)、自动删除(当所有队列使用完交换器后删除)和参数(散列映射、代理依赖)。
2•Bindings: 将交换器连接到另一个交换器或队列的规则。这是一个字符串值。
3•Queue: 将消息存储在内存或磁盘中,直到应用程序使用它们为止。队列具有诸如名称、持久属性(在代理重新启动时存活)、独占属性(仅由一个连接使用,这意味着当连接关闭时队列将被删除)、自动删除属性(当最后一个使用者取消订阅时队列将被删除)和参数属性(散列映射、代理依赖)。
AMQP模型提供了四种类型的交换:
1•Direct exchange: 通过队列的绑定与队列之间的一对一关系。有一个默认的exchange,它是一个直接的exchange类型,使用队列的名称作为绑定的路由键。
2•Fanout exchange: 这个交换将为绑定到它的每个队列复制一条消息。你必须像广播一样思考;它非常类似于发布/订阅模式(主题)。
3•Topic exchange: 这个交换类似于直接交换,唯一的区别是它通过使用*(可以完全替换一个单词)和#(可以替换零个或多个单词)选项接受通配符(regex)作为路由键。
4•Headers exchange: 这个交换器将通过比较多个标头来进行路由。如果您希望消息头完全匹配,必须通过添加x-match:all(消息头:键)或通过将x-match:any(消息头:键)添加到相同的消息头来指定。
图5-2说明了这些交换类型。
这个小介绍足以让我们开始使用RabbitMQ并学习如何使用它。
二:RabbitMQ
RabbitMQ是一个开源消息中间件,它实现了AMQP模型(版本0.8.x到AMQP模型的1.0版本)。RabbitMQ是用Erlang编程语言编写的,这使得它既灵活又健壮。
以下是它的一些特点:
1.分布式节点
2.支持集群模式
3.插件的基础;最重要的插件是: 一致性哈希,社区插件
4.具有完全ACID的数据/状态复制(原子性、一致性、隔离性、持久性)
5.可靠性,可扩展性
6.具有镜像队列的高可用性
7.多协议支持:AMQP、MQTT、STOMP、SMTP、XMPP
8.Web控制台和Rest API(用于监视和管理)
9.安全:SSL和LDAP
10.多个客户端库:Java、.net、Ruby、Erlang、Python、Clojure、PHP、JavaScript等。
我们可以写一整本书来定义所有可用的特性,但是我想做的是让您了解RabbitMQ能做什么。(如果你需要更多信息,你可以在 http://www.rabbitmq.com/features.html.查看)。因此,让我们通过创建客户端来开始使用它。
三:RabbitMQ with Spring Boot
Spring Boot依赖于Spring的spring-amqp项目来完成所有的繁重工作来连接、发布、消费和管理RabbitMQ中间件。spring-amqp项目一直是消息应用程序中最常用的模块之一。
那么,为什么我们需要Spring Boot呢?请记住,Spring Boot是一个启动运行时将帮助我们从spring-amqp模块中配置我们需要的一些属性,这些属性可以添加到我们的application.properties文件或通过使用@Configuration类来覆盖其中的一些观点。
spring-amqp项目使用了已知的模板模式,它暴露了一个RabbitTemplate类,它允许我们发布和使用消息,以及其他任务。它还提供了易于使用的消息侦听器,它们连接到队列并使用消息。如果你担心线程、事务、重新连接(在失败的情况下)、管理等,大可不必,因为spring-amqp已经覆盖这些内容。
3.1.Producer(生产者)
让我们从一个简单的生产者开始。ampq-demo项目有你需要开始的所有东西。打开项目,从清单5-1中所示的简单生产者开始。
Listing 5-1. com.micai.spring.messaging.amqp.Producer.java
package com.micai.spring.messaging.amqp;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 10:19* @Description: 生产者*/
@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String routingKey, String message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);}}
清单5-1显示了一个简单的amqp生产者。让我们分析代码的重要部分:
•@Component: 这是Spring框架的一个标记,它将把这个类看作Spring Bean,以便您可以在应用程序中使用它。您将在接下来的主要应用程序中看到这一点。
•RabbitTemplate: 这是Spring AMQP模块的主要类之一,它带来了许多与RabbitMQ交互的功能,如发送、接收和执行RabbitMQ的管理任务。在这个例子中,它被用来转换和发送一条消息给代理。请记住,为了与RabbitMQ交互,您必须打开一个连接并创建一个通道(从该连接),然后将消息发送到消息路由。这个设置将由RabbitTemplate实例处理。
•convertAndSend: 这个方法将把消息转换成正确的类型(到一个字节数组),并将其发送给RabbitMQ中间件。这个特殊的方法有三个参数。第一个参数是消息路由的名称(消息将通过通道发送),第二个参数是路由键(将消息路由到右侧队列的绑定规则)和消息,在本例中只是一个字符串。
正如你所看到的,这是一个非常简单的生产者。值得一提的是,RabbitTemplate类有各种重载方法,可以帮助您发送、监听、使用自定义转换器,并执行专门的任务(稍后您将对此进行更多的了解)。
接下来,让我们看看如何使用这个生产者。打开AmqpDemoApplication.java类,如清单5-2中所示。
package com.micai.spring.messaging;import com.micai.spring.messaging.amqp.Producer;
import com.micai.spring.messaging.amqp.RpcClient;
import org.apache.http.util.Args;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import javax.validation.Valid;/*** @Auther: zhaoxinguo* @Date: 2018/8/6 14:28* @Description:*/
@SpringBootApplication
public class AmqpDemoApplication {public static void main(String [] args) {SpringApplication.run(AmqpDemoApplication.class, args);}@BeanCommandLineRunner simple(@Value("${micai.amqp.exchange:}") String exchange,@Value("${micai.amqp.queue}") String routingKey,Producer producer) {return args -> {producer.sendMessage(exchange, routingKey, "HELLO AMQP!");};}}
清单5-2显示了主Spring Boot应用程序。让我们回顾一下代码:
• @Bean CommandLineRunner: 您已经熟悉了这个注解和接口。当Spring容器初始化所有的bean并准备好使用时,它将被执行。
• @Value("${micai.amqp.exchange:}"): 这个注解评估属性(通过application.properties、命令行参数或环境变量),其中包括了一个micai.amqp.exchange的key。如果没有找到,它只会使用一个空字符串,也就是交换之后的字符串。
• @Value("${micai.amqp.queue}"): 这个注解将评估属性(通过 application.properties、命令行参数或环境变量),其中包括了一个micai.amqp.queue的key。这个密钥是强制性的,所以您将在src/resources/application.properties文件中找到spring-boot-queue的值。
• Producer: 这是一个简单的生产者类(如清单5-1所示)。正如您所看到的,我们使用sendMessage方法发送消息路由的名称、路由键和“HELLO AMQP!”消息。
在您测试它之前,您需要确保您的RabbitMQ中间件已经启动并运行。你还必须设置你的消息路由、绑定和队列,尽管你不需要在这里做这个,因为amqp-demo项目已经为你设置了这个配置。看一看AMQPConfig.java类,在清单5-3。
Listing 5-3. com.micai.spring.messaging.config.AMQPConfig.java
package com.micai.spring.messaging.config;import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 10:49* @Description:*/
@Configuration
@EnableConfigurationProperties(AMQPProperties.class)
public class AMQPConfig {@Beanpublic Queue queue(@Value("${micai.amqp.queue}")String queueName){return new Queue(queueName,false);}}
清单5-3展示了定义以下内容的AMQPConfig:
• @EnableConfigurationProperties: 这将声明一个具有前缀的自定义属性: micai.amqp.*。这就是为什么我们可以使用 micai.amqp.queue 或者 micai.amqp.exchange 键的原因
• @Bean Queue:这是一个重要的部分,通过编程,我们声明队列(在本例中)将通过返回队列类的新实例来创建。
• @Value("${micai.amqp.queue}"): 这个注解将取得micai.amqp.queue的值,在src/main/resources/应用程序中定义的具有spring-boot-queue值的属性。
我认为这个配置非常简单,但是如果您仔细想想,它看起来就像是丢失了交换声明和路由到队列的绑定。好吧,每次你在RabbitMQ中创建一个队列时,它都被绑定到一个默认交换(通常只是声明为空字符串),而路由键恰好是队列的名称。正如您所看到的,我们将队列的名称作为路由键传递给生产者实例。
您现在可以运行申请书,并在RabbitMQ管理控制台中看到spring-boot-queue被创建并有一条消息。输出如图5-3所示。
图5-3显示了日志。在amqp-demo项目中,您将找到com.micai.springmessaging.aop.AMQPAudit.java类。它是一个Around通知,它将记录生产者方法调用。见图5 - 4。
Figure 5-4. RabbitMQ Management Console (http://localhost:15672/#/queues): spring-boot-queue
图5-4展示了RabbitMQ管理控制台,您可以看到创建的队列和发送的消息。
你知道我们是如何连接到RabbitMQ代理的吗?如果我有一个远程服务器,我需要指定在哪里连接,传递IP,或者用户名或密码,会发生什么?
Spring Boot将会解决这个问题,因为Spring Boot在启动运行时会在类路径中找到spring-boot-starer-amqp的依赖关系,并会询问您是否已经有了ConnectionFactory(它将连接到RabbitMQ),并提供关于消息中间件的所有必要信息。如果您不这样做,它将尝试使用默认设置并寻找本地代理。
如果您想指定一个远程消息中间件,您可以通过提供spring.rabbitmq.*属性来实现在src/main/resources/application.properties 文件。这就足够连接到远程RabbitMQ了。
3.2.Consumer(消费者)
现在,让我们讨论一下如何消费使用生产者发送的消息。打开Consumer.java。清单5-4所示的java类
Listing 5-4. com.micai.spring.messaging.amqp.Consumer.java
package com.micai.spring.messaging.amqp;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 11:30* @Description: 消费者*/
@Component
public class Consumer implements MessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);@Overridepublic void onMessage(Message message) {LOGGER.info("接收spring-boot-queue消息内容为:{}", message);}
}
清单5-4显示了最简单的消费者代码。这是一个异步的消费者,它实现了org.springframework.amqp.core.MessageListener接口和onMessage方法。这个方法将接收消息作为一个org.springframework.amqp.core.Message实例。为了使用这个消费者,您需要提供一些有用的配置设置。打开AMQPConfig.java类。请参见清单5-5中。
Listing 5-5. com.micai.spring.messaging.config.AMQPConfig.java
package com.micai.spring.messaging.config;import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 10:49* @Description:*/
@Configuration
@EnableConfigurationProperties(AMQPProperties.class)
public class AMQPConfig {@Beanpublic Queue queue(@Value("${micai.amqp.queue}")String queueName){return new Queue(queueName,false);}@Beanpublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageListener consumer,@Value("${micai.amqp.queue}")String queueName) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(queueName);container.setMessageListener(consumer);return container;}
}
清单5-5展示了增强的AMQPConfig类(清单5-3),我们有以下内容:
• SimpleMessageListenerContainer: 该类创建了一个消息侦听器容器,它将侦听任何消息的队列。这个类需要建立一个连接工厂、一个消息监听器处理程序和队列(s)的名称。正如您所看到的,这是一个bean,我们需要返回来启动消息侦听器容器。
• ConnectionFactory: 这个接口对于SimpleMessage ListenerContainer类是强制性的,因为它是知道RabbitMQ代理(主机、用户名、密码、vhost等)的那个接口。重要的是要知道这个连接工厂是由Spring Boot连接起来的,要么使用默认设置(根本没有配置),要么通过在应用程序中指定它的属性。带有spring.rabbitmq.*的属性。您也可以将自己的ConnectionFactory声明为bean(通过在方法中声明@Bean)。
• MessageListener: 正如您所看到的,这是方法容器(消费者)的参数之一,Spring将注册com.micia.spring.messaging.amqp.Consumer类作为一个处理程序。然后在调用容器的setMessageListener方法时使用它。
注意,我们使用的是queueName,它是由Spring Boot连接起来的,通过micai.amqp.queue属性使用。差不多就是这样。你现在有了一个完整的生产者和消费者。您可以运行程序并查看如图5-5所示的日志。
清单5-5显示了消费者的日志。注意,调用的方法是onMessage,它由org.springframework.amqp.core.Message实例提供。
3.2.1.Consumer Using Annotations(消费者使用注解)
等一下!我告诉过你Spring Boot会更容易,对吧?在当前的例子中,我们需要注册我们的容器并实现MessageListener接口。好消息是,Spring AMQP提供了一种使用注释的方法,并且在Spring Boot的帮助下,一切都是正确的配置。
转到AMQPConfig类并删除容器方法。它应该类似于清单5-3。打开com.micai.spring.messaging.amqp。AnnotatedConsumer类;它应该类似于清单5-6。
Listing 5-6. com.micai.spring.messaging.amqp.AnnotatedConsumer.java
package com.micai.spring.messaging.amqp;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 13:39* @Description: 基于注解的消费者*/
@Component
public class AnnotatedConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(AnnotatedConsumer.class);@RabbitListener(queues = "${micai.amqp.queue}")public void process(String message) {LOGGER.info("基于注解接收消息内容为:{}", message);}}
清单5-6向您展示了如何在不创建容器的情况下创建消费者,只需添加@Rabbitlistener注解即可。Spring AMQP将为您创建一个容器,它将用Spring Boot的帮助将所有东西连接起来。正如您所看到的,它使用队列作为参数,并且使用了micai.amqp.queue属性。还要看这个方法接收到一个字符串(而不是一个消息对象)。您可以使用您自己的对象,这需要额外的步骤。不要担心,因为我们将在后面的部分中做这个。
您现在可以运行这个项目,您应该得到类似于图5-6的内容。
正如您所看到的,用几行代码创建生产者和消费者是非常简单的。接下来,让我们回顾一下如何创建和使用RPC模型。
3.3.RPC
远程过程调用(Remote Procedure Call,RPC)模型是60年代的众多用例之一,当时分布式计算是一个挑战(它仍然是)。RPC模型被认为是一个请求-响应协议,在这个协议中,您有一个客户端,通过向远程服务器发送一条请求消息来执行一个或多个任务,从而启动一个进程。然后远程服务器向客户端发送响应,这样它就可以继续处理这个过程。见图5 - 7。
在Spring Boot中创建RPC消息传递模型非常简单。请记住,Spring Boot依赖于Spring AMQP模块,因此您不必配置RabbitMQ中间件。Spring AMQP将会解决这个问题。让我们复习一下代码,这样你就能更好地了解正在发生的事情。
打开com.micai.spring.messaging.amqp.RpcClient类。它应该类似于清单5-7。
Listing 5-7. com.micai.spring.messaging.amqp.RpcClient.java
package com.micai.spring.messaging.amqp;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 13:52* @Description:*/
@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public Object sendMessage(String exchange, String routingKey, String message) {Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, message);return response;}}
清单5-7向您展示了RpcClient类,如果您将其与清单5-1(生产者类)进行比较,您将会注意到,被调用的模板方法只有一个不同之处。在这个例子中,我们使用convertSendAndReceive方法,它接受三个参数一个消息路由名称、路由键和消息。它返回一个Object对象(返回通常会被包装成一个 org.springframework.amqp.core.Message实例)。当然,您可以从这个签名中找到更多重载的方法,但是现在,我们将使它尽可能的简单。
接下来,让我们看一下服务器。打开com.micai.spring.messaging.amqp.RpcServer类。参见清单5 - 8。
Listing 5-8. com.micai.spring.messaging.amqp.RpcServer.java
package com.micai.spring.messaging.amqp;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 14:05* @Description:*/
@Component
public class RpcServer {@RabbitListener(queues = "${micai.amqp.queue}")public Message<String> process(String message) {// More Processing here...return MessageBuilder.withPayload("PROCESSED:OK").setHeader("PROCESSED", new SimpleDateFormat("yyyy-MM-dd").format(new Date())).setHeader("CODE", UUID.randomUUID().toString()).build();}}
清单5-8展示了RpcServer类。让我们看看有什么新的特性不同于其他版本:
•@RabbitListener: 您已经熟悉这个注解了。它将创建一个消息侦听器容器,并且它将侦听来自于micai.amqp.queue队列传入的任何消息。(请记住,这是应用程序中application.properties文件指定的属性)。
•Message<String>: 如果您想要增强您的消息,这就是您需要返回的内容,因为它提供了一种使用头部的方法。在这个例子中,我们使用的是字符串类型消息。
•MessageBuilder: 这是一个助手类,它允许您构建一个新的消息、添加/复制标题,等等。正如您所看到的,我们只是创建了一个带有有效负载集的PROCESSED:OK ,并添加几个头信息。
如果您仔细查看清单5-8,您会注意到process方法返回一个String类型的消息,正因为如此,Spring AMQP使用RabbitMQ的直接回复功能。这个特性允许我们将服务器连接到客户端以获得响应,而无需创建reply-queue(这是自版本3.4以来的一个特性)。您不需要担心任何相关数据,因为Spring AMQP将为您创建它。您还可以定制它或创建自己的相关数据。现在,让我们看一看主应用程序。打开你的com.micai.spring.messaging.AmqpDemoApplication类;它应该类似于清单5-9。
Listing 5-9. com.micai.spring.messaging.AmqpDemoApplication.java
package com.micai.spring.messaging;import com.micai.spring.messaging.amqp.Producer;
import com.micai.spring.messaging.amqp.RpcClient;
import org.apache.http.util.Args;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import javax.validation.Valid;/*** @Auther: zhaoxinguo* @Date: 2018/8/6 14:28* @Description:*/
@SpringBootApplication
public class AmqpDemoApplication {public static void main(String [] args) {SpringApplication.run(AmqpDemoApplication.class, args);}@BeanCommandLineRunner simple(@Value("${micai.amqp.exchange:}") String exchange,@Value("${micai.amqp.queue}") String routingKey,RpcClient rpcClient) {return args -> {Object result = rpcClient.sendMessage(exchange, routingKey, "HELLO AMQP/RPC!");assert result != null;};}}
清单5-9显示了主要的应用程序,您只使用了RpcClient。
在运行RPC示例之前,请确保没有使用相同队列的另一个侦听器。通过运行这款应用,你应该得到类似于图5-8的内容。
有时您需要对RPC有更多的控制。例如,您可能想要一个固定的队列,用于执行应答。为此,您需要向应用程序添加一些配置。参见清单5-10。
Listing 5-10. com.micai.spring.messaging.config.AMQPConfig.java
package com.micai.spring.messaging.config;import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 10:49* @Description:*/
@Configuration
@EnableConfigurationProperties(AMQPProperties.class)
public class AMQPConfig {@Beanpublic Queue queue(@Value("${micai.amqp.queue}")String queueName){return new Queue(queueName,false);}@Autowiredprivate ConnectionFactory connectionFactory;@Value("${micai.amqp.reply-queue}")private String replyQueueName;@Beanpublic RabbitTemplate fixedReplyQueueRabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setReplyAddress(replyQueueName);template.setReplyTimeout(60000L);return template;}@Beanpublic SimpleMessageListenerContainer replyListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueues(replyQueue());container.setMessageListener(fixedReplyQueueRabbitTemplate());return container;}@Beanpublic Queue replyQueue(){return new Queue(replyQueueName,false);}}
清单5-10向您展示了如何设置一个固定的队列,该队列将由服务器用于应答客户端请求,而客户端将侦听该队列。这里的重要部分是RabbitTemplate,它将使用setReplyAddress方法来配置reply-to队列。也有必要使用与侦听器相同的模板来侦听来自服务器的响应(这是通过将侦听器容器设置为setMessageListener来完成的)。
3.4.Reply Management(回复管理)
使用Spring AMQP的一个很酷的事情是它有一些很好的特性。例如,您可以使用exchange和路由键来创建实际的reply-to场景。换句话说,您在不等待响应的情况下发送一条消息,然后您回复一个特定的交换/队列,它将有另一个流程。
Spring AMQP包含@SendTo注解,您可以将您的回复发送到消息路由或队列。打开com.micai.spring.messaging.amqp.ReplyToService类,如清单5-11所示。
Listing 5-11. com.micai.spring.messaging.amqp.ReplyToService.java
package com.micai.spring.messaging.amqp;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 14:42* @Description:*/
@Component
public class ReplyToService {private static final Logger LOGGER = LoggerFactory.getLogger(ReplyToService.class);@RabbitListener(queues="${micai.amqp.queue}")@SendTo("${micai.amqp.reply-exchange-queue}")public Message<String> replyToProcess(String message){LOGGER.info("message: {}", message);//More Processing here...return MessageBuilder.withPayload("PROCESSED:OK").setHeader("PROCESSED", new SimpleDateFormat("yyyy-MM-dd").format(new Date())).setHeader("CODE", UUID.randomUUID().toString()).build();}
}
清单5-11显示了@SendTo注释。正如您所看到的,replyToProcess方法也用@Rabbitlistener进行注释。这意味着它将侦听一个队列(micai.amqp.queues属性的值)。这里的重要部分是,它将返回一条消息,该消息将被发送到消息路由/路由键(由micai.amqp.reply-exchange-queue属性的值提供)作为@SendTo注释的回复机制。
您可以使用生产者类来发送消息(参见清单5-2),但是在运行这个示例之前,您需要使用RabbitMQ Web控制台执行以下操作:
• 创建一个名为my-exchange的消息路由
• 创建一个队列(使用您想要的任何名称)
• 用my-reply-rk路由键将my-exchange绑定到您刚刚创建的队列
当然,您也可以以编程的方式创建这个
现在你已经准备好实验了。运行这个示例,您应该在已经创建的队列中有一条消息。正如您所看到的,您有另一种方法来完成应答并创建另一个流。
3.5.Flow Control(流量控制)
流量控制是RabbitMQ最好的特性之一。它将降低那些发送消息的发布者的连接速度,从而这些消息的速度不够快。
RabbitMQ通常会降低速度,有时它会阻塞连接,防止它们被穿透。了解这个特性是件好事,因为您可以使用它来确定瓶颈所在。要么您将并发消费者的数量增加到队列,要么您检查客户的代码,以确定为什么处理消息花费的时间太长。
那么,找到这些瓶颈有多容易呢?找到它们的一种方法是监控您的RabbitMQ控制台,看看您是否触发了流量控制机制。您可以通过检查连接或通道选项卡的状态来确定这一点,它们通常是黄色的,并表示流量控制。然而,有一种更简单的方法。RabbitMQ发送事件,允许您对流量控制作出反应并完成关机。
3.5.1.Blocking/Unblocking Events(阻塞事件/非阻塞事件)
Spring AMQP提供了一种机制,用于阻塞、非阻塞和关机事件。你可以在com.micai.spring.messaging.config.AMQConfig类中找到代码。
如果你想要监听阻塞或非阻塞事件,只需在配置中添加以下代码,即阻塞侦听器事件:
@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate template = new RabbitTemplate(connectionFactory);template.execute(new ChannelCallback<Object>() {public Object doInRabbit(Channel channel) throws Exception {channel.getConnection().addShutdownListener(new ShutdownListener() {public void shutdownCompleted(ShutdownSignalException cause) {// Process the shutdown}});channel.getConnection().addBlockedListener(new BlockedListener() {public void handleUnblocked() throws IOException {// Resume business logic}@Overridepublic void handleBlocked(String reason) throws IOException {// FlowControl -> Logic to handle block}});return null;}});return template;}
如果你只想监听一个失败或RabbitMQ关闭事件,你可以使用下面的代码,ShutdownListener事件
@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate template = new RabbitTemplate(connectionFactory);template.execute(new ChannelCallback<Object>() {public Object doInRabbit(Channel channel) throws Exception {channel.getConnection().addShutdownListener(new ShutdownListener() {public void shutdownCompleted(ShutdownSignalException cause) {// Process the shutdown}});return null;}});return template;}
您可以只拥有一个RabbitTemplate,并将BlockedListener和ShutdownListener加入到相同的代码中。
四:More Features(更多特性)
在Spring AMQP模块中有很多特性,需要一整本书来解释其中的每一个。我在这一节中指出了其中的一些:
•Transactions: Spring AMQP允许您在代码中使用@Transactional注释,所以通过添加这个注释,Spring AMQP模块在事务模式下设置通道。然后它可以执行提交或回滚,这取决于具体情况。您还可以通过定义bean来指定想要使用的事务管理器
@Transactional
public void processInvoice() {String incoming = rabbitTemplate.receiveAndConvert();// Do some more database processing...String reply = reviewInvoice(incoming);rabbitTemplate.convertAndSend(reply);
}
• Multi-Listeners: Spring AMQP版本1.5.0和以上添加了一种新的方式,使一个类根据消息的类型(如果需要)处理多个侦听器。看一看多MultiListenerService的代码。如果需要的话,可以添加@Payload, @Header, and @Headers注解。
package com.micai.spring.messaging.amqp;import com.micai.spring.messaging.domain.Invoice;
import com.micai.spring.messaging.domain.InvoiceWithTax;
import com.micai.spring.messaging.domain.Item;
import com.micai.spring.messaging.domain.Order;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/9 15:38* @Description:*/
@Component
@RabbitListener(id="multi", queues = "${micai.amqp.queue}")
public class MultiListenerService {@RabbitHandler@SendTo("${micai.amqp.reply-exchange-queue}")public Order processInvoice(Invoice invoice) {Order order = new Order();//Process Invoice here...order.setInvoice(invoice);return order;}@RabbitHandlerpublic Order processInvoiceWithTax(InvoiceWithTax invoiceWithTax) {Order order = new Order();//Process Invoice with Tax here...return order;}@RabbitHandlerpublic String itemProcess(@Header("amqp_receivedRoutingKey") String routingKey, @Payload Item item) {//Some Process here...return "{\"message\": \"OK\"}";}}
• Retries:有时您会得到一个错误,要么通过处理消息,要么通过代理,您将需要在消费者级别重试。你需要在这个场景中使用这个特性:
// Retry for the consumer, normally this needs to be set in the container:
// container.setAdviceChain(new Advice[] { interceptor() });
//@Bean
public StatefulRetryOperationsInterceptor interceptor() {return RetryInterceptorBuilder.stateful().maxAttempts(3).backOffOptions(1000, 2.0, 10000)
// initialInterval, multiplier, maxInterval.build();
}
• 或者是下面这样:
@Bean
RetryOperationsInterceptor interceptor(RabbitTemplate template,@Value("${micai.amqp.error-exchange:}")String errorExchange,@Value("${micai.amqp.error-routing-key}")String errorExchangeRountingKey) {return RetryInterceptorBuilder.stateless().maxAttempts(3).recoverer(new RepublishMessageRecoverer(template, errorExchange, errorExchangeRountingKey)).build();
}
有更多的。看一看Spring AMQP项目参考,了解更多关于这些令人敬畏的特性的信息。
五:Currency Project(货币项目)
您可以继续使用货币项目,并添加必要的逻辑,以提供一种方式来消费有关不同市场汇率的信息。您可以检查所有的代码;它已经准备好了。使用演示来发送速率消息(演示在http://projects.spring.io/spring-amqp/)中有所有必要的代码。
六:总结
本章讨论了AMQP模型,并解释了消息路由、绑定和队列之间的区别。它回顾了一些简单的例子,说明了创建生产者和消费者的方法。
您看到了Spring AMQP模块的一些惊人特性,以及Spring Boot如何帮助您进行配置。
使用Spring消息传递的好处之一是,每个Spring技术都使用相同的概念。我们发送信息的方式(生产者);模板模式的使用(JmsTemplate、RabbitTemplate等);我们使用接口(通过实现MessageListener)或注释(如@Jmslistener、@Rabbitlistener、@sendto)创建监听器(消费者)的方式,很快您就会看到@streamlistener了;通过注释(如@Payload、@Header等)直接访问消息结构的方法,它们在构建中都是相似的。
请记住,这只是您对Spring AMQP所能做的一小部分,我需要写一整本关于Spring AMQP的书来展示每一个特性。你现在拥有了创建很棒的AMQP应用所需的最低知识。下一章介绍了发布/订阅消息,但这次使用Redis作为主要的消息传递引擎。
七:源代码
https://gitee.com/micai/micai-spring-message/tree/master/amqp-demo
这篇关于Spring Boot Messaging Chapter 5 AMQP with Spring Boot的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!