本文主要是介绍c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)
- 前言
- [ThreadPool 项目地址](https://github.com/progschj/ThreadPool)
- 项目源码:
- 基本用法
- 类成员变量
- 类成员函数
- 构造函数的签名
- 创建线程
- 线程默认的任务
- 向任务队列中添加一个任务
- 析构函数
- 总结
前言
维基百科上对线程池的简要介绍:
线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
ThreadPool 项目地址
progschj/ThreadPool 是一个简易的基于 c++11 标准的线程池实现,采用了 Zlib license(相当宽松自由的开源协议,任意修改分发商用),截止当前时间点,已获得 7k+ stars。整个项目源码仅有一个头文件,代码行数不足一百行,早在多年前就已稳定不再更新。
项目源码:
#ifndef THREAD_POOL_H
#define THREAD_POOL_H#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class ThreadPool {
public:ThreadPool(size_t);template<class F, class... Args>auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();
private:// need to keep track of threads so we can join themstd::vector< std::thread > workers;// the task queuestd::queue< std::function<void()> > tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for (size_t i = 0; i < threads; ++i)workers.emplace_back([this]{for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread& worker : workers)worker.join();
}#endif
基本用法
// create thread pool with 4 worker threads
ThreadPool pool(4);// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);// get result from future
std::cout << result.get() << std::endl;
类成员变量
std::vector< std::thread > workers;
std::queue< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
workers
:存储线程池中 std::thread 的容器tasks
:任务队列queue_mutex
:任务队列的互斥锁condition
:任务队列的条件变量stop
:线程池是否停止的标志位
类成员函数
构造函数的签名
inline ThreadPool::ThreadPool(size_t threads): stop(false)
- 构造函数传入一个
size_t
类型的参数,初始化线程池中线程的数量 - 初始化列表将 stop 标志位初始化为 false
创建线程
for (size_t i = 0; i < threads; ++i)
{workers.emplace_back([this]{for (;;){//...}});
}
- 使用 for 循环创建 threads 个线程,将线程加入 workers 容器
- lambda 表达式用于创建线程,捕获 this,lambda 表达式中包含一个无限循环
线程默认的任务
std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();
}task();
- 首先声明了一个
std::function<void()>
类型的变量 task - 在互斥锁保护任务队列后,调用
condition.wait()
等待任务队列非空或线程池停止,线程创建后,会在这里等待;如果 stop 标志位为 true 或者任务队列不为空,解除等待,继续往下执行 - 如果标志位 stop 为 true,且任务队列为空,此任务将退出
- 以上条件都通过后,将从任务队列中取出一个任务 task,移动到局部变量 task 中(吐槽下:距离 c++11 标准的发布已经过去了十几年,现在还不明白这一条的,就很难评价了)
- 执行
task()
,也就是上一步从队列头部取出的任务
向任务队列中添加一个任务
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;
enqueue
函数模板,用于任务的入队列F&& f
,这里预期是一个任意的 callable 对象Args&&... args
,一个可变模板参数,会在编译期展开参数包- 返回值是一个
std::future
类型的对象,用于获取任务的执行结果,std::future
的模板参数使用std::result_of
萃取可调用对象的返回值类型 - 注意,c++17 后
std::result_of
就已经是 deprecated,可以使用std::invoke_result
类型萃取
继续往下看 enqueue 函数的实现:
using return_type = typename std::result_of<F(Args...)>::type;
- 使用
std::result_of
类型萃取可调用对象的返回值类型,并使用 using 为其起个别名 reture_type
auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
这一段做了好几件事,一步一步拆解:
std::make_shared
构建一个std::shared_ptr
std::packaged_task
模板是用于包装 callable 对象,使用了前面推导出的return_type
类型来实例化模板- 而
std::make_shared
需要调用实例类型的构造函数,而std::packaged_task
的构造函数需要一个可调用对象,所以这里使用std::bind
将可变模板参数绑定给 f(对std::bind
不熟悉的建议先行查阅资料),std::forward
转发一下类型 - 简单来说,以上只是构建一个 callable 对象的包装器
std::future<return_type> res = task->get_future();
{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
- 从 task 中获取
std::future
对象 - 使用大括号控制代码块,在这个代码块中上锁
- 如果线程池已经停止,抛出异常
- 否则正常执行,将 task 推入到队列尾部
- 条件变量通知一个等待的线程,这个时候,构造函数中
condition.wait()
会被唤醒,以执行后面的代码块,即从队列头部取出一个任务并执行 - 最后返回
std::future
对象
析构函数
遵循 RAII 原则,释放所有资源
{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)worker.join();
- 上锁,将停止标志位置为 true
- 通知所有等待的线程
- 等待所有线程终止
总结
该项目仅是一个线程池的简易实现,对学习 c++11 标准的多线程及部分特性有一定帮助,如果想要更复杂的具有各种调度策略的线程池,还需进一步细化。
这篇关于c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!