workflow源码解析:ThreadTask

2024-01-17 16:20

本文主要是介绍workflow源码解析:ThreadTask,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。// 定义INPUT
struct AddInput
{int x;int y;
};// 定义OUTPUT
struct AddOutput
{int res;
};// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{output->res = input->x + input->y;
}using AddTask = WFThreadTask<AddInput, AddOutput>;void callback(AddTask *task)
{auto *input = task->get_input();auto *output = task->get_output();assert(task->get_state() == WFT_STATE_SUCCESS);fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}int main()
{using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;AddTask *task = AddFactory::create_thread_task("add_task",add_routine,callback);AddInput *input = task->get_input();input->x = 1;input->y = 2;task->start();getchar();return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:using T = WFThreadTask<INPUT, OUTPUT>;...
public:static T *create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (T *)> callback);...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,std::function<void (INPUT *, OUTPUT *)> routine,std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),WFGlobal::get_compute_executor(),std::move(routine),std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:virtual void execute()  //实现ExecSession的纯虚函数{this->routine(&this->input, &this->output); //执行用户程序的routine}protected:std::function<void (INPUT *, OUTPUT *)> routine;public:__WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (INPUT *, OUTPUT *)>&& rt,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),routine(std::move(rt)){}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:void start();void dismiss();INPUT *get_input() { return &this->input; }OUTPUT *get_output() { return &this->output; }void *user_data;int get_state() const { return this->state; }int get_error() const { return this->error; }void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:virtual SubTask *done();protected:INPUT input;OUTPUT output;std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;public:WFThreadTask(ExecQueue *queue, Executor *executor,std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :ExecRequest(queue, executor),callback(std::move(cb)){// 初始化}protected:virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:ExecRequest(ExecQueue *queue, Executor *executor);ExecQueue *get_request_queue() const { return this->queue; }void set_request_queue(ExecQueue *queue) { this->queue = queue; }virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口{this->executor->request(this, this->queue);...}protected:int state;int error;ExecQueue *queue;Executor *executor;protected:virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{// 子任务被调起的时机virtual void dispatch() = 0;// 子任务执行完成的时机virtual SubTask *done() = 0;// 内部实现,决定了任务流走向void subtask_done();...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:virtual void execute() = 0;virtual void handle(int state, int error) = 0;protected:ExecQueue *get_queue() { return this->queue; }private:ExecQueue *queue;...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{...
private:struct list_head task_list;pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中int request(ExecSession *session, ExecQueue *queue);private:// 执行器和系统资源,是一个包含关系thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{ExecSessionEntry *entry = new ExecSessionEntry;session->queue = queue;entry->session = session;entry->thrdpool = this->thrdpool;queue->mutex.lock();list_add_tail(&entry->list, &queue->session_list);if (queue->session_list.next == &entry->list){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/if (thrdpool_schedule(&task, this->thrdpool) < 0){list_del(&entry->list);delete entry;entry = NULL;}}queue->mutex.unlock();return -!entry;
}
struct ExecSessionEntry
{struct list_head list;ExecSession *session;thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{ExecQueue *queue = (ExecQueue *)context;ExecSessionEntry *entry;ExecSession *session;queue->mutex.lock();entry = list_entry(queue->session_list.next, ExecSessionEntry, list);list_del(&entry->list);session = entry->session;if (!list_empty(&queue->session_list)){struct thrdpool_task task = {Executor::executor_thread_routine, queue};/*{.routine	=	Executor::executor_thread_routine,.context	=	queue};*/__thrdpool_schedule(&task, entry, entry->thrdpool);}elsedelete entry;queue->mutex.unlock();session->execute(); //这里会执行到用户routinesession->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

这篇关于workflow源码解析:ThreadTask的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/616584

相关文章

C语言中自动与强制转换全解析

《C语言中自动与强制转换全解析》在编写C程序时,类型转换是确保数据正确性和一致性的关键环节,无论是隐式转换还是显式转换,都各有特点和应用场景,本文将详细探讨C语言中的类型转换机制,帮助您更好地理解并在... 目录类型转换的重要性自动类型转换(隐式转换)强制类型转换(显式转换)常见错误与注意事项总结与建议类型

MySQL 缓存机制与架构解析(最新推荐)

《MySQL缓存机制与架构解析(最新推荐)》本文详细介绍了MySQL的缓存机制和整体架构,包括一级缓存(InnoDBBufferPool)和二级缓存(QueryCache),文章还探讨了SQL... 目录一、mysql缓存机制概述二、MySQL整体架构三、SQL查询执行全流程四、MySQL 8.0为何移除查

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

使用Java实现一个解析CURL脚本小工具

《使用Java实现一个解析CURL脚本小工具》文章介绍了如何使用Java实现一个解析CURL脚本的工具,该工具可以将CURL脚本中的Header解析为KVMap结构,获取URL路径、请求类型,解析UR... 目录使用示例实现原理具体实现CurlParserUtilCurlEntityICurlHandler

深入解析Spring TransactionTemplate 高级用法(示例代码)

《深入解析SpringTransactionTemplate高级用法(示例代码)》TransactionTemplate是Spring框架中一个强大的工具,它允许开发者以编程方式控制事务,通过... 目录1. TransactionTemplate 的核心概念2. 核心接口和类3. TransactionT

数据库使用之union、union all、各种join的用法区别解析

《数据库使用之union、unionall、各种join的用法区别解析》:本文主要介绍SQL中的Union和UnionAll的区别,包括去重与否以及使用时的注意事项,还详细解释了Join关键字,... 目录一、Union 和Union All1、区别:2、注意点:3、具体举例二、Join关键字的区别&php

Spring IOC控制反转的实现解析

《SpringIOC控制反转的实现解析》:本文主要介绍SpringIOC控制反转的实现,IOC是Spring的核心思想之一,它通过将对象的创建、依赖注入和生命周期管理交给容器来实现解耦,使开发者... 目录1. IOC的基本概念1.1 什么是IOC1.2 IOC与DI的关系2. IOC的设计目标3. IOC

java中的HashSet与 == 和 equals的区别示例解析

《java中的HashSet与==和equals的区别示例解析》HashSet是Java中基于哈希表实现的集合类,特点包括:元素唯一、无序和可包含null,本文给大家介绍java中的HashSe... 目录什么是HashSetHashSet 的主要特点是HashSet 的常用方法hasSet存储为啥是无序的

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Linux中shell解析脚本的通配符、元字符、转义符说明

《Linux中shell解析脚本的通配符、元字符、转义符说明》:本文主要介绍shell通配符、元字符、转义符以及shell解析脚本的过程,通配符用于路径扩展,元字符用于多命令分割,转义符用于将特殊... 目录一、linux shell通配符(wildcard)二、shell元字符(特殊字符 Meta)三、s