本文主要是介绍循序渐进ActiveMQ(1)----HelloWorld,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
mq部署方式:单节点、集群、主从
介绍
当前,corba、docm、rmi等rpc中间件技术已经广泛应用于各个领域。但是面对
规模和复杂度都越来搞的分布式系统,这些技术也显示出其局限性:
(1)同步通信,客户发出调用后,必须等待服务对象完成处理并返回结果才能继续执行;
(2)客户和服务对象的生命周期紧密耦合:客户进程和服务对象进行都必须正常运行;如果由于
服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常;
(3)点对点通信:客户的一次调用只发送给某个单独的目标对象。
面向消息的中间件(Message Oriented middleware,MOM)较好的解决了以上问题。
发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。
1 这种模式下,发送和接收是异步的,发送者无需等待;
2 二者的生命周期未必相同:发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行:
3 一对多通信:对于一个消息可以有多个接收者。
JAVA消息服务(JMS)定义了java中访问消息中间件的接口。jms只是接口,并没有给予实现,实现了jms接口的
消息中间件称为jms provider,已有的MOM系统包括Apache的ActiveMQ、阿里的RocketMQ、IBM的MQSeries、
Microsoft的MSMQ和bEA的MessageQ、RabbitMQ等等,它们基本都遵循JMS规范
ActiveMq是Apache出品,最流行的,能力强劲的开源消息总线。
ActiveMq是一个完全支持jms1.1,和j2ee1.4规范的jms provider的实现。
尽管jms规范出台很久了,但是jms依然在当今的j2ee应用中扮演着特殊的角色。
可以说ActiveMq在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,
ActiveMq还需不断的升级版本,80%以上的业务,我们使用ActiveMq是可以满足需求的,
当然后续如淘宝、天猫及双11这种特殊时间,ActiveMq需要进行很复杂的优化源码及架构设计才能完成。
RocketMq是阿里巴巴的一个更强大的分布式消息中间件。而ActiveMq是核心和基础,所以需要掌握好。
JMS规范
消息中间件需要实现JMS接口:
- Provider(MessageProvider):生产者
- Consumer(MessageConsumer):消费者
- PTP:Point To Point,即点对点的消息模型
- Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型
- Queue:队列目标
- Topic:主题目标
- ConnectionFactory:连接工厂。JMS用它创建连接。
- Connection:JMS客户端到JMS Provider的;连接。
- Destination:消息的目的地。
- Session:会话,一个发送或者接收消息的线程
ConnectionFactory接口(连接工厂)
用户用来创建到JMS提供者的连接的【被管对象】。jms客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或主题连接工厂。
Connection接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂以后,就可以创建一个与jms提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination接口(目标)
目标是一个包装了消息目标标识符的【被管对象】,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。
JMS管理员创建这些对象,然后用户通过jndi发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的queue,以及发布者/订阅者模型的Topic
MessageConsumer接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
MessageProducer接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用发送者,在发送消息时指定目标。
Message接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和消息寻找路由的操作设置。
一组消息属性(可选):包括额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)
一个消息体(可选):允许用户创建五种类型的消息(文本消息、映射消息、字节消息、流消息、对象消息)
消息接口非常灵活,并提供了许多方式来定制消息的内容。
JMS顶一个五种不过不通过的消息正文格式,一级调用的消息类型,允许你发送并接收一些不通过形式的数据,提供现有消息格式的一些级别的兼容性。
- StreamMessage java原始的数据流
- MapMessage 一套名称-值对
- TextMessage 一个字符串对象
- ObjectMessage 一个序列化的java对象
- ByteMessage 一个未解释字节的数据流
Session接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,也就是说消息是按照发送的顺序一个一个接收的。
会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。
一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。
使用方式
去官网下载:http://activemq.apache.org,可以下载最新的,这里使用apache-activemq-5.11.1-bin.zip
使用windows系统改来操作下。现在已经更新到了ActiveMQ 5.15.12 (March 9, 2020)
目录结构
加压缩,查看目录结构:
activemq内置jetty容器,activemq可以由不同的存储方式持久化,比如jdbc入库,webapps下是管控台代码,直接运行bin下bat文件就可以把管控台部署运行起来。
http://localhost:8161/ 默认端口8161,在jetty.xml文件中可以改端口
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"><!-- the default port number for the web console --><property name="host" value="0.0.0.0"/><property name="port" value="8161"/></bean>
在jetty-realm.properties可以改用户密码
开机启动
在D:\apache-activemq-5.15.12\bin下创建start.bat,如下:
@echo offstart /d "D:\apache-activemq-5.15.12\bin\win64" activemq.bat
@echo
打开Win+R-->gpedit.msc,按图操作:
确定。
或者直接点击InstallService.bat:
开始编码
我们实现一个消息发送者Sender和消息接收者Receiver,发送者生产消息发送到activemq,启动接收者去消费消息
Sender:
package jeff.mq.helloworld;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** @author jeffSheng* 2018年7月3日*/
public class Sender {public static void main(String[] args) throws Exception {/*** 第一步:* 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均* 使用默认即可,默认端口“tcp://localhost:61616”*/ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");/*** 第二步:* 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法* 开启连接,Connection连接默认是关闭的。*/Connection connection = connectionFactory.createConnection();connection.start();/*** 第三步:* 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,* 参数配置2为签收模式,一般我们设置自动签收。*///我们这里不开启事务Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);/*** 第四步:* 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,* 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题* 在程序众包给可以使用多个Queue和Topic*/Destination destination = session.createQueue("queue1");/*** 第五步:* 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者)* MessageProcuder/MessageConsumer*/MessageProducer messageProducer = session.createProducer(destination);/*** 第六步:* 我们可以使用MessageProducer的setDeliverMode方法为其设置持久化特性和非持久化特性(DeliverMode)*///如果设置为持久话方式,我们需要指定具体持久话策略,比如jdbc持久化到数据库messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);/*** 第七步:* 最后我们使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据,* 同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection*/for (int i = 0; i <= 5; i++) {TextMessage textMessage = session.createTextMessage();textMessage.setText("我是消息,Id:"+i);messageProducer.send(textMessage);System.out.println("生产者:"+textMessage.getText());}//关闭方法会递归向下关闭会话等连接if(connection!=null){connection.close();}}}
启动打印:
生产者:我是消息,Id:0
生产者:我是消息,Id:1
生产者:我是消息,Id:2
生产者:我是消息,Id:3
生产者:我是消息,Id:4
生产者:我是消息,Id:5
消息已经发送到activemq消息服务器了,我们看下:
接下来我们启动消费者Receiver:
package jeff.mq.helloworld;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** @author jeffSheng* 2018年7月3日*/
public class Receiver {public static void main(String[] args) throws Exception {/*** 第一步:* 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均* 使用默认即可,默认端口“tcp://localhost:61616”*/ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");/*** 第二步:* 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法* 开启连接,Connection连接默认是关闭的。*/Connection connection = connectionFactory.createConnection();connection.start();/*** 第三步:* 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,* 参数配置2为签收模式,一般我们设置自动签收。*///我们这里不开启事务Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);/*** 第四步:* 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,* 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题* 在程序众包给可以使用多个Queue和Topic*/Destination destination = session.createQueue("queue1");/*** 第五步:* 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者)* MessageProcuder/MessageConsumer*/MessageConsumer messageConsumer = session.createConsumer(destination);while(true){TextMessage msg = (TextMessage)messageConsumer.receive();if(msg==null)break;System.out.println("收到内容: "+msg.getText());}//关闭方法会递归向下关闭会话等连接if(connection!=null){connection.close();}}}
启动并打印:
收到内容: 我是消息,Id:0
收到内容: 我是消息,Id:1
收到内容: 我是消息,Id:2
收到内容: 我是消息,Id:3
收到内容: 我是消息,Id:4
收到内容: 我是消息,Id:5
再观察下消息服务器:
消费者receive方法是同步PULL拉取模式,有三种重载方法:
receive():拉取时阻塞的直到获得消息的
receive(long arg):拉取时最多阻塞一段时间的
receiveNoWait():拉取时不阻塞的,有则返回,无则返回null
这篇关于循序渐进ActiveMQ(1)----HelloWorld的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!