Linux基础 -- pthread之线程池任务调度

2024-09-04 22:04

本文主要是介绍Linux基础 -- pthread之线程池任务调度,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

线程池任务依赖设计方案

1. 设计目标

为了在多线程环境中支持任务间的依赖关系,我们设计了一个基于 pthread_create 封装的线程池,任务之间可以设置依赖,只有在依赖的任务完成后,依赖任务才会被执行。该设计目标是简化任务调度的逻辑,让开发者可以专注于任务的编写,而不必关注复杂的线程管理和任务依赖的执行顺序。

2. 核心概念

2.1 任务(Task)

任务是线程池中执行的最小单位。每个任务包含以下信息:

  • 任务函数:待执行的工作函数。
  • 参数:传递给任务函数的参数。
  • 依赖任务:任务可以依赖其他任务,只有依赖的任务执行完成,当前任务才能执行。
  • 依赖计数:记录任务依赖的其他任务数量,每当一个依赖任务完成,该计数器减 1,直到依赖计数为 0 时,任务才会被执行。

2.2 任务队列

任务队列是一个先进先出的队列,用于存储待执行的任务。线程池中的线程会从任务队列中取出任务并执行。为了支持任务依赖,只有当任务的依赖任务全部完成后,任务才会被放入任务队列中。

2.3 线程池(Thread Pool)

线程池是线程管理的核心组件,它包含以下部分:

  • 线程数组:线程池中的线程,负责从任务队列中获取任务并执行。
  • 任务队列:用于存储待执行任务的队列。
  • 互斥锁和条件变量:保证线程安全地访问任务队列,并通过条件变量实现任务的调度。

3. 线程池工作原理

3.1 任务的添加与依赖处理

当开发者向线程池提交一个任务时,可以同时提交该任务所依赖的其他任务。如果任务没有依赖任务,任务会被立即加入任务队列中,等待线程执行。如果任务有依赖任务,任务会暂时挂起,直到其所有依赖任务完成。

每当一个任务执行完成后,线程池会检查是否有其他任务依赖该任务。如果有,线程池会减少依赖任务的依赖计数器,当计数器为 0 时,将该依赖任务放入任务队列中。

3.2 线程的工作流程

线程池中的每个线程都会执行以下操作:

  1. 从任务队列中取出一个任务。
  2. 执行任务函数。
  3. 任务执行完成后,通知所有依赖该任务的其他任务,减少这些任务的依赖计数。
  4. 如果依赖计数为 0,则将依赖任务放入任务队列中。

3.3 线程池的关闭与销毁

在销毁线程池时,线程池会通知所有线程停止工作,等待所有线程结束后,释放所有分配的内存资源,确保程序无资源泄漏。

4. 主要流程

4.1 任务提交流程

  1. 开发者调用线程池的 add_task 接口提交任务。
  2. 如果任务没有依赖,立即将任务加入任务队列中。
  3. 如果任务有依赖,记录依赖关系,并等待依赖任务完成。

4.2 任务执行流程

  1. 线程从任务队列中获取一个任务。
  2. 执行任务。
  3. 检查是否有其他任务依赖于该任务。
  4. 如果有依赖任务,减少依赖任务的计数器。
  5. 如果依赖计数器为 0,将依赖任务放入任务队列。

4.3 任务依赖完成通知

  1. 任务完成后,通知所有依赖它的任务。
  2. 对每个依赖任务,减少依赖计数器。
  3. 当依赖计数器为 0 时,将该任务加入任务队列,等待执行。

5. 锁与同步机制

为了保证多线程环境下任务队列和任务依赖关系的安全性,线程池使用互斥锁和条件变量:

  • 互斥锁:保护任务队列和任务依赖数据的并发访问。
  • 条件变量:用于通知线程任务队列中有新任务需要处理,避免线程一直忙等待。

6. 优势与适用场景

6.1 优势

  • 简化任务依赖管理:通过自动管理任务依赖,减少开发者手动处理依赖逻辑的负担。
  • 高效的线程复用:线程池中的线程复用,避免频繁创建和销毁线程带来的性能开销。
  • 支持任务间复杂依赖:通过依赖计数和依赖队列,可以支持复杂的任务依赖关系,比如任务链和任务图。

6.2 适用场景

  • 多任务的复杂调度:多个任务之间存在依赖关系的场景,例如图像处理中的多个滤波器操作、数据处理管道等。
  • 任务并行化:需要将一系列任务并行化执行,且任务之间存在顺序依赖。
  • 性能优化:通过复用线程池减少频繁线程创建和销毁的开销,适用于对性能要求较高的应用场景。

7. 扩展功能

  • 动态调整线程池大小:根据系统的负载情况,动态调整线程池中的线程数量,以适应不同的任务需求。
  • 任务优先级支持:为任务设置优先级,高优先级的任务可以优先执行。
  • 任务超时处理:增加任务超时功能,确保长时间未完成的任务可以被正确处理,避免任务卡住。

8. 线程池的设计细节

8.1 任务结构设计

每个任务封装了任务函数及其参数,并且通过依赖计数器来追踪当前任务的依赖任务。任务的主要结构包括以下字段:

  • func:任务函数指针,表示具体的工作逻辑。
  • arg:任务函数的参数。
  • dependencies:一个指向依赖任务数组的指针,用于保存该任务所依赖的其他任务。
  • dep_count:依赖任务的总数量,用于初始化 remaining_deps 计数器。
  • remaining_deps:当前任务未完成的依赖任务计数器。当其值为 0 时,任务将可以被执行。

8.2 线程池结构设计

线程池通过维护一个任务队列来管理任务执行,同时通过线程数组来管理线程的执行。主要结构包括:

  • threads:线程数组,用于保存所有创建的线程。
  • task_queue:任务队列,采用链表的形式,存储待执行的任务。
  • lock:互斥锁,保证任务队列在多线程环境下的安全访问。
  • notify:条件变量,用于在线程空闲时唤醒它们执行新的任务。
  • thread_count:线程池中线程的数量。
  • stop:线程池是否已经停止的标志,线程池销毁时设为 true,通知所有线程退出。

8.3 锁与条件变量的使用

  • 锁(Mutex):任务队列的操作需要线程安全,因此我们在对任务队列进行增删操作时,必须加锁保护,确保只有一个线程可以修改任务队列。
  • 条件变量(Condition Variable):当任务队列为空时,线程会进入等待状态,避免忙等待浪费 CPU 资源。当有新任务加入队列时,条件变量会通知空闲线程执行任务。

9. 线程池中的任务调度

9.1 任务提交与依赖关系处理

  1. 无依赖任务:如果任务没有依赖任务,则直接将其加入任务队列,等待线程执行。
  2. 有依赖任务:如果任务有依赖任务,则该任务的 remaining_deps 被设置为依赖任务的数量。当依赖任务完成时,remaining_deps 计数器减少,直到所有依赖任务完成,remaining_deps 为 0,该任务才会被放入任务队列。

9.2 任务执行与依赖任务通知

  1. 当线程从任务队列中取出任务并执行时,首先完成当前任务的工作。
  2. 任务执行完成后,线程检查是否有其他任务依赖于该任务。对于每个依赖任务,减少其 remaining_deps 计数。
  3. 如果依赖任务的 remaining_deps 变为 0,则该任务被放入任务队列,准备执行。

9.3 任务的优先执行

默认情况下,任务是按提交顺序被执行的。如果需要支持优先级,可以为每个任务增加优先级字段,并修改任务队列的插入逻辑,使高优先级任务排在前面。这样可以确保高优先级任务优先执行。

10. 销毁线程池的安全性

10.1 停止标志

当用户请求销毁线程池时,线程池需要安全地通知所有工作线程退出。我们使用 stop 标志来表示线程池是否处于停止状态。当该标志被设置为 true 时,所有线程会在任务执行完毕后退出。

10.2 资源清理

线程池销毁时,需要确保以下资源被正确释放:

  • 线程数组:所有线程在退出前需要 pthread_join,以确保线程正常结束。
  • 任务队列:在销毁线程池前,所有未执行的任务需要被清理,以避免内存泄漏。
  • 锁与条件变量:在释放线程池资源时,需要销毁互斥锁和条件变量。

11. 性能优化建议

11.1 任务批处理

如果任务依赖性较小,线程池可以批量处理任务,减少频繁的锁开销。例如,多个无依赖的任务可以一次性加入任务队列,线程从队列中批量取出任务执行,减少锁的竞争。

11.2 线程池动态扩展

根据系统负载情况,线程池的线程数量可以动态调整。例如,在任务过多时,临时增加线程处理任务;当任务量减少时,减少线程的数量,以避免线程资源浪费。

11.3 任务超时处理

为每个任务设定超时时间,避免任务长时间占用线程。超时未完成的任务可以被取消或重新调度,这对于高实时性要求的系统尤其重要。

12. 常见问题与处理

12.1 任务依赖死锁

在设计任务依赖时,必须确保任务的依赖关系是无环的(即不能出现循环依赖)。如果任务形成了环形依赖,所有任务都将等待依赖任务完成,导致死锁。因此,在任务依赖设计中,必须保证任务依赖关系构成一个有向无环图(DAG)。

12.2 任务队列溢出

如果任务提交过于频繁,任务队列可能会堆积大量任务,导致内存消耗过大。解决方法包括限制任务队列的大小,或动态调整队列大小,并且在队列满时,可以选择拒绝新任务或阻塞提交线程。

12.3 线程饥饿

如果某些任务始终得不到执行(比如低优先级任务),会导致线程饥饿问题。可以通过任务优先级策略和公平调度算法,保证每个任务都能在合理时间内得到执行。

13. 未来扩展

13.1 支持分布式任务调度

线程池的任务调度机制可以扩展到分布式系统中,在多台服务器之间共享任务队列,实现跨节点的任务分配与执行。

13.2 支持更多的任务调度策略

当前设计仅支持简单的任务依赖和优先级调度。未来可以支持更多复杂的调度策略,例如基于任务执行时间的调度、负载均衡调度等。

13.3 与异步I/O的结合

将线程池与异步 I/O 机制结合,允许线程池中的线程在等待 I/O 操作时不阻塞,从而提高任务执行效率,特别适合高并发的网络应用。

代码

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdbool.h>// 任务函数类型
typedef void (*task_func)(void*);// 任务结构
typedef struct task {task_func func;            // 任务函数void* arg;                 // 任务参数struct task** dependencies; // 依赖的任务int dep_count;             // 依赖任务的数量int remaining_deps;        // 剩余未完成的依赖任务数量struct task* next;         // 下一个任务
} task_t;// 线程池结构
typedef struct thread_pool {pthread_mutex_t lock;      // 互斥锁保护任务队列pthread_cond_t notify;     // 条件变量,用于通知线程pthread_t* threads;        // 线程数组task_t* task_queue;        // 任务队列int thread_count;          // 线程数量bool stop;                 // 标志是否停止线程池
} thread_pool_t;// 创建线程池
thread_pool_t* thread_pool_create(int thread_count);// 添加任务到线程池
int thread_pool_add(thread_pool_t* pool, task_func func, void* arg, task_t** dependencies, int dep_count);// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool);// 线程的工作函数
void* thread_work(void* arg);// 初始化线程池
thread_pool_t* thread_pool_create(int thread_count) {thread_pool_t* pool = (thread_pool_t*)malloc(sizeof(thread_pool_t));if (pool == NULL) {perror("创建线程池失败");return NULL;}pool->thread_count = thread_count;pool->task_queue = NULL;pool->stop = false;pthread_mutex_init(&pool->lock, NULL);pthread_cond_init(&pool->notify, NULL);pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thread_count);for (int i = 0; i < thread_count; i++) {if (pthread_create(&pool->threads[i], NULL, thread_work, (void*)pool) != 0) {perror("线程创建失败");thread_pool_destroy(pool);return NULL;}}return pool;
}// 线程的工作函数,不断从任务队列中取任务并执行
void* thread_work(void* arg) {thread_pool_t* pool = (thread_pool_t*)arg;while (true) {pthread_mutex_lock(&pool->lock);while (pool->task_queue == NULL && !pool->stop) {pthread_cond_wait(&pool->notify, &pool->lock);}if (pool->stop) {pthread_mutex_unlock(&pool->lock);break;}// 获取任务task_t* task = pool->task_queue;if (task != NULL && task->remaining_deps == 0) {pool->task_queue = task->next;} else {task = NULL;}pthread_mutex_unlock(&pool->lock);// 执行任务if (task != NULL) {task->func(task->arg);// 依赖的任务完成后,通知依赖它的任务pthread_mutex_lock(&pool->lock);for (int i = 0; i < task->dep_count; i++) {task->dependencies[i]->remaining_deps--;if (task->dependencies[i]->remaining_deps == 0) {// 将依赖任务加入任务队列中task->dependencies[i]->next = pool->task_queue;pool->task_queue = task->dependencies[i];pthread_cond_signal(&pool->notify);}}pthread_mutex_unlock(&pool->lock);free(task);}}return NULL;
}// 添加任务到线程池
int thread_pool_add(thread_pool_t* pool, task_func func, void* arg, task_t** dependencies, int dep_count) {task_t* task = (task_t*)malloc(sizeof(task_t));if (task == NULL) {perror("任务分配失败");return -1;}task->func = func;task->arg = arg;task->dependencies = dependencies;task->dep_count = dep_count;task->remaining_deps = dep_count;task->next = NULL;pthread_mutex_lock(&pool->lock);// 如果没有依赖任务,立即执行if (task->remaining_deps == 0) {task->next = pool->task_queue;pool->task_queue = task;pthread_cond_signal(&pool->notify);} else {// 否则等待依赖任务完成for (int i = 0; i < dep_count; i++) {if (dependencies[i] == NULL) {perror("依赖任务无效");free(task);pthread_mutex_unlock(&pool->lock);return -1;}}}pthread_mutex_unlock(&pool->lock);return 0;
}// 销毁线程池
void thread_pool_destroy(thread_pool_t* pool) {if (pool == NULL) return;pthread_mutex_lock(&pool->lock);pool->stop = true;pthread_cond_broadcast(&pool->notify);pthread_mutex_unlock(&pool->lock);for (int i = 0; i < pool->thread_count; i++) {pthread_join(pool->threads[i], NULL);}free(pool->threads);task_t* current = pool->task_queue;while (current != NULL) {task_t* temp = current;current = current->next;free(temp);}pthread_mutex_destroy(&pool->lock);pthread_cond_destroy(&pool->notify);free(pool);
}// 测试任务函数
void print_message(void* arg) {char* message = (char*)arg;printf("执行任务: %s\n", message);sleep(1);
}// 测试代码
int main() {// 创建线程池,包含 4 个线程thread_pool_t* pool = thread_pool_create(4);// 创建任务结构体task_t* task1 = (task_t*)malloc(sizeof(task_t));task_t* task2 = (task_t*)malloc(sizeof(task_t));task_t* task3 = (task_t*)malloc(sizeof(task_t));// 提交任务1thread_pool_add(pool, print_message, "任务1", NULL, 0);// 任务2依赖任务1task_t* dependencies1[] = {task1};thread_pool_add(pool, print_message, "任务2", dependencies1, 1);// 任务3依赖任务2task_t* dependencies2[] = {task2};thread_pool_add(pool, print_message, "任务3", dependencies2, 1);// 模拟主线程等待任务完成sleep(5);// 销毁线程池thread_pool_destroy(pool);return 0;
}

这篇关于Linux基础 -- pthread之线程池任务调度的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1137149

相关文章

Linux中shell解析脚本的通配符、元字符、转义符说明

《Linux中shell解析脚本的通配符、元字符、转义符说明》:本文主要介绍shell通配符、元字符、转义符以及shell解析脚本的过程,通配符用于路径扩展,元字符用于多命令分割,转义符用于将特殊... 目录一、linux shell通配符(wildcard)二、shell元字符(特殊字符 Meta)三、s

Linux之软件包管理器yum详解

《Linux之软件包管理器yum详解》文章介绍了现代类Unix操作系统中软件包管理和包存储库的工作原理,以及如何使用包管理器如yum来安装、更新和卸载软件,文章还介绍了如何配置yum源,更新系统软件包... 目录软件包yumyum语法yum常用命令yum源配置文件介绍更新yum源查看已经安装软件的方法总结软

linux报错INFO:task xxxxxx:634 blocked for more than 120 seconds.三种解决方式

《linux报错INFO:taskxxxxxx:634blockedformorethan120seconds.三种解决方式》文章描述了一个Linux最小系统运行时出现的“hung_ta... 目录1.问题描述2.解决办法2.1 缩小文件系统缓存大小2.2 修改系统IO调度策略2.3 取消120秒时间限制3

Linux alias的三种使用场景方式

《Linuxalias的三种使用场景方式》文章介绍了Linux中`alias`命令的三种使用场景:临时别名、用户级别别名和系统级别别名,临时别名仅在当前终端有效,用户级别别名在当前用户下所有终端有效... 目录linux alias三种使用场景一次性适用于当前用户全局生效,所有用户都可调用删除总结Linux

Linux:alias如何设置永久生效

《Linux:alias如何设置永久生效》在Linux中设置别名永久生效的步骤包括:在/root/.bashrc文件中配置别名,保存并退出,然后使用source命令(或点命令)使配置立即生效,这样,别... 目录linux:alias设置永久生效步骤保存退出后功能总结Linux:alias设置永久生效步骤

Linux使用fdisk进行磁盘的相关操作

《Linux使用fdisk进行磁盘的相关操作》fdisk命令是Linux中用于管理磁盘分区的强大文本实用程序,这篇文章主要为大家详细介绍了如何使用fdisk进行磁盘的相关操作,需要的可以了解下... 目录简介基本语法示例用法列出所有分区查看指定磁盘的区分管理指定的磁盘进入交互式模式创建一个新的分区删除一个存

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

高效管理你的Linux系统: Debian操作系统常用命令指南

《高效管理你的Linux系统:Debian操作系统常用命令指南》在Debian操作系统中,了解和掌握常用命令对于提高工作效率和系统管理至关重要,本文将详细介绍Debian的常用命令,帮助读者更好地使... Debian是一个流行的linux发行版,它以其稳定性、强大的软件包管理和丰富的社区资源而闻名。在使用

Linux Mint Xia 22.1重磅发布: 重要更新一览

《LinuxMintXia22.1重磅发布:重要更新一览》Beta版LinuxMint“Xia”22.1发布,新版本基于Ubuntu24.04,内核版本为Linux6.8,这... linux Mint 22.1「Xia」正式发布啦!这次更新带来了诸多优化和改进,进一步巩固了 Mint 在 Linux 桌面

LinuxMint怎么安装? Linux Mint22下载安装图文教程

《LinuxMint怎么安装?LinuxMint22下载安装图文教程》LinuxMint22发布以后,有很多新功能,很多朋友想要下载并安装,该怎么操作呢?下面我们就来看看详细安装指南... linux Mint 是一款基于 Ubuntu 的流行发行版,凭借其现代、精致、易于使用的特性,深受小伙伴们所喜爱。对