epoll+线程池模型

2024-08-25 22:52
文章标签 线程 模型 epoll

本文主要是介绍epoll+线程池模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

🔥博客主页: 我要成为C++领域大神
🎥系列专栏:【C++核心编程】 【计算机网络】 【Linux编程】 【操作系统】
❤️感谢大家点赞👍收藏⭐评论✍️

本博客致力于知识分享,与更多的人进行学习交流

负载均衡技术

大量的用户请求可能导致任务分发不均匀,导致资源浪费,不能很好的处理和响应

通过预先设定的分发策略,最大的尝试均匀分发业务,让每台处理机都有任务负载

代理服务器

代理服务器是一个以数据中转为主要职责的中间件,代理服务器可以将用户的请求中转给处理服务器机,也可以将结果反馈给用户,避免用户直接访问服务器主机,提高安全性(安全策略都可以部署在代理服务器中),还可以进行任务的控制与分发,例如负载均衡可以在代理服务器中完成

HA高可用性结构

某个处理机宕机,可用通过HA概念将数据任务转发给正常的处理机。察觉处理机异常,快速反应

线程池设计原则

epoll具有强大的socket监听能力,可以快速察觉所有套接字,线程池具有高并发处理能力,大量的用户请求可用快速处理。

1)提高线程重用性,线程不能与用户绑定,可重复为多个用户处理业务,避免频繁创建销毁线程,减少不必要的开销。

2)预创建,提前准备好部分线程待用,用户发送请求后直接选择线程处理,提高响应速度

3)线程管理策略,设定线程池阈值,通过阈值管理调度线程,线程扩容与缩减

4)为服务器提供并发处理能力,可以更快处理请求或业务

5)提高线程池的重用性,用户实现任务,线程池负责执行任务

6)线程池使用生产者消费者实现,任务传递模式

工作流程

线程池:

生产者将要处理的业务传递到任务队列中去,如果任务队列中有可获取的任务,消费者一直获取执行,生产者投递的任务不允许持续占用消费者。管理者会对消费者进行扫描,根据阈值检测是否需要扩容和缩减,对消费者进行创建或者杀死。

epoll模型:

生产者负责实现epoll模型,将事件转换成业务,投递到线程池中

线程池的扩容和缩减

使用线程池最小阈值min,作为扩容增减数量

扩容:当前任务量>=闲消费者数量 或者 忙线程数量占活线程数量的70%

缩减:当前线程数量+扩容量,小于最大线程阈值

消费者与管理者配合实现缩减

epoll的水平触发模式和边缘触发模式

epoll 的水平触发(Level Triggered, LT)和边缘触发(Edge Triggered, ET)是两种不同的事件通知机制,它们定义了 epoll 如何向应用程序报告文件描述符上的事件。


水平触发(LT)

在水平触发模式下,只要满足条件的事件仍然存在,epoll 就会重复通知这个事件。比如,如果一个文件描述符上有可读数据,那么只要没有读完,epoll_wait 就会不断报告该文件描述符是可读的。这种模式的特点是:

容错性较好,不易丢失事件。

更易于编程和理解。

可以用于多线程程序中,多个线程可以共享同一个 epoll 文件描述符。

缺点:开销大,往返在socket缓冲区和用户之间

在水平模式下,我们的epoll+线程池模型有问题,当第一轮事件监听未处理完毕,epoll_wait不会阻塞,当再次有客户端发送任务时epoll_wait立即返回,并且会对任务进行误添加。

可以以边缘触发模式监听socket的读事件来避免这种问题,node.events=EPOLL | EPOLLET


边缘触发(ET)

边缘触发模式下,事件只在状态变化时被通知一次,之后即使条件仍然满足,也不会再次通知,直到状态再次发生变化。例如,只有当新数据到达使得文件描述符从非可读变为可读时,epoll_wait 才会报告可读事件。边缘触发模式的特点是:

效率更高,因为它减少了事件的重复通知。

需要更加小心地处理每次通知,确保处理所有的数据,否则可能会丢失未处理完的数据。

更适合单线程或者每个线程使用独立 epoll 文件描述符的场景。

代码实现

文件结构:

makefile:

server.h

#ifndef __server_H__
#define __server_H__#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>#define TIMEOUT 1
pthread_mutex_t lock;typedef struct
{void *(*bussiness)(void *);//任务函数指针void *arg;}bs_t;typedef struct
{int thread_shutdown;//线程池开关int thread_max;//最大线程数int thread_min;//最小线程数int thread_alive;//有效线程数int thread_busy;//忙线程数量int kill_number;//缩减码bs_t *queue;//任务队列int front;int rear;int cur;int max;pthread_cond_t Not_Full;pthread_cond_t Not_Empty;pthread_t * ctids;//存储消费者IDpthread_t mtid;//存储管理者ID
}pool_t;//线程池类型pool_t * thread_pool_create(int Max,int Min,int Qmax);//线程池初始化
int Producer_add_task(pool_t * p,bs_t bs);//生产者添加任务模块,执行一次添加一次任务
void *Customer_thread(void *arg);//消费者线程,参数为线程池地址
void *Manager_thread(void *arg);//管理者线程,参数为线程池地址
int thread_pool_destroy(pool_t *p);//销毁线程池
void * user_bussiness(void *arg);//自定义线程任务
int is_thread_alive(pthread_t pid);

thread_pool_create.c

#include "../include/server.h"pool_t * thread_pool_create(int Max,int Min,int Qmax)
{pool_t *ptr=NULL;if((ptr=(pool_t*)malloc(sizeof(pool_t)))==NULL){perror("thread_pool_create malloc pool failed");exit(0);}ptr->thread_shutdown=1;ptr->thread_max=Max;ptr->thread_min=Min;ptr->thread_alive=0;ptr->kill_number=0;ptr->thread_busy=0;if((ptr->queue=(bs_t*)malloc(sizeof(bs_t)*Qmax))==NULL){perror("thread_pool_create malloc queue failed");exit(0);}if((ptr->ctids=(pthread_t*)malloc(sizeof(pthread_t)*Max))==NULL){perror("thread_pool_create malloc ctids failed");exit(0);}ptr->front=0;ptr->rear=0;ptr->cur=0;ptr->max=Qmax;if(pthread_cond_init(&ptr->Not_Full,NULL)!=0 || pthread_cond_init(&ptr->Not_Empty,NULL)!=0|| pthread_mutex_init(&lock,NULL)!=0){printf("thread pool create failed,init Cond or Lock Failed\n");exit(0);}int err;for(int i=0;i<Min;++i){if((err=pthread_create(&ptr->ctids[i],NULL,Customer_thread,(void*)ptr))!=0){printf("thread pool create failed,customer thread create failed:%s",strerror(err));exit(0);}++ptr->thread_alive;}if((err=pthread_create(&ptr->mtid,NULL,Manager_thread,(void*)ptr))!=0){printf("thread pool create failed,manager thread create failed:%s",strerror(err));exit(0);}pthread_create(&ptid,NULL,print_thread,(void*)ptr);printf("Print Thread Create Success...\n");return ptr;
}

thread_pool_destroy.c

#include "../include/server.h"int thread_pool_destroy(pool_t *p)
{pthread_mutex_destroy(&lock);pthread_cond_destroy(&p->Not_Full);pthread_cond_destroy(&p->Not_Empty);free(p->ctids);free(p->queue);free(p);return 0;
}

Epoll_Listen.c

#include "../include/server.h"int Epoll_Listen(int serverfd,pool_t * p)
{struct epoll_event ready_array[EPOLLMAX];int ready,flag;bs_t tmp;printf("Epoll_Thread Server,Epoll_Listen Running...\n");while(p->thread_shutdown){if((ready=epoll_wait(epfd,ready_array,EPOLLMAX,-1))==-1){perror("Epoll_Listen call failed,epoll_wait call failed");exit(0);}flag=0;while(ready){if(ready_array[flag].data.fd==serverfd){tmp.business=Business_Accept;tmp.arg=(void *)&serverfd;Producer_add_task(p,tmp);}else{tmp.business=Business_Retime;tmp.arg=((void*)&ready_array[flag].data.fd);Producer_add_task(p,tmp);}++flag;--ready;}}close(serverfd);return 0;
}

Manager_thread.c

#include "../include/server.h"void * Manager_thread(void *arg)
{pthread_detach(pthread_self());pool_t *p=(pool_t *)arg;int alive,cur,busy;pthread_mutex_lock(&lock);alive=p->thread_alive;busy=p->thread_busy;cur=p->cur;pthread_mutex_unlock(&lock);int add,flag;//持续执行while(p->thread_shutdown){if((cur>=alive-busy||(double)busy/alive*100>=70)&&alive+p->thread_min<=p->thread_max){for(flag=0,add=0;flag<p->thread_max&&add<p->thread_min;flag++){if(p->ctids[flag]==0 || !is_thread_alive(p->ctids[flag])){pthread_create(&p->ctids[flag],NULL,Customer_thread,(void*)p);add++;pthread_mutex_lock(&lock);++(p->thread_alive);pthread_mutex_unlock(&lock);}}pthread_kill(ptid,SIGUSR1);}if(busy*2<=alive-busy && alive-p->thread_min >= p->thread_min){printf("%d\n",p->thread_min);pthread_mutex_lock(&lock);p->kill_number=p->thread_min;pthread_mutex_unlock(&lock);for(int i=0;i<p->thread_min;++i){pthread_cond_signal(&p->Not_Empty);}}sleep(TIMEOUT);}printf("Thread shutdown 0,manager thread[0x%x]exiting...\n",(unsigned int)pthread_self());pthread_exit(NULL);
}

Business_Retime.c

#include "../include/server.h"void * Business_Retime(void *arg)
{int toupper_flag=0;char recv_buf[1024];bzero(recv_buf,sizeof(recv_buf));char time_buf[100];bzero(time_buf,sizeof(time_buf));time_t tp;int recvlen;int sockfd=*(int*)arg;printf("111\n");while((recvlen=recv(sockfd,recv_buf,sizeof(recv_buf),MSG_DONTWAIT))==-1){if(errno==EINTR)break;perror("Business_Retime recv call failed");exit(0);}if(recvlen>0){if (strcmp(recv_buf, "localtime") == 0) {tp = time(NULL); // 获取时间种子ctime_r(&tp, time_buf);time_buf[strcspn(time_buf, "\n")] = '\0';send(sockfd, time_buf, strlen(time_buf) + 1, MSG_NOSIGNAL);bzero(time_buf, sizeof(time_buf));} else {toupper_flag = 0;while (recvlen > toupper_flag) {recv_buf[toupper_flag] = toupper(recv_buf[toupper_flag]);++toupper_flag;}send(sockfd, recv_buf, recvlen, MSG_NOSIGNAL);bzero(recv_buf, sizeof(recv_buf));}}else if(recvlen==0){close(sockfd);epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,NULL);}return NULL;
}

Epoll_Create.c

#include "../include/server.h"int Epoll_Create(int serverfd)
{int epfd;if((epfd=epoll_create(EPOLLMAX))==-1){perror("Epoll_Create call failed,epoll_create call failed");exit(0);}struct epoll_event node;node.data.fd=serverfd;node.events=EPOLLIN|EPOLLET;if((epoll_ctl(epfd,EPOLL_CTL_ADD,serverfd,&node))==-1){perror("Epoll_Create call failed,epoll_ctl call failed");exit(0);}printf("Epoll_Server Epoll Create success...\n");return epfd;}

Business_Accept.c

#include "../include/server.h"void *Business_Accept(void *arg)
{struct sockaddr_in addr;socklen_t addrlen;int sockfd=*(int *)arg;int customerfd;struct epoll_event node;char response[1024];char ip[16];bzero(response,sizeof(response));bzero(ip,sizeof(ip));addrlen=sizeof(addr);if((customerfd=accept(sockfd,(struct sockaddr*)&addr,&addrlen))==-1){perror("Business_Accept accept call failed");exit(0);}inet_ntop(AF_INET,&addr.sin_addr.s_addr,ip,16);node.data.fd=customerfd;node.events=EPOLLIN|EPOLLET;if(epoll_ctl(epfd,EPOLL_CTL_ADD,customerfd,&node)==-1){perror("Business_Accept epoll_ctl call failed");exit(0);}sprintf(response,"hi Thread [%s] welcome to epoll demo",ip);send(customerfd,response,strlen(response),MSG_NOSIGNAL);return NULL;
}

print_thread.c

#include "../include/server.h"void *print_thread(void*arg)
{pthread_detach(pthread_self());pool_t *ptr=(pool_t*)arg;PTR=ptr;struct sigaction act,oact;act.sa_handler=sig_usr;act.sa_flags=0;sigemptyset(&act.sa_mask);sigaction(SIGUSR1,&act,&oact);//设置捕捉sigprocmask(SIG_SETMASK,&act.sa_mask,NULL);//解除屏蔽while(ptr->thread_shutdown)sleep(TIMEOUT);//等待信号pthread_exit(NULL);
}

sig_usr.c

#include "../include/server.h"void sig_usr(int n)
{//显示一次阈值信息printf("[Thread_Epoll_Server Info] alive[%d] busy[%d] Idel[%d] Cur[%d] Busy/Alive[%.2f%%] Alive/max[%.2f%%]\n",
PTR->thread_alive,PTR->thread_busy,PTR->thread_alive-PTR->thread_busy,PTR->cur,(double)PTR->thread_busy/PTR->thread_alive*100,(double)PTR->thread_alive/PTR->thread_max*100);
} 

Producer_add_task.c

#include "../include/server.h"int Producer_add_task(pool_t *p,bs_t bs)
{if(p->thread_shutdown){//上锁pthread_mutex_lock(&lock);while(p->cur==p->max){pthread_cond_wait(&p->Not_Full,&lock);if(!p->thread_shutdown){pthread_mutex_unlock(&lock);printf("thread shutdown 0,exiting...\n");pthread_exit(NULL);}}//添加一个业务p->queue[p->front].business=bs.business;p->queue[p->front].arg=bs.arg;++p->cur;p->front=(p->front+1)%p->max;//解锁pthread_mutex_unlock(&lock);pthread_kill(ptid,SIGUSR1);//唤醒一个消费者pthread_cond_signal(&p->Not_Empty);}else{printf("thread shutdown 0,exiting...\n");pthread_exit(NULL);}printf("Producer Thread [0x%x] Add Task Successfully,business_addr=%p\n",(unsigned int)pthread_self(),bs.business);return 0;
}

Net_init.c

#include "../include/server.h"int Net_init(void)
{int sockfd;struct sockaddr_in sockAddr;bzero(&sockAddr,sizeof(sockAddr));sockAddr.sin_family=AF_INET;sockAddr.sin_port=htons(8080);sockAddr.sin_addr.s_addr=htonl(INADDR_ANY);if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1){perror("socket call failed");exit(0);}if(bind(sockfd,(struct sockaddr*)&sockAddr,sizeof(sockAddr))==-1){perror("bind call failed");exit(0);}listen(sockfd,BACKLOG);printf("Epoll_thread Server Net init Success...\n");return sockfd;
}

is_thread_alive.c

#include "../include/server.h"int is_thread_alive(pthread_t tid)
{pthread_kill(tid,0);if(errno==ESRCH)return 0;return 1;}

main.c

#include "../include/server.h"int main(void)
{//主线程设置对SIGUSR1信号的屏蔽,继承给所有线程sigset_t set,oset;sigemptyset(&set);sigaddset(&set,SIGUSR1);sigprocmask(SIG_SETMASK,&set,&oset);//启动接口startint sockfd=Net_init();epfd=Epoll_Create(sockfd);pool_t *ptr=thread_pool_create(300,10,1000);Epoll_Listen(sockfd,ptr);if(!ptr->thread_shutdown) thread_pool_destroy(ptr);printf("Epoll Server Closing...\n");return 0;
}

运行结果

在服务端刚启动后,有13个线程,一个生产者,十个消费者,一个管理者,一个输出线程

这篇关于epoll+线程池模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1106875

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

Retrieval-based-Voice-Conversion-WebUI模型构建指南

一、模型介绍 Retrieval-based-Voice-Conversion-WebUI(简称 RVC)模型是一个基于 VITS(Variational Inference with adversarial learning for end-to-end Text-to-Speech)的简单易用的语音转换框架。 具有以下特点 简单易用:RVC 模型通过简单易用的网页界面,使得用户无需深入了

透彻!驯服大型语言模型(LLMs)的五种方法,及具体方法选择思路

引言 随着时间的发展,大型语言模型不再停留在演示阶段而是逐步面向生产系统的应用,随着人们期望的不断增加,目标也发生了巨大的变化。在短短的几个月的时间里,人们对大模型的认识已经从对其zero-shot能力感到惊讶,转变为考虑改进模型质量、提高模型可用性。 「大语言模型(LLMs)其实就是利用高容量的模型架构(例如Transformer)对海量的、多种多样的数据分布进行建模得到,它包含了大量的先验

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}

秋招最新大模型算法面试,熬夜都要肝完它

💥大家在面试大模型LLM这个板块的时候,不知道面试完会不会复盘、总结,做笔记的习惯,这份大模型算法岗面试八股笔记也帮助不少人拿到过offer ✨对于面试大模型算法工程师会有一定的帮助,都附有完整答案,熬夜也要看完,祝大家一臂之力 这份《大模型算法工程师面试题》已经上传CSDN,还有完整版的大模型 AI 学习资料,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

AI Toolkit + H100 GPU,一小时内微调最新热门文生图模型 FLUX

上个月,FLUX 席卷了互联网,这并非没有原因。他们声称优于 DALLE 3、Ideogram 和 Stable Diffusion 3 等模型,而这一点已被证明是有依据的。随着越来越多的流行图像生成工具(如 Stable Diffusion Web UI Forge 和 ComyUI)开始支持这些模型,FLUX 在 Stable Diffusion 领域的扩展将会持续下去。 自 FLU

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号