Java项目之消息队列(手写java模拟实现mq)【七、⽹络通信协议设计、消息队列服务器端实现、客户端实现】✔ ★

本文主要是介绍Java项目之消息队列(手写java模拟实现mq)【七、⽹络通信协议设计、消息队列服务器端实现、客户端实现】✔ ★,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

⼗⼀. ⽹络通信协议设计

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

定义 Request / Response

/** 表示一个网络通信中的请求对象. 按照自定义协议的格式来展开的*/
public class Request {private int type;private int length;private byte[] payload;public int getType() {return type;}public void setType(int type) {this.type = type;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getPayload() {return payload;}public void setPayload(byte[] payload) {this.payload = payload;}
}
/** 这个对象表示一个响应. 也是根据自定义应用层协议来的*/
public class Response {private int type;private int length;private byte[] payload;public int getType() {return type;}public void setType(int type) {this.type = type;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getPayload() {return payload;}public void setPayload(byte[] payload) {this.payload = payload;}
}

定义参数⽗类

构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.
不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再通过继承的⽅式体现

/** 使用这个类表示方法的公共参数/辅助的字段.* 后续每个方法又会有一些不同的参数, 不同的参数再分别使用不同的子类来表示.*/
public class BasicArguments implements Serializable {// 表示一次请求/响应 的身份标识. 可以把请求和响应对上.protected String rid;// 这次通信使用的 channel 的身份标识.protected String channelId;public String getRid() {return rid;}public void setRid(String rid) {this.rid = rid;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}
}

此处的 rid 和 channelId 都是基于 UUID 来⽣成的. rid ⽤来标识⼀个请求-响应. 这⼀点在请求响应⽐较多的时候⾮常重要

定义返回值⽗类

public class BasicAckArguments extends BasicArguments implements Serializable {private String queueName;private String messageId;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}
}

定义其他参数类

针对每个 VirtualHost 提供的⽅法, 都需要有⼀个类表⽰对应的参数

1) ExchangeDeclareArguments

package com.example.mq.common;import com.example.mq.mqserver.core.ExchangeType;import java.io.Serializable;
import java.util.Map;public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private Map<String, Object> arguments;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public ExchangeType getExchangeType() {return exchangeType;}public void setExchangeType(ExchangeType exchangeType) {this.exchangeType = exchangeType;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable = durable;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete = autoDelete;}public Map<String, Object> getArguments() {return arguments;}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}

⼀个创建交换机的请求, 形如:
• 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图⽚的结构.
• 按照 length ⻓度读取出 payload, 就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象

2) ExchangeDeleteArguments

public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}
}

3) QueueDeclareArguments

public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean exclusive;private boolean autoDelete;private Map<String, Object> arguments;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable = durable;}public boolean isExclusive() {return exclusive;}public void setExclusive(boolean exclusive) {this.exclusive = exclusive;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete = autoDelete;}public Map<String, Object> getArguments() {return arguments;}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}

4) QueueDeleteArguments

public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}
}

5) QueueBindArguments

public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public String getBindingKey() {return bindingKey;}public void setBindingKey(String bindingKey) {this.bindingKey = bindingKey;}
}

6) QueueUnbindArguments

public class QueueUnbindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}
}

7) BasicPublishArguments

public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey = routingKey;}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}
}

8) BasicConsumeArguments

public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;// 这个类对应的 basicConsume 方法中, 还有一个参数, 是回调函数. (如何来处理消息)// 这个回调函数, 是不能通过网络传输的.// 站在 broker server 这边, 针对消息的处理回调, 其实是统一的. (把消息返回给客户端)// 客户端这边收到消息之后, 再在客户端自己这边执行一个用户自定义的回调就行了.// 此时, 客户端也就不需要把自身的回调告诉给服务器了.// 这个类就不需要 consumer 成员了.public String getConsumerTag() {return consumerTag;}public void setConsumerTag(String consumerTag) {this.consumerTag = consumerTag;}public String getQueueName() {return queueName;}public void setQueueName(String queueName) {this.queueName = queueName;}public boolean isAutoAck() {return autoAck;}public void setAutoAck(boolean autoAck) {this.autoAck = autoAck;}
}

9) SubScribeReturns

public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;public String getConsumerTag() {return consumerTag;}public void setConsumerTag(String consumerTag) {this.consumerTag = consumerTag;}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}
}

⼗⼆. 实现 BrokerServer

创建 BrokerServer 类

• virtualHost 表⽰服务器持有的虚拟主机. 队列, 交换机, 绑定, 消息都是通过虚拟主机管理.
• sessions ⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socket.
• serverSocket 是服务器⾃⾝的 socket
• executorService 这个线程池⽤来处理响应.
• runnable 这个标志位⽤来控制服务器的运⾏停⽌.


/** 这个 BrokerServer 就是咱们 消息队列 本体服务器.* 本质上就是一个 TCP 的服务器.*/
public class BrokerServer {private ServerSocket serverSocket = null;// 当前考虑一个 BrokerServer 上只有一个 虚拟主机private VirtualHost virtualHost = new VirtualHost("default");// 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)// 此处的 key 是 channelId, value 为对应的 Socket 对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();// 引入一个线程池, 来处理多个客户端的请求.private ExecutorService executorService = null;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable = true;public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(port);}public void start() throws IOException {System.out.println("[BrokerServer] 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把处理连接的逻辑丢给这个线程池.executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println("[BrokerServer] 服务器停止运行!");// e.printStackTrace();}}// 一般来说停止服务器, 就是直接 kill 掉对应进程就行了.// 此处还是搞一个单独的停止方法. 主要是用于后续的单元测试.public void stop() throws IOException {runnable = false;// 把线程池中的任务都放弃了. 让线程都销毁.executorService.shutdownNow();serverSocket.close();}// 通过这个方法, 来处理一个客户端的连接.// 在这一个连接中, 可能会涉及到多个请求和响应.private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {// 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {// 1. 读取请求并解析.Request request = readRequest(dataInputStream);// 2. 根据请求计算响应Response response = process(request, clientSocket);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}}} catch (EOFException | SocketException e) {// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.// 需要借助这个异常来结束循环System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[BrokerServer] connection 出现异常!");e.printStackTrace();} finally {try {// 当连接处理完了, 就需要记得关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错!");}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 这个刷新缓冲区也是重要的操作!!dataOutputStream.flush();}private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析.BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() == 0x7) {QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() == 0x8) {QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端.// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的// socket 对象了, 从而可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}// 2. 构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服务器给消费者客户端推送的消息数据.response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端.//    注意! 此处的 dataOutputStream 这个对象不能 close !!!//    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.//    此时就无法继续往 socket 中写入后续数据了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 调用 basicAck 确认消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 当前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}private void clearClosedSession(Socket clientSocket) {// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String, Socket> entry : sessions.entrySet()) {if (entry.getValue() == clientSocket) {// 不能在这里直接删除!!!// 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);}}

启动/停⽌服务器

• 这⾥就是⼀个单纯的 TCP 服务器, 没啥特别的.
• 实现停⽌操作, 主要是为了⽅便后续开展单元测试

    public void start() throws IOException {System.out.println("[BrokerServer] 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把处理连接的逻辑丢给这个线程池.executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println("[BrokerServer] 服务器停止运行!");// e.printStackTrace();}}// 一般来说停止服务器, 就是直接 kill 掉对应进程就行了.// 此处还是搞一个单独的停止方法. 主要是用于后续的单元测试.public void stop() throws IOException {runnable = false;// 把线程池中的任务都放弃了. 让线程都销毁.executorService.shutdownNow();serverSocket.close();}

实现处理连接

• 对于 EOFException 和 SocketException , 我们视为客⼾端正常断开连接.
◦ 如果是客⼾端先 close, 后调⽤ DataInputStream 的 read, 则抛出 EOFException
◦ 如果是先调⽤ DataInputStream 的 read, 后客⼾端调⽤ close, 则抛出 SocketException

    // 通过这个方法, 来处理一个客户端的连接.// 在这一个连接中, 可能会涉及到多个请求和响应.private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {// 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {// 1. 读取请求并解析.Request request = readRequest(dataInputStream);// 2. 根据请求计算响应Response response = process(request, clientSocket);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}}} catch (EOFException | SocketException e) {// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.// 需要借助这个异常来结束循环System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[BrokerServer] connection 出现异常!");e.printStackTrace();} finally {try {// 当连接处理完了, 就需要记得关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}

实现 readRequest

    private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错!");}request.setPayload(payload);return request;}

实现 writeResponse

    private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 这个刷新缓冲区也是重要的操作!!dataOutputStream.flush();}

实现处理请求

• 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
• 再根据不同的 type, 分别处理不同的逻辑. (主要是调⽤ virtualHost 中不同的⽅法).
• 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客⼾端.
• 最后构造成统⼀的响应

    private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析.BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() == 0x7) {QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() == 0x8) {QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端.// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的// socket 对象了, 从而可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}// 2. 构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服务器给消费者客户端推送的消息数据.response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端.//    注意! 此处的 dataOutputStream 这个对象不能 close !!!//    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.//    此时就无法继续往 socket 中写入后续数据了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 调用 basicAck 确认消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 当前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}

实现 clearClosedSession

• 如果客⼾端只关闭了 Connection, 没关闭 Connection 中包含的 Channel, 也没关系, 在这⾥统⼀进⾏清理.
• 注意迭代器失效问题.

    private void clearClosedSession(Socket clientSocket) {// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String, Socket> entry : sessions.entrySet()) {if (entry.getValue() == clientSocket) {// 不能在这里直接删除!!!// 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);}

⼗三. 实现客⼾端

创建 ConnectionFactory

⽤来创建连接的⼯⼚类.
• 当前没有实现⽤⼾认证和多虚拟主机, ⽤⼾名密码可以暂时先不要

package com.example.mq.mqclient;import java.io.IOException;public class ConnectionFactory {// broker server 的 ip 地址private String host;// broker server 的端口号private int port;// 访问 broker server 的哪个虚拟主机.// 下列几个属性暂时先都不搞了.
//    private String virtualHostName;
//    private String username;
//    private String password;public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}
}

Connection 和 Channel 的定义

⼀个客⼾端可以创建多个 Connection.
⼀个 Connection 对应⼀个 socket, ⼀个 TCP 连接.
⼀个 Connection 可以包含多个 Channel

1) Connection 的定义

• Socket 是客⼾端持有的套接字. InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接⼝.
• channelMap ⽤来管理该连接中所有的 Channel.
• callbackPool 是⽤来在客⼾端这边执⾏⽤⼾回调的线程池.

public class Connection {private Socket socket = null;// 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来.private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;private ExecutorService callbackPool = null;public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println("[Connection] 连接正常断开!");} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 连接异常断开!");e.printStackTrace();}});t.start();}public void close() {// 关闭 Connection 释放上述资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}// 使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {// 服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());// 根据 channelId 找到对应的 channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}// 执行该 channel 对象内部的回调.callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 当前响应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());// 把这个结果放到对应的 channel 的 hash 表中.Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}channel.putReturns(basicReturns);}}// 发送请求public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());}// 读取响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());return response;}// 通过这个方法, 在 Connection 中能够创建出一个 Channelpublic Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.channelMap.put(channelId, channel);// 同时也需要把 "创建 channel" 的这个消息也告诉服务器.boolean ok = channel.createChannel();if (!ok) {// 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!// 把刚才已经加入 hash 表的键值对, 再删了.channelMap.remove(channelId);return null;}return channel;}
}

2) Channel 的定义

• channelId 为 channel 的⾝份标识, 使⽤ UUID 标识.
• Connection 为 channel 对应的连接.
• baseReturnsMap ⽤来保存响应的返回值. 放到这个哈希表中⽅便和请求匹配.
• consumer 为消费者的回调(⽤⼾注册的). 对于消息响应, 应该调⽤这个回调处理消息.

package com.example.mq.mqclient;import com.example.mq.common.*;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;
import java.lang.ref.ReferenceQueue;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;public class Channel {private String channelId;// 当前这个 channel 属于哪个连接.private Connection connection;// 用来存储后续客户端收到的服务器的响应.private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.// 此处约定一个 Channel 中只能有一个回调.private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}// 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.public boolean createChannel() throws IOException {// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象BasicArguments basicArguments = new BasicArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 构造出完整请求之后, 就可以发送这个请求了.connection.writeRequest(request);// 等待服务器的响应BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}// 期望使用这个方法来阻塞等待服务器的响应.private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {// 如果查询结果为 null, 说明包裹还没回来.// 此时就需要阻塞等待.synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}// 读取成功之后, 还需要把这个消息从哈希表中删除掉.basicReturnsMap.remove(rid);return basicReturns;}public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 当前也不知道有多少个线程在等待上述的这个响应.// 把所有的等待的线程都唤醒.notifyAll();}}private String generateRid() {return "R-" + UUID.randomUUID().toString();}// 关闭 channel, 给服务器发送一个 type = 0x2 的请求public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}// 创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}// 删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}// 删除队列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments arguments = new QueueDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 创建绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments arguments = new QueueBindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);arguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 解除绑定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnbindArguments arguments = new QueueUnbindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 发送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments = new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setRoutingKey(routingKey);arguments.setBasicProperties(basicProperties);arguments.setBody(body);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {// 先设置回调.if (this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");}this.consumer = consumer;BasicConsumeArguments arguments = new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setConsumerTag(channelId);  // 此处 consumerTag 也使用 channelId 来表示了.arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}// 确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments = new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}public Connection getConnection() {return connection;}public void setConnection(Connection connection) {this.connection = connection;}public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {return basicReturnsMap;}public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {this.basicReturnsMap = basicReturnsMap;}public Consumer getConsumer() {return consumer;}public void setConsumer(Consumer consumer) {this.consumer = consumer;}}

Connection

封装请求响应读写操作

在 Connection 中, 实现下列⽅法

    // 发送请求public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());}// 读取响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());return response;}

创建 channel

在 Connection 中, 定义下列⽅法来创建⼀个 channel

    // 通过这个方法, 在 Connection 中能够创建出一个 Channelpublic Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.channelMap.put(channelId, channel);// 同时也需要把 "创建 channel" 的这个消息也告诉服务器.boolean ok = channel.createChannel();if (!ok) {// 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!// 把刚才已经加入 hash 表的键值对, 再删了.channelMap.remove(channelId);return null;}return channel;}

Channel

发送请求

通过 Channel 提供请求的发送操作.

1) 创建 channel

    // 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.public boolean createChannel() throws IOException {// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象BasicArguments basicArguments = new BasicArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 构造出完整请求之后, 就可以发送这个请求了.connection.writeRequest(request);// 等待服务器的响应BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}

generateRid 的实现

    private String generateRid() {return "R-" + UUID.randomUUID().toString();}

waitResult 的实现
• 由于服务器的响应是异步的. 此处通过 waitResult 实现同步等待的效果

// 期望使用这个方法来阻塞等待服务器的响应.private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {// 如果查询结果为 null, 说明包裹还没回来.// 此时就需要阻塞等待.synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}// 读取成功之后, 还需要把这个消息从哈希表中删除掉.basicReturnsMap.remove(rid);return basicReturns;}

2) 关闭 channel

    // 关闭 channel, 给服务器发送一个 type = 0x2 的请求public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}

3) 创建交换机

    // 创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}

4) 删除交换机

    // 删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

5) 创建队列

   // 创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}

6) 删除队列

    // 删除队列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments arguments = new QueueDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

7) 创建绑定

    // 创建绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments arguments = new QueueBindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);arguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

8) 删除绑定

    // 解除绑定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnbindArguments arguments = new QueueUnbindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

9) 发送消息

    // 发送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments = new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setRoutingKey(routingKey);arguments.setBasicProperties(basicProperties);arguments.setBody(body);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

10) 订阅消息

// 订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {// 先设置回调.if (this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");}this.consumer = consumer;BasicConsumeArguments arguments = new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setConsumerTag(channelId);  // 此处 consumerTag 也使用 channelId 来表示了.arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

11) 确认消息

    // 确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments = new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}

⼩结

上述发送请求的操作, 逻辑基本⼀致. 构造参数 + 构造请求 + 发送 + 等待结果

处理响应

1) 创建扫描线程

创建⼀个扫描线程, ⽤来不停的读取 socket 中的响应数据.
注意: ⼀个 Connection 中可能包含多个 channel, 需要把响应分别放到对应的 channel 中.

    public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println("[Connection] 连接正常断开!");} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 连接异常断开!");e.printStackTrace();}});t.start();}

2) 实现响应的分发

给 Connection 创建 dispatchResponse ⽅法.
• 针对服务器返回的控制响应和消息响应, 分别处理.
◦ 如果是订阅数据, 则调⽤ channel 中的回调.
◦ 如果是控制消息, 直接放到结果集合中.

    // 使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {// 服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());// 根据 channelId 找到对应的 channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}// 执行该 channel 对象内部的回调.callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 当前响应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());// 把这个结果放到对应的 channel 的 hash 表中.Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}channel.putReturns(basicReturns);}}

3) 实现 channel.putReturns

把响应放到响应的 hash 表中, 同时唤醒等待响应的线程去消费.

    public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 当前也不知道有多少个线程在等待上述的这个响应.// 把所有的等待的线程都唤醒.notifyAll();}}

关闭 Connection

给 Connection 实现 close ⽅法

    public void close() {// 关闭 Connection 释放上述资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

测试客⼾端-服务器

public class MqClientTests {private BrokerServer brokerServer = null;private ConnectionFactory factory = null;private Thread t = null;@BeforeEachpublic void setUp() throws IOException {// 1. 先启动服务器MqApplication.context = SpringApplication.run(MqApplication.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {// 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!try {brokerServer.start();} catch (IOException e) {e.printStackTrace();}});t.start();// 2. 配置 ConnectionFactoryfactory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException {// 停止服务器brokerServer.stop();// t.join();MqApplication.context.close();// 删除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);factory = null;}@Testpublic void testConnection() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);}@Testpublic void testChannel() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}@Testpublic void testExchange() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchange");Assertions.assertTrue(ok);// 此处稳妥起见, 把改关闭的要进行关闭.channel.close();connection.close();}@Testpublic void testQueue() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue", true, false, false, null);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testBinding() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true, false, false, null);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true, false, false, null);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);Assertions.assertTrue(ok);ok = channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);Assertions.assertArrayEquals(requestBody, body);System.out.println("[消费数据] 结束!");}});Assertions.assertTrue(ok);Thread.sleep(500);channel.close();connection.close();}
}

⼗四. 案例: 基于 MQ 的⽣产者消费者模型

/** 这个类表示一个消费者.* 通常这个类也应该是在一个独立的服务器中被执行*/
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.while (true) {Thread.sleep(500);}}
}
/** 这个类用来表示一个生产者.* 通常这是一个单独的服务器程序.*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}

扩展功能

• 虚拟主机管理
• ⽤⼾管理/⽤⼾认证
• 交换机/队列 的独占模式和⾃动删除.
• 发送⽅确认(broker 给⽣产者的确认应答)
• 拒绝应答 (nack)
• 死信队列
• 管理接⼝
• 管理⻚⾯

这篇关于Java项目之消息队列(手写java模拟实现mq)【七、⽹络通信协议设计、消息队列服务器端实现、客户端实现】✔ ★的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1068052

相关文章

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

Python实现高效地读写大型文件

《Python实现高效地读写大型文件》Python如何读写的是大型文件,有没有什么方法来提高效率呢,这篇文章就来和大家聊聊如何在Python中高效地读写大型文件,需要的可以了解下... 目录一、逐行读取大型文件二、分块读取大型文件三、使用 mmap 模块进行内存映射文件操作(适用于大文件)四、使用 pand

python实现pdf转word和excel的示例代码

《python实现pdf转word和excel的示例代码》本文主要介绍了python实现pdf转word和excel的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、引言二、python编程1,PDF转Word2,PDF转Excel三、前端页面效果展示总结一

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.