本文主要是介绍IM项目:进阶版即时通讯项目---文件存储和消息转发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 文件传输服务
- 基本功能
- 模块划分
- 流程图
- 实现逻辑
- 代码实现
- 消息转发
- 功能设计
- 模块划分
- 获取转发目标和消息处理
- 代码实现
文件传输服务
基本功能
- 文件的上传
- 文件的下载
模块划分
- 基于gflags进行参数和配置文件的解析
- 基于spdlog进行日志输出
- 基于etcd进行服务注册
- 基于brpc进行RPC服务器远程调用
- 基于文件流操作进行读写的封装
流程图
实现逻辑
- 单个文件上传
- 获取文件的元数据
- 分配文件的ID
- 以文件ID为文件名打开文件,写入数据
- 组织响应返回
- 单个文件下载
- 从请求中获取文件ID
- 打开文件,获取大小,读取数据
- 组织响应返回
- 多个文件上传
这个就是循环一下
- 多个文件下载
这个就是循环一下
代码实现
/*** @file file_server.hpp* @brief 文件传输服务,和语音传输服务基本相同* @author zhaobohan (zhaobohan_free@163.com)*/
#include <brpc/server.h>
#include <butil/logging.h>#include "../../common/etcd.hpp" // 服务注册模块封装
#include "../../common/logger.hpp" // 日志模块封装
#include "../../common/utils.hpp"
// #include "base.pb.h"
// #include "file.pb.h"#include "../build/base.pb.h"
#include "../build/file.pb.h"namespace im
{// 对于文件服务的封装
class FileServiceImpl : public im::FileService
{
public:FileServiceImpl(const std::string &storage_path): _storage_path(storage_path){umask(0);mkdir(storage_path.c_str(), 0775);if (_storage_path.back() != '/') _storage_path.push_back('/');}~FileServiceImpl(){}void GetSingleFile(google::protobuf::RpcController* controller,const ::im::GetSingleFileReq* request,::im::GetSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 取出请求中的文件ID(起始就是文件名)std::string fid = request->file_id();std::string filename = _storage_path + fid;// 2. 将文件ID作为文件名,读取文件数据std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_data()->set_file_id(fid);response->mutable_file_data()->set_file_content(body);}void GetMultiFile(google::protobuf::RpcController* controller,const ::im::GetMultiFileReq* request,::im::GetMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 循环取出请求中的文件ID,读取文件数据进行填充for (int i = 0; i < request->file_id_list_size(); i++) {std::string fid = request->file_id_list(i);std::string filename = _storage_path + fid;std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}FileDownloadData data;data.set_file_id(fid);data.set_file_content(body);response->mutable_file_data()->insert({fid, data});}response->set_success(true);}void PutSingleFile(google::protobuf::RpcController* controller,const ::im::PutSingleFileReq* request,::im::PutSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 为文件生成一个唯一uudi作为文件名 以及 文件IDstd::string fid = uuid();std::string filename = _storage_path + fid;// 2. 取出请求中的文件数据,进行文件数据写入bool ret = writeFile(filename, request->file_data().file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_info()->set_file_id(fid);response->mutable_file_info()->set_file_size(request->file_data().file_size());response->mutable_file_info()->set_file_name(request->file_data().file_name());}void PutMultiFile(google::protobuf::RpcController* controller,const ::im::PutMultiFileReq* request,::im::PutMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());for (int i = 0; i < request->file_data_size(); i++) {std::string fid = uuid();std::string filename = _storage_path + fid;bool ret = writeFile(filename, request->file_data(i).file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}im::FileMessageInfo *info = response->add_file_info();info->set_file_id(fid);info->set_file_size(request->file_data(i).file_size());info->set_file_name(request->file_data(i).file_name());}response->set_success(true);}private:std::string _storage_path;
};class FileServer
{
public:using ptr = std::shared_ptr<FileServer>;FileServer(const Registry::ptr ®_client,const std::shared_ptr<brpc::Server> &server):_reg_client(reg_client),_rpc_server(server){}~FileServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};class FileServerBuilder
{
public:// 用于构造服务注册客户端对象void make_reg_object(const std::string ®_host,const std::string &service_name,const std::string &access_host) {_reg_client = std::make_shared<Registry>(reg_host);_reg_client->registry(service_name, access_host);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads, const std::string &path = "./data/") {_rpc_server = std::make_shared<brpc::Server>();FileServiceImpl *file_service = new FileServiceImpl(path);int ret = _rpc_server->AddService(file_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}FileServer::ptr build() {if (!_reg_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}FileServer::ptr server = std::make_shared<FileServer>(_reg_client, _rpc_server);return server;}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};
}
消息转发
功能设计
消息转发子服务,主要是涉及到对于一条消息内容,组织消息的ID,以及其他元数据,然后传递给网关服务器,应该给谁进行发送
转发的目标一般是以聊天会话为基础进行传输的,通过会话就可以找到对应的聊天成员,作为一个转发的目标,并且还要把对应的数据存放在消息队列当中,消息队列收到消息后就会进行对应的回调处理,其实回调就是把数据存放在MySQL,elasticsearch,或者是文件存储系统中,这些内容我在后面的内容也都会涉及到,这里考虑到篇幅原因就不多多进行分析了
模块划分
- 基于gflags进行参数和配置文件解析
- 基于spdlog进行日志输出
- 基于etcd框架进行服务注册
- 基于ODB进行数据库对象操作
- 基于brpc进行RPC服务器搭建和远程调用
- 基于MQ将消息发布到消息队列并且进行对应的存储
下面来聊一下在这当中比较重要的接口
获取转发目标和消息处理
基本流程大概为
- 从请求中取出消息内容,会话ID,用户ID
- 根据用户ID,从用户管理子服务获取用户信息
- 根据消息内容进行填充消息结构,比如分配消息ID,填充发送者信息,填充消息产生时间
- 把消息传送给消息队列,进行持久化存储
- 从数据库获取目标会话的所有成员ID
- 组织响应,也就是完整的消息以及所有要被发送的用户ID,然后发送给网关,网关进行发送
代码实现
class TransmiteServiceImpl : public im::MsgTransmitService
{
public:TransmiteServiceImpl(const std::string &user_service_name,const ServiceManager::ptr &channels,const std::shared_ptr<odb::core::database> &mysql_client,const std::string &exchange_name,const std::string &routing_key,const MQClient::ptr &mq_client): _user_service_name(user_service_name), _mm_channels(channels), _mysql_session_member_table(std::make_shared<ChatSessionMemeberTable>(mysql_client)), _exchange_name(exchange_name), _routing_key(routing_key), _mq_client(mq_client){}~TransmiteServiceImpl(){}// 获取转发对象void GetTransmitTarget(google::protobuf::RpcController* controller,const ::im::NewMessageReq* request,::im::GetTransmitTargetRsp* response,::google::protobuf::Closure* done) override {brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 从请求中获取关键信息:用户ID,所属会话ID,消息内容std::string rid = request->request_id();std::string uid = request->user_id();std::string chat_ssid = request->chat_session_id();const MessageContent &content = request->message();// 进行消息组织:发送者-用户子服务获取信息,所属会话,消息内容,产生时间,消息IDauto channel = _mm_channels->choose(_user_service_name);if (!channel) {LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);return err_response(rid, "没有可供访问的用户子服务节点!");}// 去查找用户的信息UserService_Stub stub(channel.get());GetUserInfoReq req;GetUserInfoRsp rsp;req.set_request_id(rid);req.set_user_id(uid);brpc::Controller cntl;stub.GetUserInfo(&cntl, &req, &rsp, nullptr);if (cntl.Failed() == true || rsp.success() == false) {LOG_ERROR("{} - 用户子服务调用失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "用户子服务调用失败!");}// 构造消息结构体MessageInfo message;message.set_message_id(uuid());message.set_chat_session_id(chat_ssid);message.set_timestamp(time(nullptr));message.mutable_sender()->CopyFrom(rsp.user_info());message.mutable_message()->CopyFrom(content);// 获取消息转发客户端用户列表auto target_list = _mysql_session_member_table->members(chat_ssid);// 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);if (ret == false) {LOG_ERROR("{} - 持久化消息发布失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "持久化消息发布失败:!");}// 组织响应,告诉网关要给谁传消息response->set_request_id(rid);response->set_success(true);response->mutable_message()->CopyFrom(message);for (const auto &id : target_list) {response->add_target_id_list(id);}}private:// 用户子服务调用相关信息std::string _user_service_name;ServiceManager::ptr _mm_channels;// 聊天会话成员表的操作句柄ChatSessionMemeberTable::ptr _mysql_session_member_table;// 消息队列客户端句柄std::string _exchange_name;std::string _routing_key;MQClient::ptr _mq_client;
};class TransmiteServer
{
public:using ptr = std::shared_ptr<TransmiteServer>;TransmiteServer(const std::shared_ptr<odb::core::database> &mysql_client,const Discovery::ptr discovery_client,const Registry::ptr ®istry_client,const std::shared_ptr<brpc::Server> &server):_service_discoverer(discovery_client),_registry_client(registry_client),_mysql_client(mysql_client),_rpc_server(server){}~TransmiteServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:// 服务发现客户端Discovery::ptr _service_discoverer;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};class TransmiteServerBuilder
{
public:// 构造mysql客户端对象void make_mysql_object(const std::string &user,const std::string &pswd,const std::string &host,const std::string &db,const std::string &cset,int port,int conn_pool_count) {_mysql_client = ODBFactory::create(user, pswd, host, db, cset, port, conn_pool_count);}// 用于构造服务发现客户端&信道管理对象void make_discovery_object(const std::string ®_host,const std::string &base_service_name,const std::string &user_service_name) {_user_service_name = user_service_name;_mm_channels = std::make_shared<ServiceManager>();_mm_channels->declared(user_service_name);LOG_DEBUG("设置用户子服务为需添加管理的子服务:{}", user_service_name);auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);_service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);}// 用于构造服务注册客户端对象void make_registry_object(const std::string ®_host,const std::string &service_name,const std::string &access_host) {_registry_client = std::make_shared<Registry>(reg_host);_registry_client->registry(service_name, access_host);}// 用于构造rabbitmq客户端对象void make_mq_object(const std::string &user, const std::string &passwd,const std::string &host,const std::string &exchange_name,const std::string &queue_name,const std::string &binding_key) {_routing_key = binding_key;_exchange_name = exchange_name;_mq_client = std::make_shared<MQClient>(user, passwd, host);_mq_client->declareComponents(exchange_name, queue_name, binding_key);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads) {if (!_mq_client) {LOG_ERROR("还未初始化消息队列客户端模块!");abort();}if (!_mm_channels) {LOG_ERROR("还未初始化信道管理模块!");abort();}if (!_mysql_client) {LOG_ERROR("还未初始化Mysql数据库模块!");abort();}_rpc_server = std::make_shared<brpc::Server>();TransmiteServiceImpl *transmite_service = new TransmiteServiceImpl(_user_service_name, _mm_channels, _mysql_client, _exchange_name, _routing_key, _mq_client);int ret = _rpc_server->AddService(transmite_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}TransmiteServer::ptr build() {if (!_service_discoverer) {LOG_ERROR("还未初始化服务发现模块!");abort();}if (!_registry_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}TransmiteServer::ptr server = std::make_shared<TransmiteServer>(_mysql_client, _service_discoverer, _registry_client, _rpc_server);return server;}private:std::string _user_service_name;ServiceManager::ptr _mm_channels;Discovery::ptr _service_discoverer;std::string _routing_key;std::string _exchange_name;MQClient::ptr _mq_client;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};
这篇关于IM项目:进阶版即时通讯项目---文件存储和消息转发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!