Client客户端模块

2024-09-01 02:20
文章标签 模块 客户端 client

本文主要是介绍Client客户端模块,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一.Client模块介绍

二.Client具体实现

1.消费者/订阅者模块

成员变量

  • _tag: 标识消费者的字符串,用于区分不同消费者。
  • _qname: 消费者订阅的队列名称,表明该消费者从哪个队列获取消息。
  • _auto_ack: 布尔值,指示消费者是否自动确认消息。
  • _cb: 客户端的消费者回调函数,用于处理消息的逻辑。

2.信道管理模块

Channel的实现

Channel类的主要职责是通过TCP连接与服务器进行通信,并处理各种消息队列操作。

成员变量

  • _chid:信道的唯一标识符,由UUID生成,用于区分不同的信道实例。
  • _consumer:表示与信道关联的消费者对象,管理消息的消费。
  • _conn:表示与服务器的TCP连接,通过此连接发送和接收消息。
  • _codec:编解码器,用于将消息在协议缓冲区(Protobuf)格式和字节流之间进行转换。
  • _mutex_cv:用于线程安全的消息处理和等待响应的同步机制。
协议定制

包含服务器发给客户端的消费响应和通用响应,在connection模块中,客户端收到这两种类型的响应,会分别调用对应的消息处理回调函数.(Connection模块中详细解释)

构造函数和析构函数

信道的主要操作

构建请求+发送给服务器

交换机/队列/绑定
  • 声明、删除交换机declareExchangeremoveExchange 方法用于在服务器端声明或删除交换机。每个请求都通过编解码器发送,并等待服务器的响应。
  • 声明、删除队列declareQueueremoveQueue 方法用于在服务器端声明或删除队列,操作流程与交换机类似。
  • 绑定、解绑bindunbind 方法用于将队列与交换机绑定或解绑,以实现消息的路由功能。
       // 1. 声明/删除/查找交换机bool declareExchange(const std::string &name, msg::ExchangeType type,bool durable,bool auto_del,google::protobuf::Map<std::string, std::string> &args){// 构建请求msg::DeclareExchangeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(name);req.set_e_type(type);req.set_durable(durable);req.set_auto_delete(auto_del);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("declare exchange failed, name:%s", name.c_str());return false;}return true;}bool removeExchange(const std::string &name){// 构建请求msg::RemoveExchangeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(name);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("remove exchange failed, name:%s", name.c_str());return false;}return true;}// 2. 声明/删除队列bool declareQueue(const std::string &name, bool durable,bool exclusive, bool auto_del,google::protobuf::Map<std::string, std::string> &args){// 构建请求msg::DeclareQueueReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(name);req.set_durable(durable);req.set_exclusive(exclusive);req.set_auto_delete(auto_del);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("declare queue failed, name:%s", name.c_str());return false;}return true;}bool removeQueue(const std::string &name){// 构建请求msg::RemoveQueueReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(name);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("remove queue failed, name:%s", name.c_str());return false;}return true;}// 3. 绑定/解绑bool bind(const std::string &ename, const std::string &qname, const std::string &key){// 构建请求msg::BindReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_q_name(qname);req.set_bind_key(key);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("bind failed, ename:%s, qname:%s, key:%s", ename.c_str(), qname.c_str(), key.c_str());return false;}return true;}bool unbind(const std::string &ename, const std::string &qname){// 构建请求msg::UnbindReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_q_name(qname);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("unbind failed, ename:%s, qname:%s", ename.c_str(), qname.c_str());return false;}return true;}
订阅/消息发布/消息确认
  • 订阅、取消订阅basicSubscribe 方法用于订阅消息队列,并通过回调函数处理接收到的消息。basicCancel 方法用于取消订阅。
  • 消息发布与确认basicPublish 方法用于发布消息到指定交换机,而 basicAck 方法用于确认收到的消息。

订阅时要创建consumer对象,取消订阅时注销consumer

  // 5. 消息的发布,确认bool basicPublish(const std::string &ename, msg::BasicAttributes *bp, const std::string &body){// 构建请求msg::BasicPublishReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_body(body);if (bp != nullptr){req.mutable_attr()->CopyFrom(*bp);}// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic publish failed, qname:%s", ename.c_str());return false;}return true;}void basicAck(const std::string &id){if (_consumer.get() == nullptr){ELOG("channel has no consumer, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());return;}// 构建请求msg::BasicAckReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(_consumer->_qname);req.set_messagg_id(id);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic ack failed, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());return;}return;}
打开/关闭信道
        // 6.打开/关闭信道bool openChannel(){msg::OpenChannelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);if(_codec.get() == nullptr){ELOG("codec is null");return false;}_codec->send(_conn, req);BasicCommonResponsePtr rsp = waitResponse(req.rid());return rsp->success() == true;}void closeChannel(){msg::CloseChannelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);_codec->send(_conn, req);BasicCommonResponsePtr rsp = waitResponse(req.rid());return;}
等待响应和相应消息处理函数
  • waitResponse 方法用于等待服务器的响应,该方法使用条件变量实现同步等待。
  • putBasicCommonResponse 方法用于将服务器的响应放入映射表中,供后续处理使用。
  • consume 方法用于处理从服务器接收到的消息,并调用消费者的回调函数。
   // 等待服务端响应BasicCommonResponsePtr waitResponse(const std::string &req_id){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock, [this, &req_id](){ return _consume_rsp_map.find(req_id) != _consume_rsp_map.end(); });BasicCommonResponsePtr rsp = _consume_rsp_map[req_id];_consume_rsp_map.erase(req_id);return rsp;}
   // 给连接对象提供的函数,收到来自服务器不同类型的消息,进行不同的处理// 收到通用响应,向map中添加响应void putBasicCommonResponse(const BasicCommonResponsePtr &rsp){std::unique_lock<std::mutex> lock(_mutex);_consume_rsp_map[rsp->rid()] = rsp;_cv.notify_all();}// 收到来自服务器的消费响应,调用消费者的回调函数void consume(const BasicConsumeRspPtr &rsp){if (_consumer.get() == nullptr){ELOG("channel has no consumer");return;}if (_consumer->_tag != rsp->consumer_tag()){ELOG("consumer tag not match, consumer_tag:%s", _consumer->_tag.c_str());return;}_consumer->_cb(rsp->consumer_tag(), rsp->mutable_attr(), rsp->body());}

ChannelManager的实现

3.异步线程模块

4.连接管理模块

这个模块同样是针对muduo库客户端连接的二次封装,向用户提供创建channel信道的接口,创建信道后,可以通过信道来获取指定服务。

三.全部代码

consumer.hpp

#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include <vector>
#include <functional>
namespace mq
{// tag  BasicAttributes bodyusing ConsumerCallBack = std::function<void(const std::string &, const msg::BasicAttributes *, const std::string &)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string _tag;     // 消费者标识std::string _qname;   // 订阅的队列的名称bool _auto_ack;       // 是否自动确认ConsumerCallBack _cb; // 消费者回调函数Consumer(const std::string &ctag, const std::string &qname, bool auto_ack, const ConsumerCallBack &cb): _tag(ctag), _qname(qname), _auto_ack(auto_ack), _cb(cb){DLOG("consumer:%s created %p", ctag.c_str(), this);}Consumer(){DLOG("consumer created %p", this);}~Consumer(){DLOG("consumer destroyed %p", this);}};
};

channel.hpp

#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include "../common_mq/myproto.pb.h"
#include <string>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include "muduo/net/TcpConnection.h"
#include <condition_variable>
#include "consumer.hpp"
#include "proto/codec.h"
#include "../common_mq/myproto.pb.h"namespace mq
{using BasicConsumeRspPtr = std::shared_ptr<msg::BasicConsumeRsp>;using BasicCommonResponsePtr = std::shared_ptr<msg::BasicCommonResponse>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;class Channel{private:std::string _chid;                                                        // 信道IDConsumer::ptr _consumer;                                                  // 消费者对象muduo::net::TcpConnectionPtr _conn;                                       // 连接对象ProtobufCodecPtr _codec;                                                  // 编解码器std::mutex _mutex;                                                        // 互斥锁std::condition_variable _cv;                                              // 条件变量std::unordered_map<std::string, BasicCommonResponsePtr> _consume_rsp_map; // 存放常规响应的mappublic:using ptr = std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr &conn,const ProtobufCodecPtr &codec): _conn(conn), _codec(codec){_chid = UUIDHelper::uuid();DLOG("channel:%s created", _chid.c_str());}Channel(){}~Channel(){basicCancel();DLOG("channel:%s destroyed", _chid.c_str());}std::string chid() const{return _chid;}// 发送请求,在服务器端处理请求,创建相应的数据结构对象// 1. 声明/删除/查找交换机bool declareExchange(const std::string &name, msg::ExchangeType type,bool durable,bool auto_del,google::protobuf::Map<std::string, std::string> &args){// 构建请求msg::DeclareExchangeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(name);req.set_e_type(type);req.set_durable(durable);req.set_auto_delete(auto_del);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("declare exchange failed, name:%s", name.c_str());return false;}return true;}bool removeExchange(const std::string &name){// 构建请求msg::RemoveExchangeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(name);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("remove exchange failed, name:%s", name.c_str());return false;}return true;}// 2. 声明/删除队列bool declareQueue(const std::string &name, bool durable,bool exclusive, bool auto_del,google::protobuf::Map<std::string, std::string> &args){// 构建请求msg::DeclareQueueReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(name);req.set_durable(durable);req.set_exclusive(exclusive);req.set_auto_delete(auto_del);req.mutable_args()->swap(args);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("declare queue failed, name:%s", name.c_str());return false;}return true;}bool removeQueue(const std::string &name){// 构建请求msg::RemoveQueueReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(name);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("remove queue failed, name:%s", name.c_str());return false;}return true;}// 3. 绑定/解绑bool bind(const std::string &ename, const std::string &qname, const std::string &key){// 构建请求msg::BindReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_q_name(qname);req.set_bind_key(key);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("bind failed, ename:%s, qname:%s, key:%s", ename.c_str(), qname.c_str(), key.c_str());return false;}return true;}bool unbind(const std::string &ename, const std::string &qname){// 构建请求msg::UnbindReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_q_name(qname);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("unbind failed, ename:%s, qname:%s", ename.c_str(), qname.c_str());return false;}return true;}// 4. 订阅/取消订阅bool basicSubscribe(const std::string &qname, const std::string &consumer_tag, bool auto_ack, const ConsumerCallBack &cb){if (_consumer.get() != nullptr){ELOG("channel has consumer, qname:%s, consumer_tag:%s", qname.c_str(), consumer_tag.c_str());return false;}// 构建请求msg::BasicSubscribeReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(qname);req.set_consumer_tag(consumer_tag);req.set_auto_ack(auto_ack);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic subscribe failed, qname:%s, consumer_tag:%s", qname.c_str(), consumer_tag.c_str());return false;}// 注册消费者_consumer = std::make_shared<Consumer>(consumer_tag, qname, auto_ack, cb);return true;}void basicCancel(){if (_consumer.get() == nullptr){// DLOG("不是消费者信道,不需要取消订阅");return;}// 构建请求msg::BasicCancelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_consumer_tag(_consumer->_tag);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic cancel failed, consumer:%s", _consumer->_tag.c_str());return;}// 注销消费者_consumer.reset();return;}// 5. 消息的发布,确认bool basicPublish(const std::string &ename, msg::BasicAttributes *bp, const std::string &body){// 构建请求msg::BasicPublishReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_e_name(ename);req.set_body(body);if (bp != nullptr){req.mutable_attr()->CopyFrom(*bp);}// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic publish failed, qname:%s", ename.c_str());return false;}return true;}void basicAck(const std::string &id){if (_consumer.get() == nullptr){ELOG("channel has no consumer, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());return;}// 构建请求msg::BasicAckReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);req.set_q_name(_consumer->_qname);req.set_messagg_id(id);// 发送请求_codec->send(_conn, req);// 异步操作,需要等待响应BasicCommonResponsePtr rsp = waitResponse(req.rid());if (rsp->success() != true){ELOG("basic ack failed, qname:%s, id:%s", _consumer->_qname.c_str(), id.c_str());return;}return;}// 6.打开/关闭信道bool openChannel(){msg::OpenChannelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);if(_codec.get() == nullptr){ELOG("codec is null");return false;}_codec->send(_conn, req);BasicCommonResponsePtr rsp = waitResponse(req.rid());return rsp->success() == true;}void closeChannel(){msg::CloseChannelReq req;req.set_rid(UUIDHelper::uuid());req.set_chid(_chid);_codec->send(_conn, req);BasicCommonResponsePtr rsp = waitResponse(req.rid());return;}private:// 等待服务端响应BasicCommonResponsePtr waitResponse(const std::string &req_id){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock, [this, &req_id](){ return _consume_rsp_map.find(req_id) != _consume_rsp_map.end(); });BasicCommonResponsePtr rsp = _consume_rsp_map[req_id];_consume_rsp_map.erase(req_id);return rsp;}public:// 给连接对象提供的函数,收到来自服务器不同类型的消息,进行不同的处理// 收到通用响应,向map中添加响应void putBasicCommonResponse(const BasicCommonResponsePtr &rsp){std::unique_lock<std::mutex> lock(_mutex);_consume_rsp_map[rsp->rid()] = rsp;_cv.notify_all();}// 收到来自服务器的消费响应,调用消费者的回调函数void consume(const BasicConsumeRspPtr &rsp){if (_consumer.get() == nullptr){ELOG("channel has no consumer");return;}if (_consumer->_tag != rsp->consumer_tag()){ELOG("consumer tag not match, consumer_tag:%s", _consumer->_tag.c_str());return;}_consumer->_cb(rsp->consumer_tag(), rsp->mutable_attr(), rsp->body());}};class ChannelManager{private:std::unordered_map<std::string, mq::Channel::ptr> _channels; // 信道管理器std::mutex _mutex;                                           // 互斥锁public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager() {}Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec){std::unique_lock<std::mutex> lock(_mutex);auto channel = std::make_shared<mq::Channel>(conn, codec);_channels.insert(std::make_pair(channel->chid(), channel));return channel;}void remove(const std::string &chid){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(chid);//DLOG("erase channel:%s", chid.c_str());}Channel::ptr get(const std::string &chid){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(chid);if (it == _channels.end()){return Channel::ptr();}return it->second;}};
};

async_worker.hpp

#pragma once
#include <memory>
#include "../common_mq/thread_pool.hpp"
#include "muduo/net/EventLoopThread.h"
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"namespace mq
{class AsyncWorker{public:using ptr = std::shared_ptr<AsyncWorker>;// muduo::net::EventLoopThread _loopThread;std::unique_ptr<muduo::net::EventLoopThread> _loopThread;ThreadPool _pool;AsyncWorker(): _loopThread(std::make_unique<muduo::net::EventLoopThread>()){}};
}

connection.hpp

#pragma once
#include "proto/codec.h"
#include "proto/dispatcher.h"#include "../include/muduo/base/Logging.h"
#include "../include/muduo/base/Mutex.h"
#include "../include/muduo/net/TcpClient.h"
#include "../include/muduo/net/EventLoopThread.h"
#include "../include/muduo/net/TcpConnection.h"
#include "../include/muduo/base/CountDownLatch.h"
// #include "include/muduo/base/Mutex.h"
// #include "include/muduo/net/TcpClient.h"
// #include "include/muduo/net/EventLoopThread.h"
// #include "include/muduo/net/TcpConnection.h"
// #include "include/muduo/base/CountDownLatch.h"
#include <iostream>
#include <functional>
#include <string>
#include <stdio.h>
#include <unistd.h>
#include "async_worker.hpp"
#include "channel.hpp"
namespace mq
{using MessagePtr = std::shared_ptr<google::protobuf::Message>;class Connection{private:AsyncWorker::ptr _worker;      // 异步工作者muduo::CountDownLatch _latch;  // 等待连接成功,通知主线程muduo::net::TcpClient _client; // TCP客户端muduo::net::TcpConnectionPtr _conn; // TCP连接ProtobufDispatcher _dispatcher;     // Protobuf消息派发器ProtobufCodecPtr _codec;            // Protobuf编解码器ChannelManager::ptr _channelManager; // 信道管理器public:using ptr = std::shared_ptr<Connection>;Connection(const std::string &ip, uint16_t port, const AsyncWorker::ptr &worker): _worker(worker),_client(_worker->_loopThread->startLoop(), muduo::net::InetAddress(ip, port), "Connection"),_latch(1),_dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_channelManager(std::make_shared<ChannelManager>()){_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<msg::BasicConsumeRsp>(std::bind(&Connection::onBasicConsumeRspCb, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<msg::BasicCommonResponse>(std::bind(&Connection::onBasicCommonRspCb, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.connect();_latch.wait();}Channel::ptr openChannel(){// 创建客户端channelauto newchannel = _channelManager->create(_conn, _codec);// 在服务端也要创建channelbool ret = newchannel->openChannel();if (!ret){ELOG("open channel failed");return Channel::ptr();}return newchannel;}void closeChannel(const Channel::ptr &channel){channel->closeChannel();                  // 关闭服务端channelstd::string chid = channel->chid();//DLOG("remove channel: %s", chid.c_str());_channelManager->remove(chid); // 删除客户端channel}private:void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()) // 连接成功{LOG_INFO << "Connected to " << conn->peerAddress().toIpPort();_conn = conn;_latch.countDown(); // 通知主线程连接成功,可以发送消息}else{LOG_ERROR << "Disconnected from " << conn->peerAddress().toIpPort();_conn.reset();}}void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn,const MessagePtr &message,muduo::Timestamp time){LOG_ERROR << "Unknown message: " << message->GetTypeName();conn->shutdown();}private:void onBasicConsumeRspCb(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeRspPtr &rsp, muduo::Timestamp time){auto channel = _channelManager->get(rsp->chid());if (channel.get() == nullptr){LOG_ERROR << "channel not found: " << rsp->chid();return;}_worker->_pool.push([channel, rsp]{channel->consume(rsp); // 封装任务,抛给线程池执行});}void onBasicCommonRspCb(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &rsp, muduo::Timestamp time){auto channel = _channelManager->get(rsp->chid());if (channel.get() == nullptr){LOG_ERROR << "channel not found: " << rsp->chid();return;}channel->putBasicCommonResponse(rsp); // 向map中添加响应,唤醒等待}};};

这篇关于Client客户端模块的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1125678

相关文章

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

Java Websocket实例【服务端与客户端实现全双工通讯】

Java Websocket实例【服务端与客户端实现全双工通讯】 现很多网站为了实现即时通讯,所用的技术都是轮询(polling)。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发 出HTTP request,然后由服务器返回最新的数据给客服端的浏览器。这种传统的HTTP request 的模式带来很明显的缺点 – 浏 览器需要不断的向服务器发出请求,然而HTTP

Jenkins构建Maven聚合工程,指定构建子模块

一、设置单独编译构建子模块 配置: 1、Root POM指向父pom.xml 2、Goals and options指定构建模块的参数: mvn -pl project1/project1-son -am clean package 单独构建project1-son项目以及它所依赖的其它项目。 说明: mvn clean package -pl 父级模块名/子模块名 -am参数

寻迹模块TCRT5000的应用原理和功能实现(基于STM32)

目录 概述 1 认识TCRT5000 1.1 模块介绍 1.2 电气特性 2 系统应用 2.1 系统架构 2.2 STM32Cube创建工程 3 功能实现 3.1 代码实现 3.2 源代码文件 4 功能测试 4.1 检测黑线状态 4.2 未检测黑线状态 概述 本文主要介绍TCRT5000模块的使用原理,包括该模块的硬件实现方式,电路实现原理,还使用STM32类

python内置模块datetime.time类详细介绍

​​​​​​​Python的datetime模块是一个强大的日期和时间处理库,它提供了多个类来处理日期和时间。主要包括几个功能类datetime.date、datetime.time、datetime.datetime、datetime.timedelta,datetime.timezone等。 ----------动动小手,非常感谢各位的点赞收藏和关注。----------- 使用datet

C8T6超绝模块--EXTI

C8T6超绝模块–EXTI 大纲 控制流程结构体分析EXTI实现按键 具体案例 控制流程 这里是流程框图,具体可以去看我STM32专栏的EXTI的具体分析 结构体分析 typedef struct {uint32_t EXTI_Line; // 中断/事件线EXTIMode_TypeDef EXTI_Mode; // EXTI 模式EXTITrigger_TypeDef EXTI_

1、创建多模块的maven springboot项目

现在的java的项目都是多模块的,这次也跟个风。 目标:实现下述结构 项目AcedBoot, 子模块:         aced-api 对外提供接口,         aced-web 给前端提供接口,         aced-service 服务层,         aced-dao 数据底层,包含数据库mapper和实体类entity,         aced-commo

Vue2电商项目(二) Home模块的开发;(还需要补充js节流和防抖的回顾链接)

文章目录 一、Home模块拆分1. 三级联动组件TypeNav2. 其余组件 二、发送请求的准备工作1. axios的二次封装2. 统一管理接口API----跨域3. nprogress进度条 三、 vuex模块开发四、TypeNav三级联动组件开发1. 动态展示三级联动数据2. 三级联动 动态背景(1)、方式一:CSS样式(2)、方式二:JS 3. 控制二三级数据隐藏与显示--绑定styl

PrestaShop免费模块/插件/扩展/工具下载

PrestaShop免费模块/插件/扩展/工具下载 PrestaShop免费模块 适用于您的电子商务网站的PrestaShop模块 现有超过3,000个PrestaShop模块可帮助您自定义在线商店,增加流量,提高转化率并建立客户忠诚度。 使您的电子商务网站成功! 下载(超过142+之多的PrestaShop官网认证的免费模块) 标签PrestaShop免费, PrestaShop免费工