本文主要是介绍Spring Boot Messaging Chapter 4 JMS with Spring Boot,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
摘要:Java Message Service(JMS)在2001年6月发布,版本1.0.2 b。这是另一个在两个或多个客户机之间发送消息的解决方案。它被认为是信息的一部分当时的中间件(MOM)技术组。这个想法是为一个经常出现的问题提供一个API,一个生产者消费者用例,它允许在分布式环境中松散耦合、可靠和异步的组件。本章从一个简单的项目开始,它将帮助您理解JMS客户机如何工作,以及如何使用Spring Boot来配置它。然后,我们将使用这些知识来构建以前的项目——货币REST API,现在它将成为一个保存新速率的接收器。那么,让我们开始吧。
一:JMS
JMS API提供了两种消息传递模型点对点和发布-订阅。点到点是消息被传递给接收者的地方,并且只有一个连接到一个队列的消费者(参见图4-1)。
发布-订阅模型是将消息传递给零个或更多的消费者(通常称为订阅者)的地方。发布者为想要订阅它的所有客户创建一个消息主题(参见图4-2)。
JMS是需要实现的必需的API。为了使用或创建JMS应用程序,您需要选择一个提供程序(通常称为JMS服务器或代理),将连接和分离你的发送者/出版商从你的接收器/用户,客户端会产生/发送或接收/订阅信息,包含实际消息的JMS消息(载荷),和一个JMS队列点到点消息传递或主题的发布-订阅场景。我们首先谈论的是客户端。
1.1.JMS with Java
让我们首先看看如何在Java中创建点到点发送方客户端;参见清单4-1.
清单4。点对点发送方客户端代码
//Step 1. Create the Connection
InitialContext ctx = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory)ctx.
lookup("connectionFactory");
QueueConnection connection = factory.createQueueConnection();
connection.start();
//Step 2. Create a Queue Session
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
//Step 3. Get the Queue object
Queue queue =( Queue)ctx.lookup("myQueue");
//Step 4. Create the Sender
QueueSender sender = session.createSender(queue);
//Step 5. Create the Message
TextMessage msg = session.createTextMessage();
msg.setText("Hello World");
//Step 6. Send the Message
sender.send(msg);
如您在清单4-1中所看到的,这个过程非常简单。发送一条短信只有6个步骤。在步骤1中,您需要知道要使用哪个连接过程。通常您需要包含jndi。您的代码中的属性文件与您的JMS提供者的一些信息;例如,如果你使用Apache ActiveMQ,你需要指定它的属性,如清单4-2(jndi.properties)所示。
清单4 - 2。jndi。Apache ActiveMQ的属性
# Initial Context for the Apache ActiveMQ
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory# This property must be the same as the one declared in the ctx.lookup statement.
# by default is: connectionFactory or ConnectionFactory
# connectionFactoryNames = connectionFactory, queueConnectionFactory,
queueConnectionFactory# Memory Broker = vm://localhost
# External Broker = tcp://hostname:61616
java.naming.provider.url=vm://localhost# Queue naming rules:
# queue.[jndiName] = [physicalName]
queue.myQueue = apress.MyQueue# Topic naming rules:
# topic.[jndiName] = [physicalName]
topic.myTopic = apress.MyTopic
清单4-2显示了jndi。您需要在每个JMS应用程序中包含的属性文件。在这种情况下,它使用Apache ActiveMQ设置和队列和主题的命名约定。现在,让我们看一下接收者,如清单4-3所示。
清单4 - 3。点对点接收者客户端代码段代码
// Step 1. Create Connection
InitialContext ctx = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("connectionFactory");
QueueConnection connection = factory.createQueueConnection();
connection.start();
// Step 2. Create Session
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 3. Get the Queue
Queue queue=(Queue)ctx.lookup("myQueue");
// Step 4. Create the Receiver
QueueReceiver receiver = session.createReceiver(queue);
// Step 5. Create the Listener
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
//Process the message here
}
};
// Step 6. Register the Listener
receiver.setMessageListener(listener);
清单4-3展示了为点到点消息创建消费者所需的六个步骤。当然,如果您在一个单独的项目中有这段代码,您需要包含jndi。属性也一样(参见清单4-2)。
如果您想使用发布-订阅者消息传递模型,您可以创建您的发布者,如清单4-4所示。
清单4-4.Publisher-Subscriber Publisher Client
//Step 1. Create the Connection
InitialContext ctx = new InitialContext();
TopicConnectionFactory factory =(TopicConnectionFactory)ctx.lookup("connectionFactory");
TopicConnection connection=f.createTopicConnection();
connection.start();
//Step 2. Create a Topic Session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
//Step 3. Get the Topic object
Topic topic = (Topic)ctx.lookup("myTopic");
//Step 4. Create the Sender
TopicPublisher publisher = session.createPublisher(topic);
//Step 5. Create the Message
TextMessage msg = session.createTextMessage();
msg.setText("Hello World");
//Step 6. Send the Message
publisher.publish(msg);
清单4-4向您展示了发布者代码的发布者-订阅者消息模型,该模型将发布一条简单的文本消息到一个名为myTopic的主题。与点对点模型没有太大区别。用户如何?参见清单4 - 5。
清单4 - 5。Publisher-Subscriber Subscriber Client
// Step 1. Create Connection
InitialContext ctx = new InitialContext();
TopicConnectionFactory factory = (TopicConnectionFactory)ctx.lookup("connectionFactory");
TopicConnection connection = factory.createTopicConnection();
connection.start();
// Step 2. Create Session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 3. Get the Topic
Topic topic = (Topic)ctx.lookup("myTopic");
// Step 4. Create the Receiver
TopicSubscriber subscriber = session.createSubscriber(topic);
// Step 5. Create the Listener
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
//Process the message here
}
};
// Step 6. Register the Listener
subscriber.setMessageListener(listener);
清单4-5显示了发布-订阅者消息传递客户端;这个客户订阅了名为myTopic的主题。您可以使用这段代码运行多个客户端,每个客户端将接收来自发布者的消息;与点对点模型没有太大区别。当然,您仍然需要jndi。这些客户的属性。
正如您所看到的,消息传递模型的实现都非常简单。如果您仔细查看任何代码,您会注意到,在每种情况下,发送的消息都只是一条文本消息,但是如果您需要发送其他信息,会发生什么呢?JMS支持不同的消息类型:
• StreamMessage是一个序列化的stream对象。
• MapMessage由名称/值对组成,比如哈希表。
• TextMessage是一个字符串。
• ObjectMessage是一个可序列化的对象。
• ByteMessage是一个原始的字节流。
作为作业,尝试使用这些代码片段创建一些客户端(点对点或发布-订阅模型)。这样做的目的是让你自己熟悉如何使用这种消息传递。
我刚刚给您介绍了您在使用JMS时通常不使用外部框架的情况。我认为有很多步骤可以做一些简单的事情。
二:JMS with Spring Boot
第2章向您展示了Spring Boot如何知道您想要运行的应用程序,因为它是一种固执己见的技术。只要添加一个Spring boot-starter pom,你就告诉Spring Boot如何配置所有东西,对吗?
在这个例子中,我们将使用Apache ActiveMQ(欢迎您使用任何其他消息中间件;代码将是相同的),这意味着我们可以包括spring boot-starter-activemq依赖性。通过增加这种依赖关系,Spring Boot将带来我们需要的所有JMS和ActiveMQ(JAR文件),它将自动配置JMS客户端所需的所有必要属性和额外配置。
我们将使用jms-sender项目,您可以在这本书的源代码中找到它。这个项目有很多类,有些代码注释掉了,而仅仅通过取消注释,它就可以工作了。不过,请不要担心,因为我将通过解释每一个代码片段来指导您完成以下部分。
2.1.Producer(生产者)
让我们从使用Spring Boot检查生产者开始。打开com.micai.spring.messaging.jms。SimpleSender。java类。参见清单4 - 6。
Listing 4-6. com.micai.spring.messaging.jms.SimpleSender.java
@Component
public class SimpleSender {private JmsTemplate jmsTemplate;@Autowiredpublic SimpleSender(JmsTemplate jmsTemplate){this.jmsTemplate = jmsTemplate;}public void sendMessage(String destination, String message){this.jmsTemplate.convertAndSend(destination, message);}
}
清单4-6向您展示了创建生产者的最简单和最简单的方法。让我们分析代码的每个部分:
• @Component:这个注释,您可能已经知道了,标记了类作为一个Spring bean,在运行时提供它。
• JmsTemplate:这是客户端最重要的部分,因为该类将向JMS提供者发送一条消息(包括其他内容)。
• @ Autowired:该注释用于类构造函数注入JmsTemplate bean(描述)。你甚至可以省略这个注释,Spring会找出你需要这里的依赖。
• convertAndSend: JmsTemplate实例有这个方法,它可以转换消息。因为这是一个字符串,它会自动转换成javax.jms.TextMessage,并且发送到目的地,通常是一个队列。
在运行这段代码之前,让我们检查一下主应用程序。打开com.micai.spring.messaging.JmsSenderApplication。java类,如清单4-7所示。
Listing 4-7. com.micai.spring.messaging.JmsSenderApplication.java
@SpringBootApplication
public class JmsSenderApplication {public static void main(String[] args) {SpringApplication.run(JmsSenderApplication.class, args);}@BeanCommandLineRunner simple(JMSProperties props, SimpleSender sender){return args -> {sender.sendMessage(props.getQueue(), "Hello World");};}
}
清单4-7显示了主应用程序。让我们考虑每一部分:
• @SpringBootApplication: 您已经知道这个注释了。这是Spring Boot确定您要运行的应用程序类型的方式。
• simple(JMSProperties,SimpleSender): 这个方法是在Spring容器准备好使用的时候执行的,并且会注入一个JMSProperties和SimpleSender bean来使用它。
• JMSProperties: 该类用作读取应用程序的属性持有者。application.properties文件,并寻找micai.jms队列属性(在本例中为jms-sender)。如果您想了解更多信息,您可以在Spring Boot参考的外部化配置部分中了解更多信息。
现在你已经准备好运行这个了。(你可以使用Maven的命令行来运行它:$mvn spring Boot:运行,或者如果你把它导入STS,你可以在引导控制面板中运行它。)一旦您运行它,您将看到类似于图4-3的内容。
Figure 4-3. JmsSenderApplication logs
图4-3显示了应用程序运行后的日志。代码实际上是向jms-demo队列发送“Hello World”消息。这意味着您将看到带有一些额外信息的JMSAudit文本。我在哪里打印的?回顾com.micai.spring.messaging.aop.JMSAudit.java类。这个类是一个Around通知,通常用于日志记录目的。我知道对于这个例子来说,它太多了,但是它给了我更多的探索AOP的方法。
您可以看到这个客户机正在发送一条消息,但是它在哪里呢?您知道我们使用的是spring-boot-start-activemq依赖关系,但是似乎没有任何代理运行。它在哪里?
请记住,Spring Boot基于类路径依赖性提出意见,因此,通过知道您有spring-boot-start-activemq依赖性,它将会看到您是否已经声明了类型connectionFactory、会话、发件人等的bean,因此它可以使用它们。Spring Boot没有找到任何东西,它会在默认情况下创建所有这些,它将使用内存中的提供者(URL是vm://localhost)。这就是为什么在运行时没有错误的原因,您可以看到消息被发送了。
2.2.Consumer(消费者)
接下来,让我们看看消费者。首先,我将向您展示使用javax.jms.MessageListener接口(它和以前一样,在JMS和Java部分中),看看配置它需要什么。参见清单4 - 8。
Listing 4-8. com.micai.spring.messaging.jms.QueueListener.java
@Component
public class QueueListener implements MessageListener {public void onMessage(Message message) {// TODO}
}
清单4-8向您展示了接收队列中任何消息的接收器(jms-demo)。让我们回顾一下代码:
• @Component: 如果注释掉了,请删除//并添加正确的导入。我建议你使用STS,在Mac上按Cmd+Shift+O,或者在Windows上按Ctrl+Shift+O键。这个注释将把该类标记为Spring bean,因此它可以在配置中使用。
• MessageListener:这个接口对于接收JMS消息是必需的,并且有必要实现onMessage方法。
• onMessage(Message):这是实现该方法所必需的,并且它具有从队列中消耗的实际消息。
正如您所看到的,它非常简单,但是如果您尝试运行它,您将看到与以前相同的结果。您将看到关于发送消息的日志。这是为什么呢?好吧,你必须告诉Spring Boot如何使用这个监听器,所以看一下清单4-9。
Listing 4-9. com.micai.spring.messaging.config.JMSConfig.java
package com.micai.spring.messaging.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;import javax.jms.ConnectionFactory;
import javax.jms.MessageListener;/*** @Auther: zhaoxinguo* @Date: 2018/8/7 15:56* @Description:*/
@Configuration
@EnableConfigurationProperties(JMSProperties.class)
public class JMSConfig {// This is for the QueueListener@Beanpublic DefaultMessageListenerContainer customMessageListenerContainer(ConnectionFactory connectionFactory,MessageListener queueListener,@Value("${micai.jms.queue}") final String destinationName){DefaultMessageListenerContainer listener = new DefaultMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setDestinationName(destinationName);listener.setMessageListener(queueListener);return listener;}}
清单4-9展示了启用QueueListener类所需的配置。让我们复习一下这门课:
• @Configuration: 这是将该类作为Spring容器的Java配置的标记,因此这里的一切都将用于设置Spring。
• @EnableConfigurationProperties: 请记住,我们使用的是应用程序的application.properties来设置队列的名称?这个特殊的注释将使用所提供的类(JMSProperties.class)作为属性持有者,因此您可以在应用程序中设置一些属性。属性文件。稍后,您可以通过使用@Value或使用JMSProperties实例bean和getter来获得它的值。
• @Bean: 这是在Spring容器中创建一个类型DefaultMessageListenerContainer bean的标记。
• DefaultMessageListenerContainer:在这种情况下,这是一个返回类型,它将被当作一个Spring bean来使用。它拥有所有必需的信息来确定QueueListener类和从(destinationName)消费的队列。
• ConnectionFactory: Spring将注入这个实例,它将使用默认值自动配置它(除非您提供了一个自定义ConnectionFactory)。在这种情况下,它将使用内存中的provider/broker.
• MessageListener: Spring将注入QueueListener类(来自清单4-8的接收者),因此它可以用来设置DefaultMessageListenerContainer实例。
• @Value("${micai.jms.queue}"): 这个标注将从“micai.jms.queue”属性中注入“jms-demo”的值。在应用程序中排队的属性。application.properties到目标名称参数。
然后,实际的方法将创建DefaultMessageListenerContainer并设置它的所有属性(connectionFactory、queueListener和destinationName)。
现在,如果你运行这个应用程序,你应该得到类似于图4-4的输出。
Figure 4-4. Logs
图4-4显示了通过从队列中消费消息来调用onMessage方法(请记住,这些日志是由AOP方面产生的)。如果你仔细看一下实际的信息,你应该会看到类似的东西:
ActiveMQTextMessage {
commandId = 5,
responseRequired = true,
messageId = ID:LAPTOP-38A3KHTL-56564-1533630829329-4:2:1:1:1,
originalDestination = null,
originalTransactionId = null,
producerId = ID:LAPTOP-38A3KHTL-56564-1533630829329-4:2:1:1,
destination = queue://jms-demo,
transactionId = null,
expiration = 0,
timestamp = 1533630829822,
arrival = 0,
brokerInTime = 1533630829823,
brokerOutTime = 1533630829834,
correlationId = null,
replyTo = null,
persistent = true,
type = null,
priority = 4,
groupID = null,
groupSequence = 0,
targetConsumerId = null,
compressed = false,
userID = null,
content = null,
marshalledProperties = null,
dataStructure = null,
redeliveryCounter = 0,
size = 1056,
properties = null,
readOnlyProperties = true,
readOnlyBody = true,
droppable = false,
jmsXGroupFirstForConsumer = false,
text = 消息内容 Hello World
}
正如您所看到的,我们正在接收ActiveMQTextMessage,它是围绕javax.jms.TextMessage的一个实现。重要的是要知道指向jms-demo队列、有效负载和带有Hello World值的文本属性的目的地属性。
您可能想知道是否有一种更简单的方式来配置侦听器。如何确定要配置什么?好吧,Spring Boot 让它变得更简单。您将在下一节中了解这个过程。
2.3.Consumer with Annotations
Spring框架为消费消息提供了有用的注释,非常类似于ApplicationEvents和Streams。Spring Boot帮助自动配置这些注释,从而使开发人员更容易。
让我们先来回顾一下com.micai.spring.messaging.jms.AnnotatedReceiver.java类。参见清单4-10。
Listing 4-10. com.micai.spring.messaging.jms.AnnotatedReceiver.java
@Component
public class AnnotatedReceiver {private static final Logger LOGGER = LoggerFactory.getLogger(AnnotatedReceiver.class);@JmsListener(destination = "${micai.jms.queue}")public void processMessage(String content) {LOGGER.info("接收到的消息内容为:{}", content);}}
清单4-10向您展示了包含@Jmslistener注释的注释器类:
• @Component: 记住,这是Spring的一个标记,使这个bean在Spring容器。
• @JmsListener:这个标注被配置为使用SpEL(Spring表达式语言)表达式指定的目的地创建消息侦听器。在本例中,它是一个名为micai.jms.queue的属性具有jms-demo值的队列属性。
Spring Boot将为您配置一切,因此不再需要bean来声明消息侦听器容器
现在是运行这个监听器的时候了(记住要注释掉bean和侦听器,因为您不再需要它们了)。一旦您运行了这个应用程序,您应该有类似于图4-5的内容。
Figure 4-5. @JmsListener logs
图4-5显示运行应用程序后的日志。你可以看到这个方法叫做processMessage;这是由@Jmslistener注释注释的同一条消息。
2.4.Currency Project(货币方案)
让我们再来讨论一下货币项目。假设您有一个客户想要发送一个更准确的利率消息,但是只能使用JMS发送利率消息。这意味着您的货币项目需要成为一个接收者,但是客户需要某种确认您收到了利率消息的机制。
让我们从使用相同的jms-sender项目创建发送方客户端开始。回顾com.micai.spring.messaging.jms.RateSender。如清单4-11所示的java类。
Listing 4-11. com.micai.spring.messaging.jms.RateSender.java
@Component
public class RateSender {@Autowiredprivate JmsTemplate jmsTemplate;public void sendCurrency(String destination, Rate rate){this.jmsTemplate.convertAndSend(destination, rate);}
}
清单4-11显示了您将要用来发送新利率对象的类。正如您所看到的,它与清单4-6中的SimpleSender类非常相似。唯一的区别是,它不是发送文本(字符串)消息,而是发送一个速率对象。看一看com.micai.spring.messaging.domain.Rate。清单4-12中的java类。
Listing 4-12. com.micai.spring.messaging.domain.Rate.java
public class Rate {private String code;private Float rate;private Date date;public Rate() { }public Rate(String base, Float rate, Date date) {super();this.code = base;this.rate = rate;this.date = date;
}//Setters and Getter omitted.
}
清单4-12展示了您在货币项目中已经看到的利率域类,但是请记住,在前一章中,我们将@entity和@id作为JPA持久性的一部分进行注释。这一次将会很简单,因为没有必要持久化这个汇率。如果你想在这一点上运行这个应用程序,你会得到一个类似的错误。
Cannot convert object of type [com.micai.spring.messaging.domain.Rate] to JMS message.
Supported message payloads are: String, byte array, Map<String,?>, Serializable object.
我们可以实现Serializable到Rate类中,但是货币项目没有这样的功能。现在,如果您还记得,我们的想法是创建一个接受JSON格式的REST API,那么让我们看看如何在这里使用JSON。
打开com.micai.spring.messaging.config.JMSConfig.java类。请参见清单4-13。
Listing 4-13. com.micai.spring.messaging.config.JMSConfig.java
@Configuration
@EnableConfigurationProperties(JMSProperties.class)
public class JMSConfig {@Beanpublic MessageConverter jacksonJmsMessageConverter() {MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();converter.setTargetType(MessageType.TEXT);converter.setTypeIdPropertyName("_class_");return converter;}
}
清单4-13向您展示了以JSON格式公开消息所需的配置。让我们回顾一下它:
•MessageConverter: 这是一个接口,指定了Java对象和JMS消息之间的转换器。它暴露了toMessage和fromMessage方法。Spring Boot自动配置将把所有东西连接起来,以使用这个特定的消息转换器。
•MappingJackson2MessageConverter: 这个类实现MessageConverter接口,并添加更多的方法来帮助转换从/到json/object。
•setTargetType: 这种方法对于帮助转换器识别需要转换的类型是必要的。在这种情况下,我们以字符串格式发送JSON,这意味着它将在幕后创建一个TextMessage对象。
•setTypeIdPropertyName: 这是一个很重要的设置,因为它将驱动映射在幕后进行的方式。这可以是任何你想要的值。它只是一个简单的标识符,它将持有被映射的合格类名。
这已经足够运行应用程序了,但是首先让我们看看主应用程序是如何运行的, 请看第4 - 14清单。
Listing 4-14. com.micai.spring.messaging.JmsSenderApplication.java
package com.micai.spring.messaging;import com.micai.spring.messaging.config.JMSProperties;
import com.micai.spring.messaging.domain.Rate;
import com.micai.spring.messaging.jms.RateSender;
import com.micai.spring.messaging.jms.SimpleSender;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import java.util.Date;/*** @Auther: zhaoxinguo* @Date: 2018/8/6 14:28* @Description:*/
@SpringBootApplication
public class JmsSenderApplication {public static void main(String [] args) {SpringApplication.run(JmsSenderApplication.class, args);}// This code is for the Rates@BeanCommandLineRunner process(JMSProperties properties, RateSender sender) {return args -> {sender.sendCurrency(properties.getRateQueue(), new Rate("EUR",0.88857F,new Date()));sender.sendCurrency(properties.getRateQueue(), new Rate("JPY",102.17F,new Date()));sender.sendCurrency(properties.getRateQueue(), new Rate("MXN",19.232F,new Date()));sender.sendCurrency(properties.getRateQueue(), new Rate("GBP",0.75705F,new Date()));};}}
清单4-14显示了主应用程序(只需要记住注释掉前面的代码)。正如你所看到的,它非常简单。我们只是发送新的利率对象。如果您运行这个应用程序,您应该得到如图4-6所示的输出。
Figure 4-6. Logs when JMS sends a rate object
图4-6显示了没有任何错误的日志。说明一切都很顺利,一些利率被成功地发送出去了。但这还不够,因为我们如何保证它实际上是JSON格式呢?
2.4.1.使用远程Apache ActiveMQ中间件
让我们使用ActiveMQ作为远程代理。它将帮助我们确定消息是否真正地移交给代理,而不是使用内存提供者。
确保您已经启动并运行了ActiveMQ(您需要从http://activemq apache.org/下载它,并遵循您的系统的安装说明)。我使用了ActiveMQ版本5.14.0,但是您可以使用任何版本。在您再次运行应用程序之前,您需要确保它将使用正在运行的ActiveMQ中间件。打开src/main/resources/应用程序的application.properties文件,并取消所有的spring.activemq.*和 micai.jms.*属性。结果应该如清单4-15所示。
Listing 4-15. src/main/resources/application.properties
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin#Micai Configuration
micai.jms.queue=jms-demo
micai.jms.rate-queue=rates
清单4-15展示了应用程序。在声明远程ActiveMQ的属性文件(在本例中是本地系统),默认端口是61616,用户名和密码被设置为admin。有了这些属性,Spring引导将配置连接到远程代理的connectionFactory。
因此,如果你有ActiveMQ中间件正在运行,你应该能够在浏览器中访问http://localhost:8161/admin URL,并查看如图4-7所示的web页面。
Figure 4-7. ActiveMQ web console: http://localhost:8161/admin
然后,您可以执行jms-demo应用程序,并从web控制台选择队列,以查看已创建名为利率的队列(这个名称是基于micai.jms.rate的。队列属性值)。参见图4 - 8。
Figure 4-8. ActiveMQ Queues tab
如果您点击名为利率的队列,您应该会看到发送给消息中间件的四条消息。见图4 - 9。
Figure 4-9. ActiveMQ queue rate messages
点击任何消息以查看内容。参见图4-10。
Figure 4-10. ActiveMQ Queue --> Rates --> Message
图4-10显示了中间件收到的实际消息。看一看消息的详细信息;你会看到类似的东西
{"code":"EUR","rate":0.88857,"date":1479442794599}
这个利率是由jms-sender应用程序发送的,在右边,您可以看到一个属性图例,其中类属性ID具有com.micai.spring.messaging.domain的值。汇率类(请记住,这对于接收方也是很重要的,因此它可以从JSON转换为object)。现在您知道了如何发送可以转换成JSON的对象。接下来,您需要接收这个消息,对吗?发送方需要接收来自接收者的一些确认(它也是需求的一部分)。
2.5.Reply-To(回复)
Spring JMS提供了一种回复另一个队列的方法,有点像响应请求/rpc模型。您可以将它与侦听器、@Sendto注释一起使用。
为了在实际操作中看到这一点,我们将使用相同的jms-sender项目。记住要禁用某些组件。您可以通过在我们正在进行的所有侦听器中注释掉@Component注解来做到这一点。
打开com.micai.spring.messaging.jms.RateReplyReceiver。java类。请参见清单4-16。
Listing 4-16. com.micai.spring.messaging.jms.RateReplyReceiver.java
package com.micai.spring.messaging.jms;import com.micai.spring.messaging.domain.Rate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;/*** @Auther: zhaoxinguo* @Date: 2018/8/7 18:44* @Description:*/
@Component
public class RateReplyReceiver {private static final Logger LOGGER = LoggerFactory.getLogger(RateReplyReceiver.class);@JmsListener(destination = "${micai.jms.rate-queue}")@SendTo("${micai.jms.rate-reply-queue}")public Message<String> processRate(Rate rate) {// Process the Rate and return any significant valueLOGGER.info("接收ActiveMQ消息队列的内容为:{}", rate);return MessageBuilder.withPayload("PROCCESSED").setHeader("CODE", rate.getCode()).setHeader("RATE", rate.getRate()).setHeader("ID", UUID.randomUUID().toString()).setHeader("DATE", new SimpleDateFormat("yyyy-MM-dd").format(new Date())).build();}}
清单4-16显示了RateReplyReceiver.java类和@Sendto注释,它需要一个值,它对应于将要发送的结果消息的reply-queue。让我们回顾一下它:
• @SendTo: 这个注解将是回复的关键.确保你仍然有@JmsListener注解,也就是说这个方法可以作为一个接收方和发送方。该方法必须具有返回类型。在这种情况下,它将使用micai.jms.rate-queue的值设置回复队列值。如果主消息有JMSReplyTo头集,则可以忽略注释值。
• Message<T>: 这是一个建立在泛型类型上的接口,并提供了有用的getter方法,比如getPayload和getHeaders.这是发送消息的首选方式。
• MessageBuilder: 这是一个助手类,它允许您通过允许添加更多的头来增强消息。
接下来让我们再看一下RateSender类。它不仅会发送汇率,而且还会监听reply-queue(应答队列)。记住,要求是接收来自接收者的某种确认。请参见清单4-17。
Listing 4-17. com.micai.spring.messaging.jms.RateSender.java
package com.micai.spring.messaging.jms;import com.micai.spring.messaging.domain.Rate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @Auther: zhaoxinguo* @Date: 2018/8/7 17:50* @Description:*/
@Component
public class RateSender {@Autowiredprivate JmsTemplate jmsTemplate;public void sendCurrency(String destination, Rate rate){this.jmsTemplate.convertAndSend(destination, rate);}@JmsListener(destination = "${micai.jms.rate-reply-queue}")public void process(String body, @Header("CODE") String code) {}
}
清单4-17显示了新的RateSender类。正如您所看到的,我们正在重用@JmsListener和新的@Header注释。让我们回顾一下它:
• @JmsListener: 您已经知道这个注解了。唯一的区别是包含应答队列的正确名称。在这样的情况下,它是micai.jms.rate-reply-queue属性的值
• @Header: 这个注解将为您提供对消息头部的直接访问,在本例中,它将是被处理的利率代码。Spring JMS有更多的选项:@Header带来一个java.util.Map对象,@Payload带来实际有效载荷,@Valid开启有效载荷验证。
在运行这个程序之前,请确保启动并运行ActiveMQ消息中间件。
如果您运行这个应用程序,您不仅应该有发送方,还应该有接收方和应答日志。参见图4-11。
Figure 4-11. Logs with the reply-to queue
您可以查看ActiveMQ web控制台,并看到reply-rate队列被创建。如图4 - 12所示。
Figure 4-12. Queues showing the reply-rate queue
现在您知道了如何创建一个reply-to,并为您的应用程序拥有一种RPC机制。
2.6.Topics
本节讨论下一个JMS消息传递模型、发布者或主题。这个模型有发布者向一个主题发送消息,这些主题可以从零到多个订阅者,这些订阅者将获得该消息的副本。你需要把它想象成报纸或杂志订阅。你订阅(对某一主题感兴趣的话题)从出版商那里收到一份报纸或杂志。
本节将继续介绍jms-sender项目,该项目将是发布者。它还在本章中打开了一个新项目,即jms-topic-subscriber项目。它的结构与jms-sender相似。
在jms-sender中,我们将使用相同的RateSender。java和我们发送利率的主要入口点,但是有一个小的变化。打开src/main/resources/application.properties文件。它应该类似于清单4-18。
Listing 4-18. src/main/resources/application.properties
# Spring Web
spring.main.web-environment=false#Default ActiveMQ properties
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin#Apress Configuration
apress.jms.queue=jms-demo
apress.jms.rate-queue=rates
apress.jms.rate-reply-queue=reply-rate#Enable Topic Messaging
spring.jms.pub-sub-domain=true#Apress Topic Configuration
apress.jms.topic=rate-topic
清单4-18展示了application.properties文件,它有一个特定的属性,spring.jms.pub-sub-domain。默认情况下,该属性是false,从而使您的生产者向队列发送消息。当它被设置为true时,生产者(发布者)将把消息发送到一个主题。这也适用于监听器。如果你把这个属性设置为true,那么监听器将成为这个主题的订阅者。
看看最后一个属性。我们只是简单地定义了所有订阅者都在监听的主题的名称。
你现在可以打开和查看 com.micai.spring.messaging.jms.RateTopicReceiver.java类,来自于jms-topic-subscriber项目,看到它是相同的代码(除了名字目的地).在这个项目中,您需要拥有相同的application.properties文件。带有spring.jms.pub-sub-domain属性设置为true(只要确保它有这个属性)。
现在是时候运行jms-topic-subscriber项目了。在运行jms-demo项目之前,请查看ActiveMQ web控制台的主题部分,如图4-13所示。
Figure 4-13. The Topics section of the ActiveMQ web console
图4-13显示了一个名为利率的主题,并且有一个消费者。接下来,运行jms-sender项目,以查看利率是否被发送到主题。看一看jms-topic-subscriber项目的日志;您应该看到,您正在从利率主题中消费消息。
作为一个实验,您可以运行jms-topic-subscriber项目的多个实例(如果您使用STS和引导指示板,这很容易),并验证每个实例都获得了汇率的副本。学报》第4 - 14见图。
Figure 4-14. STS Boot Dashboard running two instances of the jms-topic-subscriber project
三:Currency Project(货币方案)
我们需要做些什么才能使用货币项目并开始监听其他客户的新利率?解决方案已经在rest-api-jms项目中了。这是你需要做的:
• 确保你已经在application.properties把spring.activemq.*, and rate.jms.* 属性启用.
• 查看RateJmsReceiver类并注释/取消注释您想要使用的侦听器(我们有简单的侦听器和侦听器应答)。
• 查看具有JSON转换器的RateJmsConfiguration类。
作为家庭作业,试着让它运行。请记住,您将需要启动并运行ActiveMQ中间件。另外,作为一个额外的步骤,尝试使这个项目具有主题意识。
四:总结
本章向您展示了如何使用Spring Boot来使用JMS技术发送和接收消息。
您看到了不同的JMS消息传递模型,以及开发人员需要做什么来发送或接收消息。
有了Spring Boot,您就看到了设置JMS客户端是多么容易,并且通过简单的注释,您可以拥有一个使用JMS作为消息传递系统的功能应用程序。即使您刚刚看到如何使用Apache ActiveMQ作为代理,同样的编程也可以应用于HornetQ、IBM MQ等,只要在应用程序中提供正确的属性(连接工厂和消息侦听器)在application.properties文件。
下一章讨论了一种不同的消息传递方式,使用了先进的消息队列协议AMQP。
五:源代码下载地址
https://gitee.com/micai/micai-spring-message/tree/master/jms-sender
https://gitee.com/micai/micai-spring-message/tree/master/jms-topic-subscriber
https://gitee.com/micai/micai-spring-message/tree/master/rest-api-jms
这篇关于Spring Boot Messaging Chapter 4 JMS with Spring Boot的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!