本文主要是介绍Client客户端模块,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一.Client模块介绍
二.Client具体实现
1.消费者/订阅者模块
成员变量:
_tag
: 标识消费者的字符串,用于区分不同消费者。_qname
: 消费者订阅的队列名称,表明该消费者从哪个队列获取消息。_auto_ack
: 布尔值,指示消费者是否自动确认消息。_cb
: 客户端的消费者回调函数,用于处理消息的逻辑。
2.信道管理模块
Channel的实现
Channel
类的主要职责是通过TCP连接与服务器进行通信,并处理各种消息队列操作。
成员变量
_chid
:信道的唯一标识符,由UUID生成,用于区分不同的信道实例。_consumer
:表示与信道关联的消费者对象,管理消息的消费。_conn
:表示与服务器的TCP连接,通过此连接发送和接收消息。_codec
:编解码器,用于将消息在协议缓冲区(Protobuf)格式和字节流之间进行转换。_mutex
和_cv
:用于线程安全的消息处理和等待响应的同步机制。
协议定制
包含服务器发给客户端的消费响应和通用响应,在connection模块中,客户端收到这两种类型的响应,会分别调用对应的消息处理回调函数.(Connection模块中详细解释)
构造函数和析构函数
信道的主要操作:
构建请求+发送给服务器
交换机/队列/绑定
- 声明、删除交换机:
declareExchange
和removeExchange
方法用于在服务器端声明或删除交换机。每个请求都通过编解码器发送,并等待服务器的响应。 - 声明、删除队列:
declareQueue
和removeQueue
方法用于在服务器端声明或删除队列,操作流程与交换机类似。 - 绑定、解绑:
bind
和unbind
方法用于将队列与交换机绑定或解绑,以实现消息的路由功能。
// 1. 声明/删除/查找交换机bool declareExchange(const std::string &name, msg::ExchangeType type,bool durable,bool auto_del,google::protobuf::Map<std::string, std::string> &args){// 构建请求msg::DeclareExchangeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(name);req.set_e_type(type);req.set_durable(durable);req.set_auto_delete(auto_del);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("declare exchange failed, name:%s", name.c_str());return false;}return true;}bool removeExchange(const std::string &name){// 构建请求msg::RemoveExchangeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(name);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("remove exchange failed, name:%s", name.c_str());return false;}return true;}// 2. 声明/删除队列bool declareQueue(const std::string &name, bool durable,bool exclusive, bool auto_del,google::protobuf::Map<std::string, std::string> &args){// 构建请求msg::DeclareQueueReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(name);req.set_durable(durable);req.set_exclusive(exclusive);req.set_auto_delete(auto_del);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("declare queue failed, name:%s", name.c_str());return false;}return true;}bool removeQueue(const std::string &name){// 构建请求msg::RemoveQueueReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(name);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("remove queue failed, name:%s", name.c_str());return false;}return true;}// 3. 绑定/解绑bool bind(const std::string &ename, const std::string &qname, const std::string &key){// 构建请求msg::BindReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_q_name(qname);req.set_bind_key(key);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("bind failed, ename:%s, qname:%s, key:%s", ename.c_str(), qname.c_str(), key.c_str());return false;}return true;}bool unbind(const std::string &ename, const std::string &qname){// 构建请求msg::UnbindReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_q_name(qname);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("unbind failed, ename:%s, qname:%s", ename.c_str(), qname.c_str());return false;}return true;}
订阅/消息发布/消息确认
- 订阅、取消订阅:
basicSubscribe
方法用于订阅消息队列,并通过回调函数处理接收到的消息。basicCancel
方法用于取消订阅。 - 消息发布与确认:
basicPublish
方法用于发布消息到指定交换机,而basicAck
方法用于确认收到的消息。
订阅时要创建consumer对象,取消订阅时注销consumer
// 5. 消息的发布,确认bool basicPublish(const std::string &ename, msg::BasicAttributes *bp, const std::string &body){// 构建请求msg::BasicPublishReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_body(body);if (bp != nullptr){req.mutable_attr()->CopyFrom(*bp);}// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic publish failed, qname:%s", ename.c_str());return false;}return true;}void basicAck(const std::string &id){if (_consumer.get() == nullptr){ELOG("channel has no consumer, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());return;}// 构建请求msg::BasicAckReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(_consumer->_qname);req.set_messagg_id(id);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic ack failed, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());return;}return;}
打开/关闭信道
// 6.打开/关闭信道bool openChannel(){msg::OpenChannelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);if(_codec.get() == nullptr){ELOG("codec is null");return false;}_codec->send(_conn, req);BasicCommonResponsePtr rsp = waitResponse(req.rid());return rsp->success() == true;}void closeChannel(){msg::CloseChannelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);_codec->send(_conn, req);BasicCommonResponsePtr rsp = waitResponse(req.rid());return;}
等待响应和相应消息处理函数
waitResponse
方法用于等待服务器的响应,该方法使用条件变量实现同步等待。putBasicCommonResponse
方法用于将服务器的响应放入映射表中,供后续处理使用。consume
方法用于处理从服务器接收到的消息,并调用消费者的回调函数。
// 等待服务端响应BasicCommonResponsePtr waitResponse(const std::string &req_id){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock, [this, &req_id](){ return _consume_rsp_map.find(req_id) != _consume_rsp_map.end(); });BasicCommonResponsePtr rsp = _consume_rsp_map[req_id];_consume_rsp_map.erase(req_id);return rsp;}
// 给连接对象提供的函数,收到来自服务器不同类型的消息,进行不同的处理// 收到通用响应,向map中添加响应void putBasicCommonResponse(const BasicCommonResponsePtr &rsp){std::unique_lock<std::mutex> lock(_mutex);_consume_rsp_map[rsp->rid()] = rsp;_cv.notify_all();}// 收到来自服务器的消费响应,调用消费者的回调函数void consume(const BasicConsumeRspPtr &rsp){if (_consumer.get() == nullptr){ELOG("channel has no consumer");return;}if (_consumer->_tag != rsp->consumer_tag()){ELOG("consumer tag not match, consumer_tag:%s", _consumer->_tag.c_str());return;}_consumer->_cb(rsp->consumer_tag(), rsp->mutable_attr(), rsp->body());}
ChannelManager的实现
3.异步线程模块
4.连接管理模块
这个模块同样是针对muduo库客户端连接的二次封装,向用户提供创建channel信道的接口,创建信道后,可以通过信道来获取指定服务。
三.全部代码
consumer.hpp
#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include <vector>
#include <functional>
namespace mq
{// tag BasicAttributes bodyusing ConsumerCallBack = std::function<void(const std::string &, const msg::BasicAttributes *, const std::string &)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string _tag; // 消费者标识std::string _qname; // 订阅的队列的名称bool _auto_ack; // 是否自动确认ConsumerCallBack _cb; // 消费者回调函数Consumer(const std::string &ctag, const std::string &qname, bool auto_ack, const ConsumerCallBack &cb): _tag(ctag), _qname(qname), _auto_ack(auto_ack), _cb(cb){DLOG("consumer:%s created %p", ctag.c_str(), this);}Consumer(){DLOG("consumer created %p", this);}~Consumer(){DLOG("consumer destroyed %p", this);}};
};
channel.hpp
#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include "../common_mq/myproto.pb.h"
#include <string>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include "muduo/net/TcpConnection.h"
#include <condition_variable>
#include "consumer.hpp"
#include "proto/codec.h"
#include "../common_mq/myproto.pb.h"namespace mq
{using BasicConsumeRspPtr = std::shared_ptr<msg::BasicConsumeRsp>;using BasicCommonResponsePtr = std::shared_ptr<msg::BasicCommonResponse>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;class Channel{private:std::string _chid; // 信道IDConsumer::ptr _consumer; // 消费者对象muduo::net::TcpConnectionPtr _conn; // 连接对象ProtobufCodecPtr _codec; // 编解码器std::mutex _mutex; // 互斥锁std::condition_variable _cv; // 条件变量std::unordered_map<std::string, BasicCommonResponsePtr> _consume_rsp_map; // 存放常规响应的mappublic:using ptr = std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr &conn,const ProtobufCodecPtr &codec): _conn(conn), _codec(codec){_chid = UUIDHelper::uuid();DLOG("channel:%s created", _chid.c_str());}Channel(){}~Channel(){basicCancel();DLOG("channel:%s destroyed", _chid.c_str());}std::string chid() const{return _chid;}// 发送请求,在服务器端处理请求,创建相应的数据结构对象// 1. 声明/删除/查找交换机bool declareExchange(const std::string &name, msg::ExchangeType type,bool durable,bool auto_del,google::protobuf::Map<std::string, std::string> &args){// 构建请求msg::DeclareExchangeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(name);req.set_e_type(type);req.set_durable(durable);req.set_auto_delete(auto_del);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("declare exchange failed, name:%s", name.c_str());return false;}return true;}bool removeExchange(const std::string &name){// 构建请求msg::RemoveExchangeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(name);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("remove exchange failed, name:%s", name.c_str());return false;}return true;}// 2. 声明/删除队列bool declareQueue(const std::string &name, bool durable,bool exclusive, bool auto_del,google::protobuf::Map<std::string, std::string> &args){// 构建请求msg::DeclareQueueReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(name);req.set_durable(durable);req.set_exclusive(exclusive);req.set_auto_delete(auto_del);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("declare queue failed, name:%s", name.c_str());return false;}return true;}bool removeQueue(const std::string &name){// 构建请求msg::RemoveQueueReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(name);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("remove queue failed, name:%s", name.c_str());return false;}return true;}// 3. 绑定/解绑bool bind(const std::string &ename, const std::string &qname, const std::string &key){// 构建请求msg::BindReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_q_name(qname);req.set_bind_key(key);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("bind failed, ename:%s, qname:%s, key:%s", ename.c_str(), qname.c_str(), key.c_str());return false;}return true;}bool unbind(const std::string &ename, const std::string &qname){// 构建请求msg::UnbindReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_q_name(qname);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("unbind failed, ename:%s, qname:%s", ename.c_str(), qname.c_str());return false;}return true;}// 4. 订阅/取消订阅bool basicSubscribe(const std::string &qname, const std::string &consumer_tag, bool auto_ack, const ConsumerCallBack &cb){if (_consumer.get() != nullptr){ELOG("channel has consumer, qname:%s, consumer_tag:%s", qname.c_str(), consumer_tag.c_str());return false;}// 构建请求msg::BasicSubscribeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(qname);req.set_consumer_tag(consumer_tag);req.set_auto_ack(auto_ack);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic subscribe failed, qname:%s, consumer_tag:%s", qname.c_str(), consumer_tag.c_str());return false;}// 注册消费者_consumer = std::make_shared<Consumer>(consumer_tag, qname, auto_ack, cb);return true;}void basicCancel(){if (_consumer.get() == nullptr){// DLOG("不是消费者信道,不需要取消订阅");return;}// 构建请求msg::BasicCancelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_consumer_tag(_consumer->_tag);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic cancel failed, consumer:%s", _consumer->_tag.c_str());return;}// 注销消费者_consumer.reset();return;}// 5. 消息的发布,确认bool basicPublish(const std::string &ename, msg::BasicAttributes *bp, const std::string &body){// 构建请求msg::BasicPublishReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_body(body);if (bp != nullptr){req.mutable_attr()->CopyFrom(*bp);}// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic publish failed, qname:%s", ename.c_str());return false;}return true;}void basicAck(const std::string &id){if (_consumer.get() == nullptr){ELOG("channel has no consumer, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());return;}// 构建请求msg::BasicAckReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(_consumer->_qname);req.set_messagg_id(id);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic ack failed, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());return;}return;}// 6.打开/关闭信道bool openChannel(){msg::OpenChannelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);if(_codec.get() == nullptr){ELOG("codec is null");return false;}_codec->send(_conn, req);BasicCommonResponsePtr rsp = waitResponse(req.rid());return rsp->success() == true;}void closeChannel(){msg::CloseChannelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);_codec->send(_conn, req);BasicCommonResponsePtr rsp = waitResponse(req.rid());return;}private:// 等待服务端响应BasicCommonResponsePtr waitResponse(const std::string &req_id){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock, [this, &req_id](){ return _consume_rsp_map.find(req_id) != _consume_rsp_map.end(); });BasicCommonResponsePtr rsp = _consume_rsp_map[req_id];_consume_rsp_map.erase(req_id);return rsp;}public:// 给连接对象提供的函数,收到来自服务器不同类型的消息,进行不同的处理// 收到通用响应,向map中添加响应void putBasicCommonResponse(const BasicCommonResponsePtr &rsp){std::unique_lock<std::mutex> lock(_mutex);_consume_rsp_map[rsp->rid()] = rsp;_cv.notify_all();}// 收到来自服务器的消费响应,调用消费者的回调函数void consume(const BasicConsumeRspPtr &rsp){if (_consumer.get() == nullptr){ELOG("channel has no consumer");return;}if (_consumer->_tag != rsp->consumer_tag()){ELOG("consumer tag not match, consumer_tag:%s", _consumer->_tag.c_str());return;}_consumer->_cb(rsp->consumer_tag(), rsp->mutable_attr(), rsp->body());}};class ChannelManager{private:std::unordered_map<std::string, mq::Channel::ptr> _channels; // 信道管理器std::mutex _mutex; // 互斥锁public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager() {}Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec){std::unique_lock<std::mutex> lock(_mutex);auto channel = std::make_shared<mq::Channel>(conn, codec);_channels.insert(std::make_pair(channel->chid(), channel));return channel;}void remove(const std::string &chid){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(chid);//DLOG("erase channel:%s", chid.c_str());}Channel::ptr get(const std::string &chid){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(chid);if (it == _channels.end()){return Channel::ptr();}return it->second;}};
};
async_worker.hpp
#pragma once
#include <memory>
#include "../common_mq/thread_pool.hpp"
#include "muduo/net/EventLoopThread.h"
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"namespace mq
{class AsyncWorker{public:using ptr = std::shared_ptr<AsyncWorker>;// muduo::net::EventLoopThread _loopThread;std::unique_ptr<muduo::net::EventLoopThread> _loopThread;ThreadPool _pool;AsyncWorker(): _loopThread(std::make_unique<muduo::net::EventLoopThread>()){}};
}
connection.hpp
#pragma once
#include "proto/codec.h"
#include "proto/dispatcher.h"#include "../include/muduo/base/Logging.h"
#include "../include/muduo/base/Mutex.h"
#include "../include/muduo/net/TcpClient.h"
#include "../include/muduo/net/EventLoopThread.h"
#include "../include/muduo/net/TcpConnection.h"
#include "../include/muduo/base/CountDownLatch.h"
// #include "include/muduo/base/Mutex.h"
// #include "include/muduo/net/TcpClient.h"
// #include "include/muduo/net/EventLoopThread.h"
// #include "include/muduo/net/TcpConnection.h"
// #include "include/muduo/base/CountDownLatch.h"
#include <iostream>
#include <functional>
#include <string>
#include <stdio.h>
#include <unistd.h>
#include "async_worker.hpp"
#include "channel.hpp"
namespace mq
{using MessagePtr = std::shared_ptr<google::protobuf::Message>;class Connection{private:AsyncWorker::ptr _worker; // 异步工作者muduo::CountDownLatch _latch; // 等待连接成功,通知主线程muduo::net::TcpClient _client; // TCP客户端muduo::net::TcpConnectionPtr _conn; // TCP连接ProtobufDispatcher _dispatcher; // Protobuf消息派发器ProtobufCodecPtr _codec; // Protobuf编解码器ChannelManager::ptr _channelManager; // 信道管理器public:using ptr = std::shared_ptr<Connection>;Connection(const std::string &ip, uint16_t port, const AsyncWorker::ptr &worker): _worker(worker),_client(_worker->_loopThread->startLoop(), muduo::net::InetAddress(ip, port), "Connection"),_latch(1),_dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_channelManager(std::make_shared<ChannelManager>()){_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<msg::BasicConsumeRsp>(std::bind(&Connection::onBasicConsumeRspCb, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<msg::BasicCommonResponse>(std::bind(&Connection::onBasicCommonRspCb, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.connect();_latch.wait();}Channel::ptr openChannel(){// 创建客户端channelauto newchannel = _channelManager->create(_conn, _codec);// 在服务端也要创建channelbool ret = newchannel->openChannel();if (!ret){ELOG("open channel failed");return Channel::ptr();}return newchannel;}void closeChannel(const Channel::ptr &channel){channel->closeChannel(); // 关闭服务端channelstd::string chid = channel->chid();//DLOG("remove channel: %s", chid.c_str());_channelManager->remove(chid); // 删除客户端channel}private:void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()) // 连接成功{LOG_INFO << "Connected to " << conn->peerAddress().toIpPort();_conn = conn;_latch.countDown(); // 通知主线程连接成功,可以发送消息}else{LOG_ERROR << "Disconnected from " << conn->peerAddress().toIpPort();_conn.reset();}}void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn,const MessagePtr &message,muduo::Timestamp time){LOG_ERROR << "Unknown message: " << message->GetTypeName();conn->shutdown();}private:void onBasicConsumeRspCb(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeRspPtr &rsp, muduo::Timestamp time){auto channel = _channelManager->get(rsp->chid());if (channel.get() == nullptr){LOG_ERROR << "channel not found: " << rsp->chid();return;}_worker->_pool.push([channel, rsp]{channel->consume(rsp); // 封装任务,抛给线程池执行});}void onBasicCommonRspCb(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &rsp, muduo::Timestamp time){auto channel = _channelManager->get(rsp->chid());if (channel.get() == nullptr){LOG_ERROR << "channel not found: " << rsp->chid();return;}channel->putBasicCommonResponse(rsp); // 向map中添加响应,唤醒等待}};};
这篇关于Client客户端模块的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!