本文主要是介绍ActiveMQ(2):主题(topic)的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、 实现功能
因为是主题订阅,所以,可以实现一对多的消息通知,从而可以使消息可以通知多方。
二、topic(主题)代码实现
1. 生产者
package com.example.activeMQ;import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;
//import org.junit.Test;/*** @className TopicMsgProducer.java* @useFor*/
public class TopicMsgProducer {public void send() {//创建连接工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.205.10:61616");Connection conn = null;try {//创建连接conn = factory.createConnection();conn.start();//创建会话Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);//创建地点Topic topic = session.createTopic("topic.textMsg");//创建生产者MessageProducer producer = session.createProducer(topic);producer.setDeliveryMode(DeliveryMode.PERSISTENT);producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);for(int i=0 ; i<10 ;i++) {TextMessage tmsg = session.createTextMessage();tmsg.setText("早上你好 "+i);producer.send(tmsg);System.out.println("发送的消息:"+tmsg.getText());}} catch (JMSException e) {e.printStackTrace();} finally {try {if (conn !=null )conn.close();} catch (Throwable ignore) {}}}public static void main(String[] args) {new TopicMsgProducer().send();}
}
2. 消费者1
package com.example.activeMQ;import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;import org.apache.activemq.ActiveMQConnectionFactory;/*** @className TopicMsgConsumer.java* @useFor*/
public class TopicMsgConsumer {public void receive() {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.205.10:61616");Connection conn = null;try {conn = factory.createConnection();conn.setClientID("T1");conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);//订阅发布模式的 Topic对象 不是DestinationTopic topic = session.createTopic("topic.textMsg");TopicSubscriber subsriber = session.createDurableSubscriber(topic , "T1" );while (true) {TextMessage tm = (TextMessage)subsriber.receive() ;if (tm == null) {break;}System.out.println("Msg1 Received message: " + tm.getText());}} catch (JMSException e) {e.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}public static void main(String[] args) {new TopicMsgConsumer().receive();}
}
3. 消费者2
package com.example.activeMQ;import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;import org.apache.activemq.ActiveMQConnectionFactory;
//import org.junit.Test;/*** @className TopicMsgConsumer.java* @useFor*/
public class TopicMsgConsumer2 {public void receive() {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.205.10:61616");Connection conn = null;try {conn = factory.createConnection();conn.setClientID("T2");conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);// 订阅发布模式的 Topic对象 不是DestinationTopic topic = session.createTopic("topic.textMsg");TopicSubscriber subsriber = session.createDurableSubscriber(topic , "T2" );while (true) {TextMessage tm = (TextMessage)subsriber.receive() ;if (tm == null) {break;}System.out.println("Msg2 Received message: " + tm.getText());}} catch (JMSException e) {e.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}public static void main(String[] args) {new TopicMsgConsumer2().receive();}
}
三、测试
1.开启消费者1和消费者2
2.运行生产者产生消息,查看消费者消费消息
(1)消费者1
Msg1 Received message: 早上你好 0
Msg1 Received message: 早上你好 1
Msg1 Received message: 早上你好 2
Msg1 Received message: 早上你好 3
Msg1 Received message: 早上你好 4
Msg1 Received message: 早上你好 5
Msg1 Received message: 早上你好 6
Msg1 Received message: 早上你好 7
Msg1 Received message: 早上你好 8
Msg1 Received message: 早上你好 9
(2)消费者2
Msg2 Received message: 早上你好 0
Msg2 Received message: 早上你好 1
Msg2 Received message: 早上你好 2
Msg2 Received message: 早上你好 3
Msg2 Received message: 早上你好 4
Msg2 Received message: 早上你好 5
Msg2 Received message: 早上你好 6
Msg2 Received message: 早上你好 7
Msg2 Received message: 早上你好 8
Msg2 Received message: 早上你好 9
4.查看监控
四、参考
1.https://www.cnblogs.com/winner-0715/p/6697102.html
这篇关于ActiveMQ(2):主题(topic)的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!