四、ActiveMq---JMS的实现

2024-05-10 21:58
文章标签 实现 activemq jms

本文主要是介绍四、ActiveMq---JMS的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

目录: 

复制代码

一:JMQ的两种消息模式1.1:点对点的消息模式1.2:订阅模式
二:点对点的实现代码2.1:点对点的发送端2.2:点对点的接收端
三:订阅/发布模式的实现代码3.1:订阅模式的发送端3.2:订阅模式的接收端
四:发送消息的数据类型4.1:传递javabean对象4.2:发送文件
五:ActiveMQ的应用5.1:保证消息的成功处理5.2:避免消息队列的并发5.2.1:主动接收队列消息5.2.2:使用多个接收端5.3:消息有效期的管理5.4:过期消息,处理失败的消息如何处理

六:ActiveMQ的安全配置

  6.1:管理后台的密码设置

   6.2:生产消费者的连接密码

 

复制代码

 

 

 

一:JMQ的两种消息模式

消息列队有两种消息模式,一种是点对点的消息模式,还有一种就是订阅的模式.

 

1.1:点对点的消息模式

 

点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息

 

1.2:订阅模式

 

订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是,发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。

 

 

 

 

二:点对点的实现代码

这里使用java来实现一下ActiveMQ的点对点模式。

ActiveMQ版本为 5.13.3

项目使用MAVEN来构建

复制代码

    <dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version></dependency></dependencies>

复制代码

都是当前最新的版本

 

2.1:点对点的发送端

复制代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class PTPSend {//连接账号private String userName = "";//连接密码private String password = "";//连接地址private String brokerURL = "tcp://192.168.0.130:61616";//connection的工厂private ConnectionFactory factory;//连接对象private Connection connection;//一个操作会话private Session session;//目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topicprivate Destination destination;//生产者,就是产生数据的对象private MessageProducer producer;public static void main(String[] args) {PTPSend send = new PTPSend();send.start();}public void start(){try {//根据用户名,密码,url创建一个连接工厂factory = new ActiveMQConnectionFactory(userName, password, brokerURL);//从工厂中获取一个连接connection = factory.createConnection();//测试过这个步骤不写也是可以的,但是网上的各个文档都写了connection.start();//创建一个session//第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED//第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。//Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。//DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建destination = session.createQueue("text-msg");//从session中,获取一个消息生产者producer = session.createProducer(destination);//设置生产者的模式,有两种可选//DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存//DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空producer.setDeliveryMode(DeliveryMode.PERSISTENT);//创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来TextMessage textMsg = session.createTextMessage("呵呵");for(int i = 0 ; i < 100 ; i ++){//发送一条消息producer.send(textMsg);}System.out.println("发送消息成功");//即便生产者的对象关闭了,程序还在运行哦producer.close();} catch (JMSException e) {e.printStackTrace();}}
}

复制代码

 

 

2.2:点对点的接收端

 

复制代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class PTPReceive {//连接账号private String userName = "";//连接密码private String password = "";//连接地址private String brokerURL = "tcp://192.168.0.130:61616";//connection的工厂private ConnectionFactory factory;//连接对象private Connection connection;//一个操作会话private Session session;//目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topicprivate Destination destination;//消费者,就是接收数据的对象private MessageConsumer consumer;public static void main(String[] args) {PTPReceive receive = new PTPReceive();receive.start();}public void start(){try {//根据用户名,密码,url创建一个连接工厂factory = new ActiveMQConnectionFactory(userName, password, brokerURL);//从工厂中获取一个连接connection = factory.createConnection();//测试过这个步骤不写也是可以的,但是网上的各个文档都写了connection.start();//创建一个session//第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED//第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。//Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。//DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建destination = session.createQueue("text-msg");//根据session,创建一个接收者对象consumer = session.createConsumer(destination);//实现一个消息的监听器//实现这个监听器后,以后只要有消息,就会通过这个监听器接收到consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {//获取到接收的数据String text = ((TextMessage)message).getText();System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});//关闭接收端,也不会终止程序哦
//            consumer.close();} catch (JMSException e) {e.printStackTrace();}}
}

复制代码

 

 

 

 

 

三:订阅/发布模式的实现代码

3.1:订阅模式的发送端

复制代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class TOPSend {//连接账号private String userName = "";//连接密码private String password = "";//连接地址private String brokerURL = "tcp://192.168.0.130:61616";//connection的工厂private ConnectionFactory factory;//连接对象private Connection connection;//一个操作会话private Session session;//目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topicprivate Destination destination;//生产者,就是产生数据的对象private MessageProducer producer;public static void main(String[] args) {TOPSend send = new TOPSend();send.start();}public void start(){try {//根据用户名,密码,url创建一个连接工厂factory = new ActiveMQConnectionFactory(userName, password, brokerURL);//从工厂中获取一个连接connection = factory.createConnection();//测试过这个步骤不写也是可以的,但是网上的各个文档都写了connection.start();//创建一个session//第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED//第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。//Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。//DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建//=======================================================//点对点与订阅模式唯一不同的地方,就是这一行代码,点对点创建的是Queue,而订阅模式创建的是Topicdestination = session.createTopic("topic-text");//=======================================================//从session中,获取一个消息生产者producer = session.createProducer(destination);//设置生产者的模式,有两种可选//DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存//DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空producer.setDeliveryMode(DeliveryMode.PERSISTENT);//创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来TextMessage textMsg = session.createTextMessage("哈哈");long s = System.currentTimeMillis();for(int i = 0 ; i < 100 ; i ++){//发送一条消息producer.send(textMsg);}long e = System.currentTimeMillis();System.out.println("发送消息成功");System.out.println(e - s);//即便生产者的对象关闭了,程序还在运行哦producer.close();} catch (JMSException e) {e.printStackTrace();}}
}

复制代码

 

 

3.2:订阅模式的接收端

复制代码

package com.test.mq.pubsub;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class Sub {//连接账号private String userName = "";//连接密码private String password = "";//连接地址private String brokerURL = "tcp://127.0.0.1:61616";//connection的工厂private ConnectionFactory factory;//连接对象private Connection connection;//一个操作会话private Session session;//目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topicprivate Destination destination;//生产者,就是产生数据的对象private MessageProducer producer;//消费者,消费队列中数据的地方private MessageConsumer consumer;public static void main(String[] args) {Sub send = new Sub();send.start();}public void start(){try {//根据用户名,密码,url创建一个连接工厂factory = new ActiveMQConnectionFactory(userName, password, brokerURL);//从工厂中获取一个连接connection = factory.createConnection();//测试过这个步骤不写也是可以的,但是网上的各个文档都写了connection.start();//创建一个session//第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED//第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。//Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。//DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建//=======================================================//点对点与订阅模式唯一不同的地方,就是这一行代码,点对点创建的是Queue,而订阅模式创建的是Topicdestination = session.createTopic("topic-text");//=======================================================//从session中,获取一个消息生产者consumer = session.createConsumer(destination);//设置生产者的模式,有两种可选//DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存//DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {// TODO Auto-generated method stubSystem.out.println("收到message");try {System.out.println("订阅者一收到的消息:" + ((TextMessage)message).getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}});//consumer.close();} catch (JMSException e) {e.printStackTrace();}}
}

复制代码

 

 

 

 

 

四:发送消息的数据类型

上面的代码演示,全部都是发送字符串,但是ActiveMQ支持哪些数据呢?

大家可以看一下  javax.jms.Message 这个接口,只要是这个接口的数据,都可以被发送。

 

或者这样看起来有点麻烦,那么看到上面的代码,创建消息,是通过session这个对象来创建的,那我们来看一下这里有哪些可以被创建的呢?

复制代码

            //纯字符串的数据session.createTextMessage();//序列化的对象session.createObjectMessage();//流,可以用来传递文件等session.createStreamMessage();//用来传递字节session.createBytesMessage();//这个方法创建出来的就是一个map,可以把它当作map来用,当你看了它的一些方法,你就懂了session.createMapMessage();//这个方法,拿到的是javax.jms.Message,是所有message的接口session.createMessage();

复制代码

 

 

4.1:传递javabean对象

传递一个java对象,可能是最多的使用方式了,而且这种数据接收与使用都方便,那么,下面的代码就来演示下如何发送一个java对象

当然了,这个对象必须序列化,也就是实现Serializable接口

 

复制代码

            //通过这个方法,可以把一个对象发送出去,当然,这个对象需要序列化,因为一切在网络在传输的,都是字节ObjectMessage obj = session.createObjectMessage();for(int i = 0 ; i < 100 ; i ++){Person p = new Person(i,"名字");obj.setObject(p);producer.send(obj);}

复制代码

 

那么在接收端要怎么接收这个对象呢?

 

复制代码

            //实现一个消息的监听器//实现这个监听器后,以后只要有消息,就会通过这个监听器接收到consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {//同样的,强转为ObjectMessage,然后拿到对象,强转为PersonPerson p = (Person) ((ObjectMessage)message).getObject();System.out.println(p);} catch (JMSException e) {e.printStackTrace();}}});

复制代码

 

 

 

 

4.2:发送文件

 

发送文件,这里用BytesMessage

            BytesMessage bb = session.createBytesMessage();bb.writeBytes(new byte[]{2});

至于这里的new Byte[]{2},肯定不是这样写的,从文件里面拿流出来即可

 

 

接收的话

复制代码

            consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {BytesMessage bm = (BytesMessage)message;FileOutputStream out = null;try {out = new FileOutputStream("d:/1.ext");} catch (FileNotFoundException e2) {e2.printStackTrace();}byte[] by = new byte[1024];int len = 0 ;try {while((len = bm.readBytes(by))!= -1){out.write(by,0,len);}} catch (JMSException | IOException e1) {e1.printStackTrace();}}});

复制代码

 

 

 

 

 

 

五:ActiveMQ的应用

5.1:保证消息的成功处理

消息发送成功后,接收端接收到了消息。然后进行处理,但是可能由于某种原因,高并发也好,IO阻塞也好,反正这条消息在接收端处理失败了。而点对点的特性是一条消息,只会被一个接收端给接收,只要接收端A接收成功了,接收端B,就不可能接收到这条消息,如果是一些普通的消息还好,但是如果是一些很重要的消息,比如说用户的支付订单,用户的退款,这些与金钱相关的,是必须保证成功的,那么这个时候要怎么处理呢?

 

我们可以使用  CLIENT_ACKNOWLEDGE  模式

 

之前其实就有提到当创建一个session的时候,需要指定其事务,及消息的处理模式,当时使用的是

 

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

AUTO_ACKNOWLEDGE 

这一个代码的是,当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端接收到同样的消息。

那么,它还有另外一个模式,那就是 CLIENT_ACKNOWLEDGE

这行要写在接收端里面,不是写在发送端的

 

session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

 

这行代码以后,如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确定了消息。

那么要怎么确认消息呢?

在接收端接收到消息的时候,调用javax.jms.Message的acknowledge方法

复制代码

@Overridepublic void onMessage(Message message) {try {//获取到接收的数据String text = ((TextMessage)message).getText();System.out.println(text);//确认接收,并成功处理了消息message.acknowledge();} catch (JMSException e) {e.printStackTrace();}}

复制代码

这样,当消息处理成功之后,确认消息,如果不确定,activemq将会发给下一个接收端处理

 注意:只在点对点中有效,订阅模式,即使不确认,也不会保存消息

 

 

5.2:避免消息队列的并发

JMQ设计出来的原因,就是用来避免并发的,和沟通两个系统之间的交互。

 

5.2.1:主动接收队列消息

 

先看一下之前的代码:

复制代码

            //实现一个消息的监听器//实现这个监听器后,以后只要有消息,就会通过这个监听器接收到consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {//获取到接收的数据String text = ((TextMessage)message).getText();System.out.println(text);//确认接收,并成功处理了消息message.acknowledge();} catch (JMSException e) {e.printStackTrace();}}});

复制代码

之前的代码里面,实现了一个监听器,监听消息的传递,这样只要每有一个消息,都会即时的传递到程序中。

但是,这样的处理,在高并发的时候,因为它是被动接收,并没有考虑到程序的处理能力,可能会压跨系统,那要怎么办呢?

 

答案就是把被动变为主动,当程序有着处理消息的能力时,主动去接收一条消息进行处理

 

实现的代码如下:

复制代码

      if(当程序有能力处理){//当程序有能力处理时接收Message receive = consumer.receive();//这个可以设置超时时间,超过则不等待消息 recieve.receive(10000);//其实receive是一个阻塞式方法,一定会拿到值的if(null != receive){String text = ((TextMessage)receive).getText();receive.acknowledge();System.out.println(text);}else{//没有值嘛//}}

复制代码

 

通过上面的代码,就可以让程序自已判断,自己是否有能力接收这条消息,如果不能接收,那就给别的接收端接收,或者等自己有能力处理的时候接收

 

 

5.2.2:使用多个接收端

ActiveMQ是支持多个接收端的,如果当程序无法处理这么多数据的时候,可以考虑多个线程,或者增加服务器来处理。

 

 

 

5.3:消息有效期的管理

这样的场景也是有的,一条消息的有效时间,当发送一条消息的时候,可能希望这条消息在指定的时间被处理,如果超过了指定的时间,那么这条消息就失效了,就不需要进行处理了,那么我们可以使用ActiveMQ的设置有效期来实现

 

代码如下:

复制代码

            TextMessage msg = session.createTextMessage("哈哈");for(int i = 0 ; i < 100 ; i ++){//设置该消息的超时时间producer.setTimeToLive(i * 1000);producer.send(msg);}

复制代码

 

这里每一条消息的有效期都是不同的,打开ip:8161/admin/就可以查看到,里面的消息越来越少了。

 

过期的消息是不会被接收到的。

 

过期的消息会从队列中清除,并存储到ActiveMQ.DLQ这个队列里面,这个稍后会解释。

 

 

5.4:过期消息,处理失败的消息如何处理

 过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。

这个队列是ActiveMQ自动创建的。

如果需要查看这些未被处理的消息,可以进入这个队列中查看

//指定一个目的地,也就是一个队列的位置
destination = session.createQueue("ActiveMQ.DLQ");

这样就可以进入队列中,然后实现接口,或者通过receive()方法,就可以拿到未被处理的消息,从而保证正确的处理

 

 

六:ActiveMQ的安全配置

6.1:管理后台的密码设置

我们都知道,打开ip:8161/admin/ 就是activemq的管理控制台,它的默认账号和密码都是admin,在生产环境肯定需要更改密码的,这要怎么做呢?

在activemq/conf/jetty.xml中找到

复制代码

   <pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"><property name="name" value="BASIC" /><property name="roles" value="admin" /><!-- 把这个改为true,当然,高版本的已经改为了true --><property name="authenticate" value="true" /></bean>

复制代码

高版本的已经默认成为了true。所以我们直接进行下一步即可

在activemq/conf/jetty-realm.properties文件中配置,打开如下

复制代码

## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
#用户名,密码,角色
admin: admin, admin
user: user, user

复制代码

注意:大家重点看倒数第二行,那里三个分别是用户名,密码,角色,其中admin角色是固定的

 

 

6.2:生产消费者的连接密码

注意:activemq默认是不需要密码,生产消费者就可以连接的

我们需要经过配置,才能设置密码,这一步在生产环境中一定要配置

找到activemq/conf/activemq.xml,并打开
在<broker>节点中,在<systemUsage>节点上面,增加如下的一个插件

复制代码

<plugins><simpleAuthenticationPlugin><users><authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/></users></simpleAuthenticationPlugin></plugins>

复制代码

这样就开启了密码认证
然后账号密码的配置在activemq/conf/credentials.properties文件中

打开这个文件如下

复制代码

## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------# Defines credentials that will be used by components (like web console) to access the broker#账号
activemq.username=admin
#密码
activemq.password=123456
guest.password=password

复制代码

这样就配置完毕了。

 

七、ActiveMq消息类型

Activemq消息类型
JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种。ActiveMQ也有对应的实现,下面我们结合Spring JMS分别来看一下五种消息类型的收发代码。
1、TextMessage

复制代码

/*** 向指定Destination发送text消息* @param destination* @param message*/public void sendTxtMessage(Destination destination, final String message){if(null == destination){destination = jmsTemplate.getDefaultDestination();}jmsTemplate.send(destination, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message);}});System.out.println("springJMS send text message...");}

复制代码

2、MapMessage

复制代码

/*** 向指定Destination发送map消息* @param destination* @param message*/public void sendMapMessage(Destination destination, final String message){if(null == destination){destination = jmsTemplate.getDefaultDestination();}jmsTemplate.send(destination, new MessageCreator() {public Message createMessage(Session session) throws JMSException {MapMessage mapMessage = session.createMapMessage();mapMessage.setString("msgId",message);return mapMessage;}});System.out.println("springJMS send map message...");}

复制代码

3、ObjectMessage

复制代码

 /*** 向指定Destination发送序列化的对象* @param destination* @param object object 必须序列化*/public void sendObjectMessage(Destination destination, final Serializable object){if(null == destination){destination = jmsTemplate.getDefaultDestination();}jmsTemplate.send(destination, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createObjectMessage(object);}});System.out.println("springJMS send object message...");}

复制代码

4、BytesMessage

复制代码

/*** 向指定Destination发送字节消息* @param destination* @param bytes*/public void sendBytesMessage(Destination destination, final byte[] bytes){if(null == destination){destination = jmsTemplate.getDefaultDestination();}jmsTemplate.send(destination, new MessageCreator() {public Message createMessage(Session session) throws JMSException {BytesMessage bytesMessage = session.createBytesMessage();bytesMessage.writeBytes(bytes);return bytesMessage;}});System.out.println("springJMS send bytes message...");}

复制代码

5、streamMessage

复制代码

/*** 向默认队列发送Stream消息*/public void sendStreamMessage(Destination destination) {jmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {StreamMessage message = session.createStreamMessage();message.writeString("stream string");message.writeInt(11111);return message;}});System.out.println("springJMS send Strem message...");}

复制代码

消息接收处理

复制代码

 /*** 根据消息类型进行对应的处理* @param destination 消息发送/接收共同的Destination* @throws JMSException*/public void receive(Destination destination) throws JMSException {Message message = jmsTemplate.receive(destination);// 如果是文本消息if (message instanceof TextMessage) {TextMessage tm = (TextMessage) message;System.out.println("from" + destination.toString() + " get textMessage:\t" + tm.getText());}// 如果是Map消息if (message instanceof MapMessage) {MapMessage mm = (MapMessage) message;System.out.println("from" + destination.toString() + " get textMessage:\t" + mm.getString("msgId"));}// 如果是Object消息if (message instanceof ObjectMessage) {ObjectMessage om = (ObjectMessage) message;ExampleUser exampleUser = (ExampleUser) om.getObject();System.out.println("from" + destination.toString() + " get ObjectMessage:\t"+ ToStringBuilder.reflectionToString(exampleUser));}// 如果是bytes消息if (message instanceof BytesMessage) {byte[] b = new byte[1024];int len = -1;BytesMessage bm = (BytesMessage) message;while ((len = bm.readBytes(b)) != -1) {System.out.println(new String(b, 0, len));}}// 如果是Stream消息if (message instanceof StreamMessage) {StreamMessage sm = (StreamMessage) message;System.out.println(sm.readString());System.out.println(sm.readInt());}}

复制代码

 

 

这篇关于四、ActiveMq---JMS的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/977672

相关文章

每天认识几个maven依赖(ActiveMQ+activemq-jaxb+activesoap+activespace+adarwin)

八、ActiveMQ 1、是什么? ActiveMQ 是一个开源的消息中间件(Message Broker),由 Apache 软件基金会开发和维护。它实现了 Java 消息服务(Java Message Service, JMS)规范,并支持多种消息传递协议,包括 AMQP、MQTT 和 OpenWire 等。 2、有什么用? 可靠性:ActiveMQ 提供了消息持久性和事务支持,确保消

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

Kubernetes PodSecurityPolicy:PSP能实现的5种主要安全策略

Kubernetes PodSecurityPolicy:PSP能实现的5种主要安全策略 1. 特权模式限制2. 宿主机资源隔离3. 用户和组管理4. 权限提升控制5. SELinux配置 💖The Begin💖点点关注,收藏不迷路💖 Kubernetes的PodSecurityPolicy(PSP)是一个关键的安全特性,它在Pod创建之前实施安全策略,确保P

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

C++——stack、queue的实现及deque的介绍

目录 1.stack与queue的实现 1.1stack的实现  1.2 queue的实现 2.重温vector、list、stack、queue的介绍 2.1 STL标准库中stack和queue的底层结构  3.deque的简单介绍 3.1为什么选择deque作为stack和queue的底层默认容器  3.2 STL中对stack与queue的模拟实现 ①stack模拟实现