本文主要是介绍Taskflow:限制最大并发度( Limit the Maximum Concurrency),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
定义信号量Semaphore
Taskflow提供了一个机制,tf::Semaphore,用于限制任务部分中的最大并发。您可以让任务在执行工作之前/之后获取/释放一个或多个信号量。一项任务可以获取和释放信号量,或者只是获取或只是释放它。tf::Semaphore对象以初始计数开始。只要该计数高于0,Task就可以获得信号量并完成其工作。如果计数为0或更少,试图获取信号量的Task将不会运行,而是会进入该信号量的等待列表。当另一个Task释放信号量时,它会重新安排该等待列表中的所有任务。
#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(4); // 开启四个线程tf::Taskflow taskflow;std::vector<tf::Task> tasks {taskflow.emplace([](){ std::cout << "A" << std::endl; }),taskflow.emplace([](){ std::cout << "B" << std::endl; }),taskflow.emplace([](){ std::cout << "C" << std::endl; }),taskflow.emplace([](){ std::cout << "D" << std::endl; }),taskflow.emplace([](){ std::cout << "E" << std::endl; })};tf::Semaphore semaphore(1); // 创建一个信号量,初始值为1// 每个task在执行前尝试获取semaphore,在执行结束后释放for(auto& task : tasks) {task.acquire(semaphore);task.release(semaphore); }executor.run(taskflow).wait();
}
可以发现,在同一时刻,只会有一个任务被获取到semaphore并执行。
注意,用户有责任确保信号量在执行获取和释放信号量的任务时保持活力。executor和taskflow都不管理任何semaphore的生命周期。
信号量不仅能限制Task部分的最大并发性,而且限制Task不同部分的最大并发性。具体来说,您可以让一个Task获得信号量,并有另一个释放该信号量。以下示例使用信号量而不是使用显式依赖关系执行五对任务。
#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(4); // 开启四个线程tf::Taskflow taskflow;tf::Semaphore semaphore(1); // 创建一个信号量,初始值为1int n = 5, counter = 0;for(int i = 0; i < n; i++) {tf::Task f = taskflow.emplace([&](){ counter++; }).name("from-" + std::to_string(i));tf::Task t = taskflow.emplace([&](){ counter--; }).name("to-" + std::to_string(i));f.precede(t);// 隐含偏序关系:f -> tf.acquire(semaphore);t.release(semaphore);}executor.run(taskflow).wait();taskflow.dump(std::cout);
}
同时,因为信号量的count为1,所以同一时刻仅有一个任务在执行,且顺序一定是from-x --> to-x;
定义 Critical Section
tf::CriticalSection是tf::Semaphore的包装, 当Task添加到Critical Section时,该Task获取并释放Critical Section内部的信号量。此方法tf::CriticalSection::add为添加到Critical Section的每个Task自动调用tf::Task::acquire和tf::Task::release
#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(8); // create an executor of 8 workerstf::Taskflow taskflow;// create a critical section of two workerstf::CriticalSection critical_section(2); tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });tf::Task D = taskflow.emplace([](){ std::cout << "D" << std::endl; });tf::Task E = taskflow.emplace([](){ std::cout << "E" << std::endl; });critical_section.add(A, B, C, D, E);executor.run(taskflow).wait();taskflow.dump(std::cout);
}
tf::Semaphore 的一种重要应用是,有时因为任务的设计问题,任务间和任务间虽然没有显式的依赖关系,但是他们并不能在同一时刻执行(比如共享了临界区?!),如下图所示:
这三个Task均没有相互依赖关系,但是A和B之间同一时刻只能执行一个(谁先无所谓), 同样,A和C之间只能执行一个,而B和C没有竞争关系,可以并发执行,那么可以为这两个竞争关系各自设置一个信号量,来实现上述约束:
#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor;tf::Taskflow taskflow;tf::Semaphore conflict_AB(1);tf::Semaphore conflict_AC(1);tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });// describe the conflict between A and BA.acquire(conflict_AB).release(conflict_AB);B.acquire(conflict_AB).release(conflict_AB);// describe the conflict between A and CA.acquire(conflict_AC).release(conflict_AC);C.acquire(conflict_AC).release(conflict_AC);executor.run(taskflow).wait();taskflow.dump(std::cout);
}
可以看到,B和C并发执行了一段,但是A和B,A和C均不会并发执行。
当然,也可以使用tf::CriticalSection,简化上述逻辑:
#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor;tf::Taskflow taskflow;tf::CriticalSection cs_AB(1); // tf::CriticalSection cs_AC(1);tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });// describe the conflict between A and Bcs_AB.add(A,B);cs_AC.add(A,C);executor.run(taskflow).wait();taskflow.dump(std::cout);
}
这篇关于Taskflow:限制最大并发度( Limit the Maximum Concurrency)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!