IM项目:进阶版即时通讯项目---文件存储和消息转发

2024-08-26 16:44

本文主要是介绍IM项目:进阶版即时通讯项目---文件存储和消息转发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 文件传输服务
    • 基本功能
    • 模块划分
    • 流程图
    • 实现逻辑
    • 代码实现
  • 消息转发
    • 功能设计
    • 模块划分
    • 获取转发目标和消息处理
    • 代码实现

文件传输服务

基本功能

  1. 文件的上传
  2. 文件的下载

模块划分

  1. 基于gflags进行参数和配置文件的解析
  2. 基于spdlog进行日志输出
  3. 基于etcd进行服务注册
  4. 基于brpc进行RPC服务器远程调用
  5. 基于文件流操作进行读写的封装

流程图

在这里插入图片描述

实现逻辑

  1. 单个文件上传
  • 获取文件的元数据
  • 分配文件的ID
  • 以文件ID为文件名打开文件,写入数据
  • 组织响应返回
  1. 单个文件下载
  • 从请求中获取文件ID
  • 打开文件,获取大小,读取数据
  • 组织响应返回
  1. 多个文件上传

这个就是循环一下

  1. 多个文件下载

这个就是循环一下

代码实现

/*** @file file_server.hpp* @brief 文件传输服务,和语音传输服务基本相同* @author zhaobohan (zhaobohan_free@163.com)*/
#include <brpc/server.h>
#include <butil/logging.h>#include "../../common/etcd.hpp"     // 服务注册模块封装
#include "../../common/logger.hpp"   // 日志模块封装
#include "../../common/utils.hpp"
// #include "base.pb.h"
// #include "file.pb.h"#include "../build/base.pb.h"
#include "../build/file.pb.h"namespace im 
{// 对于文件服务的封装
class FileServiceImpl : public im::FileService 
{
public:FileServiceImpl(const std::string &storage_path): _storage_path(storage_path){umask(0);mkdir(storage_path.c_str(), 0775);if (_storage_path.back() != '/') _storage_path.push_back('/');}~FileServiceImpl(){}void GetSingleFile(google::protobuf::RpcController* controller,const ::im::GetSingleFileReq* request,::im::GetSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 取出请求中的文件ID(起始就是文件名)std::string fid = request->file_id();std::string filename = _storage_path + fid;// 2. 将文件ID作为文件名,读取文件数据std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_data()->set_file_id(fid);response->mutable_file_data()->set_file_content(body);}void GetMultiFile(google::protobuf::RpcController* controller,const ::im::GetMultiFileReq* request,::im::GetMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 循环取出请求中的文件ID,读取文件数据进行填充for (int i = 0; i < request->file_id_list_size(); i++) {std::string fid = request->file_id_list(i);std::string filename = _storage_path + fid;std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}FileDownloadData data;data.set_file_id(fid);data.set_file_content(body);response->mutable_file_data()->insert({fid, data});}response->set_success(true);}void PutSingleFile(google::protobuf::RpcController* controller,const ::im::PutSingleFileReq* request,::im::PutSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 为文件生成一个唯一uudi作为文件名 以及 文件IDstd::string fid = uuid();std::string filename = _storage_path + fid;// 2. 取出请求中的文件数据,进行文件数据写入bool ret = writeFile(filename, request->file_data().file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_info()->set_file_id(fid);response->mutable_file_info()->set_file_size(request->file_data().file_size());response->mutable_file_info()->set_file_name(request->file_data().file_name());}void PutMultiFile(google::protobuf::RpcController* controller,const ::im::PutMultiFileReq* request,::im::PutMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());for (int i = 0; i < request->file_data_size(); i++) {std::string fid = uuid();std::string filename = _storage_path + fid;bool ret = writeFile(filename, request->file_data(i).file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}im::FileMessageInfo *info  = response->add_file_info();info->set_file_id(fid);info->set_file_size(request->file_data(i).file_size());info->set_file_name(request->file_data(i).file_name());}response->set_success(true);}private:std::string _storage_path;
};class FileServer 
{
public:using ptr = std::shared_ptr<FileServer>;FileServer(const Registry::ptr &reg_client,const std::shared_ptr<brpc::Server> &server):_reg_client(reg_client),_rpc_server(server){}~FileServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};class FileServerBuilder 
{
public:// 用于构造服务注册客户端对象void make_reg_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_reg_client = std::make_shared<Registry>(reg_host);_reg_client->registry(service_name, access_host);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads, const std::string &path = "./data/") {_rpc_server = std::make_shared<brpc::Server>();FileServiceImpl *file_service = new FileServiceImpl(path);int ret = _rpc_server->AddService(file_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}FileServer::ptr build() {if (!_reg_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}FileServer::ptr server = std::make_shared<FileServer>(_reg_client, _rpc_server);return server;}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};
}

消息转发

功能设计

消息转发子服务,主要是涉及到对于一条消息内容,组织消息的ID,以及其他元数据,然后传递给网关服务器,应该给谁进行发送

转发的目标一般是以聊天会话为基础进行传输的,通过会话就可以找到对应的聊天成员,作为一个转发的目标,并且还要把对应的数据存放在消息队列当中,消息队列收到消息后就会进行对应的回调处理,其实回调就是把数据存放在MySQL,elasticsearch,或者是文件存储系统中,这些内容我在后面的内容也都会涉及到,这里考虑到篇幅原因就不多多进行分析了

模块划分

  1. 基于gflags进行参数和配置文件解析
  2. 基于spdlog进行日志输出
  3. 基于etcd框架进行服务注册
  4. 基于ODB进行数据库对象操作
  5. 基于brpc进行RPC服务器搭建和远程调用
  6. 基于MQ将消息发布到消息队列并且进行对应的存储

下面来聊一下在这当中比较重要的接口

获取转发目标和消息处理

基本流程大概为

  1. 从请求中取出消息内容,会话ID,用户ID
  2. 根据用户ID,从用户管理子服务获取用户信息
  3. 根据消息内容进行填充消息结构,比如分配消息ID,填充发送者信息,填充消息产生时间
  4. 把消息传送给消息队列,进行持久化存储
  5. 从数据库获取目标会话的所有成员ID
  6. 组织响应,也就是完整的消息以及所有要被发送的用户ID,然后发送给网关,网关进行发送

代码实现

class TransmiteServiceImpl : public im::MsgTransmitService 
{
public:TransmiteServiceImpl(const std::string &user_service_name,const ServiceManager::ptr &channels,const std::shared_ptr<odb::core::database> &mysql_client,const std::string &exchange_name,const std::string &routing_key,const MQClient::ptr &mq_client): _user_service_name(user_service_name), _mm_channels(channels), _mysql_session_member_table(std::make_shared<ChatSessionMemeberTable>(mysql_client)), _exchange_name(exchange_name), _routing_key(routing_key), _mq_client(mq_client){}~TransmiteServiceImpl(){}// 获取转发对象void GetTransmitTarget(google::protobuf::RpcController* controller,const ::im::NewMessageReq* request,::im::GetTransmitTargetRsp* response,::google::protobuf::Closure* done) override {brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 从请求中获取关键信息:用户ID,所属会话ID,消息内容std::string rid = request->request_id();std::string uid = request->user_id();std::string chat_ssid = request->chat_session_id();const MessageContent &content = request->message();// 进行消息组织:发送者-用户子服务获取信息,所属会话,消息内容,产生时间,消息IDauto channel = _mm_channels->choose(_user_service_name);if (!channel) {LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);return err_response(rid, "没有可供访问的用户子服务节点!");}// 去查找用户的信息UserService_Stub stub(channel.get());GetUserInfoReq req;GetUserInfoRsp rsp;req.set_request_id(rid);req.set_user_id(uid);brpc::Controller cntl;stub.GetUserInfo(&cntl, &req, &rsp, nullptr);if (cntl.Failed() == true || rsp.success() == false) {LOG_ERROR("{} - 用户子服务调用失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "用户子服务调用失败!");}// 构造消息结构体MessageInfo message;message.set_message_id(uuid());message.set_chat_session_id(chat_ssid);message.set_timestamp(time(nullptr));message.mutable_sender()->CopyFrom(rsp.user_info());message.mutable_message()->CopyFrom(content);// 获取消息转发客户端用户列表auto target_list = _mysql_session_member_table->members(chat_ssid);// 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);if (ret == false) {LOG_ERROR("{} - 持久化消息发布失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "持久化消息发布失败:!");}// 组织响应,告诉网关要给谁传消息response->set_request_id(rid);response->set_success(true);response->mutable_message()->CopyFrom(message);for (const auto &id : target_list) {response->add_target_id_list(id);}}private:// 用户子服务调用相关信息std::string _user_service_name;ServiceManager::ptr _mm_channels;// 聊天会话成员表的操作句柄ChatSessionMemeberTable::ptr _mysql_session_member_table;// 消息队列客户端句柄std::string _exchange_name;std::string _routing_key;MQClient::ptr _mq_client;
};class TransmiteServer 
{
public:using ptr = std::shared_ptr<TransmiteServer>;TransmiteServer(const std::shared_ptr<odb::core::database> &mysql_client,const Discovery::ptr discovery_client,const Registry::ptr &registry_client,const std::shared_ptr<brpc::Server> &server):_service_discoverer(discovery_client),_registry_client(registry_client),_mysql_client(mysql_client),_rpc_server(server){}~TransmiteServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:// 服务发现客户端Discovery::ptr _service_discoverer;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};class TransmiteServerBuilder 
{
public:// 构造mysql客户端对象void make_mysql_object(const std::string &user,const std::string &pswd,const std::string &host,const std::string &db,const std::string &cset,int port,int conn_pool_count) {_mysql_client = ODBFactory::create(user, pswd, host, db, cset, port, conn_pool_count);}// 用于构造服务发现客户端&信道管理对象void make_discovery_object(const std::string &reg_host,const std::string &base_service_name,const std::string &user_service_name) {_user_service_name = user_service_name;_mm_channels = std::make_shared<ServiceManager>();_mm_channels->declared(user_service_name);LOG_DEBUG("设置用户子服务为需添加管理的子服务:{}", user_service_name);auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);_service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);}// 用于构造服务注册客户端对象void make_registry_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_registry_client = std::make_shared<Registry>(reg_host);_registry_client->registry(service_name, access_host);}// 用于构造rabbitmq客户端对象void make_mq_object(const std::string &user, const std::string &passwd,const std::string &host,const std::string &exchange_name,const std::string &queue_name,const std::string &binding_key) {_routing_key = binding_key;_exchange_name = exchange_name;_mq_client = std::make_shared<MQClient>(user, passwd, host);_mq_client->declareComponents(exchange_name, queue_name, binding_key);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads) {if (!_mq_client) {LOG_ERROR("还未初始化消息队列客户端模块!");abort();}if (!_mm_channels) {LOG_ERROR("还未初始化信道管理模块!");abort();}if (!_mysql_client) {LOG_ERROR("还未初始化Mysql数据库模块!");abort();}_rpc_server = std::make_shared<brpc::Server>();TransmiteServiceImpl *transmite_service = new TransmiteServiceImpl(_user_service_name, _mm_channels, _mysql_client, _exchange_name, _routing_key, _mq_client);int ret = _rpc_server->AddService(transmite_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}TransmiteServer::ptr build() {if (!_service_discoverer) {LOG_ERROR("还未初始化服务发现模块!");abort();}if (!_registry_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}TransmiteServer::ptr server = std::make_shared<TransmiteServer>(_mysql_client, _service_discoverer, _registry_client, _rpc_server);return server;}private:std::string _user_service_name;ServiceManager::ptr _mm_channels;Discovery::ptr _service_discoverer;std::string _routing_key;std::string _exchange_name;MQClient::ptr _mq_client;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};

这篇关于IM项目:进阶版即时通讯项目---文件存储和消息转发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1109118

相关文章

关于MongoDB图片URL存储异常问题以及解决

《关于MongoDB图片URL存储异常问题以及解决》:本文主要介绍关于MongoDB图片URL存储异常问题以及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录MongoDB图片URL存储异常问题项目场景问题描述原因分析解决方案预防措施js总结MongoDB图

SpringBoot项目中报错The field screenShot exceeds its maximum permitted size of 1048576 bytes.的问题及解决

《SpringBoot项目中报错ThefieldscreenShotexceedsitsmaximumpermittedsizeof1048576bytes.的问题及解决》这篇文章... 目录项目场景问题描述原因分析解决方案总结项目场景javascript提示:项目相关背景:项目场景:基于Spring

解决Maven项目idea找不到本地仓库jar包问题以及使用mvn install:install-file

《解决Maven项目idea找不到本地仓库jar包问题以及使用mvninstall:install-file》:本文主要介绍解决Maven项目idea找不到本地仓库jar包问题以及使用mvnin... 目录Maven项目idea找不到本地仓库jar包以及使用mvn install:install-file基

springboot项目如何开启https服务

《springboot项目如何开启https服务》:本文主要介绍springboot项目如何开启https服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录springboot项目开启https服务1. 生成SSL证书密钥库使用keytool生成自签名证书将

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my

Node.js 数据库 CRUD 项目示例详解(完美解决方案)

《Node.js数据库CRUD项目示例详解(完美解决方案)》:本文主要介绍Node.js数据库CRUD项目示例详解(完美解决方案),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考... 目录项目结构1. 初始化项目2. 配置数据库连接 (config/db.js)3. 创建模型 (models/

springboot项目中常用的工具类和api详解

《springboot项目中常用的工具类和api详解》在SpringBoot项目中,开发者通常会依赖一些工具类和API来简化开发、提高效率,以下是一些常用的工具类及其典型应用场景,涵盖Spring原生... 目录1. Spring Framework 自带工具类(1) StringUtils(2) Coll

Spring Boot项目部署命令java -jar的各种参数及作用详解

《SpringBoot项目部署命令java-jar的各种参数及作用详解》:本文主要介绍SpringBoot项目部署命令java-jar的各种参数及作用的相关资料,包括设置内存大小、垃圾回收... 目录前言一、基础命令结构二、常见的 Java 命令参数1. 设置内存大小2. 配置垃圾回收器3. 配置线程栈大小

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me