ACE之Proactor模式使用实例

2023-12-06 20:58

本文主要是介绍ACE之Proactor模式使用实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

// ACE_Proactor_Client.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/SOCK_SEQPACK_Association.h"#pragma comment(lib,"ACEd.lib")class Service_Handler : public ACE_Service_Handler
{
public:Service_Handler(){//ACE_OS::printf("Service_Handler constructed for connector \n");}~Service_Handler (){if (this->handle () != ACE_INVALID_HANDLE)ACE_OS::closesocket (this->handle ());//ACE_OS::printf("one Service_Handler for connecter destructed");}void post_send(void){do {time_t now = ACE_OS::gettimeofday().sec();ACE_Message_Block *mb = new ACE_Message_Block(128);char buff[64];ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(this->handle());size_t addr_size=sizeof ACE_INET_Addr;ass.get_local_addrs(&addr,addr_size);//ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)this->handle(), addr.get_ip_address(), addr.get_port_number());sprintf(buff,"%d",addr.get_port_number());mb->copy(buff/*ctime(now)*/);if (this->writer_.write(*mb,mb->length()) !=0){ACE_OS::printf("Begin write fail in open\n");delete this;break;}else{ACE_OS::printf("sended:%s\n",mb->rd_ptr());}} while (0);}void post_recv(void){do {ACE_Message_Block *mb = new ACE_Message_Block(buffer,128);if (this->reader_.read (*mb, mb->space ()) != 0){ACE_OS::printf("Begin read fail\n");delete this;break;}} while (0);}virtual void open (ACE_HANDLE h, ACE_Message_Block&){do {this->handle (h);if (this->writer_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_send();if (this->reader_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_recv();} while (0);}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){result.message_block ().release();//ACE_OS::sleep(1);post_send();}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){do {ACE_Message_Block &mb = result.message_block ();if (!result.success () || result.bytes_transferred () == 0){mb.release ();delete this;break;}ACE_OS::printf("received:%s\n",mb.rd_ptr());mb.release();post_recv();} while (0);}
private:ACE_Asynch_Write_Stream writer_;ACE_Asynch_Read_Stream reader_;char buffer[128];
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#define TCP_CLIENT_THREAD_SEND	0x777const int CLIENT_CONNECTION_NUM_OF_PER_THREAD = 1; //< 客户端每个线程的连接数#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
/**
* @class TTcpClientThread
* @brief TCP客户端测试线程
*/
class TTcpClientThread : public ACE_Task<ACE_MT_SYNCH>
{ACE_SOCK_Connector connector[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 连接器ACE_SOCK_Stream peerStream[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 流对象public:/// ctor~TTcpClientThread();/// 运行int open();/// 停止运行int close();
private:/// 线程函数virtual int svc();
};TTcpClientThread::~TTcpClientThread()
{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++)peerStream[i].close();
}int TTcpClientThread::open() { return this->activate(); }int TTcpClientThread::close()
{ACE_TRACE("TTcpClientThread::close");ACE_Message_Block* termBlock;ACE_NEW_NORETURN(termBlock, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));if (!termBlock)ACE_DEBUG((LM_ERROR, ACE_TEXT("Allocate failed %i"), errno));else{putq(termBlock);wait();}return 0;
}int TTcpClientThread::svc()
{ACE_INET_Addr srvAddr(7878, "127.0.0.1");for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){if (connector[i].connect(peerStream[i], srvAddr) == -1){ACE_ERROR((LM_ERROR, ACE_TEXT("%i Failed to connect server errno=%i\n"), i, errno));}Sleep(100);}struct TPack{
#pragma pack(push)
#pragma pack(1)unsigned int seq;unsigned short len;char data [128];
#pragma pack(pop)};ACE_Message_Block* msg = 0;ACE_INET_Addr localAddr;ACE_TCHAR localAddrStr[128];peerStream[0].get_local_addr(localAddr);localAddr.addr_to_string(localAddrStr, sizeof(localAddrStr) / sizeof(ACE_TCHAR));TPack data;int len = sizeof(unsigned int) + sizeof(unsigned short);data.seq = 0;data.len = strlen(localAddrStr) + 1;strcpy(data.data, localAddrStr);len += data.len;char tmp[sizeof(TPack)];char buf[256];memcpy(tmp, &data, len);while(true) // 线程循环{if (getq(msg) != -1){switch(msg->msg_type()){case ACE_Message_Block::MB_HANGUP:{msg->release();return 0;}break;default:{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){peerStream[i].send(tmp, 5);Sleep(100);peerStream[i].send(tmp + 5, len - 5);Sleep(100);ACE_Time_Value timeout(2);int recvLen =  peerStream[i].recv_n(buf, sizeof(unsigned int) + sizeof(unsigned short), 0, &timeout);if (recvLen == sizeof(unsigned int) + sizeof(unsigned short)){short dataLen = *(short *)(buf + 4);if (dataLen > 256)dataLen = 256;recvLen = peerStream[i].recv_n(buf, dataLen, 0, &timeout);if (recvLen != dataLen)ACE_DEBUG((LM_INFO, ACE_TEXT("Failed to recv data, length is %i, but only get %i\n"), dataLen, recvLen));elseACE_DEBUG((LM_INFO, ACE_TEXT("Client get data: len=%i data=%s\n"), recvLen, buf));} // if recvLen} // for} // defaultbreak;} // switchmsg->release();} // if getq} // whileACE_DEBUG((LM_INFO, ACE_TEXT("Exit client thread")));return 0;
}#include <vector>
#define CLIENT_THREAD_NUM 4
int main(int argc, char *argv[]) 
{ACE_INET_Addr remote_addr(4567,ACE_LOCALHOST); std::vector<ACE_Asynch_Connector<Service_Handler> *> vtconnector;for (int i=0;i<2000;i++){ACE_INET_Addr local_addr(10000+i,ACE_LOCALHOST); ACE_Asynch_Connector<Service_Handler> *connector = new ACE_Asynch_Connector<Service_Handler>;connector->open();if (connector->connect(remote_addr,local_addr) == -1)return -1;vtconnector.push_back(connector);}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance ()->proactor_run_event_loop();return 0; 
}

// ACE_Proactor_Server.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Asynch_IO.h"
#include "ace/OS_main.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/OS.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Containers.h"
#include "ace/SOCK_SEQPACK_Association.h"ACE_DLList<ACE_Asynch_Write_Stream> wList;class Service_Handler:public ACE_Service_Handler
{
public:Service_Handler(){}~Service_Handler(void){if(this->handle()!=ACE_INVALID_HANDLE)ACE_OS::closesocket(this->handle());}virtual void open(ACE_HANDLE h,ACE_Message_Block &message_block){//handle_= h;//this->handle(h);if(rs_.open(*this,h)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::open"));return;}if(ws_.open(*this)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::open"));return;}if (post_recv()==-1)return;//wList.insert_tail(&ws_);addresses(remote_address,local_address);remote_address.addr_to_string(peer_name,MAXHOSTNAMELEN);ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(h);size_t addr_size=sizeof ACE_INET_Addr;ass.get_remote_addrs(&addr,addr_size);ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)h, addr.get_ip_address(), addr.get_port_number());//ACE_DEBUG((LM_DEBUG,ACE_TEXT("peer:%s\n"),peer_name));}
protected:int post_recv(void){ACE_Message_Block *mb=0;ACE_NEW_RETURN(mb,ACE_Message_Block(512),-1);if(rs_.read(*mb,mb->space())==-1){ACE_ERROR_RETURN((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::read"),-1);}return 0;}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){//ACE_HANDLE h = result.handle();ACE_Message_Block &mb = result.message_block ();if (result.success()&&result.bytes_transferred()!=0){ACE_DEBUG((LM_DEBUG,ACE_TEXT("recv:%s\n"),mb.rd_ptr()));if (ws_.write(*mb.duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Write_Stream::write"));}/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while(!iter.done()){if (iter.next()->write(*result.message_block().duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::write"));}iter++;}*/mb.release();post_recv();}else{mb.release();/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while (!iter.done ()){if(&ws_==iter.next()){iter.remove();break;}iter++;}*/delete this;}}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){//ACE_OS::printf("write complete:%d %d\n", result.success(),result.bytes_transferred());result.message_block().release();}
private:ACE_Asynch_Read_Stream rs_;ACE_Asynch_Write_Stream ws_;ACE_HANDLE handle_;ACE_TCHAR peer_name[MAXHOSTNAMELEN];ACE_INET_Addr remote_address;ACE_INET_Addr local_address;
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#pragma comment(lib,"ACEd.lib")#define CLIENT_THREAD_NUM 4int main(int argc,char *argv[])
{ACE_Asynch_Acceptor<Service_Handler> acceptor;if(acceptor.open(ACE_INET_Addr(4567),0,1) == -1){return -1;}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance()->proactor_run_event_loop();return 0;
};

这篇关于ACE之Proactor模式使用实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/463303

相关文章

vue使用docxtemplater导出word

《vue使用docxtemplater导出word》docxtemplater是一种邮件合并工具,以编程方式使用并处理条件、循环,并且可以扩展以插入任何内容,下面我们来看看如何使用docxtempl... 目录docxtemplatervue使用docxtemplater导出word安装常用语法 封装导出方

Linux换行符的使用方法详解

《Linux换行符的使用方法详解》本文介绍了Linux中常用的换行符LF及其在文件中的表示,展示了如何使用sed命令替换换行符,并列举了与换行符处理相关的Linux命令,通过代码讲解的非常详细,需要的... 目录简介检测文件中的换行符使用 cat -A 查看换行符使用 od -c 检查字符换行符格式转换将

使用Jackson进行JSON生成与解析的新手指南

《使用Jackson进行JSON生成与解析的新手指南》这篇文章主要为大家详细介绍了如何使用Jackson进行JSON生成与解析处理,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 核心依赖2. 基础用法2.1 对象转 jsON(序列化)2.2 JSON 转对象(反序列化)3.

Linux系统配置NAT网络模式的详细步骤(附图文)

《Linux系统配置NAT网络模式的详细步骤(附图文)》本文详细指导如何在VMware环境下配置NAT网络模式,包括设置主机和虚拟机的IP地址、网关,以及针对Linux和Windows系统的具体步骤,... 目录一、配置NAT网络模式二、设置虚拟机交换机网关2.1 打开虚拟机2.2 管理员授权2.3 设置子

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

Elasticsearch 在 Java 中的使用教程

《Elasticsearch在Java中的使用教程》Elasticsearch是一个分布式搜索和分析引擎,基于ApacheLucene构建,能够实现实时数据的存储、搜索、和分析,它广泛应用于全文... 目录1. Elasticsearch 简介2. 环境准备2.1 安装 Elasticsearch2.2 J

使用C#代码在PDF文档中添加、删除和替换图片

《使用C#代码在PDF文档中添加、删除和替换图片》在当今数字化文档处理场景中,动态操作PDF文档中的图像已成为企业级应用开发的核心需求之一,本文将介绍如何在.NET平台使用C#代码在PDF文档中添加、... 目录引言用C#添加图片到PDF文档用C#删除PDF文档中的图片用C#替换PDF文档中的图片引言在当

Java中List的contains()方法的使用小结

《Java中List的contains()方法的使用小结》List的contains()方法用于检查列表中是否包含指定的元素,借助equals()方法进行判断,下面就来介绍Java中List的c... 目录详细展开1. 方法签名2. 工作原理3. 使用示例4. 注意事项总结结论:List 的 contain

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面

Android中Dialog的使用详解

《Android中Dialog的使用详解》Dialog(对话框)是Android中常用的UI组件,用于临时显示重要信息或获取用户输入,本文给大家介绍Android中Dialog的使用,感兴趣的朋友一起... 目录android中Dialog的使用详解1. 基本Dialog类型1.1 AlertDialog(