Taskflow:子流任务(Subflow Tasking)

2024-03-30 18:20

本文主要是介绍Taskflow:子流任务(Subflow Tasking),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创建Subflow

DAG任务中,有一种常见的场景,一个任务可能在执行期间产生新的任务,然后紧接着执行新任务。 之前提到的静态图就没有办法实现这样一个功能了,所以Taskflow提供了另一种流的节点:Subflow,Subflow的API与Taskflow无异,但又可以作为Taskflow的一个节点。

比如描述如下依赖图:
在这里插入图片描述

#include <memory>
#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A");  // static task Atf::Task C = taskflow.emplace([] () {}).name("C");  // static task Ctf::Task D = taskflow.emplace([] () {}).name("D");  // static task D// 通过lambda创建subflow// 开始执行的时候,会创建一个subflow,然后通过引用传给lambda// 只有当本subflow执行完成之后,才会执行taskflowtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1");  // subflow task B1tf::Task B2 = subflow.emplace([] () {}).name("B2");  // subflow task B2tf::Task B3 = subflow.emplace([] () {}).name("B3");  // subflow task B3B1.precede(B3);  // B1 runs bofore B3B2.precede(B3);  // B2 runs before B3}).name("B");A.precede(B);  // B runs after AA.precede(C);  // C runs after AB.precede(D);  // D runs after BC.precede(D);  // D runs after Ctaskflow.dump(std::cout);      // 在执行前,subflow无法展开,subflow只会显示节点Bexecutor.run(taskflow).get();  // execute the graph to spawn the subflowtaskflow.dump(std::cout);      // 执行完毕后,才可以完全展开return 0;
}

在run之前dump,subflow只会被当作普通节点:

在这里插入图片描述

在run之后调用,subflow被展开,得到真正的依赖图:
在这里插入图片描述

Join a Subflow

Subflow 在离开其上下文时默认调用join,表示需要把subflow中的task执行完,才完成subflow的执行。同时,还可以在上下文中显式调用join,来完成递归模式:

#include <memory>
#include <taskflow/taskflow.hpp>// 递归计算斐波那契数列
int spswm(int n, tf::Subflow& sbf) {if(n < 2) return n;int res1 = 0, res2 = 0;// 生成两个递归子任务.sbf.emplace([&res1, n](tf::Subflow& sbf_inner){res1 = spswm(n-1, sbf_inner);}).name("sub Task:_"+std::to_string(n-1));sbf.emplace([&res2, n](tf::Subflow& sbf_inner){res2 = spswm(n-2, sbf_inner);}).name("sub Task:_"+std::to_string(n-2));// 显式调用join,得到两个子任务的返回值sbf.join();return res1 + res2; 
}
int main() {tf::Executor executor; tf::Taskflow taskflow;int res = 0; // 用于存放最后的结果taskflow.emplace([&res](tf::Subflow& sbf){res = spswm(5, sbf); // 计算5的斐波那契数}).name("main Task");executor.run(taskflow).wait();std::cout << "5的斐波那契数:" << res << std::endl;taskflow.dump(std::cout);    return 0;
}

调用图如下:

在这里插入图片描述

Detach a Subflow

和线程一样,Subflow 可以Detach出去,单独执行(并最后被主Taskflow Join)

#include <taskflow/taskflow.hpp>int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A");  // static task Atf::Task C = taskflow.emplace([] () {}).name("C");  // static task Ctf::Task D = taskflow.emplace([] () {}).name("D");  // static task Dtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1");  // static task B1tf::Task B2 = subflow.emplace([] () {}).name("B2");  // static task B2tf::Task B3 = subflow.emplace([] () {}).name("B3");  // static task B3B1.precede(B3);    // B1 runs bofore B3B2.precede(B3);    // B2 runs before B3subflow.detach();  // 分离出Taskflow,单独执行}).name("B");A.precede(B);  // B runs after AA.precede(C);  // C runs after AB.precede(D);  // D runs after BC.precede(D);  // D runs after Cexecutor.run(taskflow).wait();taskflow.dump(std::cout);    return 0;
}

最终结构如下:
在这里插入图片描述

detach出去的Subflow是临时的,所以,如果执行的是run_n, ABCD四个节点只会构造一次,但是subflow会被构造多次:

#include <taskflow/taskflow.hpp>int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A");  // static task Atf::Task C = taskflow.emplace([] () {}).name("C");  // static task Ctf::Task D = taskflow.emplace([] () {}).name("D");  // static task Dtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1");  // static task B1tf::Task B2 = subflow.emplace([] () {}).name("B2");  // static task B2tf::Task B3 = subflow.emplace([] () {}).name("B3");  // static task B3B1.precede(B3);    // B1 runs bofore B3B2.precede(B3);    // B2 runs before B3subflow.detach();  // 分离出Taskflow,单独执行}).name("B");A.precede(B);  // B runs after AA.precede(C);  // C runs after AB.precede(D);  // D runs after BC.precede(D);  // D runs after Cexecutor.run_n(taskflow, 5).wait();assert(taskflow.num_tasks() == 19);taskflow.dump(std::cout);return 0;
}

在这里插入图片描述

嵌套子图

Subflow 支持递归,也支持嵌套:

#include <taskflow/taskflow.hpp>int main() {tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] (tf::Subflow& sbf){std::cout << "A spawns A1 & subflow A2\n";tf::Task A1 = sbf.emplace([] () {std::cout << "subtask A1\n";}).name("A1");tf::Task A2 = sbf.emplace([] (tf::Subflow& sbf2){std::cout << "A2 spawns A2_1 & A2_2\n";tf::Task A2_1 = sbf2.emplace([] () {std::cout << "subtask A2_1\n";}).name("A2_1");tf::Task A2_2 = sbf2.emplace([] () {std::cout << "subtask A2_2\n";}).name("A2_2");A2_1.precede(A2_2);}).name("A2");A1.precede(A2);}).name("A");// execute the graph to spawn the subflowtf::Executor().run(taskflow).get();taskflow.dump(std::cout);
}

在这里插入图片描述

同样,也可以detach 子图的子图,独立执行,最终都会被master Taskflow 统一Join(类似进程与子进程的关系)

这篇关于Taskflow:子流任务(Subflow Tasking)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/862229

相关文章

FreeRTOS学习笔记(二)任务基础篇

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、 任务的基本内容1.1 任务的基本特点1.2 任务的状态1.3 任务控制块——任务的“身份证” 二、 任务的实现2.1 定义任务函数2.2 创建任务2.3 启动任务调度器2.4 任务的运行与切换2.4.1 利用延时函数2.4.2 利用中断 2.5 任务的通信与同步2.6 任务的删除2.7 任务的通知2

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

第49课 Scratch入门篇:骇客任务背景特效

骇客任务背景特效 故事背景:   骇客帝国特色背景在黑色中慢慢滚动着! 程序原理:  1 、 角色的设计技巧  2 、克隆体的应用及特效的使用 开始编程   1、使用 黑色的背景: ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/7d74c872f06b4d9fbc88aecee634b074.png#pic_center)   2

AsyncTask 异步任务解析

1:构建AsyncTask 子类的回调方法: A:doInBackground:   必须重写,所有的耗时操作都在这个里面进行; B: onPreExecute:     用户操作数据前的调用; 例如:显示一个进度条 等 ; C: onPostExecute:    当doInBackground 执行完成后;会自动把数据传给onPostExecute方法;也就是说:这个方法是处理返回的数据的方法

使用Node-API进行异步任务开发

一、Node-API异步任务机制概述         Node-API异步任务开发主要用于执行耗时操作的场景中使用,以避免阻塞主线程,确保应用程序的性能和响应效率。         1、应用场景: 文件操作:读取大型文件或执行复杂的文件操作时,可以使用异步工作项来避免阻塞主线程。网络请求:当需要进行网络请求并等待响应时,可以使用异步工作项来避免阻塞主线程,从而提高应用程序的响应性能。数据库操

探索Invoke:Python自动化任务的瑞士军刀

文章目录 探索Invoke:Python自动化任务的瑞士军刀背景:为何选择Invoke?`invoke`是什么?如何安装`invoke`?简单的`invoke`库函数使用方法场景应用:`invoke`在实际项目中的使用场景一:自动化测试场景二:代码格式化场景三:部署应用 常见问题与解决方案问题一:命令执行失败问题二:权限不足问题三:并发执行问题 总结 探索Invoke:P

RISC-V (十)任务同步和锁

并发与同步 并发:指多个控制流同时执行。         多处理器多任务。一般在多处理器架构下内存是共享的。           单处理器多任务,通过调度器,一会调度这个任务,一会调度下个任务。  共享一个处                                理器一个内存。                 单处理器任务+中断: 同步: 是为了保证在并发执行的环境中各个控制流可

145-Linux权限维持Rootkit后门Strace监控Alias别名Cron定时任务

参考 【权限维持】Linux&Rootkit后门&Strace监控&Alias别名&Cron定时任务_alias ls='alerts(){ ls $* --color=auto;python -c "-CSDN博客 参考 FlowUs 息流 - 新一代生产力工具 权限维持-Linux-定时任务-Cron后门 利用系统的定时任务功能进行反弹Shell 1、编辑后门反弹shell脚本

环形定时任务 原理

业务背景 在稍微复杂点业务系统中,不可避免会碰到做定时任务的需求,比如淘宝的交易超时自动关闭订单、超时自动确认收货等等。对于一些定时作业比较多的系统,通常都会搭建专门的调度平台来管理,通过创建定时器来周期性执行任务。如刚才所说的场景,我们可以给订单创建一个专门的任务来处理交易状态,每秒轮询一次订单表,找出那些符合超时条件的订单然后标记状态。这是最简单粗暴的做法,但明显也很low,自己都下不去手写

Spring 创建定时任务

我们在编写Spring Boot应用中经常会遇到这样的场景,比如:我需要定时地发送一些短信、邮件之类的操作,也可能会定时地检查和监控一些标志、参数等。 创建定时任务 在Spring Boot中编写定时任务是非常简单的事,下面通过实例介绍如何在Spring Boot中创建定时任务,实现每过5秒输出一下当前时间。 在Spring Boot的主类中加入@EnableScheduling注解,启用定