workflow源码解析:ThreadTask

2024-01-17 16:20

本文主要是介绍workflow源码解析:ThreadTask,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。// 定义INPUT
struct AddInput
{int x;int y;
};// 定义OUTPUT
struct AddOutput
{int res;
};// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{output->res = input->x + input->y;
}using AddTask = WFThreadTask<AddInput, AddOutput>;void callback(AddTask *task)
{auto *input = task->get_input();auto *output = task->get_output();assert(task->get_state() == WFT_STATE_SUCCESS);fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}int main()
{using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;AddTask *task = AddFactory::create_thread_task("add_task",add_routine,callback);AddInput *input = task->get_input();input->x = 1;input->y = 2;task->start();getchar();return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:using T = WFThreadTask<INPUT, OUTPUT>;...
public:static T *create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (T *)> callback);...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),WFGlobal::get_compute_executor(),std::move(routine),std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:virtual void execute()  //实现ExecSession的纯虚函数{this->routine(&this->input, &this->output); //执行用户程序的routine}protected:std::function<void (INPUT *, OUTPUT *)> routine;public:__WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (INPUT *, OUTPUT *)>&& rt,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),routine(std::move(rt)){}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:void start();void dismiss();INPUT *get_input() { return &this->input; }OUTPUT *get_output() { return &this->output; }void *user_data;int get_state() const { return this->state; }int get_error() const { return this->error; }void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:virtual SubTask *done();protected:INPUT input;OUTPUT output;std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;public:WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :ExecRequest(queue, executor),callback(std::move(cb)){// 初始化}protected:virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:ExecRequest(ExecQueue *queue, Executor *executor);ExecQueue *get_request_queue() const { return this->queue; }void set_request_queue(ExecQueue *queue) { this->queue = queue; }virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口{this->executor->request(this, this->queue);...}protected:int state;int error;ExecQueue *queue;Executor *executor;protected:virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{// 子任务被调起的时机virtual void dispatch() = 0;// 子任务执行完成的时机virtual SubTask *done() = 0;// 内部实现,决定了任务流走向void subtask_done();...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:virtual void execute() = 0;virtual void handle(int state, int error) = 0;protected:ExecQueue *get_queue() { return this->queue; }private:ExecQueue *queue;...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{...
private:struct list_head task_list;pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中int request(ExecSession *session, ExecQueue *queue);private:// 执行器和系统资源,是一个包含关系thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{ExecSessionEntry *entry = new ExecSessionEntry;session->queue = queue;entry->session = session;entry->thrdpool = this->thrdpool;queue->mutex.lock();list_add_tail(&entry->list, &queue->session_list);if (queue->session_list.next == &entry->list){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/if (thrdpool_schedule(&task, this->thrdpool) < 0){list_del(&entry->list);delete entry;entry = NULL;}}queue->mutex.unlock();return -!entry;
}
struct ExecSessionEntry
{struct list_head list;ExecSession *session;thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{ExecQueue *queue = (ExecQueue *)context;ExecSessionEntry *entry;ExecSession *session;queue->mutex.lock();entry = list_entry(queue->session_list.next, ExecSessionEntry, list);list_del(&entry->list);session = entry->session;if (!list_empty(&queue->session_list)){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/__thrdpool_schedule(&task, entry, entry->thrdpool);}elsedelete entry;queue->mutex.unlock();session->execute(); //这里会执行到用户routinesession->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

这篇关于workflow源码解析:ThreadTask的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/616584

相关文章

使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)

《使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)》在现代软件开发中,处理JSON数据是一项非常常见的任务,无论是从API接口获取数据,还是将数据存储为JSON格式,解析... 目录1. 背景介绍1.1 jsON简介1.2 实际案例2. 准备工作2.1 环境搭建2.1.1 添加

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

在C#中合并和解析相对路径方式

《在C#中合并和解析相对路径方式》Path类提供了几个用于操作文件路径的静态方法,其中包括Combine方法和GetFullPath方法,Combine方法将两个路径合并在一起,但不会解析包含相对元素... 目录C#合并和解析相对路径System.IO.Path类幸运的是总结C#合并和解析相对路径对于 C

Java解析JSON的六种方案

《Java解析JSON的六种方案》这篇文章介绍了6种JSON解析方案,包括Jackson、Gson、FastJSON、JsonPath、、手动解析,分别阐述了它们的功能特点、代码示例、高级功能、优缺点... 目录前言1. 使用 Jackson:业界标配功能特点代码示例高级功能优缺点2. 使用 Gson:轻量

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

python解析HTML并提取span标签中的文本

《python解析HTML并提取span标签中的文本》在网页开发和数据抓取过程中,我们经常需要从HTML页面中提取信息,尤其是span元素中的文本,span标签是一个行内元素,通常用于包装一小段文本或... 目录一、安装相关依赖二、html 页面结构三、使用 BeautifulSoup javascript

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL