TeamTalk消息服务器(未读计数)

2024-09-01 07:44

本文主要是介绍TeamTalk消息服务器(未读计数),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

信令和协议设计

enum MessageCmdID {// ...... 省略无关逻辑 CID_MSG_UNREAD_CNT_REQUEST = 775,CID_MSG_UNREAD_CNT_RESPONSE = 776,// ...... 省略无关逻辑 
};message IMUnreadMsgCntReq{//cmd id:		0x0307required uint32 user_id = 1;optional bytes attach_data = 20;	
}message IMUnreadMsgCntRsp{//cmd id:		0x0308required uint32 user_id = 1;required uint32 total_cnt = 2; // 多个人的未读消息repeated IM.BaseDefine.UnreadInfo unreadinfo_list = 3;optional bytes attach_data = 20;
}message UnreadInfo{required uint32 session_id = 1; // 会话IDrequired SessionType session_type = 2; // 会话类型required uint32 unread_cnt = 3; // 未读消息数量required uint32 latest_msg_id = 4; // 最新的消息IDrequired bytes latest_msg_data = 5; // 最新的消息required MsgType latest_msg_type = 6;  // 消息类型required uint32 latest_msg_from_user_id = 7;  //发送的用户id
}

流程图:

请添加图片描述

代码分析

msg_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 CMsgConn::_HandleClientUnreadMsgCntRequest 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{// ...... 省略无关逻辑 switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑          case CID_MSG_UNREAD_CNT_REQUEST:_HandleClientUnreadMsgCntRequest(pPdu );break;              // ...... 省略无关逻辑 }
}void CMsgConn::_HandleClientUnreadMsgCntRequest(CImPdu* pPdu)
{log("HandleClientUnreadMsgCntReq, from_id=%u ", GetUserId());IM::Message::IMUnreadMsgCntReq msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));CDBServConn* pDBConn = get_db_serv_conn_for_login();if (pDBConn) {CDbAttachData attach(ATTACH_TYPE_HANDLE, m_handle, 0);msg.set_user_id(GetUserId());msg.set_attach_data(attach.GetBuffer(), attach.GetLength());pPdu->SetPBMsg(&msg);pDBConn->SendPdu(pPdu);}
}

db_proxy_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 DB_PROXY::getUnreadMsgCounter函数

值得注意的是,返回的未读消息里面包含每个会话的未读消息个数,消息类型,最后一条消息。

m_handler_map.insert(make_pair(uint32_t(CID_MSG_UNREAD_CNT_REQUEST), DB_PROXY::getUnreadMsgCounter));void getUnreadMsgCounter(CImPdu* pPdu, uint32_t conn_uuid)
{IM::Message::IMUnreadMsgCntReq msg;IM::Message::IMUnreadMsgCntRsp msgResp;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){CImPdu* pPduResp = new CImPdu;uint32_t nUserId = msg.user_id();list<IM::BaseDefine::UnreadInfo> lsUnreadCount;uint32_t nTotalCnt = 0;// 从redis获取未读消息数量 和 从mysql获取最后一条未读消息CMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);CGroupMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);msgResp.set_user_id(nUserId);msgResp.set_total_cnt(nTotalCnt);for(auto it= lsUnreadCount.begin(); it!=lsUnreadCount.end(); ++it){IM::BaseDefine::UnreadInfo* pInfo = msgResp.add_unreadinfo_list();pInfo->set_session_id(it->session_id());pInfo->set_session_type(it->session_type());pInfo->set_unread_cnt(it->unread_cnt());pInfo->set_latest_msg_id(it->latest_msg_id());pInfo->set_latest_msg_data(it->latest_msg_data());pInfo->set_latest_msg_type(it->latest_msg_type());pInfo->set_latest_msg_from_user_id(it->latest_msg_from_user_id());}log("userId=%d, unreadCnt=%u, totalCount=%u", nUserId, msgResp.unreadinfo_list_size(), nTotalCnt);msgResp.set_attach_data(msg.attach_data());pPduResp->SetPBMsg(&msgResp);pPduResp->SetSeqNum(pPdu->GetSeqNum());pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_UNREAD_CNT_RESPONSE);CProxyConn::AddResponsePdu(conn_uuid, pPduResp);}else{log("parse pb failed");}
}
void CMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{// redisCacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){map<string, string> mapUnread;string strKey = "unread_" + int2string(nUserId);bool bRet = pCacheConn->hgetAll(strKey, mapUnread);pCacheManager->RelCacheConn(pCacheConn);if(bRet){IM::BaseDefine::UnreadInfo cUnreadInfo;for (auto it = mapUnread.begin(); it != mapUnread.end(); it++) {cUnreadInfo.set_session_id(atoi(it->first.c_str()));cUnreadInfo.set_unread_cnt(atoi(it->second.c_str()));cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_SINGLE);uint32_t nMsgId = 0;string strMsgData;IM::BaseDefine::MsgType nMsgType;// 从mysql获取最后一条未读消息 mysqlgetLastMsg(cUnreadInfo.session_id(), nUserId, nMsgId, strMsgData, nMsgType); if(IM::BaseDefine::MsgType_IsValid(nMsgType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nMsgType);cUnreadInfo.set_latest_msg_from_user_id(cUnreadInfo.session_id());lsUnreadCount.push_back(cUnreadInfo);nTotalCnt += cUnreadInfo.unread_cnt();}else{log("invalid msgType. userId=%u, peerId=%u, msgType=%u", nUserId, cUnreadInfo.session_id(), nMsgType);}}}else{log("hgetall %s failed!", strKey.c_str());}}else{log("no cache connection for unread");}
}
void CMessageModel::getLastMsg(uint32_t nFromId, uint32_t nToId, uint32_t& nMsgId, string& strMsgData, IM::BaseDefine::MsgType& nMsgType, uint32_t nStatus)
{uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, false);if (nRelateId != INVALID_VALUE){CDBManager* pDBManager = CDBManager::getInstance();// 读从库CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){string strTableName = "IMMessage_" + int2string(nRelateId % 8);string strSql = "select msgId,type,content from " + strTableName + " force index (idx_relateId_status_created) where relateId= " + int2string(nRelateId) + " and status = 0 order by created desc, id desc limit 1";CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());if (pResultSet){while (pResultSet->Next()){nMsgId = pResultSet->GetInt("msgId");nMsgType = IM::BaseDefine::MsgType(pResultSet->GetInt("type"));if (nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO){// "[语音]"加密后的字符串strMsgData = strAudioEnc;}else{strMsgData = pResultSet->GetString("content");}}delete pResultSet;}else{log("no result set: %s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);}else{log("no db connection_slave");}}else{log("no relation between %lu and %lu", nFromId, nToId);}
}

db_proxy_server回复信令CID_MSG_UNREAD_CNT_RESPONSE给msg_server,调用CDBServConn::_HandleUnreadMsgCountResponse

void CDBServConn::_HandleUnreadMsgCountResponse(CImPdu* pPdu)
{IM::Message::IMUnreadMsgCntRsp msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t user_id = msg.user_id();uint32_t total_cnt = msg.total_cnt();uint32_t user_unread_cnt = msg.unreadinfo_list_size();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());uint32_t handle = attach_data.GetHandle();log("HandleUnreadMsgCntResp, userId=%u, total_cnt=%u, user_unread_cnt=%u.", user_id,total_cnt, user_unread_cnt);CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, handle);if (pMsgConn && pMsgConn->IsOpen()) {msg.clear_attach_data();pPdu->SetPBMsg(&msg);pMsgConn->SendPdu(pPdu);}
}

这篇关于TeamTalk消息服务器(未读计数)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1126340

相关文章

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

CentOS 7部署主域名服务器 DNS的方法

《CentOS7部署主域名服务器DNS的方法》文章详细介绍了在CentOS7上部署主域名服务器DNS的步骤,包括安装BIND服务、配置DNS服务、添加域名区域、创建区域文件、配置反向解析、检查配置... 目录1. 安装 BIND 服务和工具2.  配置 BIND 服务3 . 添加你的域名区域配置4.创建区域

Windows Server服务器上配置FileZilla后,FTP连接不上?

《WindowsServer服务器上配置FileZilla后,FTP连接不上?》WindowsServer服务器上配置FileZilla后,FTP连接错误和操作超时的问题,应该如何解决?首先,通过... 目录在Windohttp://www.chinasem.cnws防火墙开启的情况下,遇到的错误如下:无法与

Windows server服务器使用blat命令行发送邮件

《Windowsserver服务器使用blat命令行发送邮件》在linux平台的命令行下可以使用mail命令来发送邮件,windows平台没有内置的命令,但可以使用开源的blat,其官方主页为ht... 目录下载blatBAT命令行示例备注总结在linux平台的命令行下可以使用mail命令来发送邮件,Win

Ubuntu 22.04 服务器安装部署(nginx+postgresql)

《Ubuntu22.04服务器安装部署(nginx+postgresql)》Ubuntu22.04LTS是迄今为止最好的Ubuntu版本之一,很多linux的应用服务器都是选择的这个版本... 目录是什么让 Ubuntu 22.04 LTS 变得安全?更新了安全包linux 内核改进一、部署环境二、安装系统

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

nginx配置多域名共用服务器80端口

《nginx配置多域名共用服务器80端口》本文主要介绍了配置Nginx.conf文件,使得同一台服务器上的服务程序能够根据域名分发到相应的端口进行处理,从而实现用户通过abc.com或xyz.com直... 多个域名,比如两个域名,这两个域名其实共用一台服务器(意味着域名解析到同一个IP),一个域名为abc

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在