Intel TBB::Pipeline,按序处理数据

2023-12-06 10:38

本文主要是介绍Intel TBB::Pipeline,按序处理数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在上一篇文章(TBB:pipeline,软件流水线的威力)最后提出了几个问题,我们逐个来看看TBB::Pipeline是怎么解决的。

 

 

为什么Pipeline可以保证数据执行的顺序?既然TBB归根到底是通过多线程执行任务,为什么不会在读入先后两个字符串后,后读入的字符串先被下一个task处理?Pipeline里是不是有一个类似于FIFO 先进先出队列之类的东西?

 

 

之前曾经质疑过Pipeline的性能,甚至想自己用MultiThreading来模拟一个流水线,但很快就发现其中实现的难点。数据执行的顺序性就是其中之一。

 

假设以一个thread代表流水线上的一个节点,如果某节点是并发执行的,那么就需要2个以上的thread(A和B),上一节点处理完毕的顺序数据到底是先送给A还是B呢?处理完毕后后又该先将A还是B中的数据送到下一节点呢?即使可以人为的指定A和B之间的优先规则,由于thread本身被调度的不确定性,实际运行中还是有很多不可预知的困难。

 

流水线的一个显著特性就是保证每个数据均以相同的顺序流过每个节点。因此,TBB::Pipeline中的一个首要任务就是在节点被并发执行的同时,仍能够保证所处理的数据的次序而不需额外的处理代码。此外,在要求串行处理的节点,要保证即使排在前面的数据先被处理,即使排在后面的数据先到达。

 

Pipeline的中心思想就是以token来控制数据的处理顺序和流水线的深度。Pipeline::run函数中指定了token的最大值:

 

void pipeline::run( size_t max_number_of_live_tokens ) {}

 

 

每一个数据在进入Pipeline的时候都会按照先后顺序依次分配一个token,如line1处:

 

task* stage_task::execute() {

    __TBB_ASSERT( !my_at_start || !my_object, NULL );

    if( my_at_start ) {

        if( my_filter->is_serial() ) {

            if( (my_object = (*my_filter)(my_object)) ) {

                my_token = my_pipeline.token_counter++; //line1

                my_token_ready = true;

                ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );

                if( --my_pipeline.input_tokens>0 )

                    spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );

            } else {

                my_pipeline.end_of_input = true; //line2

                return NULL;

            }

...

}

 

如果当前流水线中的token全部用完了,那么暂时就不会处理新的数据,直到已进入Pipeline的数据被处理完毕有空闲的token(line2处)

 

仍然以TBB中的例子text_filter为例考虑,流水线为 MyInputFilter->MyTransformFilter->MyOutputFiler,MyInputFilter从磁盘上读取数据,MyTransformFilter转换成大写字母,MyOutputFilter将转换好的数据写入磁盘。因此,MyInputFilter节点和MyOutputFiler节点必须是串行执行,而MyTransformFilter可以并发执行。对于MyInputFilter读入的一串顺序数据,token依次为1->2->3,如何保证经过转换后数据也是以相同的顺序写入磁盘?

 

秘密在于TBB中的一个类tbb::internal::ordered_buffer,MyOutputFilter用它来保证按照token的顺序执行其队列中的数据,而不管数据进入队列的先后次序,换句话说,即使排在后面的数据token 2先被某个MyTransformFilter节点处理完毕送往MyOutputFilter,只要数据token 1没到达没被MyOutputFilter执行,数据2就不会在数据1之前先写入磁盘。每一个需要被串行处理的节点,都会有一个ordered_buffer类型的成员变量。

 

先看看ordered_buffer的定义:

 

//! A buffer of ordered items.

/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */

class ordered_buffer {

    typedef  Token  size_type;

 

    //! Array of deferred tasks that cannot yet start executing.

    /** Element is NULL if unused. */

    task** array; //数组,以顺序方式保存所有待处理的task

 

    //! Size of array

    /** Always 0 or a power of 2 */

    size_type array_size; //数组的尺寸

 

    //! Lowest token that can start executing.

    /** All prior Token have already been seen. */

    Token low_token; //当前正在处理的token,

 

    //! Serializes updates.

    spin_mutex array_mutex; //用于保护array并发访问的锁

};

 

仍然是在task* stage_task::execute() {

...

 if( ordered_buffer* input_buffer = my_filter->input_buffer ) {

            // The next filter must execute tokens in order.

            stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );

            clone.my_token = my_token;                       //token号

            clone.my_token_ready = my_token_ready;

            clone.my_object = my_object;                    //数据

            next = input_buffer->put_token(clone);//将task放入队列

        } else {

            /* A semi-hackish way to reexecute the same task object immediately without spawning.

               recycle_as_continuation marks the task for future execution,

               and then 'this' pointer is returned to bypass spawning. */

            recycle_as_continuation();

            next = this;

        }

    } else {

...

}

 

对于需要被串行处理的节点,使用ordered_buffer的put_token函数将相关的数据和task引用放入队列。put_token的实现是关键:

 

    template<typename StageTask>

    task* put_token( StageTask& putter ) {

        task* result = &putter;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            Token token = putter.next_token_number();

            if( token!=low_token ) {

                // Trying to put token that is beyond low_token.

                // Need to wait until low_token catches up before dispatching.

                result = NULL;

                __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );

                if( token-low_token>=array_size )

                    grow( token-low_token+1 );

                ITT_NOTIFY( sync_releasing, this );

                array[token&array_size-1] = &putter;

            }

        }

        return result;

    }

这个函数的实质是,首先取得下一个要处理的token,然后把待执行的task放到ordered_buffer的任务队列中的"合适位置",而low_token指向当前需要处理的token编号。

 

例如low_token=0,当前需要处理0号token,下一个token为1,因此task保存在array[1]处并处于阻塞状态,待0号token处理完毕后,low_token增加1,再从array数组中取出1号token对应的task进行处理。

 

Pipeline中是这样通知串行节点以处理好一条数据的:

还是在task* stage_task::execute() {

...

if( ordered_buffer* input_buffer = my_filter->input_buffer )

            input_buffer->note_done(my_token,*this);

...

}

 

看看note_done的实现会有一种大彻大悟的感觉!如果刚完成的token就是次序最优先的token(low_token),那取出下一个要执行的task,以spawn的方式让TBB的task scheduler来调度:

 

 

//! Note that processing of a token is finished.

    /** Fires up processing of the next token, if processing was deferred. */

    void note_done( Token token, task& spawner ) {

        task* wakee=NULL;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            if( token==low_token ) {

                // Wake the next task

                task*& item = array[++low_token & array_size-1];

                ITT_NOTIFY( sync_acquired, this );

                wakee = item;

                item = NULL;

            }

        }

        if( wakee ) {

            spawner.spawn(*wakee);

        }

    }

 

 

 

 

ordered_buffer是一个非常有趣的实现,相比于常见的用FIFO queue来实现线程间的数据传递,ordered_buffer可谓精巧。我们可以好好利用ordered_buffer的原理来进一步改进我们的代码。


关于作者:

softarts,曾任职于阿尔卡特-朗讯,Nokia,从事电信系统软件研发工作。研究兴趣:C++,多核计算,Linux


转载请注明出处

 

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/softarts/archive/2009/04/28/4134806.aspx

这篇关于Intel TBB::Pipeline,按序处理数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/461522

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言