本文主要是介绍ACE之Proactor模式使用实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
// ACE_Proactor_Client.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/SOCK_SEQPACK_Association.h"#pragma comment(lib,"ACEd.lib")class Service_Handler : public ACE_Service_Handler
{
public:Service_Handler(){//ACE_OS::printf("Service_Handler constructed for connector \n");}~Service_Handler (){if (this->handle () != ACE_INVALID_HANDLE)ACE_OS::closesocket (this->handle ());//ACE_OS::printf("one Service_Handler for connecter destructed");}void post_send(void){do {time_t now = ACE_OS::gettimeofday().sec();ACE_Message_Block *mb = new ACE_Message_Block(128);char buff[64];ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(this->handle());size_t addr_size=sizeof ACE_INET_Addr;ass.get_local_addrs(&addr,addr_size);//ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)this->handle(), addr.get_ip_address(), addr.get_port_number());sprintf(buff,"%d",addr.get_port_number());mb->copy(buff/*ctime(now)*/);if (this->writer_.write(*mb,mb->length()) !=0){ACE_OS::printf("Begin write fail in open\n");delete this;break;}else{ACE_OS::printf("sended:%s\n",mb->rd_ptr());}} while (0);}void post_recv(void){do {ACE_Message_Block *mb = new ACE_Message_Block(buffer,128);if (this->reader_.read (*mb, mb->space ()) != 0){ACE_OS::printf("Begin read fail\n");delete this;break;}} while (0);}virtual void open (ACE_HANDLE h, ACE_Message_Block&){do {this->handle (h);if (this->writer_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_send();if (this->reader_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_recv();} while (0);}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){result.message_block ().release();//ACE_OS::sleep(1);post_send();}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){do {ACE_Message_Block &mb = result.message_block ();if (!result.success () || result.bytes_transferred () == 0){mb.release ();delete this;break;}ACE_OS::printf("received:%s\n",mb.rd_ptr());mb.release();post_recv();} while (0);}
private:ACE_Asynch_Write_Stream writer_;ACE_Asynch_Read_Stream reader_;char buffer[128];
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#define TCP_CLIENT_THREAD_SEND 0x777const int CLIENT_CONNECTION_NUM_OF_PER_THREAD = 1; //< 客户端每个线程的连接数#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
/**
* @class TTcpClientThread
* @brief TCP客户端测试线程
*/
class TTcpClientThread : public ACE_Task<ACE_MT_SYNCH>
{ACE_SOCK_Connector connector[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 连接器ACE_SOCK_Stream peerStream[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 流对象public:/// ctor~TTcpClientThread();/// 运行int open();/// 停止运行int close();
private:/// 线程函数virtual int svc();
};TTcpClientThread::~TTcpClientThread()
{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++)peerStream[i].close();
}int TTcpClientThread::open() { return this->activate(); }int TTcpClientThread::close()
{ACE_TRACE("TTcpClientThread::close");ACE_Message_Block* termBlock;ACE_NEW_NORETURN(termBlock, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));if (!termBlock)ACE_DEBUG((LM_ERROR, ACE_TEXT("Allocate failed %i"), errno));else{putq(termBlock);wait();}return 0;
}int TTcpClientThread::svc()
{ACE_INET_Addr srvAddr(7878, "127.0.0.1");for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){if (connector[i].connect(peerStream[i], srvAddr) == -1){ACE_ERROR((LM_ERROR, ACE_TEXT("%i Failed to connect server errno=%i\n"), i, errno));}Sleep(100);}struct TPack{
#pragma pack(push)
#pragma pack(1)unsigned int seq;unsigned short len;char data [128];
#pragma pack(pop)};ACE_Message_Block* msg = 0;ACE_INET_Addr localAddr;ACE_TCHAR localAddrStr[128];peerStream[0].get_local_addr(localAddr);localAddr.addr_to_string(localAddrStr, sizeof(localAddrStr) / sizeof(ACE_TCHAR));TPack data;int len = sizeof(unsigned int) + sizeof(unsigned short);data.seq = 0;data.len = strlen(localAddrStr) + 1;strcpy(data.data, localAddrStr);len += data.len;char tmp[sizeof(TPack)];char buf[256];memcpy(tmp, &data, len);while(true) // 线程循环{if (getq(msg) != -1){switch(msg->msg_type()){case ACE_Message_Block::MB_HANGUP:{msg->release();return 0;}break;default:{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){peerStream[i].send(tmp, 5);Sleep(100);peerStream[i].send(tmp + 5, len - 5);Sleep(100);ACE_Time_Value timeout(2);int recvLen = peerStream[i].recv_n(buf, sizeof(unsigned int) + sizeof(unsigned short), 0, &timeout);if (recvLen == sizeof(unsigned int) + sizeof(unsigned short)){short dataLen = *(short *)(buf + 4);if (dataLen > 256)dataLen = 256;recvLen = peerStream[i].recv_n(buf, dataLen, 0, &timeout);if (recvLen != dataLen)ACE_DEBUG((LM_INFO, ACE_TEXT("Failed to recv data, length is %i, but only get %i\n"), dataLen, recvLen));elseACE_DEBUG((LM_INFO, ACE_TEXT("Client get data: len=%i data=%s\n"), recvLen, buf));} // if recvLen} // for} // defaultbreak;} // switchmsg->release();} // if getq} // whileACE_DEBUG((LM_INFO, ACE_TEXT("Exit client thread")));return 0;
}#include <vector>
#define CLIENT_THREAD_NUM 4
int main(int argc, char *argv[])
{ACE_INET_Addr remote_addr(4567,ACE_LOCALHOST); std::vector<ACE_Asynch_Connector<Service_Handler> *> vtconnector;for (int i=0;i<2000;i++){ACE_INET_Addr local_addr(10000+i,ACE_LOCALHOST); ACE_Asynch_Connector<Service_Handler> *connector = new ACE_Asynch_Connector<Service_Handler>;connector->open();if (connector->connect(remote_addr,local_addr) == -1)return -1;vtconnector.push_back(connector);}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance ()->proactor_run_event_loop();return 0;
}
// ACE_Proactor_Server.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Asynch_IO.h"
#include "ace/OS_main.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/OS.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Containers.h"
#include "ace/SOCK_SEQPACK_Association.h"ACE_DLList<ACE_Asynch_Write_Stream> wList;class Service_Handler:public ACE_Service_Handler
{
public:Service_Handler(){}~Service_Handler(void){if(this->handle()!=ACE_INVALID_HANDLE)ACE_OS::closesocket(this->handle());}virtual void open(ACE_HANDLE h,ACE_Message_Block &message_block){//handle_= h;//this->handle(h);if(rs_.open(*this,h)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::open"));return;}if(ws_.open(*this)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::open"));return;}if (post_recv()==-1)return;//wList.insert_tail(&ws_);addresses(remote_address,local_address);remote_address.addr_to_string(peer_name,MAXHOSTNAMELEN);ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(h);size_t addr_size=sizeof ACE_INET_Addr;ass.get_remote_addrs(&addr,addr_size);ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)h, addr.get_ip_address(), addr.get_port_number());//ACE_DEBUG((LM_DEBUG,ACE_TEXT("peer:%s\n"),peer_name));}
protected:int post_recv(void){ACE_Message_Block *mb=0;ACE_NEW_RETURN(mb,ACE_Message_Block(512),-1);if(rs_.read(*mb,mb->space())==-1){ACE_ERROR_RETURN((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::read"),-1);}return 0;}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){//ACE_HANDLE h = result.handle();ACE_Message_Block &mb = result.message_block ();if (result.success()&&result.bytes_transferred()!=0){ACE_DEBUG((LM_DEBUG,ACE_TEXT("recv:%s\n"),mb.rd_ptr()));if (ws_.write(*mb.duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Write_Stream::write"));}/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while(!iter.done()){if (iter.next()->write(*result.message_block().duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::write"));}iter++;}*/mb.release();post_recv();}else{mb.release();/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while (!iter.done ()){if(&ws_==iter.next()){iter.remove();break;}iter++;}*/delete this;}}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){//ACE_OS::printf("write complete:%d %d\n", result.success(),result.bytes_transferred());result.message_block().release();}
private:ACE_Asynch_Read_Stream rs_;ACE_Asynch_Write_Stream ws_;ACE_HANDLE handle_;ACE_TCHAR peer_name[MAXHOSTNAMELEN];ACE_INET_Addr remote_address;ACE_INET_Addr local_address;
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#pragma comment(lib,"ACEd.lib")#define CLIENT_THREAD_NUM 4int main(int argc,char *argv[])
{ACE_Asynch_Acceptor<Service_Handler> acceptor;if(acceptor.open(ACE_INET_Addr(4567),0,1) == -1){return -1;}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance()->proactor_run_event_loop();return 0;
};
这篇关于ACE之Proactor模式使用实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!