TeamTalk消息服务器(未读计数)

2024-09-01 07:44

本文主要是介绍TeamTalk消息服务器(未读计数),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

信令和协议设计

enum MessageCmdID {// ...... 省略无关逻辑 CID_MSG_UNREAD_CNT_REQUEST = 775,CID_MSG_UNREAD_CNT_RESPONSE = 776,// ...... 省略无关逻辑 
};message IMUnreadMsgCntReq{//cmd id:		0x0307required uint32 user_id = 1;optional bytes attach_data = 20;	
}message IMUnreadMsgCntRsp{//cmd id:		0x0308required uint32 user_id = 1;required uint32 total_cnt = 2; // 多个人的未读消息repeated IM.BaseDefine.UnreadInfo unreadinfo_list = 3;optional bytes attach_data = 20;
}message UnreadInfo{required uint32 session_id = 1; // 会话IDrequired SessionType session_type = 2; // 会话类型required uint32 unread_cnt = 3; // 未读消息数量required uint32 latest_msg_id = 4; // 最新的消息IDrequired bytes latest_msg_data = 5; // 最新的消息required MsgType latest_msg_type = 6;  // 消息类型required uint32 latest_msg_from_user_id = 7;  //发送的用户id
}

流程图:

请添加图片描述

代码分析

msg_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 CMsgConn::_HandleClientUnreadMsgCntRequest 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{// ...... 省略无关逻辑 switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑          case CID_MSG_UNREAD_CNT_REQUEST:_HandleClientUnreadMsgCntRequest(pPdu );break;              // ...... 省略无关逻辑 }
}void CMsgConn::_HandleClientUnreadMsgCntRequest(CImPdu* pPdu)
{log("HandleClientUnreadMsgCntReq, from_id=%u ", GetUserId());IM::Message::IMUnreadMsgCntReq msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));CDBServConn* pDBConn = get_db_serv_conn_for_login();if (pDBConn) {CDbAttachData attach(ATTACH_TYPE_HANDLE, m_handle, 0);msg.set_user_id(GetUserId());msg.set_attach_data(attach.GetBuffer(), attach.GetLength());pPdu->SetPBMsg(&msg);pDBConn->SendPdu(pPdu);}
}

db_proxy_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 DB_PROXY::getUnreadMsgCounter函数

值得注意的是,返回的未读消息里面包含每个会话的未读消息个数,消息类型,最后一条消息。

m_handler_map.insert(make_pair(uint32_t(CID_MSG_UNREAD_CNT_REQUEST), DB_PROXY::getUnreadMsgCounter));void getUnreadMsgCounter(CImPdu* pPdu, uint32_t conn_uuid)
{IM::Message::IMUnreadMsgCntReq msg;IM::Message::IMUnreadMsgCntRsp msgResp;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){CImPdu* pPduResp = new CImPdu;uint32_t nUserId = msg.user_id();list<IM::BaseDefine::UnreadInfo> lsUnreadCount;uint32_t nTotalCnt = 0;// 从redis获取未读消息数量 和 从mysql获取最后一条未读消息CMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);CGroupMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);msgResp.set_user_id(nUserId);msgResp.set_total_cnt(nTotalCnt);for(auto it= lsUnreadCount.begin(); it!=lsUnreadCount.end(); ++it){IM::BaseDefine::UnreadInfo* pInfo = msgResp.add_unreadinfo_list();pInfo->set_session_id(it->session_id());pInfo->set_session_type(it->session_type());pInfo->set_unread_cnt(it->unread_cnt());pInfo->set_latest_msg_id(it->latest_msg_id());pInfo->set_latest_msg_data(it->latest_msg_data());pInfo->set_latest_msg_type(it->latest_msg_type());pInfo->set_latest_msg_from_user_id(it->latest_msg_from_user_id());}log("userId=%d, unreadCnt=%u, totalCount=%u", nUserId, msgResp.unreadinfo_list_size(), nTotalCnt);msgResp.set_attach_data(msg.attach_data());pPduResp->SetPBMsg(&msgResp);pPduResp->SetSeqNum(pPdu->GetSeqNum());pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_UNREAD_CNT_RESPONSE);CProxyConn::AddResponsePdu(conn_uuid, pPduResp);}else{log("parse pb failed");}
}
void CMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{// redisCacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){map<string, string> mapUnread;string strKey = "unread_" + int2string(nUserId);bool bRet = pCacheConn->hgetAll(strKey, mapUnread);pCacheManager->RelCacheConn(pCacheConn);if(bRet){IM::BaseDefine::UnreadInfo cUnreadInfo;for (auto it = mapUnread.begin(); it != mapUnread.end(); it++) {cUnreadInfo.set_session_id(atoi(it->first.c_str()));cUnreadInfo.set_unread_cnt(atoi(it->second.c_str()));cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_SINGLE);uint32_t nMsgId = 0;string strMsgData;IM::BaseDefine::MsgType nMsgType;// 从mysql获取最后一条未读消息 mysqlgetLastMsg(cUnreadInfo.session_id(), nUserId, nMsgId, strMsgData, nMsgType); if(IM::BaseDefine::MsgType_IsValid(nMsgType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nMsgType);cUnreadInfo.set_latest_msg_from_user_id(cUnreadInfo.session_id());lsUnreadCount.push_back(cUnreadInfo);nTotalCnt += cUnreadInfo.unread_cnt();}else{log("invalid msgType. userId=%u, peerId=%u, msgType=%u", nUserId, cUnreadInfo.session_id(), nMsgType);}}}else{log("hgetall %s failed!", strKey.c_str());}}else{log("no cache connection for unread");}
}
void CMessageModel::getLastMsg(uint32_t nFromId, uint32_t nToId, uint32_t& nMsgId, string& strMsgData, IM::BaseDefine::MsgType& nMsgType, uint32_t nStatus)
{uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, false);if (nRelateId != INVALID_VALUE){CDBManager* pDBManager = CDBManager::getInstance();// 读从库CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){string strTableName = "IMMessage_" + int2string(nRelateId % 8);string strSql = "select msgId,type,content from " + strTableName + " force index (idx_relateId_status_created) where relateId= " + int2string(nRelateId) + " and status = 0 order by created desc, id desc limit 1";CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());if (pResultSet){while (pResultSet->Next()){nMsgId = pResultSet->GetInt("msgId");nMsgType = IM::BaseDefine::MsgType(pResultSet->GetInt("type"));if (nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO){// "[语音]"加密后的字符串strMsgData = strAudioEnc;}else{strMsgData = pResultSet->GetString("content");}}delete pResultSet;}else{log("no result set: %s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);}else{log("no db connection_slave");}}else{log("no relation between %lu and %lu", nFromId, nToId);}
}

db_proxy_server回复信令CID_MSG_UNREAD_CNT_RESPONSE给msg_server,调用CDBServConn::_HandleUnreadMsgCountResponse

void CDBServConn::_HandleUnreadMsgCountResponse(CImPdu* pPdu)
{IM::Message::IMUnreadMsgCntRsp msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t user_id = msg.user_id();uint32_t total_cnt = msg.total_cnt();uint32_t user_unread_cnt = msg.unreadinfo_list_size();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());uint32_t handle = attach_data.GetHandle();log("HandleUnreadMsgCntResp, userId=%u, total_cnt=%u, user_unread_cnt=%u.", user_id,total_cnt, user_unread_cnt);CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, handle);if (pMsgConn && pMsgConn->IsOpen()) {msg.clear_attach_data();pPdu->SetPBMsg(&msg);pMsgConn->SendPdu(pPdu);}
}

这篇关于TeamTalk消息服务器(未读计数)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1126340

相关文章

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

部署Vue项目到服务器后404错误的原因及解决方案

《部署Vue项目到服务器后404错误的原因及解决方案》文章介绍了Vue项目部署步骤以及404错误的解决方案,部署步骤包括构建项目、上传文件、配置Web服务器、重启Nginx和访问域名,404错误通常是... 目录一、vue项目部署步骤二、404错误原因及解决方案错误场景原因分析解决方案一、Vue项目部署步骤

Linux流媒体服务器部署流程

《Linux流媒体服务器部署流程》文章详细介绍了流媒体服务器的部署步骤,包括更新系统、安装依赖组件、编译安装Nginx和RTMP模块、配置Nginx和FFmpeg,以及测试流媒体服务器的搭建... 目录流媒体服务器部署部署安装1.更新系统2.安装依赖组件3.解压4.编译安装(添加RTMP和openssl模块

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

JavaWeb-WebSocket浏览器服务器双向通信方式

《JavaWeb-WebSocket浏览器服务器双向通信方式》文章介绍了WebSocket协议的工作原理和应用场景,包括与HTTP的对比,接着,详细介绍了如何在Java中使用WebSocket,包括配... 目录一、概述二、入门2.1 POM依赖2.2 编写配置类2.3 编写WebSocket服务2.4 浏

查询SQL Server数据库服务器IP地址的多种有效方法

《查询SQLServer数据库服务器IP地址的多种有效方法》作为数据库管理员或开发人员,了解如何查询SQLServer数据库服务器的IP地址是一项重要技能,本文将介绍几种简单而有效的方法,帮助你轻松... 目录使用T-SQL查询方法1:使用系统函数方法2:使用系统视图使用SQL Server Configu

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

使用C/C++调用libcurl调试消息的方式

《使用C/C++调用libcurl调试消息的方式》在使用C/C++调用libcurl进行HTTP请求时,有时我们需要查看请求的/应答消息的内容(包括请求头和请求体)以方便调试,libcurl提供了多种... 目录1. libcurl 调试工具简介2. 输出请求消息使用 CURLOPT_VERBOSE使用 C

nginx-rtmp-module构建流媒体直播服务器实战指南

《nginx-rtmp-module构建流媒体直播服务器实战指南》本文主要介绍了nginx-rtmp-module构建流媒体直播服务器实战指南,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. RTMP协议介绍与应用RTMP协议的原理RTMP协议的应用RTMP与现代流媒体技术的关系2

mysqld_multi在Linux服务器上运行多个MySQL实例

《mysqld_multi在Linux服务器上运行多个MySQL实例》在Linux系统上使用mysqld_multi来启动和管理多个MySQL实例是一种常见的做法,这种方式允许你在同一台机器上运行多个... 目录1. 安装mysql2. 配置文件示例配置文件3. 创建数据目录4. 启动和管理实例启动所有实例