TeamTalk消息服务器(未读计数)

2024-09-01 07:44

本文主要是介绍TeamTalk消息服务器(未读计数),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

信令和协议设计

enum MessageCmdID {// ...... 省略无关逻辑 CID_MSG_UNREAD_CNT_REQUEST = 775,CID_MSG_UNREAD_CNT_RESPONSE = 776,// ...... 省略无关逻辑 
};message IMUnreadMsgCntReq{//cmd id:		0x0307required uint32 user_id = 1;optional bytes attach_data = 20;	
}message IMUnreadMsgCntRsp{//cmd id:		0x0308required uint32 user_id = 1;required uint32 total_cnt = 2; // 多个人的未读消息repeated IM.BaseDefine.UnreadInfo unreadinfo_list = 3;optional bytes attach_data = 20;
}message UnreadInfo{required uint32 session_id = 1; // 会话IDrequired SessionType session_type = 2; // 会话类型required uint32 unread_cnt = 3; // 未读消息数量required uint32 latest_msg_id = 4; // 最新的消息IDrequired bytes latest_msg_data = 5; // 最新的消息required MsgType latest_msg_type = 6;  // 消息类型required uint32 latest_msg_from_user_id = 7;  //发送的用户id
}

流程图:

请添加图片描述

代码分析

msg_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 CMsgConn::_HandleClientUnreadMsgCntRequest 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{// ...... 省略无关逻辑 switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑          case CID_MSG_UNREAD_CNT_REQUEST:_HandleClientUnreadMsgCntRequest(pPdu );break;              // ...... 省略无关逻辑 }
}void CMsgConn::_HandleClientUnreadMsgCntRequest(CImPdu* pPdu)
{log("HandleClientUnreadMsgCntReq, from_id=%u ", GetUserId());IM::Message::IMUnreadMsgCntReq msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));CDBServConn* pDBConn = get_db_serv_conn_for_login();if (pDBConn) {CDbAttachData attach(ATTACH_TYPE_HANDLE, m_handle, 0);msg.set_user_id(GetUserId());msg.set_attach_data(attach.GetBuffer(), attach.GetLength());pPdu->SetPBMsg(&msg);pDBConn->SendPdu(pPdu);}
}

db_proxy_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 DB_PROXY::getUnreadMsgCounter函数

值得注意的是,返回的未读消息里面包含每个会话的未读消息个数,消息类型,最后一条消息。

m_handler_map.insert(make_pair(uint32_t(CID_MSG_UNREAD_CNT_REQUEST), DB_PROXY::getUnreadMsgCounter));void getUnreadMsgCounter(CImPdu* pPdu, uint32_t conn_uuid)
{IM::Message::IMUnreadMsgCntReq msg;IM::Message::IMUnreadMsgCntRsp msgResp;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){CImPdu* pPduResp = new CImPdu;uint32_t nUserId = msg.user_id();list<IM::BaseDefine::UnreadInfo> lsUnreadCount;uint32_t nTotalCnt = 0;// 从redis获取未读消息数量 和 从mysql获取最后一条未读消息CMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);CGroupMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);msgResp.set_user_id(nUserId);msgResp.set_total_cnt(nTotalCnt);for(auto it= lsUnreadCount.begin(); it!=lsUnreadCount.end(); ++it){IM::BaseDefine::UnreadInfo* pInfo = msgResp.add_unreadinfo_list();pInfo->set_session_id(it->session_id());pInfo->set_session_type(it->session_type());pInfo->set_unread_cnt(it->unread_cnt());pInfo->set_latest_msg_id(it->latest_msg_id());pInfo->set_latest_msg_data(it->latest_msg_data());pInfo->set_latest_msg_type(it->latest_msg_type());pInfo->set_latest_msg_from_user_id(it->latest_msg_from_user_id());}log("userId=%d, unreadCnt=%u, totalCount=%u", nUserId, msgResp.unreadinfo_list_size(), nTotalCnt);msgResp.set_attach_data(msg.attach_data());pPduResp->SetPBMsg(&msgResp);pPduResp->SetSeqNum(pPdu->GetSeqNum());pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_UNREAD_CNT_RESPONSE);CProxyConn::AddResponsePdu(conn_uuid, pPduResp);}else{log("parse pb failed");}
}
void CMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{// redisCacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){map<string, string> mapUnread;string strKey = "unread_" + int2string(nUserId);bool bRet = pCacheConn->hgetAll(strKey, mapUnread);pCacheManager->RelCacheConn(pCacheConn);if(bRet){IM::BaseDefine::UnreadInfo cUnreadInfo;for (auto it = mapUnread.begin(); it != mapUnread.end(); it++) {cUnreadInfo.set_session_id(atoi(it->first.c_str()));cUnreadInfo.set_unread_cnt(atoi(it->second.c_str()));cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_SINGLE);uint32_t nMsgId = 0;string strMsgData;IM::BaseDefine::MsgType nMsgType;// 从mysql获取最后一条未读消息 mysqlgetLastMsg(cUnreadInfo.session_id(), nUserId, nMsgId, strMsgData, nMsgType); if(IM::BaseDefine::MsgType_IsValid(nMsgType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nMsgType);cUnreadInfo.set_latest_msg_from_user_id(cUnreadInfo.session_id());lsUnreadCount.push_back(cUnreadInfo);nTotalCnt += cUnreadInfo.unread_cnt();}else{log("invalid msgType. userId=%u, peerId=%u, msgType=%u", nUserId, cUnreadInfo.session_id(), nMsgType);}}}else{log("hgetall %s failed!", strKey.c_str());}}else{log("no cache connection for unread");}
}
void CMessageModel::getLastMsg(uint32_t nFromId, uint32_t nToId, uint32_t& nMsgId, string& strMsgData, IM::BaseDefine::MsgType& nMsgType, uint32_t nStatus)
{uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, false);if (nRelateId != INVALID_VALUE){CDBManager* pDBManager = CDBManager::getInstance();// 读从库CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){string strTableName = "IMMessage_" + int2string(nRelateId % 8);string strSql = "select msgId,type,content from " + strTableName + " force index (idx_relateId_status_created) where relateId= " + int2string(nRelateId) + " and status = 0 order by created desc, id desc limit 1";CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());if (pResultSet){while (pResultSet->Next()){nMsgId = pResultSet->GetInt("msgId");nMsgType = IM::BaseDefine::MsgType(pResultSet->GetInt("type"));if (nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO){// "[语音]"加密后的字符串strMsgData = strAudioEnc;}else{strMsgData = pResultSet->GetString("content");}}delete pResultSet;}else{log("no result set: %s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);}else{log("no db connection_slave");}}else{log("no relation between %lu and %lu", nFromId, nToId);}
}

db_proxy_server回复信令CID_MSG_UNREAD_CNT_RESPONSE给msg_server,调用CDBServConn::_HandleUnreadMsgCountResponse

void CDBServConn::_HandleUnreadMsgCountResponse(CImPdu* pPdu)
{IM::Message::IMUnreadMsgCntRsp msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t user_id = msg.user_id();uint32_t total_cnt = msg.total_cnt();uint32_t user_unread_cnt = msg.unreadinfo_list_size();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());uint32_t handle = attach_data.GetHandle();log("HandleUnreadMsgCntResp, userId=%u, total_cnt=%u, user_unread_cnt=%u.", user_id,total_cnt, user_unread_cnt);CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, handle);if (pMsgConn && pMsgConn->IsOpen()) {msg.clear_attach_data();pPdu->SetPBMsg(&msg);pMsgConn->SendPdu(pPdu);}
}

这篇关于TeamTalk消息服务器(未读计数)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1126340

相关文章

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

Linux服务器Java启动脚本

Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包, 通常我们会使用nohup直接启动,但是还是需要手动停止然后再次启动, 那如何更优雅的在服务器上启动jar包呢,让我们一起探讨一下吧。 1、初版 第一个版本是常用的做法,直接使用nohup后台启动jar包, 并将日志输出到当前文件夹n

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

速盾:直播 cdn 服务器带宽?

在当今数字化时代,直播已经成为了一种非常流行的娱乐和商业活动形式。为了确保直播的流畅性和高质量,直播平台通常会使用 CDN(Content Delivery Network,内容分发网络)服务器来分发直播流。而 CDN 服务器的带宽则是影响直播质量的一个重要因素。下面我们就来探讨一下速盾视角下的直播 CDN 服务器带宽问题。 一、直播对带宽的需求 高清视频流 直播通常需要传输高清视频

一种改进的red5集群方案的应用、基于Red5服务器集群负载均衡调度算法研究

转自: 一种改进的red5集群方案的应用: http://wenku.baidu.com/link?url=jYQ1wNwHVBqJ-5XCYq0PRligp6Y5q6BYXyISUsF56My8DP8dc9CZ4pZvpPz1abxJn8fojMrL0IyfmMHStpvkotqC1RWlRMGnzVL1X4IPOa_  基于Red5服务器集群负载均衡调度算法研究 http://ww

RTMP流媒体服务器 crtmpserver

http://www.oschina.net/p/crtmpserver crtmpserver又称rtmpd是Evostream Media Server(www.evostream.com)的社区版本采用GPLV3授权 其主要作用为一个高性能的RTMP流媒体服务器,可以实现直播与点播功能多终端支持功能,在特定情况下是FMS的良好替代品。 支持RTMP的一堆协议(RT

云原生之高性能web服务器学习(持续更新中)

高性能web服务器 1 Web服务器的基础介绍1.1 Web服务介绍1.1.1 Apache介绍1.1.2 Nginx-高性能的 Web 服务端 2 Nginx架构与安装2.1 Nginx概述2.1.1 Nginx 功能介绍2.1.2 基础特性2.1.3 Web 服务相关的功能 2.2 Nginx 架构和进程2.2.1 架构2.2.2 Ngnix进程结构 2.3 Nginx 模块介绍2.4

阿里云服务器ces

允许公网通过 HTTP、HTTPS 等服务访问实例 https://help.aliyun.com/document_detail/25475.html?spm=5176.2020520101.0.0.3ca96b0b3KGTPq#allowHttp

常见的服务器

常见的Web服务器 1、Tomcat:Tomcat和Java结合得最好,是Oracle官方推荐的JSP服务器。Tomcat是开源的Web服务器,经过长时间的发展,性能、稳定性等方面都非常优秀。 2、Jetty:另一个优秀的Web服务器。Jetty有个更大的优点是,Jetty可作为一个嵌入式服务器,即:如果在应用中加入Jetty的JAR文件,应用可在代码中对外提供Web服务。 3、Resin:

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队