为什么80%的码农都做不了架构师?>>>
在上一篇中,我们说了P2P版的HelloWorld,在这一篇,我们简要说一下,基于发布,订阅模式的HelloWorld。
基础知识就不在介绍了,需要的会一点一点讲。
1. pom.xml
这个和上一篇是一样的:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.ygy</groupId><artifactId>activemq</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>activemq</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.10</version><scope>test</scope></dependency><!-- activemq,学习中 --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.5.6</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.5.6</version></dependency></dependencies>
</project>
2. Pub/Sub版的HelloWorld
生产者:
package org.ygy.mq.lesson01;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;
import org.ygy.mq.constants.MQConstants;public class HelloTopicProducer {public void send(String msg) {// 生产者的主要流程Connection connection = null;try {// 1.初始化connection工厂,使用默认的URL// failover://tcp://localhost:61616ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();// 2.创建Connectionconnection = connectionFactory.createConnection();// 3.打开连接connection.start();// 4.创建Session,(是否支持事务)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.创建消息目标Destination destination_send = session.createTopic(MQConstants.DESTINATION_SEND);// 6.创建生产者MessageProducer producer = session.createProducer(destination_send);// 7.配置消息是否持久化/** DeliverMode有2种方式:* * public interface DeliveryMode { static final int NON_PERSISTENT =* 1;//不持久化:服务器重启之后,消息销毁* * static final int PERSISTENT = 2;//持久化:服务器重启之后,该消息仍存在 }*/producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 8.初始化要发送的消息TextMessage message = session.createTextMessage(msg);// 9.发送消息producer.send(message);connection.close();} catch (JMSException e) {e.printStackTrace();}}public static void main(String[] args) {new HelloTopicProducer().send("我来试一试发布/订阅...");}}
消费者:
package org.ygy.mq.lesson01;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;
import org.ygy.mq.constants.MQConstants;public class HelloTopicConsumer implements MessageListener {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message;try {System.out.println("哈,我接收到了消息:" + txtMsg.getText());} catch (JMSException e) {e.printStackTrace();}}}public void receive() {// 消费者的主要流程Connection connection = null;try {// 1.初始化connection工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();// 2.创建Connectionconnection = connectionFactory.createConnection();// 3.打开连接connection.start();// 4.创建sessionSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.创建消息目标Destination destination = session.createTopic(MQConstants.DESTINATION_SEND);// 6.创建消费者MessageConsumer consumer = session.createConsumer(destination);// 7.配置监听consumer.setMessageListener(new HelloTopicConsumer());} catch (JMSException e) {e.printStackTrace();}}public static void main(String[] args) {new HelloTopicConsumer().receive();}}
3.测试
访问网页:http://localhost:8161/admin/topics.jsp
单击那个Topics连接。
这里显示的是服务器上的主题,这些显示的都没有用,可以都删掉。
Name:主题的名称
Number Of Consumers:正在运行的消费者
Message Enqueued:进入消息队列的
Message Dequeued:出消息队列的
Operations:操作
下面就可以开始运行程序了,
注意顺序:先运行消费者:
这里会产生好几个主题,我们只看我们自己用的那个,(其实,其他几个是干嘛的,暂时还不清楚,以后再研究吧.....)
我们的消费者一直在运行
接下来,运行生产者:
控制台会输出:
再一次,刷新界面:
消费者还在运行,只生产了一条消息,而且已经被消费了。
在这里遇到了一个问题,就是运行顺序的问题,
我们如果先运行生产者,再运行消费者,消费者是接收不到消息的,郁闷了好久
猜想,应该是对概念,规范的理解出了问题,就找了一下,发现了原因:
这是上一篇介绍的JMS消息模型,哎,,对概念的理解不清晰。
至于,持久的订阅,会在以后的博客中分享,HelloWorld,就到此结束了。