Taskflow:限制最大并发度( Limit the Maximum Concurrency)

2024-03-31 21:36

本文主要是介绍Taskflow:限制最大并发度( Limit the Maximum Concurrency),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

定义信号量Semaphore

Taskflow提供了一个机制,tf::Semaphore,用于限制任务部分中的最大并发。您可以让任务在执行工作之前/之后获取/释放一个或多个信号量。一项任务可以获取和释放信号量,或者只是获取或只是释放它。tf::Semaphore对象以初始计数开始。只要该计数高于0,Task就可以获得信号量并完成其工作。如果计数为0或更少,试图获取信号量的Task将不会运行,而是会进入该信号量的等待列表。当另一个Task释放信号量时,它会重新安排该等待列表中的所有任务。

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(4); // 开启四个线程tf::Taskflow taskflow;std::vector<tf::Task> tasks {taskflow.emplace([](){ std::cout << "A" << std::endl; }),taskflow.emplace([](){ std::cout << "B" << std::endl; }),taskflow.emplace([](){ std::cout << "C" << std::endl; }),taskflow.emplace([](){ std::cout << "D" << std::endl; }),taskflow.emplace([](){ std::cout << "E" << std::endl; })};tf::Semaphore semaphore(1); // 创建一个信号量,初始值为1// 每个task在执行前尝试获取semaphore,在执行结束后释放for(auto& task : tasks) {task.acquire(semaphore);task.release(semaphore); }executor.run(taskflow).wait();
}

在这里插入图片描述
可以发现,在同一时刻,只会有一个任务被获取到semaphore并执行。

注意,用户有责任确保信号量在执行获取和释放信号量的任务时保持活力。executor和taskflow都不管理任何semaphore的生命周期。

信号量不仅能限制Task部分的最大并发性,而且限制Task不同部分的最大并发性。具体来说,您可以让一个Task获得信号量,并有另一个释放该信号量。以下示例使用信号量而不是使用显式依赖关系执行五对任务。

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(4); // 开启四个线程tf::Taskflow taskflow;tf::Semaphore semaphore(1); // 创建一个信号量,初始值为1int n = 5, counter = 0;for(int i = 0; i < n; i++) {tf::Task f = taskflow.emplace([&](){ counter++; }).name("from-" + std::to_string(i));tf::Task t = taskflow.emplace([&](){ counter--; }).name("to-" + std::to_string(i));f.precede(t);// 隐含偏序关系:f -> tf.acquire(semaphore);t.release(semaphore);}executor.run(taskflow).wait();taskflow.dump(std::cout);
}

在这里插入图片描述
同时,因为信号量的count为1,所以同一时刻仅有一个任务在执行,且顺序一定是from-x --> to-x;
在这里插入图片描述

定义 Critical Section

tf::CriticalSection是tf::Semaphore的包装, 当Task添加到Critical Section时,该Task获取并释放Critical Section内部的信号量。此方法tf::CriticalSection::add为添加到Critical Section的每个Task自动调用tf::Task::acquire和tf::Task::release

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(8);   // create an executor of 8 workerstf::Taskflow taskflow;// create a critical section of two workerstf::CriticalSection critical_section(2); tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });tf::Task D = taskflow.emplace([](){ std::cout << "D" << std::endl; });tf::Task E = taskflow.emplace([](){ std::cout << "E" << std::endl; });critical_section.add(A, B, C, D, E);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

tf::Semaphore 的一种重要应用是,有时因为任务的设计问题,任务间和任务间虽然没有显式的依赖关系,但是他们并不能在同一时刻执行(比如共享了临界区?!),如下图所示:

在这里插入图片描述

这三个Task均没有相互依赖关系,但是A和B之间同一时刻只能执行一个(谁先无所谓), 同样,A和C之间只能执行一个,而B和C没有竞争关系,可以并发执行,那么可以为这两个竞争关系各自设置一个信号量,来实现上述约束:

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor;tf::Taskflow taskflow;tf::Semaphore conflict_AB(1);tf::Semaphore conflict_AC(1);tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });// describe the conflict between A and BA.acquire(conflict_AB).release(conflict_AB);B.acquire(conflict_AB).release(conflict_AB);// describe the conflict between A and CA.acquire(conflict_AC).release(conflict_AC);C.acquire(conflict_AC).release(conflict_AC);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

可以看到,B和C并发执行了一段,但是A和B,A和C均不会并发执行。

在这里插入图片描述

当然,也可以使用tf::CriticalSection,简化上述逻辑:

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor;tf::Taskflow taskflow;tf::CriticalSection cs_AB(1); // tf::CriticalSection cs_AC(1);tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });// describe the conflict between A and Bcs_AB.add(A,B);cs_AC.add(A,C);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

这篇关于Taskflow:限制最大并发度( Limit the Maximum Concurrency)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/865404

相关文章

SpringBoot实现基于URL和IP的访问频率限制

《SpringBoot实现基于URL和IP的访问频率限制》在现代Web应用中,接口被恶意刷新或暴力请求是一种常见的攻击手段,为了保护系统资源,需要对接口的访问频率进行限制,下面我们就来看看如何使用... 目录1. 引言2. 项目依赖3. 配置 Redis4. 创建拦截器5. 注册拦截器6. 创建控制器8.

Linux限制ip访问的解决方案

《Linux限制ip访问的解决方案》为了修复安全扫描中发现的漏洞,我们需要对某些服务设置访问限制,具体来说,就是要确保只有指定的内部IP地址能够访问这些服务,所以本文给大家介绍了Linux限制ip访问... 目录背景:解决方案:使用Firewalld防火墙规则验证方法深度了解防火墙逻辑应用场景与扩展背景:

如何提高Redis服务器的最大打开文件数限制

《如何提高Redis服务器的最大打开文件数限制》文章讨论了如何提高Redis服务器的最大打开文件数限制,以支持高并发服务,本文给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录如何提高Redis服务器的最大打开文件数限制问题诊断解决步骤1. 修改系统级别的限制2. 为Redis进程特别设置限制

poj 3723 kruscal,反边取最大生成树。

题意: 需要征募女兵N人,男兵M人。 每征募一个人需要花费10000美元,但是如果已经招募的人中有一些关系亲密的人,那么可以少花一些钱。 给出若干的男女之间的1~9999之间的亲密关系度,征募某个人的费用是10000 - (已经征募的人中和自己的亲密度的最大值)。 要求通过适当的招募顺序使得征募所有人的费用最小。 解析: 先设想无向图,在征募某个人a时,如果使用了a和b之间的关系

高并发环境中保持幂等性

在高并发环境中保持幂等性是一项重要的挑战。幂等性指的是无论操作执行多少次,其效果都是相同的。确保操作的幂等性可以避免重复执行带来的副作用。以下是一些保持幂等性的常用方法: 唯一标识符: 请求唯一标识:在每次请求中引入唯一标识符(如 UUID 或者生成的唯一 ID),在处理请求时,系统可以检查这个标识符是否已经处理过,如果是,则忽略重复请求。幂等键(Idempotency Key):客户端在每次

poj 3258 二分最小值最大

题意: 有一些石头排成一条线,第一个和最后一个不能去掉。 其余的共可以去掉m块,要使去掉后石头间距的最小值最大。 解析: 二分石头,最小值最大。 代码: #include <iostream>#include <cstdio>#include <cstdlib>#include <algorithm>#include <cstring>#include <c

poj 2175 最小费用最大流TLE

题意: 一条街上有n个大楼,坐标为xi,yi,bi个人在里面工作。 然后防空洞的坐标为pj,qj,可以容纳cj个人。 从大楼i中的人到防空洞j去避难所需的时间为 abs(xi - pi) + (yi - qi) + 1。 现在设计了一个避难计划,指定从大楼i到防空洞j避难的人数 eij。 判断如果按照原计划进行,所有人避难所用的时间总和是不是最小的。 若是,输出“OPETIMAL",若

poj 2135 有流量限制的最小费用最大流

题意: 农场里有n块地,其中约翰的家在1号地,二n号地有个很大的仓库。 农场有M条道路(双向),道路i连接着ai号地和bi号地,长度为ci。 约翰希望按照从家里出发,经过若干块地后到达仓库,然后再返回家中的顺序带朋友参观。 如果要求往返不能经过同一条路两次,求参观路线总长度的最小值。 解析: 如果只考虑去或者回的情况,问题只不过是无向图中两点之间的最短路问题。 但是现在要去要回

poj 2594 二分图最大独立集

题意: 求一张图的最大独立集,这题不同的地方在于,间接相邻的点也可以有一条边,所以用floyd来把间接相邻的边也连起来。 代码: #include <iostream>#include <cstdio>#include <cstdlib>#include <algorithm>#include <cstring>#include <cmath>#include <sta

poj 3422 有流量限制的最小费用流 反用求最大 + 拆点

题意: 给一个n*n(50 * 50) 的数字迷宫,从左上点开始走,走到右下点。 每次只能往右移一格,或者往下移一格。 每个格子,第一次到达时可以获得格子对应的数字作为奖励,再次到达则没有奖励。 问走k次这个迷宫,最大能获得多少奖励。 解析: 拆点,拿样例来说明: 3 2 1 2 3 0 2 1 1 4 2 3*3的数字迷宫,走两次最大能获得多少奖励。 将每个点拆成两个