DAG计算框架:实现业务编排

2024-08-24 04:44

本文主要是介绍DAG计算框架:实现业务编排,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • DAG
  • 如何实现DAG计算框架
    • Node的实现
    • Engine的实现
    • Graph的实现
    • 具体某个节点
    • 如何使用

在工作几年之后,大部分人如果还在继续做着 CRUD的简单重复工作,即使领导不提出对你更高的期望,自身也会感到焦虑吧。学如逆水行舟不进则退,年龄在增,技术深度也需要不断精进,否则就很可能面临淘汰。因此找个时间静下心来,为自己做一个技术规划是非常有必要的。

在工作中想要做好技术规划,就必须抓住一个软件系统的演进见律:

函数->类->组件->脚本->服务->系统->分栈/层->配置化/标准化->自动化->平台化->产品化->规模化

软件工程的本质就是应对规模化所带来的复杂性。

因此如何将复杂的东西变简单,以便于承接更大的规模化发展,这本身就是技术的本质,因此是极其有技术含量的事。

DAG

在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG Directed Acyclic Graph),在工作中,大部分规则引擎都会用到DAG
在这里插入图片描述

设计时,一般只需要在请求到来时,根据变化的配置信息对DAG进行初始化,根据上下文中的信息(一般用自定义的ctx携带信息),每个node决定是否真的执行具体的task(或者跳过),将业务组件最大程度的复用和内聚。

如何实现DAG计算框架

Node的实现

通过继承扩展实现,业务开发需要实现两个函数EnableRun,其中所有参数检查逻辑在Enable中完成, Enable返回false代表不启用此NodeRun函数是真正执行
业务逻辑的函数实现,这样对于一个具体Node的所有业务逻辑都被高度内聚在了一个文件中实现。

type Node struct {Ctx *BizCtx // 自定义的上下文Name string //节点名字,一般表示业务单元的标识(一个业务流程是一个Node)g *Graph  //整个DAG的控制对象ID int64 // 保证唯一的IDDeps []string // 所有的父节点Nexts []*Node // 所有的子节点 Mask int64 // 表示依赖的演码,用于标识当前节点是否可达(可执行)
}//根据相关参数构建节点
func NewNode(...)*Node{....}
// 获取上下文
func (n *Node)GetCtx()*BizCtx{return n.BizCtx}
// 参数校验
func (n *Node)Enable()bool{...}
// 节点实际运行
func (n *Node)Run(){}
// 获取节点名字
func (n *Node)Name()string{return n.Name}
// 节点依赖,可理解成当前节点的父节点,本示例中,父节点都执行成功后,才可执行当前节点
func (n *Node)Deps()[]string{return n.Deps}
// 所有的子节点
func (n *Node)Nexts()[]*Node{return n.Nexts}
// 节点的配置,不同业务,可能有自己的一些配置参数,如分流参数,奖励参数
func (n *Node)Conf()*NodeConf{}

Engine的实现

提供DAG计算框架的运行时资源管理,协程池管理计算资源原,对象池管理内存资源。

type Engine struct { // engine的生命周期是进程级的
ctxPool *sync.Pool
gPool *sync.Pool
runPool *goPool // 某种协程池实现,接受两个函数,一个函数执行和一个回调函数
graph *Grpah // graph代表着一个真正的DAG,是请求级的生命周期。
}// 函数式选项模式获得一个Engine对象
func NewEngine(opt Options)*Engine{...}
// 初始化一个图
func (e *Engine)BuildGraph()*Graph{...}

Graph的实现

真正实现DAG调度的组件,请求范围内的生命周期

var RootNode = &Node{ID:-1}
var EndNode = &Node{ID:-2}
type Graph struct{e *Engine // Graph 和Engine互相包含id int64taskChannel chan int64 // 需要执行的节点的IDackChanel   chan int64   // 异步回调的确认chandoneChannel chan struct{}  // 执行完成或者异常时,终止DAG的通道NameTable   map[string]*Node // 节点名与节点的映射IDTable     map[int64]*Node // 节点id与节点的映射
}// 添加节点,拼接实际的图
// 这里默认添加顺序遵循了添加当前节点时,已经添加完了当前节点依赖的所有父节点
func (g *Graph)ADD(node *Node)*Graph{g.NameTable[node.Name()] = node// Mask在构建Node时会设置和Node的ID字段相同,然后与所有父节点的ID异或,得到新的Mask值// 后面执行时候,当前节点执行成功,就与其所有子节点异或,更新了子节点的Mask值// 当子节点的Mask值和其ID值又相等时,说明当前节点的所有父节点都执行成功了,可以执行当前节点了for nodeName <- node.Deps(){preNode := g.NameTable[nodeName]node.Mask ^= preNode.IDpreNode.Nexts = append(preNode.Nexts, node)}
}func (g *Graph)Run() err {
//在遍历过程中对第一层没有依赖的node添加一个 rootNode,其 ID== -1
//在遍历的过程中对最后没有出度的节点添加上特殊的 终止Nodle,其ID=== -2
// 即默认让-1和-2分别作为根节点和终止节点
g.taskChannel <- -1
for{select{case taskID <-g.taskChannel:// 遇到了终止节点,当前图可以终止执行了if taskID == -2 {close(g.doneChannel)}node := g.IDTable[taskID]if node.Enable(){// 使用Engine管理协程执行g.e.runPool(func(){node.Run()}, func(id int64){g.ackChannel<-id})}//这里也可以基于协程池做异步控制,ackcase taskID <- g.ackChanel:node := g.IDTable[taskID]// 当前节点致辞哪个成功后,通知所有子节点for nextNode <- node.Nexts(){nextNode.Mask ^= node.ID//利用相同数字异或结果为0的特性维护任务依赖状态// 该子节点可以放入可执行的channel中了if nextNode.Mask == nextNode.ID { gg.taskChannel <- nextNode.ID}case <-g.doneChannel:g.Close()return

具体某个节点

这里以一个RecoveryNode为例,其可能放于文件:/nodes/recovery.go一个单独文件中,其他具体的节点也都放于单独的文件中,但都共同放在nodes文件夹中。

type RecoveryNode struct{*Node // 继承节点的能力.... // 其他与当前节点相关的业务自定义字段
}
//注册名字
func (n *Node)Name()string{return "recovery"}//注册依赖,假如当前节点依赖了分流和奖励节点
func (n *Node)Deps()[]string{return []string{"shunt", "reward"}//可以把Node的name定义为常量进行传递会更好,避免出错
}func NewRecoveryNode(/**这里也可以传参数**/)*Node{return &RecoveryNode{Node:NewNode()}
}func (n *RecoveryNode)Enable()bool{// 利用了ctx的WithValue能力,如下shunt.path就是一个key,其中shunt可以理解成是命名空间,表示是shunt节点中设置的path key,取其值// 这里表示分流节点中的通过路径不是1时,可以执行当前节点return n.Node.GetCtx().GetString("shunt.path","") != "1"
}func (n *Node)Run(){count := n.Node.GetCtx().GetInt64("reward.count", 20)list:=Recovery(count)//示意而已,不用在乎业务具体逻辑n.Node.GetCtx().SetInt64List("recovery.success"", true)
}// ... 其他一些方法的实现

如何使用

var e *Engine
func init(){e = NewEngine{}
}func handler(){g := e.BuildGraph().ADD(NewShuntNode()).ADD(NewRewardNode()).ADD(NewRecoveryNode()).ADD(NewPackDataNode())err := g.Run()print(err)

这篇关于DAG计算框架:实现业务编排的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101485

相关文章

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

Python实现高效地读写大型文件

《Python实现高效地读写大型文件》Python如何读写的是大型文件,有没有什么方法来提高效率呢,这篇文章就来和大家聊聊如何在Python中高效地读写大型文件,需要的可以了解下... 目录一、逐行读取大型文件二、分块读取大型文件三、使用 mmap 模块进行内存映射文件操作(适用于大文件)四、使用 pand

python实现pdf转word和excel的示例代码

《python实现pdf转word和excel的示例代码》本文主要介绍了python实现pdf转word和excel的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、引言二、python编程1,PDF转Word2,PDF转Excel三、前端页面效果展示总结一

Python xmltodict实现简化XML数据处理

《Pythonxmltodict实现简化XML数据处理》Python社区为提供了xmltodict库,它专为简化XML与Python数据结构的转换而设计,本文主要来为大家介绍一下如何使用xmltod... 目录一、引言二、XMLtodict介绍设计理念适用场景三、功能参数与属性1、parse函数2、unpa

C#实现获得某个枚举的所有名称

《C#实现获得某个枚举的所有名称》这篇文章主要为大家详细介绍了C#如何实现获得某个枚举的所有名称,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以参考一下... C#中获得某个枚举的所有名称using System;using System.Collections.Generic;usi

Go语言实现将中文转化为拼音功能

《Go语言实现将中文转化为拼音功能》这篇文章主要为大家详细介绍了Go语言中如何实现将中文转化为拼音功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 有这么一个需求:新用户入职 创建一系列账号比较麻烦,打算通过接口传入姓名进行初始化。想把姓名转化成拼音。因为有些账号即需要中文也需要英

C# 读写ini文件操作实现

《C#读写ini文件操作实现》本文主要介绍了C#读写ini文件操作实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录一、INI文件结构二、读取INI文件中的数据在C#应用程序中,常将INI文件作为配置文件,用于存储应用程序的

C#实现获取电脑中的端口号和硬件信息

《C#实现获取电脑中的端口号和硬件信息》这篇文章主要为大家详细介绍了C#实现获取电脑中的端口号和硬件信息的相关方法,文中的示例代码讲解详细,有需要的小伙伴可以参考一下... 我们经常在使用一个串口软件的时候,发现软件中的端口号并不是普通的COM1,而是带有硬件信息的。那么如果我们使用C#编写软件时候,如