DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析

本文主要是介绍DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

事件和事件队列

DAOS API 函数可以在阻塞或非阻塞模式下使用。 这是通过传递给每个 API 调用的指向 DAOS 事件的指针来确定的:如果 NULL 表示操作将被阻塞。 操作完成后会返回。 所有失败情况的错误码都将通过API函数本身的返回码返回。 如果使用有效的事件,则该操作将以非阻塞模式运行,并在内部调度程序中调度该操作以及将 RPC 提交到底层堆栈后立即返回。 如果调度成功,则操作的返回值为success,但并不表示实际操作成功。 返回时可以捕获的错误要么是无效参数,要么是调度问题。 当事件完成时,操作的实际返回代码将在事件错误代码 (event.ev_error) 中提供。 必须首先通过单独的 API 调用创建要使用的有效事件。 为了允许用户一次跟踪多个事件,可以将事件创建为事件队列的一部分,事件队列基本上是可以一起进行和轮询的事件的集合。 事件队列还在内部为所有 DAOS 任务创建一个单独的任务调度程序以及一个新的网络上下文。 在某些网络提供商上,网络上下文创建是一项昂贵的操作,因此用户应尝试限制在 DAOS 之上的应用程序或 IO 中间件库中创建的事件队列的数量。 或者,可以在没有事件队列的情况下创建事件,并单独跟踪。 在这种情况下,对于阻塞操作,将使用内部全局任务调度程序和网络上下文来代替为事件队列创建的独立任务调度程序和网络上下文。 事件完成后,它可以重新用于另一个 DAOS API 调用,以最大限度地减少 DAOS 库内事件创建和分配的需要

DAOS Task API 提供了一种以非阻塞方式使用 DAOS API 的替代方法,同时在 DAOS API 操作之间构建任务依赖树。 这对于使用 DAOS 并需要构建彼此之间具有依赖关系(N-1、1-N、N-N)的 DAOS 操作计划的应用程序和中间件库非常有用

要利用任务 API,用户需要创建一个调度程序,其中可以创建 DAOS 任务作为其中的一部分。 任务 API 足够通用,允许用户混合 DAOS 特定任务(通过 DAOS 任务 API)和其他用户定义的任务,并在这些任务之间添加依赖关系

有关如何在客户端库中使用 TSE 的更多详细信息,请参阅 TSE 内部文档(https://github.com/ssbandjl/daos/blob/master/src/common/README.md)以获取更多详细信息

事件与事件队列及任务调度引擎流程图

在这里插入图片描述

流程说明(dfuse为例)

以DAOS用户态文件系统dfuse为例

  • 在初始化客户端库中初始化事件队列, 关联全局网络上下文, 设置调度器

  • 启动文件系统中注册了SLAB, 绑定事件队列和事件, 参考: daos_event_init

  • 开启轮训线程dfuse_progress_thread, 参考daos_eq_poll

  • 文件系统执行写

    客户端写数据:xb/write.c -> write(fd, direct_write_buf, BUF_SIZE)
    write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3
    
  • 封装ev, 并将ev传下去: dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev)

    DAOS用户态文件系统, 写流程
    master -> src/client/dfuse/ops/write.c -> dfuse_cb_write(fuse_req_t req, fuse_ino_t ino, struct fuse_bufvec *bufv, off_t position, struct fuse_file_info *fi)
    struct dfuse_projection_info *fs_handle = fuse_req_userdata(req)
    eqt_idx = atomic_fetch_add_relaxed(&fs_handle->di_eqt_idx, 1) -> 原子递增,每次返回+1前的值, 比如: eqt_idx=7
    eqt = &fs_handle->di_eqt[eqt_idx % fs_handle->di_eq_count] -> 取余打散到eq
    ev = d_slab_acquire(eqt->de_write_slab) -> 分配EV, 需要提前注册: d_slab_register(&fs_handle->di_slab, &write_slab, eqt, &eqt->de_write_slab)
    ev->de_complete_cb = dfuse_cb_write_complete
    dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev)daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev) -> daos_array_write(daos_handle_t oh, daos_handle_t th, daos_array_iod_t *iod, d_sg_list_t *sgl, daos_event_t *ev)dc_task_create(dc_array_write, NULL, ev, &task) -> 关联EV和tasksched = daos_ev2sched(ev) -> 拿到调度器指针, 初始化调度器return dc_task_schedule(task, true)
    sem_post(&eqt->de_sem) -> 唤醒EQ
    d_slab_restock(eqt->de_write_slab) -> 重用slab
    
  • 与tse结合构造task, 调度task

  • 网络回复后, 在轮训线程中trigger到, 拿到ev和task, 逐层向上级执行回调函数, 最终执行业务回调

源码分析

客户端mount, master, gdb --args /opt/daos/bin/dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb -f -> 默认后台启动
dfuse -m /mnt/sxb --pool sxb --cont sxb | dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb
dfuse_main.c -> maindaos_debug_init(DAOS_LOG_DEFAULT)d_log_init_adv 高级日志初始化, 客户端日志文件log_file = getenv(D_LOG_FILE_ENV) export D_LOG_FILE=/tmp/daos_client.logdebug_prio_err_load_envd_log_openfreopen(mst.log_file 重新关联标准输出或错误输出setlinebuf(stderr) 设置错误输出为行缓冲d_log_sync_maskdfuse_info->di_eq_count = 1daos_init -> 初始化客户端库daos_debug_initdaos_hhash_init_featsdc_agent_initdc_job_initdc_mgmt_net_cfgdaos_eq_lib_init -> 初始化事件队列库 -> static tse_sched_t daos_sched_g -> 指向不属于 EQ 一部分的事件的全局调度程序的指针。 作为 EQ 一部分初始化的事件将在该 EQ 调度程序中进行跟踪crt_init_optcrt_context_create(&daos_eq_ctx) -> 全局共享网络上下文, 所有事件队列(eq)共享使用这个上下文tse_sched_init(&daos_sched_g, NULL, daos_eq_ctx) -> 初始化调度器(无事件队列), 为无eq事件设置调度器dc_mgmt_net_cfg_checkpl_initdc_mgmt_initdc_pool_initdc_cont_initdc_obj_initdfuse_fs_init(dfuse_info) -> daos用户态文件信息=文件系统控制器D_ALLOC_ARRAY(fs_handle->di_eqt, fs_handle->di_eq_count) -> eq数组d_hash_table_create_inplace dpi_pool_table 打开的池表, 创建hash表 大小=power2(n次方), 操作方法dpi_iet open inodesfor (i = 0; i < fs_handle->di_eq_count; i++)struct dfuse_eq *eqt = &fs_handle->di_eqt[i] -> 根据传入的EQ数量, 将eq与文件系统句柄中的eq表绑定eqt->de_handle = fs_handle -> 互存指针,双向绑定sem_init(&eqt->de_sem, 0, 0) -> 在 eq 之前创建信号量,因为无法检查 sem_init() 是否已被调用,如果没有调用 sem_destroy 也是无效的。 这样我们就可以避免添加额外的内存来跟踪信号量的状态daos_eq_create(&eqt->de_eq) -> 一个事件队列关联一个网络上下文, 跟踪池的多个事件 -> 创建事件队列。 事件队列用于保存和池化多个事件。 创建的每个事件队列都将创建一个与事件队列关联的网络(cart)上下文。 网络上下文创建是一项昂贵的操作,并且在某些系统上网络上下文的数量可能受到限制。 因此,建议不要在用户应用程序或中间件中创建大量事件队列eq = daos_eq_alloc() -> 分配eqD_INIT_LIST_HEAD(&eq->eq_running)D_INIT_LIST_HEAD(&eq->eq_comp)daos_hhash_hlink_init(&eqx->eqx_hlink, &eq_h_ops)return eqcrt_context_create(&eqx->eqx_ctx)crt_contpext_provider_createcrt_context_initdaos_eq_insert(eqx)daos_hhash_link_insert(&eqx->eqx_hlink, DAOS_HTYPE_EQ) -> 插入全局hash表(struct daos_hhash_table	daos_ht)daos_eq_handle(eqx, eqh)daos_hhash_link_key(&eqx->eqx_hlink, &h->cookie) -> 关联keytse_sched_init(&eqx->eqx_sched, NULL, eqx->eqx_ctx) -> 初始化调度器 -> struct tse_sched_private -> 调度器及队列,属性等struct tse_sched_private	*dsp = tse_sched2priv(sched) -> 设置调度器私有指针dspD_INIT_LIST_HEAD(&dsp->dsp_init_list); -> 初始队列D_INIT_LIST_HEAD(&dsp->dsp_running_list); -> 运行队列D_INIT_LIST_HEAD(&dsp->dsp_complete_list) -> 完成队列D_INIT_LIST_HEAD(&dsp->dsp_sleeping_list) -> 睡眠队列D_INIT_LIST_HEAD(&dsp->dsp_comp_cb_list); -> 完成回调队列tse_sched_register_comp_cb(sched, comp_cb, udata) -> 初始回调为空dsc->dsc_comp_cb = comp_cb -> 设置调度器的完成回调和回调参数(udata)d_list_add(&dsc->dsc_list, &dsp->dsp_comp_cb_list) -> 将调度器的完成回调 -> 调度器的完成回调队列sched->ds_udata = udata -> 将网络上下文 daos_eq_ctx 设置到调度器的用户数据指针上(也可以是回调数据等)daos_eq_putref(eqx) -> 减一次引用计数(ch_rec_decref)duns_resolve_pathdfuse_pool_connectdfuse_cont_opendfuse_fs_start 启动文件系统d_hash_rec_insert(&fs_handle->dpi_iet 将根插入hash表, 在 dfuse_reply_entry 中也会插入: d_hash_rec_find_insert(&fs_handle->dpi_ietd_slab_init(&fs_handle->di_slab, fs_handle)for (i = 0; i < fs_handle->di_eq_count; i++)d_slab_register(&fs_handle->di_slab, &read_slab, eqt, &eqt->de_read_slab)create_many(type)ptr   = create(type) -> create(struct d_slab_type *type)type->st_reg.sr_init(ptr, type->st_arg) -> dfuse_event_init -> ev->de_eqt = handle -> 为ev绑定daos_event_tif (!type->st_reg.sr_reset(ptr)) -> dfuse_read_event_reset(void *arg) -> 重置读事件evD_ALLOC(ev->de_iov.iov_buf, DFUSE_MAX_READ) -> 读最大1MBev->de_sgl.sg_nr       = 1daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL) -> 父事件为空, 也支持父事件,: daos_event_init(child_events[i], DAOS_HDL_INVAL, &event)evx->evx_status	= DAOS_EVS_READYif (daos_handle_is_valid(eqh)) -> 句柄有效eqx = daos_eq_lookup(eqh)evx->evx_ctx = eqx->eqx_ctxevx->evx_sched = &eqx->eqx_sched -> 继承EQ的网络和调度器entry = ptr + type->st_reg.sr_offsetd_list_add_tail(entry, &type->st_free_list) -> 将对象加入空闲列表,计数器+1type->st_free_count++d_list_add_tail(&type->st_type_list, &slab->slab_list) -> 将slab放入列表备用for (i = 0; i < fs_handle->di_eq_count; i++) dfuse_progress_thread pthread_create(&fs_handle->dpi_thread, NULL, dfuse_progress_thread, fs_handle) 异步进度线程,该线程在启动时使用事件队列启动,并阻塞在信号量上,直到创建异步事件,此时线程唤醒并在 daos_eq_poll() 中忙于轮询直到完成sem_waitdaos_eq_poll  从 EQ 中检索完成事件daos_eq_lookup 查找私有事件队列daos_hhash_link_lookupcrt_progress_cond(epa.eqx->eqx_ctx, timeout, eq_progress_cb, &epa)eq_progress_cbdfuse_launch_fuse(fs_handle, &args) 创建fuse文件系统fuse_session_new(args, &dfuse_ops, sizeof(dfuse_ops), fs_handle)fuse_session_mountdfuse_send_to_fgdfuse_loopdfuse_fs_fini

总结

  • DAOS的任务调度引擎结合事件队列和事件, 与网络上下文绑定完成抽象封装, 可作为项目第三方组件引入, 结合业务, 完成同步和异步任务调度(依赖任务处理,如多副本写, EC), 事件, 事件队列, 任务, 调度器, HASH表, SLAB, 各种运行队列, 完成队列, 完成回调队列, 延迟队列…, 可应对复杂的业务调度和管理需求
  • 一个文件系统绑定多个事件队列, IO打散到每个事件队列, 负载均衡
  • 全局HASH表结合cookie作为key, 快速捞回事件队列

参考

DAOS客户端API_事件和事件队列及任务调度引擎

晓兵

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles

weixin: ssbandjl

公众号: 云原生云

DAOS IO全路径详解(视频)
DAOS 项目简介(视频)
欢迎对DAOS, SPDK, RDMA等高性能技术感兴趣的朋友加入[DAOS技术交流(群)]

这篇关于DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/190281

相关文章

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专

Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)

《Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)》:本文主要介绍Python基于火山引擎豆包大模型搭建QQ机器人详细的相关资料,包括开通模型、配置APIKEY鉴权和SD... 目录豆包大模型概述开通模型付费安装 SDK 环境配置 API KEY 鉴权Ark 模型接口Prompt

Spring中Bean有关NullPointerException异常的原因分析

《Spring中Bean有关NullPointerException异常的原因分析》在Spring中使用@Autowired注解注入的bean不能在静态上下文中访问,否则会导致NullPointerE... 目录Spring中Bean有关NullPointerException异常的原因问题描述解决方案总结

python中的与时间相关的模块应用场景分析

《python中的与时间相关的模块应用场景分析》本文介绍了Python中与时间相关的几个重要模块:`time`、`datetime`、`calendar`、`timeit`、`pytz`和`dateu... 目录1. time 模块2. datetime 模块3. calendar 模块4. timeit

python-nmap实现python利用nmap进行扫描分析

《python-nmap实现python利用nmap进行扫描分析》Nmap是一个非常用的网络/端口扫描工具,如果想将nmap集成进你的工具里,可以使用python-nmap这个python库,它提供了... 目录前言python-nmap的基本使用PortScanner扫描PortScannerAsync异

Oracle数据库执行计划的查看与分析技巧

《Oracle数据库执行计划的查看与分析技巧》在Oracle数据库中,执行计划能够帮助我们深入了解SQL语句在数据库内部的执行细节,进而优化查询性能、提升系统效率,执行计划是Oracle数据库优化器为... 目录一、什么是执行计划二、查看执行计划的方法(一)使用 EXPLAIN PLAN 命令(二)通过 S

Python中的异步:async 和 await以及操作中的事件循环、回调和异常

《Python中的异步:async和await以及操作中的事件循环、回调和异常》在现代编程中,异步操作在处理I/O密集型任务时,可以显著提高程序的性能和响应速度,Python提供了asyn... 目录引言什么是异步操作?python 中的异步编程基础async 和 await 关键字asyncio 模块理论

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置