DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析

本文主要是介绍DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

事件和事件队列

DAOS API 函数可以在阻塞或非阻塞模式下使用。 这是通过传递给每个 API 调用的指向 DAOS 事件的指针来确定的:如果 NULL 表示操作将被阻塞。 操作完成后会返回。 所有失败情况的错误码都将通过API函数本身的返回码返回。 如果使用有效的事件,则该操作将以非阻塞模式运行,并在内部调度程序中调度该操作以及将 RPC 提交到底层堆栈后立即返回。 如果调度成功,则操作的返回值为success,但并不表示实际操作成功。 返回时可以捕获的错误要么是无效参数,要么是调度问题。 当事件完成时,操作的实际返回代码将在事件错误代码 (event.ev_error) 中提供。 必须首先通过单独的 API 调用创建要使用的有效事件。 为了允许用户一次跟踪多个事件,可以将事件创建为事件队列的一部分,事件队列基本上是可以一起进行和轮询的事件的集合。 事件队列还在内部为所有 DAOS 任务创建一个单独的任务调度程序以及一个新的网络上下文。 在某些网络提供商上,网络上下文创建是一项昂贵的操作,因此用户应尝试限制在 DAOS 之上的应用程序或 IO 中间件库中创建的事件队列的数量。 或者,可以在没有事件队列的情况下创建事件,并单独跟踪。 在这种情况下,对于阻塞操作,将使用内部全局任务调度程序和网络上下文来代替为事件队列创建的独立任务调度程序和网络上下文。 事件完成后,它可以重新用于另一个 DAOS API 调用,以最大限度地减少 DAOS 库内事件创建和分配的需要

DAOS Task API 提供了一种以非阻塞方式使用 DAOS API 的替代方法,同时在 DAOS API 操作之间构建任务依赖树。 这对于使用 DAOS 并需要构建彼此之间具有依赖关系(N-1、1-N、N-N)的 DAOS 操作计划的应用程序和中间件库非常有用

要利用任务 API,用户需要创建一个调度程序,其中可以创建 DAOS 任务作为其中的一部分。 任务 API 足够通用,允许用户混合 DAOS 特定任务(通过 DAOS 任务 API)和其他用户定义的任务,并在这些任务之间添加依赖关系

有关如何在客户端库中使用 TSE 的更多详细信息,请参阅 TSE 内部文档(https://github.com/ssbandjl/daos/blob/master/src/common/README.md)以获取更多详细信息

事件与事件队列及任务调度引擎流程图

在这里插入图片描述

流程说明(dfuse为例)

以DAOS用户态文件系统dfuse为例

  • 在初始化客户端库中初始化事件队列, 关联全局网络上下文, 设置调度器

  • 启动文件系统中注册了SLAB, 绑定事件队列和事件, 参考: daos_event_init

  • 开启轮训线程dfuse_progress_thread, 参考daos_eq_poll

  • 文件系统执行写

    客户端写数据:xb/write.c -> write(fd, direct_write_buf, BUF_SIZE)
    write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3
    
  • 封装ev, 并将ev传下去: dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev)

    DAOS用户态文件系统, 写流程
    master -> src/client/dfuse/ops/write.c -> dfuse_cb_write(fuse_req_t req, fuse_ino_t ino, struct fuse_bufvec *bufv, off_t position, struct fuse_file_info *fi)
    struct dfuse_projection_info *fs_handle = fuse_req_userdata(req)
    eqt_idx = atomic_fetch_add_relaxed(&fs_handle->di_eqt_idx, 1) -> 原子递增,每次返回+1前的值, 比如: eqt_idx=7
    eqt = &fs_handle->di_eqt[eqt_idx % fs_handle->di_eq_count] -> 取余打散到eq
    ev = d_slab_acquire(eqt->de_write_slab) -> 分配EV, 需要提前注册: d_slab_register(&fs_handle->di_slab, &write_slab, eqt, &eqt->de_write_slab)
    ev->de_complete_cb = dfuse_cb_write_complete
    dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev)daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev) -> daos_array_write(daos_handle_t oh, daos_handle_t th, daos_array_iod_t *iod, d_sg_list_t *sgl, daos_event_t *ev)dc_task_create(dc_array_write, NULL, ev, &task) -> 关联EV和tasksched = daos_ev2sched(ev) -> 拿到调度器指针, 初始化调度器return dc_task_schedule(task, true)
    sem_post(&eqt->de_sem) -> 唤醒EQ
    d_slab_restock(eqt->de_write_slab) -> 重用slab
    
  • 与tse结合构造task, 调度task

  • 网络回复后, 在轮训线程中trigger到, 拿到ev和task, 逐层向上级执行回调函数, 最终执行业务回调

源码分析

客户端mount, master, gdb --args /opt/daos/bin/dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb -f -> 默认后台启动
dfuse -m /mnt/sxb --pool sxb --cont sxb | dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb
dfuse_main.c -> maindaos_debug_init(DAOS_LOG_DEFAULT)d_log_init_adv 高级日志初始化, 客户端日志文件log_file = getenv(D_LOG_FILE_ENV) export D_LOG_FILE=/tmp/daos_client.logdebug_prio_err_load_envd_log_openfreopen(mst.log_file 重新关联标准输出或错误输出setlinebuf(stderr) 设置错误输出为行缓冲d_log_sync_maskdfuse_info->di_eq_count = 1daos_init -> 初始化客户端库daos_debug_initdaos_hhash_init_featsdc_agent_initdc_job_initdc_mgmt_net_cfgdaos_eq_lib_init -> 初始化事件队列库 -> static tse_sched_t daos_sched_g -> 指向不属于 EQ 一部分的事件的全局调度程序的指针。 作为 EQ 一部分初始化的事件将在该 EQ 调度程序中进行跟踪crt_init_optcrt_context_create(&daos_eq_ctx) -> 全局共享网络上下文, 所有事件队列(eq)共享使用这个上下文tse_sched_init(&daos_sched_g, NULL, daos_eq_ctx) -> 初始化调度器(无事件队列), 为无eq事件设置调度器dc_mgmt_net_cfg_checkpl_initdc_mgmt_initdc_pool_initdc_cont_initdc_obj_initdfuse_fs_init(dfuse_info) -> daos用户态文件信息=文件系统控制器D_ALLOC_ARRAY(fs_handle->di_eqt, fs_handle->di_eq_count) -> eq数组d_hash_table_create_inplace dpi_pool_table 打开的池表, 创建hash表 大小=power2(n次方), 操作方法dpi_iet open inodesfor (i = 0; i < fs_handle->di_eq_count; i++)struct dfuse_eq *eqt = &fs_handle->di_eqt[i] -> 根据传入的EQ数量, 将eq与文件系统句柄中的eq表绑定eqt->de_handle = fs_handle -> 互存指针,双向绑定sem_init(&eqt->de_sem, 0, 0) -> 在 eq 之前创建信号量,因为无法检查 sem_init() 是否已被调用,如果没有调用 sem_destroy 也是无效的。 这样我们就可以避免添加额外的内存来跟踪信号量的状态daos_eq_create(&eqt->de_eq) -> 一个事件队列关联一个网络上下文, 跟踪池的多个事件 -> 创建事件队列。 事件队列用于保存和池化多个事件。 创建的每个事件队列都将创建一个与事件队列关联的网络(cart)上下文。 网络上下文创建是一项昂贵的操作,并且在某些系统上网络上下文的数量可能受到限制。 因此,建议不要在用户应用程序或中间件中创建大量事件队列eq = daos_eq_alloc() -> 分配eqD_INIT_LIST_HEAD(&eq->eq_running)D_INIT_LIST_HEAD(&eq->eq_comp)daos_hhash_hlink_init(&eqx->eqx_hlink, &eq_h_ops)return eqcrt_context_create(&eqx->eqx_ctx)crt_contpext_provider_createcrt_context_initdaos_eq_insert(eqx)daos_hhash_link_insert(&eqx->eqx_hlink, DAOS_HTYPE_EQ) -> 插入全局hash表(struct daos_hhash_table	daos_ht)daos_eq_handle(eqx, eqh)daos_hhash_link_key(&eqx->eqx_hlink, &h->cookie) -> 关联keytse_sched_init(&eqx->eqx_sched, NULL, eqx->eqx_ctx) -> 初始化调度器 -> struct tse_sched_private -> 调度器及队列,属性等struct tse_sched_private	*dsp = tse_sched2priv(sched) -> 设置调度器私有指针dspD_INIT_LIST_HEAD(&dsp->dsp_init_list); -> 初始队列D_INIT_LIST_HEAD(&dsp->dsp_running_list); -> 运行队列D_INIT_LIST_HEAD(&dsp->dsp_complete_list) -> 完成队列D_INIT_LIST_HEAD(&dsp->dsp_sleeping_list) -> 睡眠队列D_INIT_LIST_HEAD(&dsp->dsp_comp_cb_list); -> 完成回调队列tse_sched_register_comp_cb(sched, comp_cb, udata) -> 初始回调为空dsc->dsc_comp_cb = comp_cb -> 设置调度器的完成回调和回调参数(udata)d_list_add(&dsc->dsc_list, &dsp->dsp_comp_cb_list) -> 将调度器的完成回调 -> 调度器的完成回调队列sched->ds_udata = udata -> 将网络上下文 daos_eq_ctx 设置到调度器的用户数据指针上(也可以是回调数据等)daos_eq_putref(eqx) -> 减一次引用计数(ch_rec_decref)duns_resolve_pathdfuse_pool_connectdfuse_cont_opendfuse_fs_start 启动文件系统d_hash_rec_insert(&fs_handle->dpi_iet 将根插入hash表, 在 dfuse_reply_entry 中也会插入: d_hash_rec_find_insert(&fs_handle->dpi_ietd_slab_init(&fs_handle->di_slab, fs_handle)for (i = 0; i < fs_handle->di_eq_count; i++)d_slab_register(&fs_handle->di_slab, &read_slab, eqt, &eqt->de_read_slab)create_many(type)ptr   = create(type) -> create(struct d_slab_type *type)type->st_reg.sr_init(ptr, type->st_arg) -> dfuse_event_init -> ev->de_eqt = handle -> 为ev绑定daos_event_tif (!type->st_reg.sr_reset(ptr)) -> dfuse_read_event_reset(void *arg) -> 重置读事件evD_ALLOC(ev->de_iov.iov_buf, DFUSE_MAX_READ) -> 读最大1MBev->de_sgl.sg_nr       = 1daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL) -> 父事件为空, 也支持父事件,: daos_event_init(child_events[i], DAOS_HDL_INVAL, &event)evx->evx_status	= DAOS_EVS_READYif (daos_handle_is_valid(eqh)) -> 句柄有效eqx = daos_eq_lookup(eqh)evx->evx_ctx = eqx->eqx_ctxevx->evx_sched = &eqx->eqx_sched -> 继承EQ的网络和调度器entry = ptr + type->st_reg.sr_offsetd_list_add_tail(entry, &type->st_free_list) -> 将对象加入空闲列表,计数器+1type->st_free_count++d_list_add_tail(&type->st_type_list, &slab->slab_list) -> 将slab放入列表备用for (i = 0; i < fs_handle->di_eq_count; i++) dfuse_progress_thread pthread_create(&fs_handle->dpi_thread, NULL, dfuse_progress_thread, fs_handle) 异步进度线程,该线程在启动时使用事件队列启动,并阻塞在信号量上,直到创建异步事件,此时线程唤醒并在 daos_eq_poll() 中忙于轮询直到完成sem_waitdaos_eq_poll  从 EQ 中检索完成事件daos_eq_lookup 查找私有事件队列daos_hhash_link_lookupcrt_progress_cond(epa.eqx->eqx_ctx, timeout, eq_progress_cb, &epa)eq_progress_cbdfuse_launch_fuse(fs_handle, &args) 创建fuse文件系统fuse_session_new(args, &dfuse_ops, sizeof(dfuse_ops), fs_handle)fuse_session_mountdfuse_send_to_fgdfuse_loopdfuse_fs_fini

总结

  • DAOS的任务调度引擎结合事件队列和事件, 与网络上下文绑定完成抽象封装, 可作为项目第三方组件引入, 结合业务, 完成同步和异步任务调度(依赖任务处理,如多副本写, EC), 事件, 事件队列, 任务, 调度器, HASH表, SLAB, 各种运行队列, 完成队列, 完成回调队列, 延迟队列…, 可应对复杂的业务调度和管理需求
  • 一个文件系统绑定多个事件队列, IO打散到每个事件队列, 负载均衡
  • 全局HASH表结合cookie作为key, 快速捞回事件队列

参考

DAOS客户端API_事件和事件队列及任务调度引擎

晓兵

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles

weixin: ssbandjl

公众号: 云原生云

DAOS IO全路径详解(视频)
DAOS 项目简介(视频)
欢迎对DAOS, SPDK, RDMA等高性能技术感兴趣的朋友加入[DAOS技术交流(群)]

这篇关于DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/190281

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

禁止平板,iPad长按弹出默认菜单事件

通过监控按下抬起时间差来禁止弹出事件,把以下代码写在要禁止的页面的页面加载事件里面即可     var date;document.addEventListener('touchstart', event => {date = new Date().getTime();});document.addEventListener('touchend', event => {if (new

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号