基于多线程的Reactor模式的 回声服务器 EchoServer

2023-10-10 14:52

本文主要是介绍基于多线程的Reactor模式的 回声服务器 EchoServer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

记录下  

一个线程专门用来接受accept获取客户端的fd 

获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程

然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd

线程之间通过eventfd来通信  将客户端的fd传到 对应的线程中  

参考了MediaServer   引入EventPollerPoll 和 EventPoller的 概念  

最少两个两个线程 设置为1的话 会改成2

cpp代码:

#include "durian.h"#include <sys/epoll.h>namespace DURIAN
{EventPoller::EventPoller(int id){m_id = id;}EventPoller::~EventPoller(){printf("~EventPoller signal m_id = %d m_run_flag = %d\n",m_id,m_run_flag);Wait();}bool EventPoller::Init(){m_poll_fd = epoll_create1(0);if(m_poll_fd == -1){return false;}m_event_fd = eventfd(0,0);if(m_event_fd == -1){printf("new fd failed\n");close(m_poll_fd);return false ;}return true;}void EventPoller::RunLoop(){static const int MAX_EVENTS = 1024;struct epoll_event events[MAX_EVENTS];while(m_run_flag){int ready_count = epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);if(ready_count == -1){if(errno != EINTR){//exit(1);}//ready_count = 0;}else if(ready_count == 0){if(m_run_flag == false){//printf("time out and runflag = false exit thread\n");//break;}}for(int i = 0;i<ready_count;i++){const struct epoll_event &ev = events[i];int fd = events[i].data.fd;if(ev.events &(EPOLLIN | EPOLLERR |EPOLLHUP)){auto handler = m_accept_handlers[fd];handler(fd);}else if(ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)){auto it = m_buffer_pool.find(fd);if(it!= m_buffer_pool.end()){auto &buf = it->second;if(buf.WriteData(fd) == false){Close(fd);}}}}}}int EventPoller::GetEventFD(){return m_event_fd;}int EventPoller::GetClients(){return m_accept_handlers.size();}void EventPoller::Stop(){m_run_flag = false;}void EventPoller::Start(){//printf("Enter EventPoller Start  m_id = %d pollfd = %d eventid = %d\n",m_id,m_poll_fd,m_event_fd);m_run_flag = true;m_thread_id = std::thread(&EventPoller::RunLoop,this);}void EventPoller::Wait(){if(m_thread_id.joinable()){m_thread_id.join();}}bool EventPoller::Add2Epoll(int fd){if(m_accept_handlers.count(fd) != 0){return false;}int flags = 1;if(ioctl(fd,FIONBIO,&flags) == -1){return false;}struct epoll_event ev;ev.events = EPOLLIN |EPOLLOUT |EPOLLET;ev.data.fd = fd;if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd,&ev)==-1){return false;}return true;}void EventPoller::DeliverConn(int conn_fd){//printf("DeliverConn fd = %d\n",conn_fd);uint64_t count = conn_fd;if(write(m_event_fd,&count,sizeof(count)) == -1){printf("Deliverconn write failed\n");}}bool EventPoller::AddListener(int fd,ACCEPTER on_accept){if(Add2Epoll(fd) == false){return false;}std::cout<<"EventPoller AddListener fd = "<<fd<<std::endl;m_accept_handlers[fd] = [this,on_accept]( int server_fd){for(;;){int new_fd = accept(server_fd,nullptr,nullptr);std::cout<<"accept client fd = "<<new_fd<<std::endl;	if(new_fd == -1){if(errno!= EAGAIN){Close(server_fd);}return 0;}int enable = 1;setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY,&enable,sizeof(enable));on_accept(new_fd);}return 0;};return true;}bool EventPoller::AddEventer(int fd, EVENTER on_event){if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_event](int cfd){for(;;){uint64_t count;if(read(cfd,&count,sizeof(count)) == -1){if(errno != EAGAIN){Close(cfd);}return 0;}on_event(count);}return 0;};return true;}bool EventPoller::AddReader(int fd, READER on_read){	if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_read](int cfd){for(;;){char buf[4096] = {0};ssize_t ret = read(cfd,buf,sizeof(buf));if(ret == -1){if(errno != EAGAIN){Close(cfd);}return -1;}if(ret == 0){Close(cfd);printf("客户端关闭了连接 %d\n",cfd);return 0 ;}on_read(cfd,buf,ret);}};return true;}void EventPoller::Close(int fd){m_accept_handlers.erase(fd);m_buffer_pool.erase(fd);close(fd);}bool EventPoller::FlushData(int fd, const char * buf, size_t len){WriteBuffer *wb = nullptr;auto it = m_buffer_pool.find(fd);if(it == m_buffer_pool.end()){while(len >0){ssize_t ret = write(fd,buf,len);if(ret == -1){if(errno != EAGAIN){Close(fd);return false;}wb = &m_buffer_pool[fd];break;}buf+= ret;len-=ret;}if(len == 0){//Successreturn true;}}else{wb = &it->second;}wb->Add2Buffer(buf,len);return true;}static size_t g_pool_size = 0;
void EventPollerPool::SetPoolSize(size_t size)
{g_pool_size = size;
}
EventPollerPool & EventPollerPool::Instance()
{static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool()); static EventPollerPool &s_instance_ref = *s_instance; return s_instance_ref; 
}EventPollerPool::EventPollerPool()
{auto size = g_pool_size;auto cpus = std::thread::hardware_concurrency();size = size > 0 ? size : cpus;std::cout<<"Thread size:"<<size<<std::endl;if(size <2)size = 2;for (int i = 0; i < size; ++i) {std::shared_ptr<EventPoller> poller = std::make_shared<EventPoller>(i);m_pollers.emplace_back(poller);}
}std::shared_ptr<EventPoller> EventPollerPool::GetPoller()
{if(m_pollers.size()>1){int min_clients = 10000;int target_index = 0;for(int i = 1;i<m_pollers.size();i++){if(m_pollers[i]-> GetClients() < min_clients){min_clients = m_pollers[i]->GetClients();target_index = i;}}//printf("target index = %d min_clients = %d\n",target_index,min_clients);return m_pollers[target_index];}return m_pollers[0];}
std::shared_ptr<EventPoller> EventPollerPool::GetFirstPoller()
{return m_pollers[0];
}void EventPollerPool::StartPollers()
{for(int i = 1;i<m_pollers.size();i++){m_pollers[i]->Init();int event_fd = m_pollers[i]->GetEventFD();m_pollers[i]->AddEventer(event_fd,[&,i](uint64_t cfd){READER reader = [&,i](int fd,const char*data,size_t len){printf("Len[%s] content[%d] m_pollers[i] = %p i = %d\n",data,len,m_pollers[i],i);m_pollers[i]->FlushData(fd,data,len);return 0;};m_pollers[i]->AddReader(cfd,reader);return 0;});m_pollers[i]->Start();	}
}void EventPollerPool::Stop()
{for(int i = 0;i<m_pollers.size();i++){m_pollers[i]->Stop();}}}

头文件

#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netinet/tcp.h>#include <sys/eventfd.h>
#include <signal.h>#include <iostream>
#include <memory>
#include <list>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>#include <unordered_map>namespace DURIAN
{class WriteBuffer{private:std::list<std::string> buf_items;size_t offset = 0;public:bool IsEmpty() const{return buf_items.empty();}void Add2Buffer(const char* data,size_t len){if(buf_items.empty() || buf_items.back().size()+len >4096){buf_items.emplace_back(data,len);}else{buf_items.back().append(data,len);}}bool WriteData(int fd){while (IsEmpty() == false){auto const &item = buf_items.front();const char *p = item.data() + offset;size_t len = item.size() -offset;while(len >0){ssize_t ret = write(fd,p,len);if(ret == -1){if(errno == EAGAIN){return true;}return false;}offset += ret;p+=ret;len-= ret;}buf_items.pop_front();}return true;}};using ACCEPTER = std::function<int(int)>;using WRITER = std::function<int(int)>;using EVENTER = std::function<int(int)>;using READER = std::function<int(int,const char *data,size_t)>;//static thread_local std::unordered_map<int fd,READER>g_th_handlers;class EventPoller{private:int m_poll_fd = -1;int m_id;bool m_run_flag = false;std::unordered_map<int,ACCEPTER> m_accept_handlers;std::unordered_map<int,WriteBuffer> m_buffer_pool;std::mutex m_connction_lock;int m_event_fd;std::thread m_thread_id ;std::vector<int>m_connections;void RunLoop();public:EventPoller(int i);~EventPoller();int GetEventFD();int GetClients();std::vector<int> & GetConnections();bool Init();void Start();void Stop();void Wait();	void DeliverConn(int conn_fd);bool AddListener(int fd,ACCEPTER on_listen);bool AddEventer(int fd,EVENTER on_event);bool AddReader(int fd,READER on_read);void Close(int fd);bool Add2Epoll(int fd);bool FlushData(int fd,const char *buf,size_t len);};class EventPollerPool{public:static EventPollerPool &Instance();static void SetPoolSize(size_t size = 0);std::shared_ptr<EventPoller>GetPoller(); std::shared_ptr<EventPoller>GetFirstPoller(); 	void StartPollers();void Stop(); private:int m_size;std::vector<std::shared_ptr<EventPoller>> m_pollers;EventPollerPool();		  };}

main文件

#include "durian.h"static bool g_run_flag = true;
void sig_handler(int signo)
{signal(SIGINT, SIG_IGN);signal(SIGTERM, SIG_IGN);signal(SIGKILL, SIG_IGN);g_run_flag = false;printf("Get exit flag\n");if (SIGINT == signo || SIGTSTP == signo || SIGTERM == signo|| SIGKILL == signo){g_run_flag = false;printf("\033[0;31mprogram exit by kill cmd !\033[0;39m\n");}}bool StartServer()
{int listen_fd = socket(AF_INET,SOCK_STREAM,0);if(listen_fd == -1){printf("Create socket failed\n");return false;}else{printf("Server listen fd is:%d\n",listen_fd);}int reuseaddr = 1;if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr ,sizeof(reuseaddr)) == -1){return false;}struct sockaddr_in listen_addr = {0};listen_addr.sin_family  = AF_INET;listen_addr.sin_addr.s_addr = INADDR_ANY;listen_addr.sin_port = htons(8888);if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1){printf("bind failed\n");return false;}if(listen(listen_fd,100) == -1){printf("listen failed\n");return false;}DURIAN::EventPollerPool::SetPoolSize(1);DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.StartPollers();auto poller = pool.GetFirstPoller(); if(poller->Init()){if(poller->AddListener(listen_fd,[&](int conn_fd){printf("将新的fd加到epoll监听 fd =%d\n",conn_fd);//Deliver client fd to other pollerspool.GetPoller()->DeliverConn(conn_fd);return 0;}) == false){return false;}poller->Start();}while(g_run_flag){sleep(2);}pool.Stop();}void StopServer()
{DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.Stop();
}int main(int argc,char *argv[])
{printf(" cpp version :%d\n",__cplusplus);int thread_size = 1;bool run_flag = true;signal(SIGPIPE,SIG_IGN);signal(SIGTERM, sig_handler);signal(SIGKILL, sig_handler);signal(SIGINT,sig_handler); StartServer();return 0;
}

性能测试

ulimit -HSn 102400

ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html
 

这篇关于基于多线程的Reactor模式的 回声服务器 EchoServer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/181181

相关文章

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

Linux服务器Java启动脚本

Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包, 通常我们会使用nohup直接启动,但是还是需要手动停止然后再次启动, 那如何更优雅的在服务器上启动jar包呢,让我们一起探讨一下吧。 1、初版 第一个版本是常用的做法,直接使用nohup后台启动jar包, 并将日志输出到当前文件夹n

多线程解析报表

假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。 Way1 join import java.time.LocalTime;public class Main {public static void main(String[] args) thro

模版方法模式template method

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/template-method 超类中定义了一个算法的框架, 允许子类在不修改结构的情况下重写算法的特定步骤。 上层接口有默认实现的方法和子类需要自己实现的方法

【iOS】MVC模式

MVC模式 MVC模式MVC模式demo MVC模式 MVC模式全称为model(模型)view(视图)controller(控制器),他分为三个不同的层分别负责不同的职责。 View:该层用于存放视图,该层中我们可以对页面及控件进行布局。Model:模型一般都拥有很好的可复用性,在该层中,我们可以统一管理一些数据。Controlller:该层充当一个CPU的功能,即该应用程序

迭代器模式iterator

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/iterator 不暴露集合底层表现形式 (列表、 栈和树等) 的情况下遍历集合中所有的元素

《x86汇编语言:从实模式到保护模式》视频来了

《x86汇编语言:从实模式到保护模式》视频来了 很多朋友留言,说我的专栏《x86汇编语言:从实模式到保护模式》写得很详细,还有的朋友希望我能写得更细,最好是覆盖全书的所有章节。 毕竟我不是作者,只有作者的解读才是最权威的。 当初我学习这本书的时候,只能靠自己摸索,网上搜不到什么好资源。 如果你正在学这本书或者汇编语言,那你有福气了。 本书作者李忠老师,以此书为蓝本,录制了全套视频。 试

利用命令模式构建高效的手游后端架构

在现代手游开发中,后端架构的设计对于支持高并发、快速迭代和复杂游戏逻辑至关重要。命令模式作为一种行为设计模式,可以有效地解耦请求的发起者与接收者,提升系统的可维护性和扩展性。本文将深入探讨如何利用命令模式构建一个强大且灵活的手游后端架构。 1. 命令模式的概念与优势 命令模式通过将请求封装为对象,使得请求的发起者和接收者之间的耦合度降低。这种模式的主要优势包括: 解耦请求发起者与处理者

速盾:直播 cdn 服务器带宽?

在当今数字化时代,直播已经成为了一种非常流行的娱乐和商业活动形式。为了确保直播的流畅性和高质量,直播平台通常会使用 CDN(Content Delivery Network,内容分发网络)服务器来分发直播流。而 CDN 服务器的带宽则是影响直播质量的一个重要因素。下面我们就来探讨一下速盾视角下的直播 CDN 服务器带宽问题。 一、直播对带宽的需求 高清视频流 直播通常需要传输高清视频