Intel TBB::Pipeline,按序处理数据

2023-12-06 10:38

本文主要是介绍Intel TBB::Pipeline,按序处理数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在上一篇文章(TBB:pipeline,软件流水线的威力)最后提出了几个问题,我们逐个来看看TBB::Pipeline是怎么解决的。

 

 

为什么Pipeline可以保证数据执行的顺序?既然TBB归根到底是通过多线程执行任务,为什么不会在读入先后两个字符串后,后读入的字符串先被下一个task处理?Pipeline里是不是有一个类似于FIFO 先进先出队列之类的东西?

 

 

之前曾经质疑过Pipeline的性能,甚至想自己用MultiThreading来模拟一个流水线,但很快就发现其中实现的难点。数据执行的顺序性就是其中之一。

 

假设以一个thread代表流水线上的一个节点,如果某节点是并发执行的,那么就需要2个以上的thread(A和B),上一节点处理完毕的顺序数据到底是先送给A还是B呢?处理完毕后后又该先将A还是B中的数据送到下一节点呢?即使可以人为的指定A和B之间的优先规则,由于thread本身被调度的不确定性,实际运行中还是有很多不可预知的困难。

 

流水线的一个显著特性就是保证每个数据均以相同的顺序流过每个节点。因此,TBB::Pipeline中的一个首要任务就是在节点被并发执行的同时,仍能够保证所处理的数据的次序而不需额外的处理代码。此外,在要求串行处理的节点,要保证即使排在前面的数据先被处理,即使排在后面的数据先到达。

 

Pipeline的中心思想就是以token来控制数据的处理顺序和流水线的深度。Pipeline::run函数中指定了token的最大值:

 

void pipeline::run( size_t max_number_of_live_tokens ) {}

 

 

每一个数据在进入Pipeline的时候都会按照先后顺序依次分配一个token,如line1处:

 

task* stage_task::execute() {

    __TBB_ASSERT( !my_at_start || !my_object, NULL );

    if( my_at_start ) {

        if( my_filter->is_serial() ) {

            if( (my_object = (*my_filter)(my_object)) ) {

                my_token = my_pipeline.token_counter++; //line1

                my_token_ready = true;

                ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );

                if( --my_pipeline.input_tokens>0 )

                    spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );

            } else {

                my_pipeline.end_of_input = true; //line2

                return NULL;

            }

...

}

 

如果当前流水线中的token全部用完了,那么暂时就不会处理新的数据,直到已进入Pipeline的数据被处理完毕有空闲的token(line2处)

 

仍然以TBB中的例子text_filter为例考虑,流水线为 MyInputFilter->MyTransformFilter->MyOutputFiler,MyInputFilter从磁盘上读取数据,MyTransformFilter转换成大写字母,MyOutputFilter将转换好的数据写入磁盘。因此,MyInputFilter节点和MyOutputFiler节点必须是串行执行,而MyTransformFilter可以并发执行。对于MyInputFilter读入的一串顺序数据,token依次为1->2->3,如何保证经过转换后数据也是以相同的顺序写入磁盘?

 

秘密在于TBB中的一个类tbb::internal::ordered_buffer,MyOutputFilter用它来保证按照token的顺序执行其队列中的数据,而不管数据进入队列的先后次序,换句话说,即使排在后面的数据token 2先被某个MyTransformFilter节点处理完毕送往MyOutputFilter,只要数据token 1没到达没被MyOutputFilter执行,数据2就不会在数据1之前先写入磁盘。每一个需要被串行处理的节点,都会有一个ordered_buffer类型的成员变量。

 

先看看ordered_buffer的定义:

 

//! A buffer of ordered items.

/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */

class ordered_buffer {

    typedef  Token  size_type;

 

    //! Array of deferred tasks that cannot yet start executing.

    /** Element is NULL if unused. */

    task** array; //数组,以顺序方式保存所有待处理的task

 

    //! Size of array

    /** Always 0 or a power of 2 */

    size_type array_size; //数组的尺寸

 

    //! Lowest token that can start executing.

    /** All prior Token have already been seen. */

    Token low_token; //当前正在处理的token,

 

    //! Serializes updates.

    spin_mutex array_mutex; //用于保护array并发访问的锁

};

 

仍然是在task* stage_task::execute() {

...

 if( ordered_buffer* input_buffer = my_filter->input_buffer ) {

            // The next filter must execute tokens in order.

            stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );

            clone.my_token = my_token;                       //token号

            clone.my_token_ready = my_token_ready;

            clone.my_object = my_object;                    //数据

            next = input_buffer->put_token(clone);//将task放入队列

        } else {

            /* A semi-hackish way to reexecute the same task object immediately without spawning.

               recycle_as_continuation marks the task for future execution,

               and then 'this' pointer is returned to bypass spawning. */

            recycle_as_continuation();

            next = this;

        }

    } else {

...

}

 

对于需要被串行处理的节点,使用ordered_buffer的put_token函数将相关的数据和task引用放入队列。put_token的实现是关键:

 

    template<typename StageTask>

    task* put_token( StageTask& putter ) {

        task* result = &putter;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            Token token = putter.next_token_number();

            if( token!=low_token ) {

                // Trying to put token that is beyond low_token.

                // Need to wait until low_token catches up before dispatching.

                result = NULL;

                __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );

                if( token-low_token>=array_size )

                    grow( token-low_token+1 );

                ITT_NOTIFY( sync_releasing, this );

                array[token&array_size-1] = &putter;

            }

        }

        return result;

    }

这个函数的实质是,首先取得下一个要处理的token,然后把待执行的task放到ordered_buffer的任务队列中的"合适位置",而low_token指向当前需要处理的token编号。

 

例如low_token=0,当前需要处理0号token,下一个token为1,因此task保存在array[1]处并处于阻塞状态,待0号token处理完毕后,low_token增加1,再从array数组中取出1号token对应的task进行处理。

 

Pipeline中是这样通知串行节点以处理好一条数据的:

还是在task* stage_task::execute() {

...

if( ordered_buffer* input_buffer = my_filter->input_buffer )

            input_buffer->note_done(my_token,*this);

...

}

 

看看note_done的实现会有一种大彻大悟的感觉!如果刚完成的token就是次序最优先的token(low_token),那取出下一个要执行的task,以spawn的方式让TBB的task scheduler来调度:

 

 

//! Note that processing of a token is finished.

    /** Fires up processing of the next token, if processing was deferred. */

    void note_done( Token token, task& spawner ) {

        task* wakee=NULL;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            if( token==low_token ) {

                // Wake the next task

                task*& item = array[++low_token & array_size-1];

                ITT_NOTIFY( sync_acquired, this );

                wakee = item;

                item = NULL;

            }

        }

        if( wakee ) {

            spawner.spawn(*wakee);

        }

    }

 

 

 

 

ordered_buffer是一个非常有趣的实现,相比于常见的用FIFO queue来实现线程间的数据传递,ordered_buffer可谓精巧。我们可以好好利用ordered_buffer的原理来进一步改进我们的代码。


关于作者:

softarts,曾任职于阿尔卡特-朗讯,Nokia,从事电信系统软件研发工作。研究兴趣:C++,多核计算,Linux


转载请注明出处

 

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/softarts/archive/2009/04/28/4134806.aspx

这篇关于Intel TBB::Pipeline,按序处理数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/461522

相关文章

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下