IM项目:进阶版即时通讯项目---文件存储和消息转发

2024-08-26 16:44

本文主要是介绍IM项目:进阶版即时通讯项目---文件存储和消息转发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 文件传输服务
    • 基本功能
    • 模块划分
    • 流程图
    • 实现逻辑
    • 代码实现
  • 消息转发
    • 功能设计
    • 模块划分
    • 获取转发目标和消息处理
    • 代码实现

文件传输服务

基本功能

  1. 文件的上传
  2. 文件的下载

模块划分

  1. 基于gflags进行参数和配置文件的解析
  2. 基于spdlog进行日志输出
  3. 基于etcd进行服务注册
  4. 基于brpc进行RPC服务器远程调用
  5. 基于文件流操作进行读写的封装

流程图

在这里插入图片描述

实现逻辑

  1. 单个文件上传
  • 获取文件的元数据
  • 分配文件的ID
  • 以文件ID为文件名打开文件,写入数据
  • 组织响应返回
  1. 单个文件下载
  • 从请求中获取文件ID
  • 打开文件,获取大小,读取数据
  • 组织响应返回
  1. 多个文件上传

这个就是循环一下

  1. 多个文件下载

这个就是循环一下

代码实现

/*** @file file_server.hpp* @brief 文件传输服务,和语音传输服务基本相同* @author zhaobohan (zhaobohan_free@163.com)*/
#include <brpc/server.h>
#include <butil/logging.h>#include "../../common/etcd.hpp"     // 服务注册模块封装
#include "../../common/logger.hpp"   // 日志模块封装
#include "../../common/utils.hpp"
// #include "base.pb.h"
// #include "file.pb.h"#include "../build/base.pb.h"
#include "../build/file.pb.h"namespace im 
{// 对于文件服务的封装
class FileServiceImpl : public im::FileService 
{
public:FileServiceImpl(const std::string &storage_path): _storage_path(storage_path){umask(0);mkdir(storage_path.c_str(), 0775);if (_storage_path.back() != '/') _storage_path.push_back('/');}~FileServiceImpl(){}void GetSingleFile(google::protobuf::RpcController* controller,const ::im::GetSingleFileReq* request,::im::GetSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 取出请求中的文件ID(起始就是文件名)std::string fid = request->file_id();std::string filename = _storage_path + fid;// 2. 将文件ID作为文件名,读取文件数据std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_data()->set_file_id(fid);response->mutable_file_data()->set_file_content(body);}void GetMultiFile(google::protobuf::RpcController* controller,const ::im::GetMultiFileReq* request,::im::GetMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 循环取出请求中的文件ID,读取文件数据进行填充for (int i = 0; i < request->file_id_list_size(); i++) {std::string fid = request->file_id_list(i);std::string filename = _storage_path + fid;std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}FileDownloadData data;data.set_file_id(fid);data.set_file_content(body);response->mutable_file_data()->insert({fid, data});}response->set_success(true);}void PutSingleFile(google::protobuf::RpcController* controller,const ::im::PutSingleFileReq* request,::im::PutSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 为文件生成一个唯一uudi作为文件名 以及 文件IDstd::string fid = uuid();std::string filename = _storage_path + fid;// 2. 取出请求中的文件数据,进行文件数据写入bool ret = writeFile(filename, request->file_data().file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_info()->set_file_id(fid);response->mutable_file_info()->set_file_size(request->file_data().file_size());response->mutable_file_info()->set_file_name(request->file_data().file_name());}void PutMultiFile(google::protobuf::RpcController* controller,const ::im::PutMultiFileReq* request,::im::PutMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());for (int i = 0; i < request->file_data_size(); i++) {std::string fid = uuid();std::string filename = _storage_path + fid;bool ret = writeFile(filename, request->file_data(i).file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}im::FileMessageInfo *info  = response->add_file_info();info->set_file_id(fid);info->set_file_size(request->file_data(i).file_size());info->set_file_name(request->file_data(i).file_name());}response->set_success(true);}private:std::string _storage_path;
};class FileServer 
{
public:using ptr = std::shared_ptr<FileServer>;FileServer(const Registry::ptr &reg_client,const std::shared_ptr<brpc::Server> &server):_reg_client(reg_client),_rpc_server(server){}~FileServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};class FileServerBuilder 
{
public:// 用于构造服务注册客户端对象void make_reg_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_reg_client = std::make_shared<Registry>(reg_host);_reg_client->registry(service_name, access_host);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads, const std::string &path = "./data/") {_rpc_server = std::make_shared<brpc::Server>();FileServiceImpl *file_service = new FileServiceImpl(path);int ret = _rpc_server->AddService(file_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}FileServer::ptr build() {if (!_reg_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}FileServer::ptr server = std::make_shared<FileServer>(_reg_client, _rpc_server);return server;}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};
}

消息转发

功能设计

消息转发子服务,主要是涉及到对于一条消息内容,组织消息的ID,以及其他元数据,然后传递给网关服务器,应该给谁进行发送

转发的目标一般是以聊天会话为基础进行传输的,通过会话就可以找到对应的聊天成员,作为一个转发的目标,并且还要把对应的数据存放在消息队列当中,消息队列收到消息后就会进行对应的回调处理,其实回调就是把数据存放在MySQL,elasticsearch,或者是文件存储系统中,这些内容我在后面的内容也都会涉及到,这里考虑到篇幅原因就不多多进行分析了

模块划分

  1. 基于gflags进行参数和配置文件解析
  2. 基于spdlog进行日志输出
  3. 基于etcd框架进行服务注册
  4. 基于ODB进行数据库对象操作
  5. 基于brpc进行RPC服务器搭建和远程调用
  6. 基于MQ将消息发布到消息队列并且进行对应的存储

下面来聊一下在这当中比较重要的接口

获取转发目标和消息处理

基本流程大概为

  1. 从请求中取出消息内容,会话ID,用户ID
  2. 根据用户ID,从用户管理子服务获取用户信息
  3. 根据消息内容进行填充消息结构,比如分配消息ID,填充发送者信息,填充消息产生时间
  4. 把消息传送给消息队列,进行持久化存储
  5. 从数据库获取目标会话的所有成员ID
  6. 组织响应,也就是完整的消息以及所有要被发送的用户ID,然后发送给网关,网关进行发送

代码实现

class TransmiteServiceImpl : public im::MsgTransmitService 
{
public:TransmiteServiceImpl(const std::string &user_service_name,const ServiceManager::ptr &channels,const std::shared_ptr<odb::core::database> &mysql_client,const std::string &exchange_name,const std::string &routing_key,const MQClient::ptr &mq_client): _user_service_name(user_service_name), _mm_channels(channels), _mysql_session_member_table(std::make_shared<ChatSessionMemeberTable>(mysql_client)), _exchange_name(exchange_name), _routing_key(routing_key), _mq_client(mq_client){}~TransmiteServiceImpl(){}// 获取转发对象void GetTransmitTarget(google::protobuf::RpcController* controller,const ::im::NewMessageReq* request,::im::GetTransmitTargetRsp* response,::google::protobuf::Closure* done) override {brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 从请求中获取关键信息:用户ID,所属会话ID,消息内容std::string rid = request->request_id();std::string uid = request->user_id();std::string chat_ssid = request->chat_session_id();const MessageContent &content = request->message();// 进行消息组织:发送者-用户子服务获取信息,所属会话,消息内容,产生时间,消息IDauto channel = _mm_channels->choose(_user_service_name);if (!channel) {LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);return err_response(rid, "没有可供访问的用户子服务节点!");}// 去查找用户的信息UserService_Stub stub(channel.get());GetUserInfoReq req;GetUserInfoRsp rsp;req.set_request_id(rid);req.set_user_id(uid);brpc::Controller cntl;stub.GetUserInfo(&cntl, &req, &rsp, nullptr);if (cntl.Failed() == true || rsp.success() == false) {LOG_ERROR("{} - 用户子服务调用失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "用户子服务调用失败!");}// 构造消息结构体MessageInfo message;message.set_message_id(uuid());message.set_chat_session_id(chat_ssid);message.set_timestamp(time(nullptr));message.mutable_sender()->CopyFrom(rsp.user_info());message.mutable_message()->CopyFrom(content);// 获取消息转发客户端用户列表auto target_list = _mysql_session_member_table->members(chat_ssid);// 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);if (ret == false) {LOG_ERROR("{} - 持久化消息发布失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "持久化消息发布失败:!");}// 组织响应,告诉网关要给谁传消息response->set_request_id(rid);response->set_success(true);response->mutable_message()->CopyFrom(message);for (const auto &id : target_list) {response->add_target_id_list(id);}}private:// 用户子服务调用相关信息std::string _user_service_name;ServiceManager::ptr _mm_channels;// 聊天会话成员表的操作句柄ChatSessionMemeberTable::ptr _mysql_session_member_table;// 消息队列客户端句柄std::string _exchange_name;std::string _routing_key;MQClient::ptr _mq_client;
};class TransmiteServer 
{
public:using ptr = std::shared_ptr<TransmiteServer>;TransmiteServer(const std::shared_ptr<odb::core::database> &mysql_client,const Discovery::ptr discovery_client,const Registry::ptr &registry_client,const std::shared_ptr<brpc::Server> &server):_service_discoverer(discovery_client),_registry_client(registry_client),_mysql_client(mysql_client),_rpc_server(server){}~TransmiteServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:// 服务发现客户端Discovery::ptr _service_discoverer;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};class TransmiteServerBuilder 
{
public:// 构造mysql客户端对象void make_mysql_object(const std::string &user,const std::string &pswd,const std::string &host,const std::string &db,const std::string &cset,int port,int conn_pool_count) {_mysql_client = ODBFactory::create(user, pswd, host, db, cset, port, conn_pool_count);}// 用于构造服务发现客户端&信道管理对象void make_discovery_object(const std::string &reg_host,const std::string &base_service_name,const std::string &user_service_name) {_user_service_name = user_service_name;_mm_channels = std::make_shared<ServiceManager>();_mm_channels->declared(user_service_name);LOG_DEBUG("设置用户子服务为需添加管理的子服务:{}", user_service_name);auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);_service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);}// 用于构造服务注册客户端对象void make_registry_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_registry_client = std::make_shared<Registry>(reg_host);_registry_client->registry(service_name, access_host);}// 用于构造rabbitmq客户端对象void make_mq_object(const std::string &user, const std::string &passwd,const std::string &host,const std::string &exchange_name,const std::string &queue_name,const std::string &binding_key) {_routing_key = binding_key;_exchange_name = exchange_name;_mq_client = std::make_shared<MQClient>(user, passwd, host);_mq_client->declareComponents(exchange_name, queue_name, binding_key);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads) {if (!_mq_client) {LOG_ERROR("还未初始化消息队列客户端模块!");abort();}if (!_mm_channels) {LOG_ERROR("还未初始化信道管理模块!");abort();}if (!_mysql_client) {LOG_ERROR("还未初始化Mysql数据库模块!");abort();}_rpc_server = std::make_shared<brpc::Server>();TransmiteServiceImpl *transmite_service = new TransmiteServiceImpl(_user_service_name, _mm_channels, _mysql_client, _exchange_name, _routing_key, _mq_client);int ret = _rpc_server->AddService(transmite_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}TransmiteServer::ptr build() {if (!_service_discoverer) {LOG_ERROR("还未初始化服务发现模块!");abort();}if (!_registry_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}TransmiteServer::ptr server = std::make_shared<TransmiteServer>(_mysql_client, _service_discoverer, _registry_client, _rpc_server);return server;}private:std::string _user_service_name;ServiceManager::ptr _mm_channels;Discovery::ptr _service_discoverer;std::string _routing_key;std::string _exchange_name;MQClient::ptr _mq_client;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};

这篇关于IM项目:进阶版即时通讯项目---文件存储和消息转发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1109118

相关文章

部署Vue项目到服务器后404错误的原因及解决方案

《部署Vue项目到服务器后404错误的原因及解决方案》文章介绍了Vue项目部署步骤以及404错误的解决方案,部署步骤包括构建项目、上传文件、配置Web服务器、重启Nginx和访问域名,404错误通常是... 目录一、vue项目部署步骤二、404错误原因及解决方案错误场景原因分析解决方案一、Vue项目部署步骤

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

golang内存对齐的项目实践

《golang内存对齐的项目实践》本文主要介绍了golang内存对齐的项目实践,内存对齐不仅有助于提高内存访问效率,还确保了与硬件接口的兼容性,是Go语言编程中不可忽视的重要优化手段,下面就来介绍一下... 目录一、结构体中的字段顺序与内存对齐二、内存对齐的原理与规则三、调整结构体字段顺序优化内存对齐四、内

配置springboot项目动静分离打包分离lib方式

《配置springboot项目动静分离打包分离lib方式》本文介绍了如何将SpringBoot工程中的静态资源和配置文件分离出来,以减少jar包大小,方便修改配置文件,通过在jar包同级目录创建co... 目录前言1、分离配置文件原理2、pom文件配置3、使用package命令打包4、总结前言默认情况下,

python实现简易SSL的项目实践

《python实现简易SSL的项目实践》本文主要介绍了python实现简易SSL的项目实践,包括CA.py、server.py和client.py三个模块,文中通过示例代码介绍的非常详细,对大家的学习... 目录运行环境运行前准备程序实现与流程说明运行截图代码CA.pyclient.pyserver.py参

Redis存储的列表分页和检索的实现方法

《Redis存储的列表分页和检索的实现方法》在Redis中,列表(List)是一种有序的数据结构,通常用于存储一系列元素,由于列表是有序的,可以通过索引来访问元素,因此可以很方便地实现分页和检索功能,... 目录一、Redis 列表的基本操作二、分页实现三、检索实现3.1 方法 1:客户端过滤3.2 方法

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

使用C/C++调用libcurl调试消息的方式

《使用C/C++调用libcurl调试消息的方式》在使用C/C++调用libcurl进行HTTP请求时,有时我们需要查看请求的/应答消息的内容(包括请求头和请求体)以方便调试,libcurl提供了多种... 目录1. libcurl 调试工具简介2. 输出请求消息使用 CURLOPT_VERBOSE使用 C

IDEA运行spring项目时,控制台未出现的解决方案

《IDEA运行spring项目时,控制台未出现的解决方案》文章总结了在使用IDEA运行代码时,控制台未出现的问题和解决方案,问题可能是由于点击图标或重启IDEA后控制台仍未显示,解决方案提供了解决方法... 目录问题分析解决方案总结问题js使用IDEA,点击运行按钮,运行结束,但控制台未出现http://