IM项目:进阶版即时通讯项目---文件存储和消息转发

2024-08-26 16:44

本文主要是介绍IM项目:进阶版即时通讯项目---文件存储和消息转发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 文件传输服务
    • 基本功能
    • 模块划分
    • 流程图
    • 实现逻辑
    • 代码实现
  • 消息转发
    • 功能设计
    • 模块划分
    • 获取转发目标和消息处理
    • 代码实现

文件传输服务

基本功能

  1. 文件的上传
  2. 文件的下载

模块划分

  1. 基于gflags进行参数和配置文件的解析
  2. 基于spdlog进行日志输出
  3. 基于etcd进行服务注册
  4. 基于brpc进行RPC服务器远程调用
  5. 基于文件流操作进行读写的封装

流程图

在这里插入图片描述

实现逻辑

  1. 单个文件上传
  • 获取文件的元数据
  • 分配文件的ID
  • 以文件ID为文件名打开文件,写入数据
  • 组织响应返回
  1. 单个文件下载
  • 从请求中获取文件ID
  • 打开文件,获取大小,读取数据
  • 组织响应返回
  1. 多个文件上传

这个就是循环一下

  1. 多个文件下载

这个就是循环一下

代码实现

/*** @file file_server.hpp* @brief 文件传输服务,和语音传输服务基本相同* @author zhaobohan (zhaobohan_free@163.com)*/
#include <brpc/server.h>
#include <butil/logging.h>#include "../../common/etcd.hpp"     // 服务注册模块封装
#include "../../common/logger.hpp"   // 日志模块封装
#include "../../common/utils.hpp"
// #include "base.pb.h"
// #include "file.pb.h"#include "../build/base.pb.h"
#include "../build/file.pb.h"namespace im 
{// 对于文件服务的封装
class FileServiceImpl : public im::FileService 
{
public:FileServiceImpl(const std::string &storage_path): _storage_path(storage_path){umask(0);mkdir(storage_path.c_str(), 0775);if (_storage_path.back() != '/') _storage_path.push_back('/');}~FileServiceImpl(){}void GetSingleFile(google::protobuf::RpcController* controller,const ::im::GetSingleFileReq* request,::im::GetSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 取出请求中的文件ID(起始就是文件名)std::string fid = request->file_id();std::string filename = _storage_path + fid;// 2. 将文件ID作为文件名,读取文件数据std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_data()->set_file_id(fid);response->mutable_file_data()->set_file_content(body);}void GetMultiFile(google::protobuf::RpcController* controller,const ::im::GetMultiFileReq* request,::im::GetMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 循环取出请求中的文件ID,读取文件数据进行填充for (int i = 0; i < request->file_id_list_size(); i++) {std::string fid = request->file_id_list(i);std::string filename = _storage_path + fid;std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}FileDownloadData data;data.set_file_id(fid);data.set_file_content(body);response->mutable_file_data()->insert({fid, data});}response->set_success(true);}void PutSingleFile(google::protobuf::RpcController* controller,const ::im::PutSingleFileReq* request,::im::PutSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 为文件生成一个唯一uudi作为文件名 以及 文件IDstd::string fid = uuid();std::string filename = _storage_path + fid;// 2. 取出请求中的文件数据,进行文件数据写入bool ret = writeFile(filename, request->file_data().file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_info()->set_file_id(fid);response->mutable_file_info()->set_file_size(request->file_data().file_size());response->mutable_file_info()->set_file_name(request->file_data().file_name());}void PutMultiFile(google::protobuf::RpcController* controller,const ::im::PutMultiFileReq* request,::im::PutMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());for (int i = 0; i < request->file_data_size(); i++) {std::string fid = uuid();std::string filename = _storage_path + fid;bool ret = writeFile(filename, request->file_data(i).file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}im::FileMessageInfo *info  = response->add_file_info();info->set_file_id(fid);info->set_file_size(request->file_data(i).file_size());info->set_file_name(request->file_data(i).file_name());}response->set_success(true);}private:std::string _storage_path;
};class FileServer 
{
public:using ptr = std::shared_ptr<FileServer>;FileServer(const Registry::ptr &reg_client,const std::shared_ptr<brpc::Server> &server):_reg_client(reg_client),_rpc_server(server){}~FileServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};class FileServerBuilder 
{
public:// 用于构造服务注册客户端对象void make_reg_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_reg_client = std::make_shared<Registry>(reg_host);_reg_client->registry(service_name, access_host);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads, const std::string &path = "./data/") {_rpc_server = std::make_shared<brpc::Server>();FileServiceImpl *file_service = new FileServiceImpl(path);int ret = _rpc_server->AddService(file_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}FileServer::ptr build() {if (!_reg_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}FileServer::ptr server = std::make_shared<FileServer>(_reg_client, _rpc_server);return server;}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};
}

消息转发

功能设计

消息转发子服务,主要是涉及到对于一条消息内容,组织消息的ID,以及其他元数据,然后传递给网关服务器,应该给谁进行发送

转发的目标一般是以聊天会话为基础进行传输的,通过会话就可以找到对应的聊天成员,作为一个转发的目标,并且还要把对应的数据存放在消息队列当中,消息队列收到消息后就会进行对应的回调处理,其实回调就是把数据存放在MySQL,elasticsearch,或者是文件存储系统中,这些内容我在后面的内容也都会涉及到,这里考虑到篇幅原因就不多多进行分析了

模块划分

  1. 基于gflags进行参数和配置文件解析
  2. 基于spdlog进行日志输出
  3. 基于etcd框架进行服务注册
  4. 基于ODB进行数据库对象操作
  5. 基于brpc进行RPC服务器搭建和远程调用
  6. 基于MQ将消息发布到消息队列并且进行对应的存储

下面来聊一下在这当中比较重要的接口

获取转发目标和消息处理

基本流程大概为

  1. 从请求中取出消息内容,会话ID,用户ID
  2. 根据用户ID,从用户管理子服务获取用户信息
  3. 根据消息内容进行填充消息结构,比如分配消息ID,填充发送者信息,填充消息产生时间
  4. 把消息传送给消息队列,进行持久化存储
  5. 从数据库获取目标会话的所有成员ID
  6. 组织响应,也就是完整的消息以及所有要被发送的用户ID,然后发送给网关,网关进行发送

代码实现

class TransmiteServiceImpl : public im::MsgTransmitService 
{
public:TransmiteServiceImpl(const std::string &user_service_name,const ServiceManager::ptr &channels,const std::shared_ptr<odb::core::database> &mysql_client,const std::string &exchange_name,const std::string &routing_key,const MQClient::ptr &mq_client): _user_service_name(user_service_name), _mm_channels(channels), _mysql_session_member_table(std::make_shared<ChatSessionMemeberTable>(mysql_client)), _exchange_name(exchange_name), _routing_key(routing_key), _mq_client(mq_client){}~TransmiteServiceImpl(){}// 获取转发对象void GetTransmitTarget(google::protobuf::RpcController* controller,const ::im::NewMessageReq* request,::im::GetTransmitTargetRsp* response,::google::protobuf::Closure* done) override {brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 从请求中获取关键信息:用户ID,所属会话ID,消息内容std::string rid = request->request_id();std::string uid = request->user_id();std::string chat_ssid = request->chat_session_id();const MessageContent &content = request->message();// 进行消息组织:发送者-用户子服务获取信息,所属会话,消息内容,产生时间,消息IDauto channel = _mm_channels->choose(_user_service_name);if (!channel) {LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);return err_response(rid, "没有可供访问的用户子服务节点!");}// 去查找用户的信息UserService_Stub stub(channel.get());GetUserInfoReq req;GetUserInfoRsp rsp;req.set_request_id(rid);req.set_user_id(uid);brpc::Controller cntl;stub.GetUserInfo(&cntl, &req, &rsp, nullptr);if (cntl.Failed() == true || rsp.success() == false) {LOG_ERROR("{} - 用户子服务调用失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "用户子服务调用失败!");}// 构造消息结构体MessageInfo message;message.set_message_id(uuid());message.set_chat_session_id(chat_ssid);message.set_timestamp(time(nullptr));message.mutable_sender()->CopyFrom(rsp.user_info());message.mutable_message()->CopyFrom(content);// 获取消息转发客户端用户列表auto target_list = _mysql_session_member_table->members(chat_ssid);// 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);if (ret == false) {LOG_ERROR("{} - 持久化消息发布失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "持久化消息发布失败:!");}// 组织响应,告诉网关要给谁传消息response->set_request_id(rid);response->set_success(true);response->mutable_message()->CopyFrom(message);for (const auto &id : target_list) {response->add_target_id_list(id);}}private:// 用户子服务调用相关信息std::string _user_service_name;ServiceManager::ptr _mm_channels;// 聊天会话成员表的操作句柄ChatSessionMemeberTable::ptr _mysql_session_member_table;// 消息队列客户端句柄std::string _exchange_name;std::string _routing_key;MQClient::ptr _mq_client;
};class TransmiteServer 
{
public:using ptr = std::shared_ptr<TransmiteServer>;TransmiteServer(const std::shared_ptr<odb::core::database> &mysql_client,const Discovery::ptr discovery_client,const Registry::ptr &registry_client,const std::shared_ptr<brpc::Server> &server):_service_discoverer(discovery_client),_registry_client(registry_client),_mysql_client(mysql_client),_rpc_server(server){}~TransmiteServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:// 服务发现客户端Discovery::ptr _service_discoverer;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};class TransmiteServerBuilder 
{
public:// 构造mysql客户端对象void make_mysql_object(const std::string &user,const std::string &pswd,const std::string &host,const std::string &db,const std::string &cset,int port,int conn_pool_count) {_mysql_client = ODBFactory::create(user, pswd, host, db, cset, port, conn_pool_count);}// 用于构造服务发现客户端&信道管理对象void make_discovery_object(const std::string &reg_host,const std::string &base_service_name,const std::string &user_service_name) {_user_service_name = user_service_name;_mm_channels = std::make_shared<ServiceManager>();_mm_channels->declared(user_service_name);LOG_DEBUG("设置用户子服务为需添加管理的子服务:{}", user_service_name);auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);_service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);}// 用于构造服务注册客户端对象void make_registry_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_registry_client = std::make_shared<Registry>(reg_host);_registry_client->registry(service_name, access_host);}// 用于构造rabbitmq客户端对象void make_mq_object(const std::string &user, const std::string &passwd,const std::string &host,const std::string &exchange_name,const std::string &queue_name,const std::string &binding_key) {_routing_key = binding_key;_exchange_name = exchange_name;_mq_client = std::make_shared<MQClient>(user, passwd, host);_mq_client->declareComponents(exchange_name, queue_name, binding_key);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads) {if (!_mq_client) {LOG_ERROR("还未初始化消息队列客户端模块!");abort();}if (!_mm_channels) {LOG_ERROR("还未初始化信道管理模块!");abort();}if (!_mysql_client) {LOG_ERROR("还未初始化Mysql数据库模块!");abort();}_rpc_server = std::make_shared<brpc::Server>();TransmiteServiceImpl *transmite_service = new TransmiteServiceImpl(_user_service_name, _mm_channels, _mysql_client, _exchange_name, _routing_key, _mq_client);int ret = _rpc_server->AddService(transmite_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}TransmiteServer::ptr build() {if (!_service_discoverer) {LOG_ERROR("还未初始化服务发现模块!");abort();}if (!_registry_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}TransmiteServer::ptr server = std::make_shared<TransmiteServer>(_mysql_client, _service_discoverer, _registry_client, _rpc_server);return server;}private:std::string _user_service_name;ServiceManager::ptr _mm_channels;Discovery::ptr _service_discoverer;std::string _routing_key;std::string _exchange_name;MQClient::ptr _mq_client;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};

这篇关于IM项目:进阶版即时通讯项目---文件存储和消息转发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1109118

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定

[MySQL表的增删改查-进阶]

🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 💻💻💻数据库约束 🔭🔭🔭约束类型 not null: 指示某列不能存储 NULL 值unique: 保证某列的每行必须有唯一的值default: 规定没有给列赋值时的默认值.primary key:

Vue3项目开发——新闻发布管理系统(六)

文章目录 八、首页设计开发1、页面设计2、登录访问拦截实现3、用户基本信息显示①封装用户基本信息获取接口②用户基本信息存储③用户基本信息调用④用户基本信息动态渲染 4、退出功能实现①注册点击事件②添加退出功能③数据清理 5、代码下载 八、首页设计开发 登录成功后,系统就进入了首页。接下来,也就进行首页的开发了。 1、页面设计 系统页面主要分为三部分,左侧为系统的菜单栏,右侧

【Linux 从基础到进阶】Ansible自动化运维工具使用

Ansible自动化运维工具使用 Ansible 是一款开源的自动化运维工具,采用无代理架构(agentless),基于 SSH 连接进行管理,具有简单易用、灵活强大、可扩展性高等特点。它广泛用于服务器管理、应用部署、配置管理等任务。本文将介绍 Ansible 的安装、基本使用方法及一些实际运维场景中的应用,旨在帮助运维人员快速上手并熟练运用 Ansible。 1. Ansible的核心概念