ActiveMQ源码架构解析第二节

2024-05-07 09:18

本文主要是介绍ActiveMQ源码架构解析第二节,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ActiveMQ源码架构解析第二节

  • 博客分类:
  • ActiveMQ

 

    本节主要内容就是讲解消息的传递方式,上一节已经讲解完客户端和broker端连接的建立方式,在Connection、Session、Producer类对象建立的同时,客户端和broker端会进行一些消息交互,ActiveMQ中把所有的消息交互的内容都叫做Command,每条消息对应一个Command,例如客户端刚连接到broker,broker会发送一个BrokerInfo信息到客户端,接着客户端会发送ConnectionInfo连接信息、ProducerInfo生产者信息等等到服务端,如下图所示:

 



 

 

 

 

这些命令都继承于BaseCommand抽象类并实现于Command接口,类图如下,这里使用了访问者设计模式以及适配器设计模式。



 

适配器模式分为三种实现,第一种是通过继承实现,第二种是通过组合实现,第三种也就是类图中画的叫做默认适配器,CommandVisiter接口中定义了处理所有消息的方法,是broker和客户端api公用的一个接口,但是客户端用不到全部接口,如果实现这个接口那又必须实现全部的方法,所以此处在接口和具体类中间新增了一个CommandVisiterAdapter抽象类实现了全部的处理消息的方法并且全部都是空实现,这样在新建DefaultVisiter的时候就可以根据自己的需要来选择相应的方法进行实现了,访问者设计模式的体现请看下面的代码:

 

/**

 

     * reads packets from a Socket

 

     */

 

    publicvoid run() {

 

        LOG.trace("TCP consumer thread for " + this + " starting");

 

        this.runnerThread=Thread.currentThread();

 

        try {

 

            while (!isStopped()) {

 

                doRun();

 

            }

 

        } catch (IOException e) {

 

            stoppedLatch.get().countDown();

 

            onException(e);

 

        } catch (Throwable e){

 

            stoppedLatch.get().countDown();

 

            IOException ioe=new IOException("Unexpected error occured: " + e);

 

            ioe.initCause(e);

 

            onException(ioe);

 

        }finally {

 

            stoppedLatch.get().countDown();

 

        }

 

    }

 

 

 

    protectedvoid doRun() throws IOException {

 

        try {

 

            Object command = readCommand();

 

            doConsume(command);

 

        } catch (SocketTimeoutException e) {

 

        } catch (InterruptedIOException e) {

 

        }

 

    }

 

 

 

    protected Object readCommand() throws IOException {

 

        return wireFormat.unmarshal(dataIn);

 

}

 

 

 

这段代码是来自TcpTransport.java中,上一节已经讲解完TcpTransport的建立所以此处不在熬述,可以看到线程启动后一直在调用doRun方法,doRun方法则调用readCommand来读取客户端发送过来的信息,读到之后就会调用ActiveMQConnection.javaonCommand方法,如下代码:

 

 

 

@Override

 

    publicvoid onCommand(final Object o) {

 

        final Command command = (Command)o;

 

        if (!closed.get() && command != null) {

 

            try {

 

                command.visit(new CommandVisitorAdapter() {

 

                    @Override

 

                    public Response processMessageDispatch(MessageDispatch md) throws Exception {

 

                        waitForTransportInterruptionProcessingToComplete();

 

                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());

 

                        if (dispatcher != null) {

 

                            // Copy in case a embedded broker is dispatching via

 

                            // vm://

 

                            // md.getMessage() == null to signal end of queue

 

                            // browse.

 

                            Message msg = md.getMessage();

 

                            if (msg != null) {

 

                                msg = msg.copy();

 

                                msg.setReadOnlyBody(true);

 

                                msg.setReadOnlyProperties(true);

 

                                msg.setRedeliveryCounter(md.getRedeliveryCounter());

 

                                msg.setConnection(ActiveMQConnection.this);

 

                                msg.setMemoryUsage(null);

 

                                md.setMessage(msg);

 

                            }

 

                            dispatcher.dispatch(md);

 

                        }

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processProducerAck(ProducerAck pa) throws Exception {

 

                        if (pa != null && pa.getProducerId() != null) {

 

                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());

 

                            if (producer != null) {

 

                                producer.onProducerAck(pa);

 

                            }

 

                        }

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processBrokerInfo(BrokerInfo info) throws Exception {

 

                        brokerInfo = info;

 

                        brokerInfoReceived.countDown();

 

                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();

 

                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processConnectionError(final ConnectionError error) throws Exception {

 

                        executor.execute(new Runnable() {

 

                            @Override

 

                            publicvoid run() {

 

                                onAsyncException(error.getException());

 

                            }

 

                        });

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processControlCommand(ControlCommand command) throws Exception {

 

                        onControlCommand(command);

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processConnectionControl(ConnectionControl control) throws Exception {

 

                        onConnectionControl((ConnectionControl)command);

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processConsumerControl(ConsumerControl control) throws Exception {

 

                        onConsumerControl((ConsumerControl)command);

 

                        returnnull;

 

                    }

 

 

 

                    @Override

 

                    public Response processWireFormat(WireFormatInfo info) throws Exception {

 

                        onWireFormatInfo((WireFormatInfo)command);

 

                        returnnull;

 

                    }

 

                });

 

            } catch (Exception e) {

 

                onClientInternalException(e);

 

            }

 

        }

 

 

 

        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {

 

            TransportListener listener = iter.next();

 

            listener.onCommand(command);

 

        }

 

}

 

 

 

可以看到Command作为入参传进onCommand方法,然后方法中调用command.visit(new CommandVisitorAdapter() {…}),new CommandVisitorAdapter就是上面介绍过的适配器设计模式,这里可以看到这个匿名类只需实现可以用到的方法,而不是实现所有的接口方法。这里command.visit的好处是传入进来的command程序不需要判断他是什么类型的command然后在决定调用这个command的某个方法,而是直接调用visit方法即可,而所有的业务逻辑也是统一的在CommandVisitorAdaptor中实现,这也算是java中动态多分派的实现。

 

知道了消息接收处理以及broker和客户端的信息交互之后,我们在来看下消息是如何从Command类序列化到字节写入以及字节如何反序列化转成Command类的,在ActiveMQ中,每一个Command消息都对应一个CommandMarshal类,例如ConnectionInfo使用ConnectionInfoMarshal来序列化和反序列化,BrokerInfo使用BrokerInfoMarshal来序列化和反序列化,ConnectionInfoMarshalmarshal的实现就是把字符串或者字节等信息写入到socket的输出流中没有什么可说的,序列化和反序列化又分为两种,一种是tight一种是loosetight会针对cpu来进行优化,先写入大小,在写入具体数据,而loose方式则直接写入数据,两种方式都会使用缓存功能,客户端和服务端都分别存在marshal[]unMarshal[]数组,例如客户端给服务端发送ProducerInfo信息,第一次发送后会把ProducerInfo存放在marshal的第0位,然后服务端接收后会把这个producerInfo放在unMarshal[0]中,如果客户端在次发送ProducerInfo则从缓存中取,找到ProducerInfomarshal[]的第0个,则直接发送0,服务端则从unMarshal[]0个取出使用,这节就到这里吧,想到什么我在补充,欢迎跟帖讨论。

这篇关于ActiveMQ源码架构解析第二节的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/966940

相关文章

解析 XML 和 INI

XML 1.TinyXML库 TinyXML是一个C++的XML解析库  使用介绍: https://www.cnblogs.com/mythou/archive/2011/11/27/2265169.html    使用的时候,只要把 tinyxml.h、tinystr.h、tinystr.cpp、tinyxml.cpp、tinyxmlerror.cpp、tinyxmlparser.

通信系统网络架构_2.广域网网络架构

1.概述          通俗来讲,广域网是将分布于相比局域网络更广区域的计算机设备联接起来的网络。广域网由通信子网于资源子网组成。通信子网可以利用公用分组交换网、卫星通信网和无线分组交换网构建,将分布在不同地区的局域网或计算机系统互连起来,实现资源子网的共享。 2.网络组成          广域网属于多级网络,通常由骨干网、分布网、接入网组成。在网络规模较小时,可仅由骨干网和接入网组成

springboot家政服务管理平台 LW +PPT+源码+讲解

3系统的可行性研究及需求分析 3.1可行性研究 3.1.1技术可行性分析 经过大学四年的学习,已经掌握了JAVA、Mysql数据库等方面的编程技巧和方法,对于这些技术该有的软硬件配置也是齐全的,能够满足开发的需要。 本家政服务管理平台采用的是Mysql作为数据库,可以绝对地保证用户数据的安全;可以与Mysql数据库进行无缝连接。 所以,家政服务管理平台在技术上是可以实施的。 3.1

高仿精仿愤怒的小鸟android版游戏源码

这是一款很完美的高仿精仿愤怒的小鸟android版游戏源码,大家可以研究一下吧、 为了报复偷走鸟蛋的肥猪们,鸟儿以自己的身体为武器,仿佛炮弹一样去攻击肥猪们的堡垒。游戏是十分卡通的2D画面,看着愤怒的红色小鸟,奋不顾身的往绿色的肥猪的堡垒砸去,那种奇妙的感觉还真是令人感到很欢乐。而游戏的配乐同样充满了欢乐的感觉,轻松的节奏,欢快的风格。 源码下载

tf.split()函数解析

API原型(TensorFlow 1.8.0): tf.split(     value,     num_or_size_splits,     axis=0,     num=None,     name='split' ) 这个函数是用来切割张量的。输入切割的张量和参数,返回切割的结果。  value传入的就是需要切割的张量。  这个函数有两种切割的方式: 以三个维度的张量为例,比如说一

基于Java医院药品交易系统详细设计和实现(源码+LW+调试文档+讲解等)

💗博主介绍:✌全网粉丝10W+,CSDN作者、博客专家、全栈领域优质创作者,博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌💗 🌟文末获取源码+数据库🌟 感兴趣的可以先收藏起来,还有大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助更多的人  Java精品实战案例《600套》 2023-2025年最值得选择的Java毕业设计选题大全:1000个热

美容美发店营销版微信小程序源码

打造线上生意新篇章 一、引言:微信小程序,开启美容美发行业新纪元 在数字化时代,微信小程序以其便捷、高效的特点,成为了美容美发行业营销的新宠。本文将带您深入了解美容美发营销微信小程序,探讨其独特优势及如何助力商家实现业务增长。 二、微信小程序:美容美发行业的得力助手 拓宽客源渠道:微信小程序基于微信社交平台,轻松实现线上线下融合,帮助商家快速吸引潜在客户,拓宽客源渠道。 提升用户体验:

风水研究会官网源码系统-可展示自己的领域内容-商品售卖等

一款用于展示风水行业,周易测算行业,玄学行业的系统,并支持售卖自己的商品。 整洁大气,非常漂亮,前端内容均可通过后台修改。 大致功能: 支持前端内容通过后端自定义支持开启关闭会员功能,会员等级设置支持对接官方支付支持添加商品类支持添加虚拟下载类支持自定义其他类型字段支持生成虚拟激活卡支持采集其他站点文章支持对接收益广告支持文章评论支持积分功能支持推广功能更多功能,搭建完成自行体验吧! 原文

陀螺仪LSM6DSV16X与AI集成(8)----MotionFX库解析空间坐标

陀螺仪LSM6DSV16X与AI集成.8--MotionFX库解析空间坐标 概述视频教学样品申请源码下载开启CRC串口设置开启X-CUBE-MEMS1设置加速度和角速度量程速率选择设置FIFO速率设置FIFO时间戳批处理速率配置过滤链初始化定义MotionFX文件卡尔曼滤波算法主程序执行流程lsm6dsv16x_motion_fx_determin欧拉角简介演示 概述 本文将探讨

【文末附gpt升级秘笈】腾讯元宝AI搜索解析能力升级:千万字超长文处理的新里程碑

腾讯元宝AI搜索解析能力升级:千万字超长文处理的新里程碑 一、引言 随着人工智能技术的飞速发展,自然语言处理(NLP)和机器学习(ML)在各行各业的应用日益广泛。其中,AI搜索解析能力作为信息检索和知识抽取的核心技术,受到了广泛的关注和研究。腾讯作为互联网行业的领军企业,其在AI领域的探索和创新一直走在前列。近日,腾讯旗下的AI大模型应用——腾讯元宝,迎来了1.1.7版本的升级,新版本在AI搜