ActiveMQ点对点模式(PTP)

2024-03-15 16:32
文章标签 模式 activemq ptp 点对点

本文主要是介绍ActiveMQ点对点模式(PTP),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、创建Maven测试项目





二、pom.xml

    <dependencies>
        <!-- 消息队列 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
            <version>5.11.3</version>
        </dependency>
        
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
        </dependency>
    </dependencies>

三、生产者(消息发送者)

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

第二步:使用ConnectionFactory对象创建一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象(topicqueue),此处创建一个Queue对象

第六步:使用Session对象创建一个Producer对象。

第七步:创建一个Message对象,创建一个TextMessage对象。

第八步:使用Producer对象发送消息。

第九步:关闭资源。

四、消费者(消息接收者)

消费者:接收消息。

第一步:创建一个ConnectionFactory对象。

第二步:从ConnectionFactory对象中获得一个Connection对象。

第三步:开启连接。调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致

第六步:使用Session对象创建一个Consumer对象。

第七步:接收消息。

第八步:打印消息。

第九步:关闭资源


测试1:先运行生产者,在运行消费者消费消息

测试2:先测试消费者等待消息到来,再运行生产者生产消息

注意:消费者运行后会一直等待键盘输入

package name.yaohuan.activemq;
public class QueueTest {
    @Test
    public void testProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        //brokerURL服务器的ip及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
        //参数:队列的名称。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*TextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }
    
    @Test
    public void testConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    //取消息的内容
                    String text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待键盘输入
        System.in.read();//这句话会让消费者一直在线
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

注意:
如果使用的是阿里云服务器的话注意端口的问题,在安全组里查看tcp的端口配置规则。
消息发布后,点击查看消息时,在activemq中默认会报告错误,如下
 
原因:
此版本的activemq不支持运行在jdk1.8上,可以切换成jdk1.7

解决方案:
配置内置的jetty服务使用jdk1.7启动
在activemq的bin路径下找到 env 文件
修改其中的配置,将
#JAVA_HOME=""
改为
JAVA_HOME="/usr/local/jdk1.7.0_76/"

重启activemq
./activemq restart

这篇关于ActiveMQ点对点模式(PTP)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/812577

相关文章

每天认识几个maven依赖(ActiveMQ+activemq-jaxb+activesoap+activespace+adarwin)

八、ActiveMQ 1、是什么? ActiveMQ 是一个开源的消息中间件(Message Broker),由 Apache 软件基金会开发和维护。它实现了 Java 消息服务(Java Message Service, JMS)规范,并支持多种消息传递协议,包括 AMQP、MQTT 和 OpenWire 等。 2、有什么用? 可靠性:ActiveMQ 提供了消息持久性和事务支持,确保消

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

模版方法模式template method

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/template-method 超类中定义了一个算法的框架, 允许子类在不修改结构的情况下重写算法的特定步骤。 上层接口有默认实现的方法和子类需要自己实现的方法

【iOS】MVC模式

MVC模式 MVC模式MVC模式demo MVC模式 MVC模式全称为model(模型)view(视图)controller(控制器),他分为三个不同的层分别负责不同的职责。 View:该层用于存放视图,该层中我们可以对页面及控件进行布局。Model:模型一般都拥有很好的可复用性,在该层中,我们可以统一管理一些数据。Controlller:该层充当一个CPU的功能,即该应用程序

迭代器模式iterator

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/iterator 不暴露集合底层表现形式 (列表、 栈和树等) 的情况下遍历集合中所有的元素

《x86汇编语言:从实模式到保护模式》视频来了

《x86汇编语言:从实模式到保护模式》视频来了 很多朋友留言,说我的专栏《x86汇编语言:从实模式到保护模式》写得很详细,还有的朋友希望我能写得更细,最好是覆盖全书的所有章节。 毕竟我不是作者,只有作者的解读才是最权威的。 当初我学习这本书的时候,只能靠自己摸索,网上搜不到什么好资源。 如果你正在学这本书或者汇编语言,那你有福气了。 本书作者李忠老师,以此书为蓝本,录制了全套视频。 试

利用命令模式构建高效的手游后端架构

在现代手游开发中,后端架构的设计对于支持高并发、快速迭代和复杂游戏逻辑至关重要。命令模式作为一种行为设计模式,可以有效地解耦请求的发起者与接收者,提升系统的可维护性和扩展性。本文将深入探讨如何利用命令模式构建一个强大且灵活的手游后端架构。 1. 命令模式的概念与优势 命令模式通过将请求封装为对象,使得请求的发起者和接收者之间的耦合度降低。这种模式的主要优势包括: 解耦请求发起者与处理者

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

ActiveMQ—安装配置及使用

安装配置及使用 转自:http://blog.csdn.net/qq_21033663/article/details/52461543 (一)ActiveMQ介绍 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了

ActiveMQ—Queue与Topic区别

Queue与Topic区别 转自:http://blog.csdn.net/qq_21033663/article/details/52458305 队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:         1、点对点(point-to-point,简称PTP)Queue消息传递模型:         通过该消息传递模型,一个应用程序(即消息生产者)可以