本文主要是介绍TeamTalk源码分析之msg_server,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
消息服务器支持TCP长连接和HTTP长轮询两种接入方式,本节主要讲SOCKET的轮训,先看如下几个函数。
void CMsgConn::OnConnect(net_handle_thandle)
{
m_handle = handle;
m_login_time = get_tick_count();
g_msg_conn_map.insert(make_pair(handle,this));
netlib_option(handle,NETLIB_OPT_SET_CALLBACK, (void*)imconn_callback);
netlib_option(handle,NETLIB_OPT_SET_CALLBACK_DATA, (void*)&g_msg_conn_map);
netlib_option(handle,NETLIB_OPT_GET_REMOTE_IP, (void*)&m_peer_ip);
netlib_option(handle,NETLIB_OPT_GET_REMOTE_PORT, (void*)&m_peer_port);
//根据handle,为socket类添加如上4个参数。
}
// for client connect in
void msg_serv_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
if (msg ==NETLIB_MSG_CONNECT)
{
CMsgConn* pConn = new CMsgConn();
pConn->OnConnect(handle);
//有客户端connect后,将accept后的SOCKET跟新建的CMsgConn相关联,并将SOCKET加入轮训。
}
else
{
log("!!!error msg: %d\n", msg);
}
}
int main(intargc,char*argv[])
{
… //读配置,初始化SOCKET等操作。
CStrExplode listen_ip_list(listen_ip, ';');
for (uint32_ti = 0;i <listen_ip_list.GetItemCnt();i++) {
ret = netlib_listen(listen_ip_list.GetItem(i),listen_port,msg_serv_callback,NULL);
if (ret ==NETLIB_ERROR)
return ret;
}
……
netlib_eventloop(); //开始轮训,处理定时回调,SOCKET。
}
netlib_eventloop执行后,进开始进行循环,处理定时回调,SOCKET轮训,以下以读为例。
void CBaseSocket::OnRead()
{
if (m_state ==SOCKET_STATE_LISTENING)
{
_AcceptNewSocket();
}
else
{
u_long avail = 0;
if ( (ioctlsocket(m_socket,FIONREAD, &avail) ==SOCKET_ERROR) || (avail == 0) )
{
m_callback(m_callback_data,NETLIB_MSG_CLOSE, (net_handle_t)m_socket,NULL);
}
else
{
m_callback(m_callback_data,NETLIB_MSG_READ, (net_handle_t)m_socket,NULL);
}
}
}
SOCKET可读后,执行如上CBaseSocket::OnRead,紧接着会执行m_callback(m_callback_data,NETLIB_MSG_READ, (net_handle_t)m_socket,NULL);,此时会转到如下回调函数中:
void imconn_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
NOTUSED_ARG(handle);
NOTUSED_ARG(pParam);
if (!callback_data)
return;
ConnMap_t* conn_map = (ConnMap_t*)callback_data;
CImConn* pConn = FindImConn(conn_map,handle);
if (!pConn)
return;
//log("msg=%d, handle=%d\n", msg, handle);
switch (msg)
{
case NETLIB_MSG_CONFIRM:
pConn->OnConfirm();
break;
case NETLIB_MSG_READ:
pConn->OnRead();
break;
case NETLIB_MSG_WRITE:
pConn->OnWrite();
break;
case NETLIB_MSG_CLOSE:
pConn->OnClose();
break;
default:
log("!!!imconn_callback error msg: %d\n", msg);
break;
}
pConn->ReleaseRef();
}
可以看到,会执行如下函数,先从SOCKET中读出数据,在后面会执行CImPdu::ReadPdu。
void CImConn::OnRead()
{
for (;;)
{
uint32_t free_buf_len = m_in_buf.GetAllocSize() -m_in_buf.GetWriteOffset();
if (free_buf_len <READ_BUF_SIZE)
m_in_buf.Extend(READ_BUF_SIZE);
int ret = netlib_recv(m_handle,m_in_buf.GetBuffer() +m_in_buf.GetWriteOffset(),READ_BUF_SIZE);
if (ret <= 0)
break;
m_recv_bytes += ret;
m_in_buf.IncWriteOffset(ret);
m_last_recv_tick = get_tick_count();
}
if (m_policy_conn) {
return;
}
// no received data is read by ReadPdu(), check if this is a flash security policy request
if (m_recv_bytes ==m_in_buf.GetWriteOffset()) {
if ( (m_in_buf.GetBuffer()[0] =='<') && (g_policy_content !=NULL) ) {
log("policy request, handle=%d\n", m_handle);
m_policy_conn = true;
Send(g_policy_content,g_policy_len);
return;
}
}
try {
CImPdu* pPdu = NULL;
while ( ( pPdu = CImPdu::ReadPdu(m_in_buf.GetBuffer(),m_in_buf.GetWriteOffset()) ) )
{
uint32_t pdu_len = pPdu->GetLength();
HandlePdu(pPdu);
m_in_buf.Read(NULL,pdu_len);
delete pPdu;
++g_recv_pkt_cnt;
}
} catch (CPduException&ex) {
log("!!!catch exception, sid=%u, cid=%u, err_code=%u, err_msg=%s, close the connection\n",
ex.GetModuleId(),ex.GetCommandId(),ex.GetErrorCode(),ex.GetErrorMsg());
OnClose();
}
}
CImPdu* CImPdu::ReadPdu(uchar_t *buf,uint32_tlen)
{
uint32_t pdu_len = 0;
if (!_IsPduAvailable(buf,len,pdu_len))
return NULL;
uint16_t service_id = CByteStream::ReadUint16(buf + 4);
uint16_t command_id = CByteStream::ReadUint16(buf + 6);
CImPdu* pPdu = NULL;
switch (service_id)
{
case SID_LOGIN:
pPdu = ReadPduLogin(command_id, buf, pdu_len);
break;
case SID_BUDDY_LIST:
pPdu = ReadPduBuddyList(command_id, buf, pdu_len);
break;
case SID_GROUP:
pPdu = ReadPduGroup(command_id, buf, pdu_len);
break;
case SID_MSG:
pPdu = ReadPduMsg(command_id, buf, pdu_len);
break;
case SID_OTHER:
pPdu = ReadPduOther(command_id, buf, pdu_len);
break;
case SID_SWITCH_SERVICE:
pPdu = ReadPduSwitchService(command_id, buf, pdu_len);
break;
case SID_FILE:
pPdu = ReadPduFile(command_id, buf, pdu_len);
break;
default:
throw CPduException(service_id, command_id, ERROR_CODE_WRONG_SERVICE_ID,"wrong service id");
}
pPdu->_SetIncomingLen(pdu_len);
pPdu->_SetIncomingBuf(buf);
return pPdu;
}
现在以登陆为例子,进入CImPdu*CImPdu::ReadPduLogin。
CImPdu* CImPdu::ReadPduLogin(uint16_tcommand_id,uchar_t*pdu_buf,uint32_tpdu_len)
{
CImPdu* pPdu = NULL;
switch (command_id) {
case CID_LOGIN_REQ_MSGSERVER:
pPdu = new CImPduMsgServRequest(pdu_buf,pdu_len);
break;
case CID_LOGIN_RES_MSGSERVER:
pPdu = new CImPduMsgServResponse(pdu_buf,pdu_len);
break;
case CID_LOGIN_REQ_USERLOGIN:
pPdu = new CImPduLoginRequest(pdu_buf,pdu_len);
break;
case CID_LOGIN_RES_USERLOGIN:
pPdu = new CImPduLoginResponse(pdu_buf,pdu_len);
break;
case CID_LOGIN_KICK_USER:
pPdu = new CImPduKickUser(pdu_buf,pdu_len);
break;
default:
throw CPduException(SID_LOGIN, command_id, ERROR_CODE_WRONG_COMMAND_ID, "wrong command id");
}
return pPdu;
}
可以看到是在打包回复的内容,返回pPdu,然后回到CImConn::OnRead,执行HandlePdu(pPdu)-> _HandleLoginRequest-> SendPdu(&pdu)-> CBaseSocket::Send。
这篇关于TeamTalk源码分析之msg_server的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!