浅谈如何自我实现一个消息队列服务器(7)——编写服务器部分

2024-05-10 19:12

本文主要是介绍浅谈如何自我实现一个消息队列服务器(7)——编写服务器部分,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 一、编写服务器代码
    • 1.1、分析一个服务器应具备的功能
      • 1.1.1、成员变量
      • 1.1.2、对外提供的接口

一、编写服务器代码

在这里插入图片描述
      再次拿出这张图,前面我们已经将重要概念:VirtualHost、exchange、msgQueue、message、binding 都实现了,此时就可以开始编写消息队列MQ的本体:BrokerServer (服务器),由于消息队列的服务器是一个基于 TCP 协议进行通信的服务器,因此消息队列的 BrokerServer 也叫做 TCP服务器。

1.1、分析一个服务器应具备的功能

首先分析一个服务器需要具备的功能来确定其类种应含有的字段及应对外提供的接口:

1.1.1、成员变量

1、ServerSocket
      服务器需要依靠 ServerSocket 进行网络通信来传输数据,因此需定义一个 ServerSocket类作为成员变量之一;

2、VirtualHost
      由于当前实现的MQ只支持一个虚拟主机(后面会继续完善该项目,使其能够支持多个虚拟主机,当前只是一个基础版本MQ),因此此时定义一个 VirtualHost 类作为 BrokerServer 类的成员变量之一;

3、ConcurrentHashMap<String,Socket> sessions = new ConcurrentHashMap<>();
      服务器会与多个客户端进行通信,使用哈希表来表示服务器当前与所有客户端的会话。
即当有客户端连接上服务器进行通信时,会讲客户端记录到当前哈希表,此处的 key 为 channelId(客户端与服务器建立连接,就会创建出一个TCP连接,该TCP连接就可以通过Cannel来包含多个逻辑上的子连接), Value 为 客户端连接 socket

4、ExecutorService
      一个服务器需要处理多个客户端请求,因此需要一个线程池来执行众多的客户端连接。

5、runnable
      需要一个布尔值来控制服务器的启动。true:服务器启动。正常工作。 false:服务器关闭,不进行工作。

1.1.2、对外提供的接口

1、服务器的构造方法:
      在该方法中,对 serverSocket 对象初始化,给此次服务器网络通信时绑定一个端口号。

2、服务器的启动方法:(public void start())
      2.1、给线程对象创建实例
      2.2、通过 accept() 获取客户端连接
      2.3、获取到客户端连接之后,将连接放入线程池中,由线程池中的线程处理。

3、处理一个客户端的连接的方法(一个TCP连接可以复用,因此一个TCP连接里可以有多个 channel,通过 channel ,一个TCP连接可以发出多个请求,收到多个响应):(public void processConnection(Socket clientSocket))
      3.1、读取 连接 中的流对象
      3.2、按照前面约定的应用层协议格式读取
      3.3、读取请求并解析(readRequest())
      3.4、根据请求计算出响应(process())
      3.5、服务器将响应写回客户端(writeResponse())
      3.6、连接处理完后需要关闭连接
      3.7、清除sessions里已经被关闭了连接的socket(clearClosedSocket())

4、服务器停止启动的方法:(public void stop())
      4.1、将 runnable = false、
      4.2、然后将线程池中要进行处理的所有连接全部销毁。
      4.3、关闭网络连接。

5、读取请求并解析的方法:(读取请求的方式跟当初约定的请求报文顺序一致。readRequest(DataInputStream dataInputStream))
      5.1、构造请求对象
      5.2、从流对象中读出4字节作为请求的 type,然后再将读出的 type 值设置到 request 对象中。
      5.3、从流对象中读出4字节作为请求的 length,然后再将读出的 length 值设置到 request 对象中。
      5.4、再从流对象中读出 payload 的长度。然后判断真实读到的 payload 长度 是否与 其 请求体长度 一致,不一致的话说明读的过程有问题,直接抛异常。
      5.5、返回请求

6、根据请求计算出响应的方法:(process(Request request,Socket clientSocket))
      6.1、先将 payload 反序列化,然后转成 BasicArguments 类型。
      6.2、将请求标识 rid 、连接标识 channelId、请求的 type、请求的 length 打印出来。方便后续判断当前是哪个连接,连接中的哪一对请求、响应。
       6.3、根据 type 值判断当前请求到底是需要进行什么操作??远程调用 API 的哪一个。
在这里插入图片描述
              6.3.1、不同的API其所具有的参数类都不同,不同的 type 值调用不同的参数类。
              6.3.2、再将virtualHost类里的对应方法的参数修改成当前request中携带的参数即可。
                          6.3.2.1、 注意:basicComsume()方法的第4个参数:回调函数,并不是客户端传过来的,而是服务器这边要有一个固定的简单、一致格式的回调函数,来把收到的消息回传给客户端(订阅者)。(那么首先服务器收到消息后,如何知道要将消息传给哪个客户端?通过回调函数里需要重写的 handleDelivery()方法里的参数 comsumerTag(消费者唯一身份标识),这时候 comsumerTag == channelId(客户端连接时的身份标识)。此时根据 channelId 就可以到 sessions 哈希表中查询到是哪个 Socket 对象,此时就可以往里面传消息了,此时客户端收到服务器传来的消息之后,就可以执行自己的回调函数,将消息消费掉。)
                                            6.3.2.1.1、在 sessions 中根据 comsumerTag 查找相应的客户端,如果客户端为null或已被关闭连接,此时订阅消息的客户端已经关闭,无法往里边发送消息了,直接抛异常。
                                            6.3.2.1.2、获取到socket对象且还在连接中,此时就可以构造 SubscribeReturns 对象、Response对象,设置好 SubscribeReturns 里的属性,设置好 response里的属性,payload 就是 SubscribeReturns 序列化后的结果。将响应写回客户端。
              6.3.3、赋值返回值ok。
       6.4、 整个 basicReturns 作为 response 的 payload。构造响应对象,设置好响应对象中的值,然后打印 rid、channelId、响应 type、响应 length。
       6.5、返回 response 对象。

7、将已经关闭了的客户端连接(socket)里的所有键值对都清理掉:(public void clearClosedSocket(Socket clientSocket))
       7.1、遍历哈希表中每个元素
       7.2、集合类中不能一边迭代一边删除,否则集合类的迭代器会由于结构被破坏而迭代失败,因此先把想删除的元素使用某数据结构存起来,然后再删除即可。
       7.3、打印提示信息。

这篇关于浅谈如何自我实现一个消息队列服务器(7)——编写服务器部分的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/977309

相关文章

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

部署Vue项目到服务器后404错误的原因及解决方案

《部署Vue项目到服务器后404错误的原因及解决方案》文章介绍了Vue项目部署步骤以及404错误的解决方案,部署步骤包括构建项目、上传文件、配置Web服务器、重启Nginx和访问域名,404错误通常是... 目录一、vue项目部署步骤二、404错误原因及解决方案错误场景原因分析解决方案一、Vue项目部署步骤

Linux流媒体服务器部署流程

《Linux流媒体服务器部署流程》文章详细介绍了流媒体服务器的部署步骤,包括更新系统、安装依赖组件、编译安装Nginx和RTMP模块、配置Nginx和FFmpeg,以及测试流媒体服务器的搭建... 目录流媒体服务器部署部署安装1.更新系统2.安装依赖组件3.解压4.编译安装(添加RTMP和openssl模块

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

JavaWeb-WebSocket浏览器服务器双向通信方式

《JavaWeb-WebSocket浏览器服务器双向通信方式》文章介绍了WebSocket协议的工作原理和应用场景,包括与HTTP的对比,接着,详细介绍了如何在Java中使用WebSocket,包括配... 目录一、概述二、入门2.1 POM依赖2.2 编写配置类2.3 编写WebSocket服务2.4 浏

查询SQL Server数据库服务器IP地址的多种有效方法

《查询SQLServer数据库服务器IP地址的多种有效方法》作为数据库管理员或开发人员,了解如何查询SQLServer数据库服务器的IP地址是一项重要技能,本文将介绍几种简单而有效的方法,帮助你轻松... 目录使用T-SQL查询方法1:使用系统函数方法2:使用系统视图使用SQL Server Configu

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

使用C/C++调用libcurl调试消息的方式

《使用C/C++调用libcurl调试消息的方式》在使用C/C++调用libcurl进行HTTP请求时,有时我们需要查看请求的/应答消息的内容(包括请求头和请求体)以方便调试,libcurl提供了多种... 目录1. libcurl 调试工具简介2. 输出请求消息使用 CURLOPT_VERBOSE使用 C

nginx-rtmp-module构建流媒体直播服务器实战指南

《nginx-rtmp-module构建流媒体直播服务器实战指南》本文主要介绍了nginx-rtmp-module构建流媒体直播服务器实战指南,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. RTMP协议介绍与应用RTMP协议的原理RTMP协议的应用RTMP与现代流媒体技术的关系2

mysqld_multi在Linux服务器上运行多个MySQL实例

《mysqld_multi在Linux服务器上运行多个MySQL实例》在Linux系统上使用mysqld_multi来启动和管理多个MySQL实例是一种常见的做法,这种方式允许你在同一台机器上运行多个... 目录1. 安装mysql2. 配置文件示例配置文件3. 创建数据目录4. 启动和管理实例启动所有实例