基于多反应堆的高并发服务器【C/C++/Reactor】(中)子线程 WorkerThread的实现 和 线程池ThreadPool的初始化

本文主要是介绍基于多反应堆的高并发服务器【C/C++/Reactor】(中)子线程 WorkerThread的实现 和 线程池ThreadPool的初始化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、子线程 WorkerThread的实现

(1)工作线程

  • 线程ID:每个线程都有一个唯一的ID,用于标识
  • 线程的名字:非必需,主要用于识别线程
  • 互斥锁:线程同步
  • 条件变量:线程阻塞
  • EventLoop:在每个子线程里边都有一个反应堆模型

// 定义子线程对应的结构体
struct WokerThread {pthread_t threadID;// 线程IDchar name[24];// 线程名字pthread_mutex_t mutex;// 互斥锁(线程同步)pthread_cond_t cond;// 条件变量(线程阻塞)struct EventLoop* evLoop;// 事件循环(反应堆模型)// 在每个子线程里边都有一个反应堆模型
};

(2)工作线程初始化

// 初始化
int workerThreadInit(struct WokerThread* thread, int index);
// 初始化
int workerThreadInit(struct WokerThread* thread, int index) {thread->threadID = 0;// 线程IDsprintf(thread->name, "SubThread-%d", index);// 线程名字// 指定为NULL,表示使用默认属性pthread_mutex_init(&thread->mutex, NULL);// 互斥锁pthread_cond_init(&thread->cond, NULL);// 条件变量thread->evLoop = NULL;// 事件循环(反应堆模型)return 0;
}

(3)启动线程

// 启动线程
void workerThreadRun(struct WokerThread* thread);
// 子线程的回调函数
void* subThreadRunning(void* arg) {struct WokerThread* thread = (struct WokerThread*)arg;// 还有子线程里边的evLoop是共享资源,需要添加互斥锁pthread_mutex_lock(&thread->mutex);// 加锁thread->evLoop = eventLoopInitEx(thread->name);pthread_mutex_unlock(&thread->mutex);// 解锁pthread_cond_signal(&thread->cond);// 发送信号(唤醒主线程,通知主线程解除阻塞)eventLoopRun(thread->evLoop);// 启动反应堆模型return NULL;
}// 启动线程
void workerThreadRun(struct WokerThread* thread) {// 创建子线程pthread_create(&thread->threadID, NULL, subThreadRunning, thread);/*在这里阻塞主线程的原因是:在于子线程的反应堆模型是否被真正的创建出来了?因此,可以判断一下thread->evLoop是否为NULL,如果等于NULL,说明子线程反应堆模型还没有被初始化完毕,没有初始化完毕,我们就阻塞主线程*/// 阻塞主线程,让当前函数不会直接结束pthread_mutex_lock(&thread->mutex);while(thread->evLoop == NULL) { // 多次判断pthread_cond_wait(&thread->cond, &thread->mutex);// 子线程的回调函数(subThreadRunning)里调用pthread_cond_signal(&thread->cond)可以解除这里的阻塞}pthread_mutex_unlock(&thread->mutex);
}

二、线程池ThreadPool的初始化 

  • ThreadPool.h 
// 定义线程池
struct ThreadPool {/*在线程池里边的这个mainLoop主要是做备份用的,主线程里边的mainLoop主要负责和客户端建立连接,只负责这一件事情,只有当线程池没有子线程这种情况下,mainLoop才负责处理和客户端的连接,否则的话它是不管其他事情的。*/struct EventLoop* mainLoop; // 主线程的反应堆模型int index; bool isStart;int threadNum; // 子线程总个数struct WorkerThread* workerThreads;
};

在线程池里边其实管理一个WorkerThreads数组,在这个数组里边有若干个元素,每个元素里边都有一个WorkerThread对象,把这若干个子线程对象初始化出来。之后,每个子线程里边都有一个EventLoop(反应堆模型),子线程处理的任务其实就是EventLoop里边的任务。

除此之外,在线程池里边还可以添加一个变量用来标记当前线程池是否启动:isStart默认情况下,肯定是不启动的状态的,即isStart = false;

ThreadNum用来记录当前线程池里边的子线程的个数,就是WorkerThreads数组的元素个数,它们有对应关系。

index记录线程池里边子线程的编号,通过这个index就能够访问线程池里边某个线程。

>>思考:为什么我们要访问线程池里边的某一个线程?

场景:主线程和客户端建立了连接,之后,就需要和客户端通信,那么这个通信的文件描述符和客户端通信流程需要交给子线程去处理,每个子线程里边都有一个EventLoop(反应堆模型),就需要把这个通信的文件描述符交给这个反应堆模型去管理,检测这个文件描述符的事件,如果有读事件,说明客户端有数据发送过来,那么我们就需要接收数据,然后再给客户端回复数据,这些都是在子线程的反应堆模型里边做的。所以当这个连接建立之后,主线程就要从线程池里边找出一个子线程,并且把这个任务给到子线程去处理。

但是存在一个问题:假设只有三个子线程,在找子线程的时候,不可能每次都找workerThread1,如果每次都是workerThread1,那么workerThread2, workerThread3就空闲下来了,为了雨露均沾,可以用index表示当前访问的线程到底是谁,如果这次访问线程1,那么下次就是线程2。如果这次访问线程2,那么下次就是线程3。如果这次访问线程3,那么下次就是线程1。

还有另外一个反应堆模型,这个是属于主线程的,并不用在ThreadPool线程池里边再去实例化一个EventLoop。其实是把主线程的这个EventLoop给到线程池,目的是:假设说这个线程池的ThreadNum=0,此时这个线程池里边就没有子线程了,也就没有反应堆模型了,也就没有办法工作,在这种情况下,我们就可以让线程池使用主线程的反应堆模型,这样就能够保证线程池还能够继续工作,只不过在当前服务器框架里边,它的反应堆模型就从多个变成了一个,所有的任务都变成了全部由主线程的反应堆模型来处理了。

在线程池里边的这个mainLoop主要是做备份用的,主线程里边的mainLoop主要负责和客户端建立连接,只负责这一件事情,只有当线程池没有子线程这种情况下,mainLoop才负责处理和客户端的连接,否则的话它是不管其他事情的。

总结:

(1)定义与作用:线程池是若干个线程的集合,主要用于管理复用线程,减少线程的创建和销毁的开销

(2)结构组成

  • WorkerThreads数组:存储子线程对象,每个元素代表一个workerThread对象

  • ThreadNumber:记录线程池中线程的个数,与WorkerThreads数组的元素个数对应

  • index:记录线程池中子线程的编号,用于访问特定线程

  • 主线程的EventLoop:当线程池中无子线程时,主线程的EventLoop可作为备选方案,确保线程池仍能处理任务

(3)工作原理:当连接建立后,主线程从线程池中选择一个子线程来处理任务。每个子线程都有一个内部的反应堆模型(EventLoop)来处理任务。 如果线程池中没有子线程,主线程的反应堆模型将被使用,确保线程池仍能工作

(4)启动与状态标记:通过一个布尔类型的变量来标记线程池是否已启动

  • ThreadPool.h  
// 初始化线程池
struct ThreadPool* threadPoolInit(struct EventLoop* mainLoop, int threadNum);
  • ThreadPool.c
// 初始化线程池
struct ThreadPool* threadPoolInit(struct EventLoop* mainLoop, int threadNum) {struct ThreadPool* pool = (struct ThreadPool*)malloc(sizeof(struct ThreadPool));pool->mainLoop = mainLoop; // 主线程的反应堆模型pool->index = 0;pool->isStart = false;pool->threadNum = threadNum; // 子线程总个数pool->workerThreads = (struct WokerThread*)malloc(sizeof(struct WokerThread) * threadNum); // 子线程数组return pool;
}

>>内容概要 :本文主要介绍了线程池的概念、结构组成和工作原理

>>核心观点 :

1. 线程池是若干个线程的集合,用于管理和复用线程,减少线程的创建和销毁开销

2. 线程池包括WorkerThreads数组、ThreadNumber、index等结构组成

3. 工作原理是主线程从线程池中选择一个子线程来处理任务,每个子线程都有一个反应堆模型来处理任务

这篇关于基于多反应堆的高并发服务器【C/C++/Reactor】(中)子线程 WorkerThread的实现 和 线程池ThreadPool的初始化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/569182

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

【C++ Primer Plus习题】13.4

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: main.cpp #include <iostream>#include "port.h"int main() {Port p1;Port p2("Abc", "Bcc", 30);std::cout <<

C++包装器

包装器 在 C++ 中,“包装器”通常指的是一种设计模式或编程技巧,用于封装其他代码或对象,使其更易于使用、管理或扩展。包装器的概念在编程中非常普遍,可以用于函数、类、库等多个方面。下面是几个常见的 “包装器” 类型: 1. 函数包装器 函数包装器用于封装一个或多个函数,使其接口更统一或更便于调用。例如,std::function 是一个通用的函数包装器,它可以存储任意可调用对象(函数、函数

C++11第三弹:lambda表达式 | 新的类功能 | 模板的可变参数

🌈个人主页: 南桥几晴秋 🌈C++专栏: 南桥谈C++ 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据库学习专栏: 南桥谈MySQL 🌈Qt学习专栏: 南桥谈Qt 🌈菜鸡代码练习: 练习随想记录 🌈git学习: 南桥谈Git 🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈�

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo