FATE Board 执行流程探索

2024-08-30 10:36
文章标签 board 流程 探索 执行 fate

本文主要是介绍FATE Board 执行流程探索,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景介绍

FATE Board 是 FATE 提供的一个工程,用于给 FATE 提供可视化能力,方便在联邦学习训练中实时查看执行状态,更好地定位执行中遇到的问题。

查看 FATE 架构可以看到 FATE Board 是建立在 MySQL 和 FATE Flow Server 的基础上的,看起来数据来源是来自于这两者。FATE Flow Server 在之前的文章 中已经介绍过,FATE 中隐私计算的主要调度流程都是实现在这个服务中。

请添加图片描述

FATE Board 代码仓库地址 https://github.com/FederatedAI/FATE-Board, 本文的探索基于 v1.11.1,后续版本可能有所不同

FATE Board 实现探索

FATE Board 工程中包含前端与后端的实现,前端是基于 Vue 实现的,后端则是基于 Java 实现。本文在探索时主要基于两个场景串联了一下完整的流程,分别是主页面的 job 列表页,以及 job 日志详情,通过查看完整的调用链路,对 FATE Board 建立基础的认识。

Job 列表页

请添加图片描述

通过 Chrome 调试模式查看对应的请求,即可比较容易发现获取 job 列表数据对应的请求为 /job/query/page/new , 通过对应的接口路径全局搜索可以发现后端的实现为 src/main/java/com/webank/ai/fate/board/controller/JobManagerController.java 中的 queryPagedJob() 方法,对应的代码实现如下:

public PageBean<Map<String, Object>> queryPagedJobs(PagedJobQO pagedJobQO) {String jobId = pagedJobQO.getJobId();FlowJobQO flowJobQO = new FlowJobQO();if (jobId != null && 0 != jobId.trim().length()) {flowJobQO.setJob_id(pagedJobQO.getJobId());}// 构造请求参数 ...// 实际获取数据Map<String, Object> jobMap = getJobMap(flowJobQO);// ... 冗长的业务处理
}

可以看到的真正的数据获取部分基本就是直接调用 getJobMap() ,对应的实现如下所示:

private Map<String, Object> getJobMap(Object query) {result = flowFeign.post(Dict.URL_JOB_QUERY, JSON.toJSONString(query));// ... 冗长的结果转换
}

实际的获取是通过一次 HTTP 请求获取到,对应的请求地址为 /v1/job/list/job,看情况应该是调用 FATE Flow Server 获取的,在 FATE-Flow 中看到的对应的接口,处于路径 FATE-Flow/python/fate_flow/apps/job_app.py 中的 list_job(),实际的实现就是一次简单的数据库查询,不再进一步展开。

Job 日志

请添加图片描述

通过 chrome 调试模式看到实际获取 Job 日志是通过 websocket 获取的,请求的地址为 /log/new/202307260855242117390/host/8889/default,目前来看日志的获取和 job 列表的获取存在一些差异

依旧利用请求地址搜索对应的代码实现,可以确认后端对应的实现路径为 src/main/java/com/webank/ai/fate/board/websocket/LogWebSocketController.java 中的 LogWebSocketController 类实现,对于 websocket 的服务端,消息处理都是在 onMessage 实现的,我们可以看到对应的代码实现如下:

@OnMessage
public void onMessage(String message,Session session,@PathParam("jobId") String jobId,@PathParam("role") String role,@PathParam("partyId") String partyId,@PathParam("componentId") String componentId) throws Exception {synchronized (session) {LogQuery logQuery = JSON.parseObject(message, LogQuery.class);// 根据类型主要包含 logSize 和 logCat,其中 logSize 用于获取日志行数,logCat 获取日志内容if (logQuery.getType().equals(LogTypeEnum.LOG_SIZE.boardValue)) {logSize(session, jobId, role, partyId, componentId, logQuery);} else {logCat(session, jobId, role, partyId, componentId, logQuery);}}
}

可以看到的通过路径获取 jobId, role, partyId, componentId 的参数,然后调用 logSize()logCat() 执行实际的处理,我们主要关注日志内容的获取,可以看到 logCat() 对应的实现如下所示:

private void logCat(Session session, String jobId, String role, String partyId, String componentId, LogQuery logQuery) {// 构造请求FlowLogCatReq flowLogCatReq = new FlowLogCatReq();flowLogCatReq.setJob_id(jobId);flowLogCatReq.setLog_type(Dict.logTypeMap.get(logQuery.getType()));flowLogCatReq.setRole(role);flowLogCatReq.setParty_id(Integer.valueOf(partyId));flowLogCatReq.setComponent_name(componentId);flowLogCatReq.setInstance_id(logQuery.getInstanceId());flowLogCatReq.setBegin(logQuery.getBegin());flowLogCatReq.setEnd(logQuery.getEnd());// 实际获取数据FlowResponse<List<FlowLogCatResp>> resultFlow = flowLogFeign.logCat(flowLogCatReq);// 构造响应数据LogContentResponse logContentResponse = new LogContentResponse();logContentResponse.setType(logQuery.getType());logContentResponse.setData(resultFlow.getData().stream().map(LogContentResponse.LogContent::fromFlowContent).collect(Collectors.toList()));try {session.getBasicRemote().sendText(JSON.toJSONString(logContentResponse));} catch (IOException e) {e.printStackTrace();logger.error("websocket send error: {}", logContentResponse);}
}

根据最核心的数据获取是调用 flowLogFeign.logCat() ,对应的实现:

@FeignClient(url = RouteTargeter.URL_PLACE_HOLDER + "/v1/log", name = "flowLogFeign", configuration = FeignRequestInterceptor.class)
public interface FlowLogFeign {// 构造 http 请求@RequestMapping(value = "/cat", method = RequestMethod.POST)FlowResponse<List<FlowLogCatResp>> logCat(FlowLogCatReq request);@RequestMapping(value = "/size", method = RequestMethod.POST)FlowResponse<FlowLogSizeResp> logSize(FlowLogSizeReq request);
}

最后兜了一圈,看起来还是转换了一次网络请求,看起来还是发送给了 FATE Flow Server,追踪 FATE-Flow 工程中的对应实现,可以看到对应的网络请求位于 FATE-Flow/python/fate_flow/apps/log_app.py 路径下,具体的实现位于 FATE-Flow/python/fate_flow/utils/log_sharing_utils.py 中的 cat_log() 方法中,实现如下:

def cat_log(self, begin, end):line_list = []log_path = self.get_log_file_path()if begin and end:cmd = f"cat {log_path} | tail -n +{begin}| head -n {end-begin+1}"elif begin:cmd = f"cat {log_path} | tail -n +{begin}"elif end:cmd = f"cat {log_path} | head -n {end}"else:cmd = f"cat {log_path}"lines = self.execute(cmd)if lines:line_list = []line_num = begin if begin else 1for line in lines.split("\n"):line = replace_ip(line)line_list.append({"line_num": line_num, "content": line})line_num += 1return line_list

可以看到最终就是调用系统的 cat 命令,最终文件对应的内容,整体实现简单直接。

总结

通过对 FATE-Board 两个请求的调用链路的跟踪,可以对 FATE-Board 工程有了一些了解,看起来 FATE-Board 是建立在 FATE-Flow 基础上的一个简单可视化,使用的能力基本都是通过 FATE-Flow 提供,而 FATE-Board 仅仅提供必要的数据包装与前端的展示呈现,过程简单清晰。后续如果希望了解 FATE-Board 对应的可视化的能力范围,直接查看 FATE-Flow 对应提供的接口即可

这篇关于FATE Board 执行流程探索的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120576

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

maven 编译构建可以执行的jar包

💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~ 专栏导航 Python系列: Python面试题合集,剑指大厂Git系列: Git操作技巧GO

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出 在数字化时代,文本到语音(Text-to-Speech, TTS)技术已成为人机交互的关键桥梁,无论是为视障人士提供辅助阅读,还是为智能助手注入声音的灵魂,TTS 技术都扮演着至关重要的角色。从最初的拼接式方法到参数化技术,再到现今的深度学习解决方案,TTS 技术经历了一段长足的进步。这篇文章将带您穿越时

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

jenkins 插件执行shell命令时,提示“Command not found”处理方法

首先提示找不到“Command not found,可能我们第一反应是查看目标机器是否已支持该命令,不过如果相信能找到这里来的朋友估计遇到的跟我一样,其实目标机器是没有问题的通过一些远程工具执行shell命令是可以执行。奇怪的就是通过jenkinsSSH插件无法执行,经一番折腾各种搜索发现是jenkins没有加载/etc/profile导致。 【解决办法】: 需要在jenkins调用shell脚

轻松录制每一刻:探索2024年免费高清录屏应用

你不会还在用一些社交工具来录屏吧?现在的市面上有不少免费录屏的软件了。别看如软件是免费的,它的功能比起社交工具的录屏功能来说全面的多。这次我就分享几款我用过的录屏工具。 1.福晰录屏大师 链接直达:https://www.foxitsoftware.cn/REC/  这个软件的操作方式非常简单,打开软件之后从界面设计就能看出来这个软件操作的便捷性。界面的设计简单明了基本一打眼你就会轻松驾驭啦

火语言RPA流程组件介绍--浏览网页

🚩【组件功能】:浏览器打开指定网址或本地html文件 配置预览 配置说明 网址URL 支持T或# 默认FLOW输入项 输入需要打开的网址URL 超时时间 支持T或# 打开网页超时时间 执行后后等待时间(ms) 支持T或# 当前组件执行完成后继续等待的时间 UserAgent 支持T或# User Agent中文名为用户代理,简称 UA,它是一个特殊字符串头,使得服务器

Lua 脚本在 Redis 中执行时的原子性以及与redis的事务的区别

在 Redis 中,Lua 脚本具有原子性是因为 Redis 保证在执行脚本时,脚本中的所有操作都会被当作一个不可分割的整体。具体来说,Redis 使用单线程的执行模型来处理命令,因此当 Lua 脚本在 Redis 中执行时,不会有其他命令打断脚本的执行过程。脚本中的所有操作都将连续执行,直到脚本执行完成后,Redis 才会继续处理其他客户端的请求。 Lua 脚本在 Redis 中原子性的原因