Linux基础 -- pthread之线程池任务调度

2024-09-04 22:04

本文主要是介绍Linux基础 -- pthread之线程池任务调度,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

线程池任务依赖设计方案

1. 设计目标

为了在多线程环境中支持任务间的依赖关系,我们设计了一个基于 pthread_create 封装的线程池,任务之间可以设置依赖,只有在依赖的任务完成后,依赖任务才会被执行。该设计目标是简化任务调度的逻辑,让开发者可以专注于任务的编写,而不必关注复杂的线程管理和任务依赖的执行顺序。

2. 核心概念

2.1 任务(Task)

任务是线程池中执行的最小单位。每个任务包含以下信息:

  • 任务函数:待执行的工作函数。
  • 参数:传递给任务函数的参数。
  • 依赖任务:任务可以依赖其他任务,只有依赖的任务执行完成,当前任务才能执行。
  • 依赖计数:记录任务依赖的其他任务数量,每当一个依赖任务完成,该计数器减 1,直到依赖计数为 0 时,任务才会被执行。

2.2 任务队列

任务队列是一个先进先出的队列,用于存储待执行的任务。线程池中的线程会从任务队列中取出任务并执行。为了支持任务依赖,只有当任务的依赖任务全部完成后,任务才会被放入任务队列中。

2.3 线程池(Thread Pool)

线程池是线程管理的核心组件,它包含以下部分:

  • 线程数组:线程池中的线程,负责从任务队列中获取任务并执行。
  • 任务队列:用于存储待执行任务的队列。
  • 互斥锁和条件变量:保证线程安全地访问任务队列,并通过条件变量实现任务的调度。

3. 线程池工作原理

3.1 任务的添加与依赖处理

当开发者向线程池提交一个任务时,可以同时提交该任务所依赖的其他任务。如果任务没有依赖任务,任务会被立即加入任务队列中,等待线程执行。如果任务有依赖任务,任务会暂时挂起,直到其所有依赖任务完成。

每当一个任务执行完成后,线程池会检查是否有其他任务依赖该任务。如果有,线程池会减少依赖任务的依赖计数器,当计数器为 0 时,将该依赖任务放入任务队列中。

3.2 线程的工作流程

线程池中的每个线程都会执行以下操作:

  1. 从任务队列中取出一个任务。
  2. 执行任务函数。
  3. 任务执行完成后,通知所有依赖该任务的其他任务,减少这些任务的依赖计数。
  4. 如果依赖计数为 0,则将依赖任务放入任务队列中。

3.3 线程池的关闭与销毁

在销毁线程池时,线程池会通知所有线程停止工作,等待所有线程结束后,释放所有分配的内存资源,确保程序无资源泄漏。

4. 主要流程

4.1 任务提交流程

  1. 开发者调用线程池的 add_task 接口提交任务。
  2. 如果任务没有依赖,立即将任务加入任务队列中。
  3. 如果任务有依赖,记录依赖关系,并等待依赖任务完成。

4.2 任务执行流程

  1. 线程从任务队列中获取一个任务。
  2. 执行任务。
  3. 检查是否有其他任务依赖于该任务。
  4. 如果有依赖任务,减少依赖任务的计数器。
  5. 如果依赖计数器为 0,将依赖任务放入任务队列。

4.3 任务依赖完成通知

  1. 任务完成后,通知所有依赖它的任务。
  2. 对每个依赖任务,减少依赖计数器。
  3. 当依赖计数器为 0 时,将该任务加入任务队列,等待执行。

5. 锁与同步机制

为了保证多线程环境下任务队列和任务依赖关系的安全性,线程池使用互斥锁和条件变量:

  • 互斥锁:保护任务队列和任务依赖数据的并发访问。
  • 条件变量:用于通知线程任务队列中有新任务需要处理,避免线程一直忙等待。

6. 优势与适用场景

6.1 优势

  • 简化任务依赖管理:通过自动管理任务依赖,减少开发者手动处理依赖逻辑的负担。
  • 高效的线程复用:线程池中的线程复用,避免频繁创建和销毁线程带来的性能开销。
  • 支持任务间复杂依赖:通过依赖计数和依赖队列,可以支持复杂的任务依赖关系,比如任务链和任务图。

6.2 适用场景

  • 多任务的复杂调度:多个任务之间存在依赖关系的场景,例如图像处理中的多个滤波器操作、数据处理管道等。
  • 任务并行化:需要将一系列任务并行化执行,且任务之间存在顺序依赖。
  • 性能优化:通过复用线程池减少频繁线程创建和销毁的开销,适用于对性能要求较高的应用场景。

7. 扩展功能

  • 动态调整线程池大小:根据系统的负载情况,动态调整线程池中的线程数量,以适应不同的任务需求。
  • 任务优先级支持:为任务设置优先级,高优先级的任务可以优先执行。
  • 任务超时处理:增加任务超时功能,确保长时间未完成的任务可以被正确处理,避免任务卡住。

8. 线程池的设计细节

8.1 任务结构设计

每个任务封装了任务函数及其参数,并且通过依赖计数器来追踪当前任务的依赖任务。任务的主要结构包括以下字段:

  • func:任务函数指针,表示具体的工作逻辑。
  • arg:任务函数的参数。
  • dependencies:一个指向依赖任务数组的指针,用于保存该任务所依赖的其他任务。
  • dep_count:依赖任务的总数量,用于初始化 remaining_deps 计数器。
  • remaining_deps:当前任务未完成的依赖任务计数器。当其值为 0 时,任务将可以被执行。

8.2 线程池结构设计

线程池通过维护一个任务队列来管理任务执行,同时通过线程数组来管理线程的执行。主要结构包括:

  • threads:线程数组,用于保存所有创建的线程。
  • task_queue:任务队列,采用链表的形式,存储待执行的任务。
  • lock:互斥锁,保证任务队列在多线程环境下的安全访问。
  • notify:条件变量,用于在线程空闲时唤醒它们执行新的任务。
  • thread_count:线程池中线程的数量。
  • stop:线程池是否已经停止的标志,线程池销毁时设为 true,通知所有线程退出。

8.3 锁与条件变量的使用

  • 锁(Mutex):任务队列的操作需要线程安全,因此我们在对任务队列进行增删操作时,必须加锁保护,确保只有一个线程可以修改任务队列。
  • 条件变量(Condition Variable):当任务队列为空时,线程会进入等待状态,避免忙等待浪费 CPU 资源。当有新任务加入队列时,条件变量会通知空闲线程执行任务。

9. 线程池中的任务调度

9.1 任务提交与依赖关系处理

  1. 无依赖任务:如果任务没有依赖任务,则直接将其加入任务队列,等待线程执行。
  2. 有依赖任务:如果任务有依赖任务,则该任务的 remaining_deps 被设置为依赖任务的数量。当依赖任务完成时,remaining_deps 计数器减少,直到所有依赖任务完成,remaining_deps 为 0,该任务才会被放入任务队列。

9.2 任务执行与依赖任务通知

  1. 当线程从任务队列中取出任务并执行时,首先完成当前任务的工作。
  2. 任务执行完成后,线程检查是否有其他任务依赖于该任务。对于每个依赖任务,减少其 remaining_deps 计数。
  3. 如果依赖任务的 remaining_deps 变为 0,则该任务被放入任务队列,准备执行。

9.3 任务的优先执行

默认情况下,任务是按提交顺序被执行的。如果需要支持优先级,可以为每个任务增加优先级字段,并修改任务队列的插入逻辑,使高优先级任务排在前面。这样可以确保高优先级任务优先执行。

10. 销毁线程池的安全性

10.1 停止标志

当用户请求销毁线程池时,线程池需要安全地通知所有工作线程退出。我们使用 stop 标志来表示线程池是否处于停止状态。当该标志被设置为 true 时,所有线程会在任务执行完毕后退出。

10.2 资源清理

线程池销毁时,需要确保以下资源被正确释放:

  • 线程数组:所有线程在退出前需要 pthread_join,以确保线程正常结束。
  • 任务队列:在销毁线程池前,所有未执行的任务需要被清理,以避免内存泄漏。
  • 锁与条件变量:在释放线程池资源时,需要销毁互斥锁和条件变量。

11. 性能优化建议

11.1 任务批处理

如果任务依赖性较小,线程池可以批量处理任务,减少频繁的锁开销。例如,多个无依赖的任务可以一次性加入任务队列,线程从队列中批量取出任务执行,减少锁的竞争。

11.2 线程池动态扩展

根据系统负载情况,线程池的线程数量可以动态调整。例如,在任务过多时,临时增加线程处理任务;当任务量减少时,减少线程的数量,以避免线程资源浪费。

11.3 任务超时处理

为每个任务设定超时时间,避免任务长时间占用线程。超时未完成的任务可以被取消或重新调度,这对于高实时性要求的系统尤其重要。

12. 常见问题与处理

12.1 任务依赖死锁

在设计任务依赖时,必须确保任务的依赖关系是无环的(即不能出现循环依赖)。如果任务形成了环形依赖,所有任务都将等待依赖任务完成,导致死锁。因此,在任务依赖设计中,必须保证任务依赖关系构成一个有向无环图(DAG)。

12.2 任务队列溢出

如果任务提交过于频繁,任务队列可能会堆积大量任务,导致内存消耗过大。解决方法包括限制任务队列的大小,或动态调整队列大小,并且在队列满时,可以选择拒绝新任务或阻塞提交线程。

12.3 线程饥饿

如果某些任务始终得不到执行(比如低优先级任务),会导致线程饥饿问题。可以通过任务优先级策略和公平调度算法,保证每个任务都能在合理时间内得到执行。

13. 未来扩展

13.1 支持分布式任务调度

线程池的任务调度机制可以扩展到分布式系统中,在多台服务器之间共享任务队列,实现跨节点的任务分配与执行。

13.2 支持更多的任务调度策略

当前设计仅支持简单的任务依赖和优先级调度。未来可以支持更多复杂的调度策略,例如基于任务执行时间的调度、负载均衡调度等。

13.3 与异步I/O的结合

将线程池与异步 I/O 机制结合,允许线程池中的线程在等待 I/O 操作时不阻塞,从而提高任务执行效率,特别适合高并发的网络应用。

代码

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdbool.h>// 任务函数类型
typedef void (*task_func)(void*);// 任务结构
typedef struct task {task_func func;            // 任务函数void* arg;                 // 任务参数struct task** dependencies; // 依赖的任务int dep_count;             // 依赖任务的数量int remaining_deps;        // 剩余未完成的依赖任务数量struct task* next;         // 下一个任务
} task_t;// 线程池结构
typedef struct thread_pool {pthread_mutex_t lock;      // 互斥锁保护任务队列pthread_cond_t notify;     // 条件变量,用于通知线程pthread_t* threads;        // 线程数组task_t* task_queue;        // 任务队列int thread_count;          // 线程数量bool stop;                 // 标志是否停止线程池
} thread_pool_t;// 创建线程池
thread_pool_t* thread_pool_create(int thread_count);// 添加任务到线程池
int thread_pool_add(thread_pool_t* pool, task_func func, void* arg, task_t** dependencies, int dep_count);// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool);// 线程的工作函数
void* thread_work(void* arg);// 初始化线程池
thread_pool_t* thread_pool_create(int thread_count) {thread_pool_t* pool = (thread_pool_t*)malloc(sizeof(thread_pool_t));if (pool == NULL) {perror("创建线程池失败");return NULL;}pool->thread_count = thread_count;pool->task_queue = NULL;pool->stop = false;pthread_mutex_init(&pool->lock, NULL);pthread_cond_init(&pool->notify, NULL);pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thread_count);for (int i = 0; i < thread_count; i++) {if (pthread_create(&pool->threads[i], NULL, thread_work, (void*)pool) != 0) {perror("线程创建失败");thread_pool_destroy(pool);return NULL;}}return pool;
}// 线程的工作函数,不断从任务队列中取任务并执行
void* thread_work(void* arg) {thread_pool_t* pool = (thread_pool_t*)arg;while (true) {pthread_mutex_lock(&pool->lock);while (pool->task_queue == NULL && !pool->stop) {pthread_cond_wait(&pool->notify, &pool->lock);}if (pool->stop) {pthread_mutex_unlock(&pool->lock);break;}// 获取任务task_t* task = pool->task_queue;if (task != NULL && task->remaining_deps == 0) {pool->task_queue = task->next;} else {task = NULL;}pthread_mutex_unlock(&pool->lock);// 执行任务if (task != NULL) {task->func(task->arg);// 依赖的任务完成后,通知依赖它的任务pthread_mutex_lock(&pool->lock);for (int i = 0; i < task->dep_count; i++) {task->dependencies[i]->remaining_deps--;if (task->dependencies[i]->remaining_deps == 0) {// 将依赖任务加入任务队列中task->dependencies[i]->next = pool->task_queue;pool->task_queue = task->dependencies[i];pthread_cond_signal(&pool->notify);}}pthread_mutex_unlock(&pool->lock);free(task);}}return NULL;
}// 添加任务到线程池
int thread_pool_add(thread_pool_t* pool, task_func func, void* arg, task_t** dependencies, int dep_count) {task_t* task = (task_t*)malloc(sizeof(task_t));if (task == NULL) {perror("任务分配失败");return -1;}task->func = func;task->arg = arg;task->dependencies = dependencies;task->dep_count = dep_count;task->remaining_deps = dep_count;task->next = NULL;pthread_mutex_lock(&pool->lock);// 如果没有依赖任务,立即执行if (task->remaining_deps == 0) {task->next = pool->task_queue;pool->task_queue = task;pthread_cond_signal(&pool->notify);} else {// 否则等待依赖任务完成for (int i = 0; i < dep_count; i++) {if (dependencies[i] == NULL) {perror("依赖任务无效");free(task);pthread_mutex_unlock(&pool->lock);return -1;}}}pthread_mutex_unlock(&pool->lock);return 0;
}// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool) {if (pool == NULL) return;pthread_mutex_lock(&pool->lock);pool->stop = true;pthread_cond_broadcast(&pool->notify);pthread_mutex_unlock(&pool->lock);for (int i = 0; i < pool->thread_count; i++) {pthread_join(pool->threads[i], NULL);}free(pool->threads);task_t* current = pool->task_queue;while (current != NULL) {task_t* temp = current;current = current->next;free(temp);}pthread_mutex_destroy(&pool->lock);pthread_cond_destroy(&pool->notify);free(pool);
}// 测试任务函数
void print_message(void* arg) {char* message = (char*)arg;printf("执行任务: %s\n", message);sleep(1);
}// 测试代码
int main() {// 创建线程池,包含 4 个线程thread_pool_t* pool = thread_pool_create(4);// 创建任务结构体task_t* task1 = (task_t*)malloc(sizeof(task_t));task_t* task2 = (task_t*)malloc(sizeof(task_t));task_t* task3 = (task_t*)malloc(sizeof(task_t));// 提交任务1thread_pool_add(pool, print_message, "任务1", NULL, 0);// 任务2依赖任务1task_t* dependencies1[] = {task1};thread_pool_add(pool, print_message, "任务2", dependencies1, 1);// 任务3依赖任务2task_t* dependencies2[] = {task2};thread_pool_add(pool, print_message, "任务3", dependencies2, 1);// 模拟主线程等待任务完成sleep(5);// 销毁线程池thread_pool_destroy(pool);return 0;
}

这篇关于Linux基础 -- pthread之线程池任务调度的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1137149

相关文章

linux-基础知识3

打包和压缩 zip 安装zip软件包 yum -y install zip unzip 压缩打包命令: zip -q -r -d -u 压缩包文件名 目录和文件名列表 -q:不显示命令执行过程-r:递归处理,打包各级子目录和文件-u:把文件增加/替换到压缩包中-d:从压缩包中删除指定的文件 解压:unzip 压缩包名 打包文件 把压缩包从服务器下载到本地 把压缩包上传到服务器(zip

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

Linux_kernel驱动开发11

一、改回nfs方式挂载根文件系统         在产品将要上线之前,需要制作不同类型格式的根文件系统         在产品研发阶段,我们还是需要使用nfs的方式挂载根文件系统         优点:可以直接在上位机中修改文件系统内容,延长EMMC的寿命         【1】重启上位机nfs服务         sudo service nfs-kernel-server resta

【Linux 从基础到进阶】Ansible自动化运维工具使用

Ansible自动化运维工具使用 Ansible 是一款开源的自动化运维工具,采用无代理架构(agentless),基于 SSH 连接进行管理,具有简单易用、灵活强大、可扩展性高等特点。它广泛用于服务器管理、应用部署、配置管理等任务。本文将介绍 Ansible 的安装、基本使用方法及一些实际运维场景中的应用,旨在帮助运维人员快速上手并熟练运用 Ansible。 1. Ansible的核心概念

Linux服务器Java启动脚本

Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包, 通常我们会使用nohup直接启动,但是还是需要手动停止然后再次启动, 那如何更优雅的在服务器上启动jar包呢,让我们一起探讨一下吧。 1、初版 第一个版本是常用的做法,直接使用nohup后台启动jar包, 并将日志输出到当前文件夹n

[Linux]:进程(下)

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:Linux学习 贝蒂的主页:Betty’s blog 1. 进程终止 1.1 进程退出的场景 进程退出只有以下三种情况: 代码运行完毕,结果正确。代码运行完毕,结果不正确。代码异常终止(进程崩溃)。 1.2 进程退出码 在编程中,我们通常认为main函数是代码的入口,但实际上它只是用户级

AI基础 L9 Local Search II 局部搜索

Local Beam search 对于当前的所有k个状态,生成它们的所有可能后继状态。 检查生成的后继状态中是否有任何状态是解决方案。 如果所有后继状态都不是解决方案,则从所有后继状态中选择k个最佳状态。 当达到预设的迭代次数或满足某个终止条件时,算法停止。 — Choose k successors randomly, biased towards good ones — Close