基于多线程的Reactor模式的 回声服务器 EchoServer

2023-10-10 14:52

本文主要是介绍基于多线程的Reactor模式的 回声服务器 EchoServer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

记录下  

一个线程专门用来接受accept获取客户端的fd 

获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程

然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd

线程之间通过eventfd来通信  将客户端的fd传到 对应的线程中  

参考了MediaServer   引入EventPollerPoll 和 EventPoller的 概念  

最少两个两个线程 设置为1的话 会改成2

cpp代码:

#include "durian.h"#include <sys/epoll.h>namespace DURIAN
{EventPoller::EventPoller(int id){m_id = id;}EventPoller::~EventPoller(){printf("~EventPoller signal m_id = %d m_run_flag = %d\n",m_id,m_run_flag);Wait();}bool EventPoller::Init(){m_poll_fd = epoll_create1(0);if(m_poll_fd == -1){return false;}m_event_fd = eventfd(0,0);if(m_event_fd == -1){printf("new fd failed\n");close(m_poll_fd);return false ;}return true;}void EventPoller::RunLoop(){static const int MAX_EVENTS = 1024;struct epoll_event events[MAX_EVENTS];while(m_run_flag){int ready_count = epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);if(ready_count == -1){if(errno != EINTR){//exit(1);}//ready_count = 0;}else if(ready_count == 0){if(m_run_flag == false){//printf("time out and runflag = false exit thread\n");//break;}}for(int i = 0;i<ready_count;i++){const struct epoll_event &ev = events[i];int fd = events[i].data.fd;if(ev.events &(EPOLLIN | EPOLLERR |EPOLLHUP)){auto handler = m_accept_handlers[fd];handler(fd);}else if(ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)){auto it = m_buffer_pool.find(fd);if(it!= m_buffer_pool.end()){auto &buf = it->second;if(buf.WriteData(fd) == false){Close(fd);}}}}}}int EventPoller::GetEventFD(){return m_event_fd;}int EventPoller::GetClients(){return m_accept_handlers.size();}void EventPoller::Stop(){m_run_flag = false;}void EventPoller::Start(){//printf("Enter EventPoller Start  m_id = %d pollfd = %d eventid = %d\n",m_id,m_poll_fd,m_event_fd);m_run_flag = true;m_thread_id = std::thread(&EventPoller::RunLoop,this);}void EventPoller::Wait(){if(m_thread_id.joinable()){m_thread_id.join();}}bool EventPoller::Add2Epoll(int fd){if(m_accept_handlers.count(fd) != 0){return false;}int flags = 1;if(ioctl(fd,FIONBIO,&flags) == -1){return false;}struct epoll_event ev;ev.events = EPOLLIN |EPOLLOUT |EPOLLET;ev.data.fd = fd;if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd,&ev)==-1){return false;}return true;}void EventPoller::DeliverConn(int conn_fd){//printf("DeliverConn fd = %d\n",conn_fd);uint64_t count = conn_fd;if(write(m_event_fd,&count,sizeof(count)) == -1){printf("Deliverconn write failed\n");}}bool EventPoller::AddListener(int fd,ACCEPTER on_accept){if(Add2Epoll(fd) == false){return false;}std::cout<<"EventPoller AddListener fd = "<<fd<<std::endl;m_accept_handlers[fd] = [this,on_accept]( int server_fd){for(;;){int new_fd = accept(server_fd,nullptr,nullptr);std::cout<<"accept client fd = "<<new_fd<<std::endl;	if(new_fd == -1){if(errno!= EAGAIN){Close(server_fd);}return 0;}int enable = 1;setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY,&enable,sizeof(enable));on_accept(new_fd);}return 0;};return true;}bool EventPoller::AddEventer(int fd, EVENTER on_event){if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_event](int cfd){for(;;){uint64_t count;if(read(cfd,&count,sizeof(count)) == -1){if(errno != EAGAIN){Close(cfd);}return 0;}on_event(count);}return 0;};return true;}bool EventPoller::AddReader(int fd, READER on_read){	if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_read](int cfd){for(;;){char buf[4096] = {0};ssize_t ret = read(cfd,buf,sizeof(buf));if(ret == -1){if(errno != EAGAIN){Close(cfd);}return -1;}if(ret == 0){Close(cfd);printf("客户端关闭了连接 %d\n",cfd);return 0 ;}on_read(cfd,buf,ret);}};return true;}void EventPoller::Close(int fd){m_accept_handlers.erase(fd);m_buffer_pool.erase(fd);close(fd);}bool EventPoller::FlushData(int fd, const char * buf, size_t len){WriteBuffer *wb = nullptr;auto it = m_buffer_pool.find(fd);if(it == m_buffer_pool.end()){while(len >0){ssize_t ret = write(fd,buf,len);if(ret == -1){if(errno != EAGAIN){Close(fd);return false;}wb = &m_buffer_pool[fd];break;}buf+= ret;len-=ret;}if(len == 0){//Successreturn true;}}else{wb = &it->second;}wb->Add2Buffer(buf,len);return true;}static size_t g_pool_size = 0;
void EventPollerPool::SetPoolSize(size_t size)
{g_pool_size = size;
}
EventPollerPool & EventPollerPool::Instance()
{static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool()); static EventPollerPool &s_instance_ref = *s_instance; return s_instance_ref; 
}EventPollerPool::EventPollerPool()
{auto size = g_pool_size;auto cpus = std::thread::hardware_concurrency();size = size > 0 ? size : cpus;std::cout<<"Thread size:"<<size<<std::endl;if(size <2)size = 2;for (int i = 0; i < size; ++i) {std::shared_ptr<EventPoller> poller = std::make_shared<EventPoller>(i);m_pollers.emplace_back(poller);}
}std::shared_ptr<EventPoller> EventPollerPool::GetPoller()
{if(m_pollers.size()>1){int min_clients = 10000;int target_index = 0;for(int i = 1;i<m_pollers.size();i++){if(m_pollers[i]-> GetClients() < min_clients){min_clients = m_pollers[i]->GetClients();target_index = i;}}//printf("target index = %d min_clients = %d\n",target_index,min_clients);return m_pollers[target_index];}return m_pollers[0];}
std::shared_ptr<EventPoller> EventPollerPool::GetFirstPoller()
{return m_pollers[0];
}void EventPollerPool::StartPollers()
{for(int i = 1;i<m_pollers.size();i++){m_pollers[i]->Init();int event_fd = m_pollers[i]->GetEventFD();m_pollers[i]->AddEventer(event_fd,[&,i](uint64_t cfd){READER reader = [&,i](int fd,const char*data,size_t len){printf("Len[%s] content[%d] m_pollers[i] = %p i = %d\n",data,len,m_pollers[i],i);m_pollers[i]->FlushData(fd,data,len);return 0;};m_pollers[i]->AddReader(cfd,reader);return 0;});m_pollers[i]->Start();	}
}void EventPollerPool::Stop()
{for(int i = 0;i<m_pollers.size();i++){m_pollers[i]->Stop();}}}

头文件

#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netinet/tcp.h>#include <sys/eventfd.h>
#include <signal.h>#include <iostream>
#include <memory>
#include <list>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>#include <unordered_map>namespace DURIAN
{class WriteBuffer{private:std::list<std::string> buf_items;size_t offset = 0;public:bool IsEmpty() const{return buf_items.empty();}void Add2Buffer(const char* data,size_t len){if(buf_items.empty() || buf_items.back().size()+len >4096){buf_items.emplace_back(data,len);}else{buf_items.back().append(data,len);}}bool WriteData(int fd){while (IsEmpty() == false){auto const &item = buf_items.front();const char *p = item.data() + offset;size_t len = item.size() -offset;while(len >0){ssize_t ret = write(fd,p,len);if(ret == -1){if(errno == EAGAIN){return true;}return false;}offset += ret;p+=ret;len-= ret;}buf_items.pop_front();}return true;}};using ACCEPTER = std::function<int(int)>;using WRITER = std::function<int(int)>;using EVENTER = std::function<int(int)>;using READER = std::function<int(int,const char *data,size_t)>;//static thread_local std::unordered_map<int fd,READER>g_th_handlers;class EventPoller{private:int m_poll_fd = -1;int m_id;bool m_run_flag = false;std::unordered_map<int,ACCEPTER> m_accept_handlers;std::unordered_map<int,WriteBuffer> m_buffer_pool;std::mutex m_connction_lock;int m_event_fd;std::thread m_thread_id ;std::vector<int>m_connections;void RunLoop();public:EventPoller(int i);~EventPoller();int GetEventFD();int GetClients();std::vector<int> & GetConnections();bool Init();void Start();void Stop();void Wait();	void DeliverConn(int conn_fd);bool AddListener(int fd,ACCEPTER on_listen);bool AddEventer(int fd,EVENTER on_event);bool AddReader(int fd,READER on_read);void Close(int fd);bool Add2Epoll(int fd);bool FlushData(int fd,const char *buf,size_t len);};class EventPollerPool{public:static EventPollerPool &Instance();static void SetPoolSize(size_t size = 0);std::shared_ptr<EventPoller>GetPoller(); std::shared_ptr<EventPoller>GetFirstPoller(); 	void StartPollers();void Stop(); private:int m_size;std::vector<std::shared_ptr<EventPoller>> m_pollers;EventPollerPool();		  };}

main文件

#include "durian.h"static bool g_run_flag = true;
void sig_handler(int signo)
{signal(SIGINT, SIG_IGN);signal(SIGTERM, SIG_IGN);signal(SIGKILL, SIG_IGN);g_run_flag = false;printf("Get exit flag\n");if (SIGINT == signo || SIGTSTP == signo || SIGTERM == signo|| SIGKILL == signo){g_run_flag = false;printf("\033[0;31mprogram exit by kill cmd !\033[0;39m\n");}}bool StartServer()
{int listen_fd = socket(AF_INET,SOCK_STREAM,0);if(listen_fd == -1){printf("Create socket failed\n");return false;}else{printf("Server listen fd is:%d\n",listen_fd);}int reuseaddr = 1;if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr ,sizeof(reuseaddr)) == -1){return false;}struct sockaddr_in listen_addr = {0};listen_addr.sin_family  = AF_INET;listen_addr.sin_addr.s_addr = INADDR_ANY;listen_addr.sin_port = htons(8888);if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1){printf("bind failed\n");return false;}if(listen(listen_fd,100) == -1){printf("listen failed\n");return false;}DURIAN::EventPollerPool::SetPoolSize(1);DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.StartPollers();auto poller = pool.GetFirstPoller(); if(poller->Init()){if(poller->AddListener(listen_fd,[&](int conn_fd){printf("将新的fd加到epoll监听 fd =%d\n",conn_fd);//Deliver client fd to other pollerspool.GetPoller()->DeliverConn(conn_fd);return 0;}) == false){return false;}poller->Start();}while(g_run_flag){sleep(2);}pool.Stop();}void StopServer()
{DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.Stop();
}int main(int argc,char *argv[])
{printf(" cpp version :%d\n",__cplusplus);int thread_size = 1;bool run_flag = true;signal(SIGPIPE,SIG_IGN);signal(SIGTERM, sig_handler);signal(SIGKILL, sig_handler);signal(SIGINT,sig_handler); StartServer();return 0;
}

性能测试

ulimit -HSn 102400

ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html
 

这篇关于基于多线程的Reactor模式的 回声服务器 EchoServer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/181181

相关文章

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

部署Vue项目到服务器后404错误的原因及解决方案

《部署Vue项目到服务器后404错误的原因及解决方案》文章介绍了Vue项目部署步骤以及404错误的解决方案,部署步骤包括构建项目、上传文件、配置Web服务器、重启Nginx和访问域名,404错误通常是... 目录一、vue项目部署步骤二、404错误原因及解决方案错误场景原因分析解决方案一、Vue项目部署步骤

Linux流媒体服务器部署流程

《Linux流媒体服务器部署流程》文章详细介绍了流媒体服务器的部署步骤,包括更新系统、安装依赖组件、编译安装Nginx和RTMP模块、配置Nginx和FFmpeg,以及测试流媒体服务器的搭建... 目录流媒体服务器部署部署安装1.更新系统2.安装依赖组件3.解压4.编译安装(添加RTMP和openssl模块

SpringBoot中使用 ThreadLocal 进行多线程上下文管理及注意事项小结

《SpringBoot中使用ThreadLocal进行多线程上下文管理及注意事项小结》本文详细介绍了ThreadLocal的原理、使用场景和示例代码,并在SpringBoot中使用ThreadLo... 目录前言技术积累1.什么是 ThreadLocal2. ThreadLocal 的原理2.1 线程隔离2

Java多线程父线程向子线程传值问题及解决

《Java多线程父线程向子线程传值问题及解决》文章总结了5种解决父子之间数据传递困扰的解决方案,包括ThreadLocal+TaskDecorator、UserUtils、CustomTaskDeco... 目录1 背景2 ThreadLocal+TaskDecorator3 RequestContextH

JavaWeb-WebSocket浏览器服务器双向通信方式

《JavaWeb-WebSocket浏览器服务器双向通信方式》文章介绍了WebSocket协议的工作原理和应用场景,包括与HTTP的对比,接着,详细介绍了如何在Java中使用WebSocket,包括配... 目录一、概述二、入门2.1 POM依赖2.2 编写配置类2.3 编写WebSocket服务2.4 浏

Java实现状态模式的示例代码

《Java实现状态模式的示例代码》状态模式是一种行为型设计模式,允许对象根据其内部状态改变行为,本文主要介绍了Java实现状态模式的示例代码,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来... 目录一、简介1、定义2、状态模式的结构二、Java实现案例1、电灯开关状态案例2、番茄工作法状态案例

查询SQL Server数据库服务器IP地址的多种有效方法

《查询SQLServer数据库服务器IP地址的多种有效方法》作为数据库管理员或开发人员,了解如何查询SQLServer数据库服务器的IP地址是一项重要技能,本文将介绍几种简单而有效的方法,帮助你轻松... 目录使用T-SQL查询方法1:使用系统函数方法2:使用系统视图使用SQL Server Configu

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同

nginx-rtmp-module构建流媒体直播服务器实战指南

《nginx-rtmp-module构建流媒体直播服务器实战指南》本文主要介绍了nginx-rtmp-module构建流媒体直播服务器实战指南,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. RTMP协议介绍与应用RTMP协议的原理RTMP协议的应用RTMP与现代流媒体技术的关系2