Taskflow:子流任务(Subflow Tasking)

2024-03-30 18:20

本文主要是介绍Taskflow:子流任务(Subflow Tasking),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创建Subflow

DAG任务中,有一种常见的场景,一个任务可能在执行期间产生新的任务,然后紧接着执行新任务。 之前提到的静态图就没有办法实现这样一个功能了,所以Taskflow提供了另一种流的节点:Subflow,Subflow的API与Taskflow无异,但又可以作为Taskflow的一个节点。

比如描述如下依赖图:
在这里插入图片描述

#include <memory>
#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A");  // static task Atf::Task C = taskflow.emplace([] () {}).name("C");  // static task Ctf::Task D = taskflow.emplace([] () {}).name("D");  // static task D// 通过lambda创建subflow// 开始执行的时候,会创建一个subflow,然后通过引用传给lambda// 只有当本subflow执行完成之后,才会执行taskflowtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1");  // subflow task B1tf::Task B2 = subflow.emplace([] () {}).name("B2");  // subflow task B2tf::Task B3 = subflow.emplace([] () {}).name("B3");  // subflow task B3B1.precede(B3);  // B1 runs bofore B3B2.precede(B3);  // B2 runs before B3}).name("B");A.precede(B);  // B runs after AA.precede(C);  // C runs after AB.precede(D);  // D runs after BC.precede(D);  // D runs after Ctaskflow.dump(std::cout);      // 在执行前,subflow无法展开,subflow只会显示节点Bexecutor.run(taskflow).get();  // execute the graph to spawn the subflowtaskflow.dump(std::cout);      // 执行完毕后,才可以完全展开return 0;
}

在run之前dump,subflow只会被当作普通节点:

在这里插入图片描述

在run之后调用,subflow被展开,得到真正的依赖图:
在这里插入图片描述

Join a Subflow

Subflow 在离开其上下文时默认调用join,表示需要把subflow中的task执行完,才完成subflow的执行。同时,还可以在上下文中显式调用join,来完成递归模式:

#include <memory>
#include <taskflow/taskflow.hpp>// 递归计算斐波那契数列
int spswm(int n, tf::Subflow& sbf) {if(n < 2) return n;int res1 = 0, res2 = 0;// 生成两个递归子任务.sbf.emplace([&res1, n](tf::Subflow& sbf_inner){res1 = spswm(n-1, sbf_inner);}).name("sub Task:_"+std::to_string(n-1));sbf.emplace([&res2, n](tf::Subflow& sbf_inner){res2 = spswm(n-2, sbf_inner);}).name("sub Task:_"+std::to_string(n-2));// 显式调用join,得到两个子任务的返回值sbf.join();return res1 + res2; 
}
int main() {tf::Executor executor; tf::Taskflow taskflow;int res = 0; // 用于存放最后的结果taskflow.emplace([&res](tf::Subflow& sbf){res = spswm(5, sbf); // 计算5的斐波那契数}).name("main Task");executor.run(taskflow).wait();std::cout << "5的斐波那契数:" << res << std::endl;taskflow.dump(std::cout);    return 0;
}

调用图如下:

在这里插入图片描述

Detach a Subflow

和线程一样,Subflow 可以Detach出去,单独执行(并最后被主Taskflow Join)

#include <taskflow/taskflow.hpp>int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A");  // static task Atf::Task C = taskflow.emplace([] () {}).name("C");  // static task Ctf::Task D = taskflow.emplace([] () {}).name("D");  // static task Dtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1");  // static task B1tf::Task B2 = subflow.emplace([] () {}).name("B2");  // static task B2tf::Task B3 = subflow.emplace([] () {}).name("B3");  // static task B3B1.precede(B3);    // B1 runs bofore B3B2.precede(B3);    // B2 runs before B3subflow.detach();  // 分离出Taskflow,单独执行}).name("B");A.precede(B);  // B runs after AA.precede(C);  // C runs after AB.precede(D);  // D runs after BC.precede(D);  // D runs after Cexecutor.run(taskflow).wait();taskflow.dump(std::cout);    return 0;
}

最终结构如下:
在这里插入图片描述

detach出去的Subflow是临时的,所以,如果执行的是run_n, ABCD四个节点只会构造一次,但是subflow会被构造多次:

#include <taskflow/taskflow.hpp>int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] () {}).name("A");  // static task Atf::Task C = taskflow.emplace([] () {}).name("C");  // static task Ctf::Task D = taskflow.emplace([] () {}).name("D");  // static task Dtf::Task B = taskflow.emplace([] (tf::Subflow& subflow) { tf::Task B1 = subflow.emplace([] () {}).name("B1");  // static task B1tf::Task B2 = subflow.emplace([] () {}).name("B2");  // static task B2tf::Task B3 = subflow.emplace([] () {}).name("B3");  // static task B3B1.precede(B3);    // B1 runs bofore B3B2.precede(B3);    // B2 runs before B3subflow.detach();  // 分离出Taskflow,单独执行}).name("B");A.precede(B);  // B runs after AA.precede(C);  // C runs after AB.precede(D);  // D runs after BC.precede(D);  // D runs after Cexecutor.run_n(taskflow, 5).wait();assert(taskflow.num_tasks() == 19);taskflow.dump(std::cout);return 0;
}

在这里插入图片描述

嵌套子图

Subflow 支持递归,也支持嵌套:

#include <taskflow/taskflow.hpp>int main() {tf::Taskflow taskflow;tf::Task A = taskflow.emplace([] (tf::Subflow& sbf){std::cout << "A spawns A1 & subflow A2\n";tf::Task A1 = sbf.emplace([] () {std::cout << "subtask A1\n";}).name("A1");tf::Task A2 = sbf.emplace([] (tf::Subflow& sbf2){std::cout << "A2 spawns A2_1 & A2_2\n";tf::Task A2_1 = sbf2.emplace([] () {std::cout << "subtask A2_1\n";}).name("A2_1");tf::Task A2_2 = sbf2.emplace([] () {std::cout << "subtask A2_2\n";}).name("A2_2");A2_1.precede(A2_2);}).name("A2");A1.precede(A2);}).name("A");// execute the graph to spawn the subflowtf::Executor().run(taskflow).get();taskflow.dump(std::cout);
}

在这里插入图片描述

同样,也可以detach 子图的子图,独立执行,最终都会被master Taskflow 统一Join(类似进程与子进程的关系)

这篇关于Taskflow:子流任务(Subflow Tasking)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/862229

相关文章

Linux中的计划任务(crontab)使用方式

《Linux中的计划任务(crontab)使用方式》:本文主要介绍Linux中的计划任务(crontab)使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、前言1、linux的起源与发展2、什么是计划任务(crontab)二、crontab基础1、cro

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

如何使用Python实现一个简单的window任务管理器

《如何使用Python实现一个简单的window任务管理器》这篇文章主要为大家详细介绍了如何使用Python实现一个简单的window任务管理器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 任务管理器效果图完整代码import tkinter as tkfrom tkinter i

Spring Boot 集成 Quartz 使用Cron 表达式实现定时任务

《SpringBoot集成Quartz使用Cron表达式实现定时任务》本文介绍了如何在SpringBoot项目中集成Quartz并使用Cron表达式进行任务调度,通过添加Quartz依赖、创... 目录前言1. 添加 Quartz 依赖2. 创建 Quartz 任务3. 配置 Quartz 任务调度4. 启

Java使用多线程处理未知任务数的方案介绍

《Java使用多线程处理未知任务数的方案介绍》这篇文章主要为大家详细介绍了Java如何使用多线程实现处理未知任务数,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 知道任务个数,你可以定义好线程数规则,生成线程数去跑代码说明:1.虚拟线程池:使用 Executors.newVir

Spring Boot中定时任务Cron表达式的终极指南最佳实践记录

《SpringBoot中定时任务Cron表达式的终极指南最佳实践记录》本文详细介绍了SpringBoot中定时任务的实现方法,特别是Cron表达式的使用技巧和高级用法,从基础语法到复杂场景,从快速启... 目录一、Cron表达式基础1.1 Cron表达式结构1.2 核心语法规则二、Spring Boot中定

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

Python Invoke自动化任务库的使用

《PythonInvoke自动化任务库的使用》Invoke是一个强大的Python库,用于编写自动化脚本,本文就来介绍一下PythonInvoke自动化任务库的使用,具有一定的参考价值,感兴趣的可以... 目录什么是 Invoke?如何安装 Invoke?Invoke 基础1. 运行测试2. 构建文档3.

解决Cron定时任务中Pytest脚本无法发送邮件的问题

《解决Cron定时任务中Pytest脚本无法发送邮件的问题》文章探讨解决在Cron定时任务中运行Pytest脚本时邮件发送失败的问题,先优化环境变量,再检查Pytest邮件配置,接着配置文件确保SMT... 目录引言1. 环境变量优化:确保Cron任务可以正确执行解决方案:1.1. 创建一个脚本1.2. 修

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五