循序渐进ActiveMQ(1)----HelloWorld

2023-11-29 19:18

本文主要是介绍循序渐进ActiveMQ(1)----HelloWorld,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

mq部署方式:单节点、集群、主从

介绍

当前,corba、docm、rmi等rpc中间件技术已经广泛应用于各个领域。但是面对
规模和复杂度都越来搞的分布式系统,这些技术也显示出其局限性:
(1)同步通信,客户发出调用后,必须等待服务对象完成处理并返回结果才能继续执行;
(2)客户和服务对象的生命周期紧密耦合:客户进程和服务对象进行都必须正常运行;如果由于
服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常;
(3)点对点通信:客户的一次调用只发送给某个单独的目标对象。


面向消息的中间件(Message Oriented middleware,MOM)较好的解决了以上问题。
发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。
1 这种模式下,发送和接收是异步的,发送者无需等待;
2 二者的生命周期未必相同:发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行:
3 一对多通信:对于一个消息可以有多个接收者。


JAVA消息服务(JMS)定义了java中访问消息中间件的接口。jms只是接口,并没有给予实现,实现了jms接口的
消息中间件称为jms provider,已有的MOM系统包括Apache的ActiveMQ、阿里的RocketMQ、IBM的MQSeries、
Microsoft的MSMQ和bEA的MessageQ、RabbitMQ等等,它们基本都遵循JMS规范

 

ActiveMq是Apache出品,最流行的,能力强劲的开源消息总线。
ActiveMq是一个完全支持jms1.1,和j2ee1.4规范的jms provider的实现。
尽管jms规范出台很久了,但是jms依然在当今的j2ee应用中扮演着特殊的角色。
可以说ActiveMq在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,
ActiveMq还需不断的升级版本,80%以上的业务,我们使用ActiveMq是可以满足需求的,
当然后续如淘宝、天猫及双11这种特殊时间,ActiveMq需要进行很复杂的优化源码及架构设计才能完成。
RocketMq是阿里巴巴的一个更强大的分布式消息中间件。而ActiveMq是核心和基础,所以需要掌握好。

 

 

JMS规范

消息中间件需要实现JMS接口:

  • Provider(MessageProvider):生产者
  • Consumer(MessageConsumer):消费者

 

  • PTP:Point To Point,即点对点的消息模型
  • Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型

 

  • Queue:队列目标
  • Topic:主题目标

 

  • ConnectionFactory:连接工厂。JMS用它创建连接。
  • Connection:JMS客户端到JMS Provider的;连接。
  • Destination:消息的目的地。
  • Session:会话,一个发送或者接收消息的线程




 

ConnectionFactory接口(连接工厂)

用户用来创建到JMS提供者的连接的【被管对象】。jms客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或主题连接工厂。

 

Connection接口(连接)

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂以后,就可以创建一个与jms提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
 

Destination接口(目标)

目标是一个包装了消息目标标识符的【被管对象】,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。
JMS管理员创建这些对象,然后用户通过jndi发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的queue,以及发布者/订阅者模型的Topic


 

MessageConsumer接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
 

MessageProducer接口(消息生产者)

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用发送者,在发送消息时指定目标。
 

Message接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和消息寻找路由的操作设置。
一组消息属性(可选):包括额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)
一个消息体(可选):允许用户创建五种类型的消息(文本消息、映射消息、字节消息、流消息、对象消息)
消息接口非常灵活,并提供了许多方式来定制消息的内容。

JMS顶一个五种不过不通过的消息正文格式,一级调用的消息类型,允许你发送并接收一些不通过形式的数据,提供现有消息格式的一些级别的兼容性。

  • StreamMessage java原始的数据流
  • MapMessage 一套名称-值对
  • TextMessage 一个字符串对象
  • ObjectMessage 一个序列化的java对象
  • ByteMessage 一个未解释字节的数据流


 

Session接口(会话)

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,也就是说消息是按照发送的顺序一个一个接收的。

会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。

一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。




 

使用方式

去官网下载:http://activemq.apache.org,可以下载最新的,这里使用apache-activemq-5.11.1-bin.zip
使用windows系统改来操作下。现在已经更新到了ActiveMQ 5.15.12 (March 9, 2020)


目录结构

加压缩,查看目录结构:
 
 
        activemq内置jetty容器,activemq可以由不同的存储方式持久化,比如jdbc入库,webapps下是管控台代码,直接运行bin下bat文件就可以把管控台部署运行起来。
 
http://localhost:8161/  默认端口8161,在jetty.xml文件中可以改端口

  <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"><!-- the default port number for the web console --><property name="host" value="0.0.0.0"/><property name="port" value="8161"/></bean>

在jetty-realm.properties可以改用户密码

 

开机启动

在D:\apache-activemq-5.15.12\bin下创建start.bat,如下:

@echo offstart /d "D:\apache-activemq-5.15.12\bin\win64"  activemq.bat
@echo 

打开Win+R-->gpedit.msc,按图操作:

确定。

或者直接点击InstallService.bat:

开始编码

我们实现一个消息发送者Sender和消息接收者Receiver,发送者生产消息发送到activemq,启动接收者去消费消息

Sender:

package jeff.mq.helloworld;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** @author jeffSheng* 2018年7月3日*/
public class Sender {public static void main(String[] args) throws Exception {/*** 第一步:* 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均* 使用默认即可,默认端口“tcp://localhost:61616”*/ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");/*** 第二步:* 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法* 开启连接,Connection连接默认是关闭的。*/Connection connection = connectionFactory.createConnection();connection.start();/*** 第三步:* 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,* 参数配置2为签收模式,一般我们设置自动签收。*///我们这里不开启事务Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);/*** 第四步:* 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,* 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题* 在程序众包给可以使用多个Queue和Topic*/Destination destination = session.createQueue("queue1");/*** 第五步:* 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者)* MessageProcuder/MessageConsumer*/MessageProducer messageProducer = session.createProducer(destination);/*** 第六步:* 我们可以使用MessageProducer的setDeliverMode方法为其设置持久化特性和非持久化特性(DeliverMode)*///如果设置为持久话方式,我们需要指定具体持久话策略,比如jdbc持久化到数据库messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);/*** 第七步:* 最后我们使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据,* 同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection*/for (int i = 0; i <= 5; i++) {TextMessage textMessage = session.createTextMessage();textMessage.setText("我是消息,Id:"+i);messageProducer.send(textMessage);System.out.println("生产者:"+textMessage.getText());}//关闭方法会递归向下关闭会话等连接if(connection!=null){connection.close();}}}

启动打印:

生产者:我是消息,Id:0
生产者:我是消息,Id:1
生产者:我是消息,Id:2
生产者:我是消息,Id:3
生产者:我是消息,Id:4
生产者:我是消息,Id:5

消息已经发送到activemq消息服务器了,我们看下:

 接下来我们启动消费者Receiver:

package jeff.mq.helloworld;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** @author jeffSheng* 2018年7月3日*/
public class Receiver {public static void main(String[] args) throws Exception {/*** 第一步:* 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均* 使用默认即可,默认端口“tcp://localhost:61616”*/ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");/*** 第二步:* 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法* 开启连接,Connection连接默认是关闭的。*/Connection connection = connectionFactory.createConnection();connection.start();/*** 第三步:* 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,* 参数配置2为签收模式,一般我们设置自动签收。*///我们这里不开启事务Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);/*** 第四步:* 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,* 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题* 在程序众包给可以使用多个Queue和Topic*/Destination destination = session.createQueue("queue1");/*** 第五步:* 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者)* MessageProcuder/MessageConsumer*/MessageConsumer messageConsumer = session.createConsumer(destination);while(true){TextMessage msg = (TextMessage)messageConsumer.receive();if(msg==null)break;System.out.println("收到内容: "+msg.getText());}//关闭方法会递归向下关闭会话等连接if(connection!=null){connection.close();}}}

 

 启动并打印:

 

收到内容: 我是消息,Id:0
收到内容: 我是消息,Id:1
收到内容: 我是消息,Id:2
收到内容: 我是消息,Id:3
收到内容: 我是消息,Id:4
收到内容: 我是消息,Id:5

再观察下消息服务器:

 

消费者receive方法是同步PULL拉取模式,有三种重载方法:

 

receive():拉取时阻塞的直到获得消息的

receive(long arg):拉取时最多阻塞一段时间的

receiveNoWait():拉取时不阻塞的,有则返回,无则返回null






















 

这篇关于循序渐进ActiveMQ(1)----HelloWorld的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/434015

相关文章

每天认识几个maven依赖(ActiveMQ+activemq-jaxb+activesoap+activespace+adarwin)

八、ActiveMQ 1、是什么? ActiveMQ 是一个开源的消息中间件(Message Broker),由 Apache 软件基金会开发和维护。它实现了 Java 消息服务(Java Message Service, JMS)规范,并支持多种消息传递协议,包括 AMQP、MQTT 和 OpenWire 等。 2、有什么用? 可靠性:ActiveMQ 提供了消息持久性和事务支持,确保消

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

ActiveMQ—安装配置及使用

安装配置及使用 转自:http://blog.csdn.net/qq_21033663/article/details/52461543 (一)ActiveMQ介绍 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了

ActiveMQ—Queue与Topic区别

Queue与Topic区别 转自:http://blog.csdn.net/qq_21033663/article/details/52458305 队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:         1、点对点(point-to-point,简称PTP)Queue消息传递模型:         通过该消息传递模型,一个应用程序(即消息生产者)可以

Python简单入门教程helloworld

Python 学习资源 推荐书籍: Python核心编程(第二版) (强烈推荐,建议有一定基础的看,或者看完简明Python教程再看) Python 基础教程 第二版 (入门,没有核心编程好,但也不错) 编写高质量代码:改善Python程序的91个建议 (进阶,有一定基础再看) 书籍下载: Python 教程(部分内容来源于网络, 历时一年多总结整理的,给刚刚入门的

jms与ActiveMQ实践与应用

前言 这是我自己从不知道JMS为何物到学习如何使用第三方工具实现跨服务器的知识总结,在整个过程中可能考虑不全。另外,如果想尽快使用JMS,建议直接看实例那一节就可以了。有问题多交流。 词语解释 (有些词可能用的不是很正确,在这里我把自己能意识到的词拿出来解释一下): 1、  跨服务器:专业术语好像叫“跨实例”。意思是,可以在多个服务器(可以是不同的服务器,如resin与tomcat)之间相

HelloWorld 模块

helloworld.c 代码 #include <linux/init.h>#include <linux/module.h>MODULE_LICENSE("Dual BSD/GPL");static int hello_init(void){printk(KERN_ALERT "Hello world\n");return 0;}static void hello_exit(v

Linux apache-activemq安装及配置

1.  apache-activemq安装   备注:apache-activemq安装时JDK必须在1.5以,否则不能访问。 (1)从官网下载Activemq Linux包http://activemq.apache.org/download.html.这儿我下载的是 apache-activemq-5.4.3-bin.tar.gz (2)解压包 tar zxvf apache

Spring整合JMS基于ActiveMQ实现

转载地址:http://haohaoxuexi.iteye.com/blog/1893038?page=2#comments 1.1     JMS简介        JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一

ROS程序设计系列 - 5.实例helloworld

ROS程序设计系列 - 5.实例helloworld 1. 源由2. 步骤Step 1:安装ROS系统Step 2:创建框架工程Step 3:检查工程结构Step 4:创建CPP文件Step 5:修改配置文件Step 6:编译工程 3. 测试Step 1: 启动ROS CoreStep 2: 启动Hello程序 4. 总结5. 参考资料6. 补充 1. 源由 在做《Ardupil