ActiveMQ简述

2024-09-02 11:18
文章标签 简述 activemq

本文主要是介绍ActiveMQ简述,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


欢迎跳转到本文的原文链接:https://honeypps.com/mq/activemq-quick-start/

##概述
ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。

JMS支持两种消息发送和接收模型。一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。

这里写图片描述

另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。

这里写图片描述


##ActiveMQ的安装
下载最新的安装包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是针对linux系统进行阐述,当然ActiveMQ也有win版的,这里就不赘述了),可以去官网下载,也可以在下方留言区留下你的邮箱,博主会发给你的~

下载之后解压: tar -zvxf apache-activemq-5.13.2-bin.tar.gz

ActiveMQ目录内容有:

  • bin目录包含ActiveMQ的启动脚本
  • conf目录包含ActiveMQ的所有配置文件
  • data目录包含日志文件和持久性消息数据
  • example: ActiveMQ的示例
  • lib: ActiveMQ运行所需要的lib
  • webapps: ActiveMQ的web控制台和一些相关的demo

运行命令:activemq start(在activemq/bin下运行)

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')

查看activemq是否运行命令:ps -aux | grep activemq

shr        986  1.2  9.7 1281720 201936 pts/5  Sl   19:43   0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start
shr       1501  0.0  0.0   5176   724 pts/5    S+   20:06   0:00 grep activemq

关闭命令: activemq stop

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Waiting at least 30 seconds for regular process termination of pid '986' :
Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jreHeap sizes: current=63232k  free=62218k  max=932096kJVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data
Extensions classpath:[/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra]
ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf
ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data
Connecting to pid: 986
..Stopping broker: localhost
.. TERMINATED

ActiveMQ的默认服务端口为61616,这个可以在conf/activemq.xml配置文件中修改:

<transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

##案例
在下载的apache-activemq-5.13.2-bin.tar.gz包中解压有一个jar包:activemq-all-5.13.2.jar,引入这个jar到你的项目中即可开始编写案例代码。

博主的activemq服务器地址为10.10.195.187,这个在下面代码中会有体现。

按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:

  1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
  2. 利用factory构造JMS connection
  3. 启动connection
  4. 通过connection创建JMS session.
  5. 指定JMS destination.
  6. 创建JMS producer或者创建JMS message并提供destination.
  7. 创建JMS consumer或注册JMS message listener.
  8. 发送和接收JMS message.
  9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

下面来看代码举例(P2P式)。
通过Java实现的基于ActiveMQ的请求提交:

package com.zzh.activemq;import java.io.Serializable;
import java.util.HashMap;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class RequestSubmit
{//消息发送者private MessageProducer producer;//一个发送或者接受消息的线程private Session session;public void init() throws Exception{//ConnectionFactory连接工厂,JMS用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");//Connection:JMS客户端到JMS Provider的连接,从构造工厂中得到连接对象Connection connection = connectionFactory.createConnection();//启动connection.start();//获取连接操作session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Destination destinatin = session.createQueue("RequestQueue");//得到消息生成(发送)者producer = session.createProducer(destinatin);//设置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);}public void submit(HashMap<Serializable,Serializable> requestParam) throws Exception{ObjectMessage message = session.createObjectMessage(requestParam);producer.send(message);session.commit();}public static void main(String[] args) throws Exception{RequestSubmit submit = new RequestSubmit();submit.init();HashMap<Serializable,Serializable> requestParam = new HashMap<Serializable,Serializable>();requestParam.put("朱小厮", "zzh");submit.submit(requestParam);}
}

创建Session时有两个非常重要的参数,第一个boolean类型的参数用来表示是否采用事务消息。如果是事务消息,对于的参数设置为true,此时消息的提交自动有comit处理,消息的回滚则自动由rollback处理。加入消息不是事务的,则对应的该参数设置为false,此时分为三种情况:

  • Session.AUTO_ACKNOWLEDGE表示Session会自动确认所接收到的消息。
  • Session.CLIENT_ACKNOWLEDGE表示由客户端程序通过调用消息的确认方法来确认所接收到的消息。
  • Session.DUPS_OK_ACKNOWLEDGE使得Session将“懒惰”地确认消息,即不会立即确认消息,这样有可能导致消息重复投递。

提供Java实现的基于ActiveMQ的请求处理:

package com.zzh.activemq;import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class RequestProcessor
{public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception{System.out.println("requestHandler....."+requestParam.toString());for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet()){System.out.println(entry.getKey()+":"+entry.getValue());}}public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("RequestQueue");//消息消费(接收)者MessageConsumer consumer = session.createConsumer(destination);RequestProcessor processor = new RequestProcessor();while(true){ObjectMessage message = (ObjectMessage) consumer.receive(1000);if(null != message){System.out.println(message);HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject();processor.requestHandler(requestParam);}else{break;}}}
}

输出结果:

ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
requestHandler.....{朱小厮=zzh}
朱小厮:zzh

可以通过页面查看队列的使用情况,在浏览器中输入http://10.10.195.187:8161/admin/queues.jsp,用户名和密码都是:admin,看到以下页面:
这里写图片描述
这个是在jetty服务器下跑的,可以修改conf/jetty.xml来修改相关jetty配置。

上面的例子是关于P2P模式的,不过有个不妥之处,就是没有资源的释放。下面举一个Pub/Sub模式的。
通过JMS创建ActiveMQ的topic,并给topic发送消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Produce;public class TopicRequest
{//消息发送者private MessageProducer producer;//一个发送或者接受消息的线程private Session session;//Connection:JMS客户端到JMS Provider的连接private Connection connection;public void init() throws Exception{//ConnectionFactory连接工厂,JMS用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");//从构造工厂中得到连接对象connection = connectionFactory.createConnection();//启动connection.start();//获取连接操作session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("MessageTopic");producer = session.createProducer(topic);//设置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);}public void submit(String mess) throws Exception{TextMessage message = session.createTextMessage();message.setText(mess);producer.send(message);}public void close(){try{if(session != null)session.close();if(producer != null)producer.close();if(connection !=null )connection.close();}catch (JMSException e){e.printStackTrace();}}public static void main(String[] args) throws Exception{TopicRequest topicRequest = new TopicRequest();topicRequest.init();topicRequest.submit("I'm first");topicRequest.close();}
}

消息发送到对应的topic后,需要将listener注册到需要订阅的topic上,以便能够接收该topic的消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class TopicReceive
{private MessageConsumer consumer;private Session session;public void init() throws Exception{ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");Connection connection = connectionFactory.createConnection();connection.start();session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("MessageTopic");consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener(){@Overridepublic void onMessage(Message message){TextMessage tm = (TextMessage) message;System.out.println(tm);try{System.out.println(tm.getText());}catch (JMSException e){e.printStackTrace();}}});}public static void main(String[] args) throws Exception{TopicReceive receive = new TopicReceive();receive.init();}
}

输出结果:

ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first}
I'm first

参考文献

  1. 《大型分布式网站架构——设计与实践》陈康贤著。
  2. http://activemq.apache.org/

欢迎跳转到本文的原文链接:https://honeypps.com/mq/activemq-quick-start/

欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


这篇关于ActiveMQ简述的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1129798

相关文章

每天认识几个maven依赖(ActiveMQ+activemq-jaxb+activesoap+activespace+adarwin)

八、ActiveMQ 1、是什么? ActiveMQ 是一个开源的消息中间件(Message Broker),由 Apache 软件基金会开发和维护。它实现了 Java 消息服务(Java Message Service, JMS)规范,并支持多种消息传递协议,包括 AMQP、MQTT 和 OpenWire 等。 2、有什么用? 可靠性:ActiveMQ 提供了消息持久性和事务支持,确保消

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

ActiveMQ—安装配置及使用

安装配置及使用 转自:http://blog.csdn.net/qq_21033663/article/details/52461543 (一)ActiveMQ介绍 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了

ActiveMQ—Queue与Topic区别

Queue与Topic区别 转自:http://blog.csdn.net/qq_21033663/article/details/52458305 队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:         1、点对点(point-to-point,简称PTP)Queue消息传递模型:         通过该消息传递模型,一个应用程序(即消息生产者)可以

1、简述linux操作系统启动流程

1、简述linux操作系统启动流程 启动第一步--加载BIOS 当你打开计算机电源,计算机会首先加载BIOS信息,BIOS信息是如此的重要,以至于计算机必须在最开始就找到它。这是因为BIOS中包含了CPU的相关信息、设备启动顺序信息、硬盘信息、内存信息、时钟信息、PnP特性等等。开机时将ROM中的指令映射到RAM的低地址空间,CPU读取到这些指令,硬件的健康状况进行检查,按照BIOS中设置的启

jms与ActiveMQ实践与应用

前言 这是我自己从不知道JMS为何物到学习如何使用第三方工具实现跨服务器的知识总结,在整个过程中可能考虑不全。另外,如果想尽快使用JMS,建议直接看实例那一节就可以了。有问题多交流。 词语解释 (有些词可能用的不是很正确,在这里我把自己能意识到的词拿出来解释一下): 1、  跨服务器:专业术语好像叫“跨实例”。意思是,可以在多个服务器(可以是不同的服务器,如resin与tomcat)之间相

【Java String】简述String类比较和常量池内存分析

一、引出正题 String 类型对象进行比较时,我们一般使用 equals() 的方式进行值比较,但是有时候可能会出现 == 对象比较的方式。 在使用 == 比较的时候,往往是和String在JVM内存存储结构有关,这也引起了部分同学使用时的错误,那么接下来我们来详细分析一下此问题。 二、举例说明 1、new String("xx")都是在堆上创建字符串对象。当调用 intern() 方

Linux apache-activemq安装及配置

1.  apache-activemq安装   备注:apache-activemq安装时JDK必须在1.5以,否则不能访问。 (1)从官网下载Activemq Linux包http://activemq.apache.org/download.html.这儿我下载的是 apache-activemq-5.4.3-bin.tar.gz (2)解压包 tar zxvf apache

Spring整合JMS基于ActiveMQ实现

转载地址:http://haohaoxuexi.iteye.com/blog/1893038?page=2#comments 1.1     JMS简介        JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一

手动依赖注入;AutowireCapableBeanFactory简述

概述 spring boot虽然帮我们把bean的依赖注入基本都默认配置好了;但有些时候,业务场景中一些对象实例不在spring生命周期中,但是对象中的属性又需要被spring依赖注入(属性在spring容器管理中); 比如一些servle的filter类,其中的一些属性又需要依赖一些spring的bean 我遇到的情况是这样的,实现FactoryBean接口获取到的自定义bean因为没有参