本文主要是介绍ACE_Proactor网络通信示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
注:本文仅对使用ACE进行网络通信进行演示说明。本文中的代码皆使用doxgen的注释风格。本文中使用的事件机制,其原理与实现请参考[ 基于C++的事件机制设计[2.0]]一文。
ACE的Proactor对Epoll和IOCP进行了良好包装,因此,使用ACE来进行网络开发是相当的便利,性能也不差。闲言少叙,看代码。
这里以TCP协议进行流式通信。我们需要解析流,得出每次接收到的数据包大小和包含的数据域,假定我们的包结构如下:
包序列号(32Bit) | 长度(16Bit) | 数据域(大小为长度所表示的字节)... | (下一包)
通过分析由包序列号和长度组成的包头来解决半包,粘包等问题,许多其它文章也有描述,这里就省略了。
这样可以确定我们的包头结构如下:
- #pragma pack(push)
- #pragma pack(1)
- /**
- * @brief Tcp包头结构
- */
- typedef struct tag_TTcpPackHeader
- {
- unsigned int seq; //< 包序号
- unsigned short len; //< 包长度
- }TTcpPackHeader;
- #pragma pack(pop)
- /// 包头尺寸宏
- #define TCP_PACK_HEADER_SIZE sizeof(tag_TTcpPackHeader)
需要注意的是,要求在字节边界对齐。
现在来看看通过ACE来实现TCP通信需要哪些东西:
INET_Addr 用于地址访问
Task_Base 用于线程模型
Message_Block 用于消息传递和数据容器
Asynch_IO 异步通信
Proactor IOCP架构
并且,要建立这样的通信架构,我们需要:
一个Acceptor:用于接受连接
一个Handler:对应于每个连接句柄,并用于数据的发送/接收。
一个事件分发线程:以事件的形式将接收到数据分发出去,并在对应的句柄上进行数据发送。
本示例并没有采用在接收到数据时立即进行处理的方式,而是通过创建一个额外的事件分发线程的形式,将数据包投递到该线程的消息队列中,由该线程向外派送。因此,数据处理与网络层是隔离的,且网络层能专注于通信,最大的发挥效用。
好了,下面来看看实现:
先看Handler,参考ACE_Service_Handler,我们需要重载open(),addresses(), handle_read_stream(),handle_write_stream(),以在连接打开时进行读写流对象的初始化、获取客户端地址,处理输入/输入流。
注:以T作为类的开头而不是C,是出于对曾经伟大的BORLAND的深刻怀念。
注:成员又以m_开头,是出于对现而今仍伟大的MS的深刻怨念。
- /**
- * @class TTcpHandler
- * @brief Tcp连接句柄
- */
- class TTcpHandler : public ACE_Service_Handler
- {
- public:
- /**
- * @brief 客户端连接事件类型定义
- * @param [in] ACE_UINT32 客户端地址
- * @param [in] ACE_UINT16 客户端端口
- * @param [in] TTcpHandler* 连接句柄
- */
- typedef TEvent<void, ACE_UINT32, ACE_UINT16, TTcpHandler *> TOnClientConnect;
- /**
- * @brief 客户端断开连接事件类型定义
- * @param [in] ACE_UINT32 客户端地址
- * @param [in] ACE_UINT16 客户端端口
- */
- typedef TEvent<void, ACE_UINT32, ACE_UINT16> TOnClientDisconnect;
- /**
- * @brief 客户端连接验证事件
- * @param [in] ACE_UINT32 客户端地址
- * @param [in] ACE_UINT16 客户端端口
- * @return bool
- * - true 验证通过
- * - false 验证失败
- */
- typedef TEvent<bool, ACE_UINT32, ACE_UINT16> TOnClientValidate;
- /**
- * @brief 接收到客户端数据事件类型定义
- * @param [in] ACE_UINT32 客户端地址
- * @param [in] ACE_UINT16 客户端端口
- * @param [in] unsigned int 数据包序列号
- * @param [in] const char* 数据区域指针
- * @param [in] size_t 数据长度
- */
- typedef TEvent<void, ACE_UINT32, ACE_UINT16, unsigned int, const char*, unsigned short> TOnDataReceive;
- /**
- * @brief 成功发送客户端数据事件类型定义
- * @param [in] ACE_UINT32 客户端地址
- * @param [in] ACE_UINT16 客户端端口
- * @param [in] unsigned int 数据包序列号
- * @param [in] const char* 数据区域指针
- * @param [in] size_t 数据长度
- */
- typedef TEvent<void, ACE_UINT32, ACE_UINT16, unsigned int, const char*, unsigned short> TOnDataSendSucceeded;
- /**
- * @brief 失败发送客户端数据事件类型定义
- * @param [in] ACE_UINT32 客户端地址
- * @param [in] ACE_UINT16 客户端端口
- * @param [in] unsigned int 数据包序列号
- * @param [in] const char* 数据区域指针
- * @param [in] size_t 数据长度
- */
- typedef TEvent<void, ACE_UINT32, ACE_UINT16, unsigned int, const char*, unsigned short> TOnDataSendFailed;
- private:
- ACE_Asynch_Read_Stream m_Reader; //< 异步读数据流
- ACE_Asynch_Write_Stream m_Writer; //< 异步写数据流
- ACE_Message_Block* m_CurDataMB; //< 当前读取数据
- ACE_INET_Addr m_ClientAddr; //< 客户端地址
- public:
- /**
- * @name 事件句柄
- * @{
- */
- DECL_PROP(TOnClientConnect, OnClientConnect) //< 客户端连接事件句柄
- DECL_PROP(TOnClientDisconnect, OnClientDisconnect) //< 客户端断开事件句柄
- DECL_PROP(TOnDataReceive, OnDataReceive) //< 接收到数据的事件句柄
- DECL_PROP(TOnDataSendSucceeded, OnDataSendSucceeded) //< 成功发送数据的事件句柄
- DECL_PROP(TOnDataSendFailed, OnDataSendFailed) //< 发送数据失败的事件句柄
- /**
- * @}
- */
- public:
- /// ctor
- TTcpHandler();
- /// dtor
- ~TTcpHandler();
- /**
- * @brief 发送数据
- * @param [in] unsigned int 数据包序列号
- * @param [in] const char* 要发送的数据区域指针
- * @param [in] size_t 要发送的数据长度
- * @return int
- * - 0 成功
- * - 1 失败
- */
- int send(unsigned int seq, const char* data, unsigned short size);
- /**
- * @brief 打开句柄
- * @see ACE_Service_Handler
- */
- virtual void open(ACE_HANDLE h, ACE_Message_Block& mb);
- /**
- * @brief 获取地址
- * @see ACE_Service_Handler
- */
- virtual void addresses (const ACE_INET_Addr &remote_address,
- const ACE_INET_Addr &local_address);
- /**
- * @brief 读取流数据
- * @see ACE_Service_Handler
- */
- virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result& result);
- /**
- * @brief 写入流数据
- * @see ACE_Service_Handler
- */
- virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result& result);
- /**
- * @brief 初始化当前数据接收缓冲事件
- */
- void initCurDataMB();
- }; // class TTcpHandler
而相应滴,Acceptor在接受连接时,产生出的Handler应该是TTcpHandler类型,其定义如下:
注意,为了将事件句柄与连接句柄(TTcpHandler)挂钩,这里重载了make_handler()。而重载validate_connection则是为了让连接验证事件能够在恰当的时机被激发。
- /**
- * @class TTcpAcceptor
- * @brief TCP接受器
- * @see ACE_Asynch_Acceptor
- * @see TTcpHandler
- */
- class TTcpAcceptor : public ACE_Asynch_Acceptor<TTcpHandler>
- {
- public:
- /**
- * @name TCP事件句柄
- * @see TTcpHandler
- * @{
- */
- DECL_PROP(TTcpHandler::TOnClientConnect, OnClientConnect)
- DECL_PROP(TTcpHandler::TOnClientDisconnect, OnClientDisconnect)
- DECL_PROP(TTcpHandler::TOnClientValidate, OnClientValidate)
- DECL_PROP(TTcpHandler::TOnDataReceive, OnDataReceive)
- DECL_PROP(TTcpHandler::TOnDataSendSucceeded, OnDataSendSucceeded)
- DECL_PROP(TTcpHandler::TOnDataSendFailed, OnDataSendFailed)
- /**
- * @}
- */
- protected:
- /**
- * @brief 连接验证
- * @note 激发 OnClientValidate 事件 @see TOnClientValidate
- * @see ACE_Asynch_Acceptor
- */
- virtual int validate_connection (const ACE_Asynch_Accept::Result& result,
- const ACE_INET_Addr &remote,
- const ACE_INET_Addr& local);
- /**
- * @brief 创建连接句柄事件
- * @see ACE_Asynch_Acceptor
- */
- virtual TTcpHandler* make_handler(void);
- }; // class TTcpAcceptor
有了Acceptor和Handler,还需要使之运行于Proactor模式下,因此有了以下线程:
- /**
- * @class TTcpNetThread
- * @brief TCP网络线程
- * @see ACE_Task_Base
- * @see ACE_Proactor
- */
- class TTcpNetThread : public ACE_Task_Base
- {
- public:
- /**
- * @name TCP事件句柄
- * @see TTcpHandler
- * @{
- */
- DECL_PROP(TTcpHandler::TOnClientConnect, OnClientConnect)
- DECL_PROP(TTcpHandler::TOnClientDisconnect, OnClientDisconnect)
- DECL_PROP(TTcpHandler::TOnClientValidate, OnClientValidate)
- DECL_PROP(TTcpHandler::TOnDataReceive, OnDataReceive)
- DECL_PROP(TTcpHandler::TOnDataSendSucceeded, OnDataSendSucceeded)
- DECL_PROP(TTcpHandler::TOnDataSendFailed, OnDataSendFailed)
- /**
- * @}
- */
- /// 运行
- int open();
- /// 停止运行
- int close();
- protected:
- /// 线程函数
- virtual int svc();
- };
最后再看看事件分发线程,该线程也是对上述实现的聚合和封装,对外暴露事件和发送方法:
注意,该类也负责响应TTcpNetThread所激发的事件,所以需要派生自TObject。
- /**
- * @class TTcp
- * @brief TCP接收和事件处理代理线程
- */
- class TTcp : public TObject, public ACE_Task<ACE_MT_SYNCH>
- {
- public:
- /**
- * @name 重定义事件类型
- * @see TTcpHandler
- * @{
- */
- typedef TTcpHandler::TOnClientConnect TOnClientConnect;
- typedef TTcpHandler::TOnClientDisconnect TOnClientDisconnect;
- typedef TTcpHandler::TOnClientValidate TOnClientValidate;
- typedef TTcpHandler::TOnDataReceive TOnDataReceive;
- typedef TTcpHandler::TOnDataSendSucceeded TOnDataSendSucceeded;
- typedef TTcpHandler::TOnDataSendFailed TOnDataSendFailed;
- /**
- * @}
- */
- private:
- /**
- * @name 成员变量
- * @{
- */
- ACE_Recursive_Thread_Mutex m_Lock; //< 线程锁
- hash_map<unsigned __int64, TTcpHandler *> m_AddrMap; //< 地址/句柄映射
- TTcpNetThread* m_TcpNetThd;
- /**
- * @}
- */
- public:
- /**
- * @name TCP事件句柄
- * @see TTcpHandler
- * @{
- */
- DECL_PROP(TTcpHandler::TOnClientConnect, OnClientConnect)
- DECL_PROP(TTcpHandler::TOnClientDisconnect, OnClientDisconnect)
- DECL_PROP(TTcpHandler::TOnClientValidate, OnClientValidate)
- DECL_PROP(TTcpHandler::TOnDataReceive, OnDataReceive)
- DECL_PROP(TTcpHandler::TOnDataSendSucceeded, OnDataSendSucceeded)
- DECL_PROP(TTcpHandler::TOnDataSendFailed, OnDataSendFailed)
- /**
- * @}
- */
- public:
- /// ctor
- TTcp();
- /// dtor
- ~TTcp();
- /// 运行
- void open();
- /// 停止
- void close();
- /// 发送数据
- int send(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* buf, unsigned short len);
- private:
- /// 线程函数
- virtual int svc();
- private:
- /**
- * @name TTcpNetThread 事件处理方法
- * 关于事件原型的定义,请参考 @see TTcpHandler
- * @{
- */
- void tcpNetThread_OnClientConnect(ACE_UINT32 ip, ACE_UINT16 port, TTcpHandler* handler);
- void tcpNetThread_OnClientDisconnect(ACE_UINT32 ip, ACE_UINT16 port);
- void tcpNetThread_OnDataReceive(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size);
- void tcpNetThread_OnDataSendSucceeded(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size);
- void tcpNetThread_OnDataSendFailed(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size);
- /**
- * @}
- */
- }; // class TTcp
现在基本格调已经确定,需要做的是编写具体实现代码了。
此乃末技。
应用ACE来作为底层通信的框架,已经是许多年前的技术了,这里纯粹是凑字数,骗更新滴。这样的老东西,确实是相当的让人无语。
现在我们一步步来看看实现:
先是TTcpAcceptor,该类仅重载了两个方法,如下:
- #include "TCPAcceptor.h"
- namespace igame
- {
- int TTcpAcceptor::validate_connection (const ACE_Asynch_Accept::Result& result,
- const ACE_INET_Addr &remote,
- const ACE_INET_Addr& local)
- {
- if (m_OnClientValidate.valid())
- // 这里激发TOnClientValidate事件
- return m_OnClientValidate(remote.get_ip_address(), remote.get_port_number()) ? 0 : -1;
- else
- return 0; // 默认允许连接
- }
- TTcpHandler* TTcpAcceptor::make_handler(void)
- {
- TTcpHandler* handler = 0;
- ACE_NEW_RETURN (handler, TTcpHandler(), 0);
- // 设置事件句柄
- handler->setOnClientConnect(m_OnClientConnect);
- handler->setOnClientDisconnect(m_OnClientDisconnect);
- handler->setOnDataReceive(m_OnDataReceive);
- handler->setOnDataSendSucceeded(m_OnDataSendSucceeded);
- handler->setOnDataSendFailed(m_OnDataSendFailed);
- return handler;
- }
- } // namespace igame
复杂的部分在TTcpHandler,该类不仅需要接收数据(拼包),也要处理发送:
- #include "TcpHandler.h"
- namespace igame
- {
- TTcpHandler::TTcpHandler()
- :m_CurDataMB(0) // 初始化
- { }
- TTcpHandler::~TTcpHandler()
- {
- if (handle() != ACE_INVALID_HANDLE)
- {
- ACE_OS::closesocket(handle()); // 关闭句柄
- #ifdef _DEBUG
- // 打印调试信息
- ACE_TCHAR remoteAddrStr[128];
- m_ClientAddr.addr_to_string(remoteAddrStr, sizeof(remoteAddrStr) / sizeof(ACE_TCHAR));
- ACE_DEBUG((LM_INFO, ACE_TEXT("Disconnect from %s/n"), remoteAddrStr));
- #endif
- // 客户端断开
- m_OnClientDisconnect(m_ClientAddr.get_ip_address(), m_ClientAddr.get_port_number());
- if (m_CurDataMB)
- m_CurDataMB->release();
- }
- }
- int TTcpHandler::send(unsigned int seq, const char* data, unsigned short dataSize)
- {
- ACE_Message_Block* dataMB = 0;
- ACE_NEW_NORETURN(dataMB, ACE_Message_Block(sizeof(unsigned int) + sizeof(unsigned short) + dataSize));
- short len = dataSize;
- dataMB->copy((const char *)&seq, sizeof(unsigned int)); // 这里没有处理seq
- dataMB->copy((const char *)&len, sizeof(unsigned short));
- dataMB->copy((const char *)data, dataSize);
- int ret = m_Writer.write(*dataMB, dataMB->length()); // 发送
- if (ret == -1)
- m_OnDataSendFailed(m_ClientAddr.get_ip_address(), m_ClientAddr.get_port_number(), seq, data, dataSize);
- else
- m_OnDataSendSucceeded(m_ClientAddr.get_ip_address(), m_ClientAddr.get_port_number(), seq, data, dataSize);
- return ret;
- }
- void TTcpHandler::addresses (const ACE_INET_Addr &remote_address,
- const ACE_INET_Addr &local_address)
- {
- m_ClientAddr = remote_address; // 取得客户端地址
- }
- void TTcpHandler::open(ACE_HANDLE h, ACE_Message_Block& mb)
- {
- handle(h); // set handle
- if (m_Reader.open(*this) == -1) // 允许读
- {
- ACE_ERROR((LM_ERROR, ACE_TEXT("failed to open read handle %i/n"), errno));
- delete this;
- return;
- }
- if (m_Writer.open(*this) == -1) // 允许写
- {
- ACE_ERROR((LM_ERROR, ACE_TEXT("failed to open write handle %i/n"), errno));
- delete this;
- return;
- }
- // 激发客户端连接事件
- m_OnClientConnect(m_ClientAddr.get_ip_address(), m_ClientAddr.get_port_number(), this);
- initCurDataMB();
- m_Reader.read(*m_CurDataMB, m_CurDataMB->space()); // 读数据
- }
- void TTcpHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result& result)
- {
- ACE_Message_Block& mb = result.message_block();
- if (!result.success() || result.bytes_transferred() == 0) // no data or failed?
- {
- mb.release();
- delete this;
- }
- else
- {
- if (this->m_CurDataMB->length() < TCP_PACK_HEADER_SIZE) // try to read header info
- {
- this->m_Reader.read(*m_CurDataMB, m_CurDataMB->space());
- return ;
- }
- TTcpPackHeader* header = reinterpret_cast<TTcpPackHeader *>(this->m_CurDataMB->rd_ptr());
- ACE_Message_Block* dataMB = this->m_CurDataMB->cont();
- if (!dataMB)
- {
- ACE_NEW_NORETURN(dataMB, ACE_Message_Block(header->len));
- if (dataMB)
- this->m_CurDataMB->cont(dataMB);
- else
- {
- this->m_CurDataMB->release();
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Failed to allocated: %i/n"), errno));
- delete this;
- return ;
- }
- }
- if (dataMB->length() == header->len)
- {
- // 成功读取了数据?
- m_OnDataReceive(m_ClientAddr.get_ip_address(), m_ClientAddr.get_port_number(), header->seq, dataMB->rd_ptr(), header->len);
- m_CurDataMB->release();
- initCurDataMB(); // 下一包数据
- this->m_Reader.read(*m_CurDataMB, m_CurDataMB->space()); // next, try to get header
- return ;
- }
- this->m_Reader.read(*dataMB, dataMB->space()); // try to get data left
- }
- }
- void TTcpHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result& result)
- {
- if (result.success() && result.bytes_transferred() > 0) // 发送成功
- {
- ACE_Message_Block& mb = result.message_block();
- #ifdef _DEBUG
- ACE_TCHAR addrStr[128];
- m_ClientAddr.addr_to_string(addrStr, sizeof(addrStr) / sizeof(ACE_TCHAR));
- ACE_DEBUG((LM_INFO, ACE_TEXT("Send to client: %s len:%i/n"), addrStr, result.bytes_transferred()));
- char* ptr = mb.rd_ptr();
- #endif
- mb.release();
- }
- }
- void TTcpHandler::initCurDataMB()
- {
- ACE_NEW_NORETURN(m_CurDataMB, ACE_Message_Block(TCP_PACK_HEADER_SIZE, TCP_DATA_RECEIVE));
- }
- } // namespace igame
然后是TTcpNetThread,该类的实现也相当简单:
- #include <ace/Proactor.h>
- #include "TCPNetThread.h"
- namespace igame
- {
- int TTcpNetThread::open() { return this->activate(); }
- int TTcpNetThread::close()
- {
- ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环
- this->wait(); // 等待清理现场
- return 0;
- }
- int TTcpNetThread::svc()
- {
- ACE_INET_Addr listenAddr(DEF_LISTENING_PORT); // 默认监听地址
- TTcpAcceptor tcpAcceptor; // 接收器
- // 设置事件
- tcpAcceptor.setOnClientConnect(m_OnClientConnect);
- tcpAcceptor.setOnClientDisconnect(m_OnClientDisconnect);
- tcpAcceptor.setOnClientValidate(m_OnClientValidate);
- tcpAcceptor.setOnDataReceive(m_OnDataReceive);
- tcpAcceptor.setOnDataSendFailed(m_OnDataSendFailed);
- tcpAcceptor.setOnDataSendSucceeded(m_OnDataSendSucceeded);
- // 演出开始
- if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p/n"), ACE_TEXT("failed to open TcpAcceptor errno=%i/n"), errno), -1);
- // Proactor的事件循环开始
- ACE_Proactor::instance()->proactor_run_event_loop();
- ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin/n")));
- return 0;
- }
- } // namespace igame
最后,对以上三个类进行聚合,封装,就成了TTcp类,在此之前,先定义消息类型:
- /**
- * @name TCP的ACE_Message_Block类型定义 @see ACE_Message_Block
- * @{
- */
- /// @brief TCP数据接收
- #define TCP_DATA_RECEIVE 0x5505
- /// @brief TCP客户端连接
- #define TCP_CLIENT_CONNECT 0x5506
- /// @brief TCP客户端断线
- #define TCP_CLIENT_DISCONNECT 0x5507
- /// @brief TCP数据发送
- #define TCP_DATA_SEND 0x5508
- /// @brief TCP数据发送成功
- #define TCP_DATA_SEND_SUCCEEDED 0x5509
- /// @brief TCP数据发送失败
- #define TCP_DATA_SEND_FAILED 0x550A
- /**
- * @}
- */
- /// 默认监听地址:偶的车牌号
- #define DEF_LISTENING_PORT 777
现在看看TTcp的实现:
唔,太长了,下一篇吧。
此乃末技,
不知何用。
堆砌字数,
凑成更新。
走过路过,
不要错过。
请原谅偶拖篇幅,这里奉上拖欠的数字。
TTcp的实现如下:
- #include "Tcp.h"
- namespace igame
- {
- TTcp::TTcp()
- :m_TcpNetThd(0)
- {
- ACE_NEW_NORETURN(m_TcpNetThd, TTcpNetThread()); // 创建TTcpNetThread对象实例
- }
- TTcp::~TTcp()
- {
- if (m_TcpNetThd) // 释放
- delete m_TcpNetThd;
- }
- void TTcp::open()
- {
- ACE_TRACE("TTcp::open");
- // 所有TTcpNetThread的事件,交由TTcp来处理
- // TOnClientValidate除外,该事件需要特定的逻辑,且无法异步
- if (m_TcpNetThd)
- {
- m_TcpNetThd->setOnClientConnect(EVENT(TTcpHandler::TOnClientConnect, TTcp, this, tcpNetThread_OnClientConnect));
- m_TcpNetThd->setOnClientDisconnect(EVENT(TTcpHandler::TOnClientDisconnect, TTcp, this, tcpNetThread_OnClientDisconnect));
- m_TcpNetThd->setOnClientValidate(m_OnClientValidate);
- m_TcpNetThd->setOnDataReceive(EVENT(TTcpHandler::TOnDataReceive, TTcp, this, tcpNetThread_OnDataReceive));
- m_TcpNetThd->setOnDataSendFailed(EVENT(TTcpHandler::TOnDataSendFailed, TTcp, this, tcpNetThread_OnDataSendFailed));
- m_TcpNetThd->setOnDataSendSucceeded(EVENT(TTcpHandler::TOnDataSendSucceeded, TTcp, this, tcpNetThread_OnDataSendSucceeded));
- }
- if (activate() == -1)
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Resume thread failed")));
- }
- void TTcp::close()
- {
- if (m_TcpNetThd)
- m_TcpNetThd->close();
- ACE_TRACE("TTcp::close");
- ACE_Message_Block* termBlock; // 结束信号
- ACE_NEW_NORETURN(termBlock, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));
- if (!termBlock)
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Allocate failed %i"), errno));
- else
- {
- putq(termBlock);
- wait();
- }
- }
- int TTcp::send(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* buf, unsigned short len)
- {
- ACE_Message_Block* mb = 0; // 数据包
- ACE_NEW_RETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + sizeof(unsigned int) + sizeof(unsigned short) + len, TCP_DATA_SEND), -1);
- // 格式:ip | port | seq | len | 数据...
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&seq, sizeof(unsigned int));
- mb->copy((const char *)&len, sizeof(unsigned short));
- mb->copy(buf, len);
- return putq(mb);
- }
- int TTcp::svc()
- {
- ACE_TRACE("TTcp::svc");
- if (m_TcpNetThd->open() == -1)
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Failed to pen TTcpNetThread: %i"), errno));
- ACE_Message_Block* msg = 0;
- while(true)
- {
- if (getq(msg) == -1)
- {
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("Failed to getq %i"), errno), -1);
- }
- switch(msg->msg_type())
- {
- case ACE_Message_Block::MB_HANGUP: // 偶要退出
- {
- ACE_DEBUG((LM_DEBUG, ACE_TEXT("Quit")));
- msg->release();
- return 0;
- }
- break;
- case TCP_CLIENT_CONNECT: // 客户端连接
- {
- int len = msg->length();
- int hLen = sizeof(TTcpHandler *);
- if (msg->length() != TCP_PACK_HEADER_SIZE + sizeof(TTcpHandler *))
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("Tcp connection message block invalid!")), -1);
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- TTcpHandler* handler = (TTcpHandler *)(*(int *)ptr);
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> lock(m_Lock);
- m_AddrMap.insert(make_pair<unsigned __int64, TTcpHandler *>((unsigned __int64)ip << 32 | port, handler));
- }
- m_OnClientConnect(ip, port, handler);
- }
- break;
- case TCP_CLIENT_DISCONNECT: // 客户端断开连接
- {
- if (msg->length() != sizeof(ACE_UINT32) + sizeof(ACE_UINT16))
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("Invalid tcp disconnect message block/n")), -1);
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr;
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> lock(m_Lock);
- m_AddrMap.erase((unsigned __int64)ip << 32 | port);
- }
- m_OnClientDisconnect(ip, port);
- }
- break;
- case TCP_DATA_RECEIVE:
- {
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- TTcpPackHeader* header = (TTcpPackHeader *)ptr; ptr += TCP_PACK_HEADER_SIZE;
- const char* data = ptr;
- m_OnDataReceive(ip, port, header->seq, data, header->len);
- }
- break;
- case TCP_DATA_SEND:
- {
- if (msg->length() > sizeof(TTcpPackHeader))
- {
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- unsigned int seq = *(unsigned int *)ptr; ptr += sizeof(unsigned int);
- unsigned short len = *(unsigned short *)ptr; ptr += sizeof(unsigned short);
- const char* data = ptr;
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> _lock(m_Lock);
- hash_map<unsigned __int64, TTcpHandler *>::iterator it = m_AddrMap.find((unsigned __int64)ip << 32 | port);
- if (it != m_AddrMap.end())
- {
- (*it).second->send(seq, data, len);
- }
- }
- }
- }
- break;
- case TCP_DATA_SEND_SUCCEEDED:
- {
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- TTcpPackHeader* header = (TTcpPackHeader *)ptr; ptr += TCP_PACK_HEADER_SIZE;
- const char* data = ptr;
- m_OnDataSendSucceeded(ip, port, header->seq, data, header->len);
- }
- break;
- case TCP_DATA_SEND_FAILED:
- {
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- TTcpPackHeader* header = (TTcpPackHeader *)ptr; ptr += TCP_PACK_HEADER_SIZE;
- const char* data = ptr;
- m_OnDataSendFailed(ip, port, header->seq, data, header->len);
- }
- break;
- default:
- {
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Unknown ACE_Message_Block type %i/n"), msg->msg_type()));
- }
- break;
- } // switch
- msg->release();
- } // while true
- return 0;
- }
- void TTcp::tcpNetThread_OnClientConnect(ACE_UINT32 ip, ACE_UINT16 port, TTcpHandler* handler)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + sizeof(TTcpHandler *), TCP_CLIENT_CONNECT));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&handler, sizeof(TTcpHandler *));
- this->putq(mb);
- }
- }
- void TTcp::tcpNetThread_OnClientDisconnect(ACE_UINT32 ip, ACE_UINT16 port)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16), TCP_CLIENT_DISCONNECT));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- this->putq(mb);
- }
- }
- void TTcp::tcpNetThread_OnDataReceive(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_RECEIVE));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&seq, sizeof(unsigned int));
- mb->copy((const char *)&size, sizeof(unsigned short));
- mb->copy(data, size);
- this->putq(mb);
- }
- }
- void TTcp::tcpNetThread_OnDataSendSucceeded(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_SEND_SUCCEEDED));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&seq, sizeof(unsigned int));
- mb->copy((const char *)&size, sizeof(unsigned short));
- mb->copy(data, size);
- this->putq(mb);
- }
- }
- void TTcp::tcpNetThread_OnDataSendFailed(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_SEND_FAILED));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&seq, sizeof(unsigned int));
- mb->copy((const char *)&size, sizeof(unsigned short));
- mb->copy(data, size);
- this->putq(mb);
- }
- }
- } // namespace igame
在完整的工程中,还有测试代码,这里就不列出了。本来已经在下载频道中上传了,并设置下载点数为0,结果传完后楞是自私都找不到?!NNDCSDN!!
这是下载资源。
来信到igame2000@hotmail.com
需要完整代码的请来信索取吧,必复。
此乃末技。。。。
这篇关于ACE_Proactor网络通信示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!