本文主要是介绍Intel TBB 开发指南 4 Parallelizing Complex Loops,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
原文
你可以仅使用并行化简单循环部分中的构造成功地并行化许多应用程序。 但是,某些情况需要其他并行模式。 本节描述了对其中一些替代模式的支持。
Cook Until Done: parallel_for_each
对于某些循环,迭代空间的结束是事先不知道的,或者循环体可能会在循环退出之前添加更多的迭代要做。 你可以使用模板类 oneapi::tbb::parallel_for_each
处理这两种情况。
链表是事先未知的迭代空间的一个例子。 在并行编程中,通常最好使用动态数组而不是链表,因为访问链表中的项目本质上是串行的。 但是如果你仅限于链表,项目可以安全地并行处理,并且处理每个项目至少需要几千条指令,你可以使用parallel_for_each
来获得一些并行性。
例如,考虑以下串行代码:
void SerialApplyFooToList( const std::list<Item>& list ) {for( std::list<Item>::const_iterator i=list.begin() i!=list.end(); ++i )Foo(*i);
}
如果 Foo
至少需要运行几千条指令,你可以通过将循环转换为使用 parallel_for_each
来获得并行加速。 为此,请使用 const
限定的 operator()
定义一个对象。 这类似于来自 C++ 标准头文件 <functional>
的 C++ 函数对象,不同之处是 operator()
必须是 const
。
class ApplyFoo {
public:void operator()( Item& item ) const {Foo(item);}
};
SerialApplyFooToList
的并行形式如下:
void ParallelApplyFooToList( const std::list<Item>& list ) {parallel_for_each( list.begin(), list.end(), ApplyFoo() );
}
调用 parallel_for_each
永远不会导致两个线程同时作用于输入迭代器。 因此,顺序程序的输入迭代器的典型定义可以正常工作。 这种便利使得 parallel_for_each
不可扩展,因为工作内容的获取是串行的。 但是在许多情况下,与按顺序执行操作相比,你仍然可以获得有用的加速。
parallel_for_each
可以通过两种方式可扩展地获取工作。
- 迭代器可以是随机访问迭代器。
parallel_for_each
的body
参数,如果它需要一个parallel_for_each<Item>&
类型的第二个参数feeder
,可以通过调用feeder.add(item)
添加更多工作。 例如,假设处理树中的节点是处理其后代的先决条件。 使用parallel_for_each
,处理完一个节点后,你可以使用feeder.add
添加后代节点。 在处理完所有项目之前,parallel_for_each
的实例不会终止。
在装配线上工作:parallel_pipeline
流水线是模仿传统制造装配线的常见并行模式。 数据流经一系列管道过滤器,每个过滤器都以某种方式处理数据。 给定传入的数据流,其中一些过滤器可以并行运行,而另一些则不能。 例如,在视频处理中,对帧的某些操作不依赖于其他帧,因此可以同时对多个帧进行。 另一方面,对帧的一些操作需要先处理先前的帧。
oneTBB 类 parallel_pipeline
和过滤器实现了管道模式。 我们将使用一个简单的文本处理示例来演示 parallel_pipeline
和过滤器执行并行格式化的用法。 该示例读取一个文本文件,将文本中的每个十进制数字平方,然后将修改后的文本写入一个新文件。 下面是管道的图片。
警告:
由于提供给parallel_pipline
过滤器的主体对象可能会被复制,因此其operator()
不应修改主体。 否则,修改可能对调用parallel_pipeline
的线程可见,也可能不可见,具体取决于operator()
是作用于原始文件还是副本。 作为这种细微差别的提醒,parallel_pipeline
要求将主体对象的operator()
声明为const
。
从输入文件读取块 => 对块中数字求平方 => 将块写入输出文件
假设原始文件 I/O 是顺序的。 求平方的过滤器可以并行完成。 也就是说,如果你可以非常快速地串行读取 n 个块,则可以并行转换 n 个块中的每一个,只要它们以正确的顺序写入输出文件即可。 虽然原始 I/O 是顺序的,但输入和输出的格式可以移动到中间过滤器,从而是并行的。
为了分摊并行调度开销,过滤器对文本块进行操作。 每个输入块大约有 4000 个字符。 每个块由 TextSlice
类的一个实例表示:
// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the C++ declarationrepresents only the header of a much larger object in memory. */
class TextSlice {// Pointer to one past last character in sequencechar* logical_end;// Pointer to one past last available byte in sequence.char* physical_end;
public:// Allocate a TextSlice object that can hold up to max_size characters.static TextSlice* allocate( size_t max_size ) {// +1 leaves room for a terminating null character.TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate( sizeof(TextSlice)+max_size+1 );t->logical_end = t->begin();t->physical_end = t->begin()+max_size;return t;}// Free this TextSlice objectvoid free() {oneapi::tbb::tbb_allocator<char>().deallocate((char*)this, sizeof(TextSlice)+(physical_end-begin())+1);}// Pointer to beginning of sequencechar* begin() {return (char*)(this+1);}// Pointer to one past last character in sequencechar* end() {return logical_end;}// Length of sequencesize_t size() const {return logical_end-(char*)(this+1);}// Maximum number of characters that can be appended to sequencesize_t avail() const {return physical_end-logical_end;}// Append sequence [first,last) to this sequence.void append( char* first, char* last ) {memcpy( logical_end, first, last-first );logical_end += last-first;}// Set end() to given value.void set_end( char* p ) {logical_end=p;}
};
下面是用于构建和运行管道的顶级代码。 TextSlice
对象使用指针在过滤器之间传递,以避免复制 TextSlice
的开销。
void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {oneapi::tbb::parallel_pipeline(ntoken,oneapi::tbb::make_filter<void,TextSlice*>(oneapi::tbb::filter::serial_in_order, MyInputFunc(input_file) )&oneapi::tbb::make_filter<TextSlice*,TextSlice*>(oneapi::tbb::filter::parallel, MyTransformFunc() )&oneapi::tbb::make_filter<TextSlice*,void>(oneapi::tbb::filter::serial_in_order, MyOutputFunc(output_file) ) );
}
方法parallel_pipeline
的参数ntoken
控制并行度。从概念上讲,令牌(token)流经管道。在串行有序过滤器中,每个令牌必须按顺序串行处理。在并行过滤器中,过滤器可以并行处理多个令牌。如果token数量没有限制,可能会出现中间的无序过滤器不断获取token的问题,因为输出过滤器跟不上。这种情况通常会导致中间过滤器产生不希望的资源消耗。方法 parallel_pipeline
的参数指定可以运行中的最大令牌数。一旦达到此限制,管道将永远不会在输入过滤器处创建新令牌,直到另一个令牌在输出过滤器处被销毁。
第二个参数指定过滤器的顺序。每个过滤器由函数 make_filter<inputType, outputType>(mode,functor)
构造。
inputType
指定过滤器输入的值的类型。对于输入过滤器,类型为void
。outputType
指定过滤器输出的值的类型。对于输出过滤器,类型为void
。mode
指定过滤器是并行处理、按顺序串行还是按顺序乱序处理项目。functor
指定如何从输入值产生输出值。
过滤器与 operator&
连接。连接两个过滤器时,第一个过滤器的 outputType
必须与第二个过滤器的 inputType
匹配。
过滤器可以提前构建和连接。执行此操作的前一个示例的等效版本如下:
void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {oneapi::tbb::filter<void,TextSlice*> f1( oneapi::tbb::filter::serial_in_order,MyInputFunc(input_file) );oneapi::tbb::filter<TextSlice*,TextSlice*> f2(oneapi::tbb::filter::parallel,MyTransformFunc() );oneapi::tbb::filter<TextSlice*,void> f3(oneapi::tbb::filter::serial_in_order,MyOutputFunc(output_file) );oneapi::tbb::filter<void,void> f = f1 & f2 & f3;oneapi::tbb::parallel_pipeline(ntoken,f);
}
在此示例中,输入过滤器必须是 serial_in_order
,因为过滤器从顺序文件中读取块,而输出过滤器必须以相同的顺序写入块。 所有 serial_in_order
过滤器都以相同的顺序处理项目。 因此,如果一个项目按照 MyInputFunc
建立的顺序到达 MyOutputFunc
,管道会自动延迟对该项目调用 MyOutputFunc::operator()
直到它的前辈被处理。 还有另一种串行过滤器,serial_out_of_order
,它不保留顺序。
中间过滤器对纯本地数据进行操作。 因此,它的 functor 的任意数量的调用都可以同时运行。 因此,它被指定为并行过滤器。
现在详细解释每个过滤器的 functor。 输出 functor 是最简单的。 它所要做的就是将 TextSlice
写入文件并释放 TextSlice
。
// Functor that writes a TextSlice to a file.
class MyOutputFunc {FILE* my_output_file;
public:MyOutputFunc( FILE* output_file );void operator()( TextSlice* item ) const;
};MyOutputFunc::MyOutputFunc( FILE* output_file ) :my_output_file(output_file)
{
}void MyOutputFunc::operator()( TextSlice* out ) const {size_t n = fwrite( out->begin(), 1, out->size(), my_output_file );if( n!=out->size() ) {fprintf(stderr,"Can't write into file '%s'\n", OutputFileName);exit(1);}out->free();
}
方法 operator()
处理一个 TextSlice
。 参数out
指向要处理的TextSlice
。 由于它用于管道的最后一个过滤器,因此它返回 void
。
中间过滤器的 functor 类似,但稍微复杂一些。 它返回一个指向它产生的 TextSlice
的指针。
// Functor that changes each decimal number to its square.
class MyTransformFunc {
public:TextSlice* operator()( TextSlice* input ) const;
};TextSlice* MyTransformFunc::operator()( TextSlice* input ) const {// Add terminating null so that strtol works right even if number is at end of the input.*input->end() = '\0';char* p = input->begin();TextSlice* out = TextSlice::allocate( 2*MAX_CHAR_PER_INPUT_SLICE );char* q = out->begin();for(;;) {while( p<input->end() && !isdigit(*p) )*q++ = *p++;if( p==input->end() )break;long x = strtol( p, &p, 10 );// Note: no overflow checking is needed here, as we have twice the// input string length, but the square of a non-negative integer n// cannot have more than twice as many digits as n.long y = x*x;sprintf(q,"%ld",y);q = strchr(q,0);}out->set_end(q);input->free();return out;
}
输入 functor 是最复杂的,因为它必须确保没有数字跨越边界。 当它发现可能是进入下一个切片的数字时,它会将部分数字复制到下一个切片。 此外,它必须指示何时到达输入的结尾。 它通过对 flow_control
类型的特殊参数调用方法 stop()
来实现这一点。 用于管道的第一个过滤器的任何 functor 都需要此习语。
TextSlice* next_slice = NULL;class MyInputFunc {
public:MyInputFunc( FILE* input_file_ );MyInputFunc( const MyInputFunc& f ) : input_file(f.input_file) { }~MyInputFunc();TextSlice* operator()( oneapi::tbb::flow_control& fc ) const;
private:FILE* input_file;
};MyInputFunc::MyInputFunc( FILE* input_file_ ) :input_file(input_file_) { }MyInputFunc::~MyInputFunc() {
}TextSlice* MyInputFunc::operator()( oneapi::tbb::flow_control& fc ) const {// Read characters into space that is available in the next slice.if( !next_slice )next_slice = TextSlice::allocate( MAX_CHAR_PER_INPUT_SLICE );size_t m = next_slice->avail();size_t n = fread( next_slice->end(), 1, m, input_file );if( !n && next_slice->size()==0 ) {// No more characters to processfc.stop();return NULL;} else {// Have more characters to process.TextSlice* t = next_slice;next_slice = TextSlice::allocate( MAX_CHAR_PER_INPUT_SLICE );char* p = t->end()+n;if( n==m ) {// Might have read partial number.// If so, transfer characters of partial number to next slice.while( p>t->begin() && isdigit(p[-1]) )--p;assert(p>t->begin(),"Number too large to fit in buffer.\n");next_slice->append( p, t->end()+n );}t->set_end(p);return t;}
}
必须定义复制构造函数,因为在从 functor 构建底层 oneapi::tbb::filter_t
时会复制 functor,并在管道运行时再次复制。
Using Circular Buffers
循环缓冲区有时可用于最小化分配和释放管道过滤器之间传递的项目的开销。 如果创建项目的第一个过滤器和消费项目的最后一个过滤器都是 serial_in_order
,则可以通过大小至少为 ntoken
的循环缓冲区分配和释放项目,其中 ntoken
是 parallel_pipeline
的第一个参数。 在这些情况下,无需检查物品是否仍在使用中。
这样做的原因是最多 ntoken
项目在运行,并且项目将按照它们被分配的顺序被释放。 因此,当循环缓冲区环绕以重新分配项目时,该项目必须已从其先前在管道中的使用中释放出来。 如果第一个和最后一个过滤器不是 serial_in_order
,那么你必须跟踪当前正在使用的缓冲区,因为缓冲区可能不会按照分配的顺序被释放。
Throughput of pipeline
管道的吞吐量是令牌流过它的速率,并受两个约束条件的限制。首先,如果一个管道用 N 个令牌运行,那么显然并行运行的操作不能超过 N 个。选择正确的 N 值可能涉及一些实验。太低的值会限制并行性;太高的值可能需要太多的资源(例如,更多的缓冲区)。其次,流水线的吞吐量受限于最慢的顺序过滤器的吞吐量。即使对于没有并行过滤器的管道也是如此。无论其他过滤器有多快,最慢的顺序过滤器都是瓶颈。因此,一般而言,你应该尽量保持顺序过滤器的速度,并在可能的情况下将工作转移到并行过滤器上。
文本处理示例的加速相对较差,因为串行过滤器受到系统 I/O 速度的限制。事实上,即使文件位于本地磁盘上,你也不太可能看到超过 2 倍的加速。要真正从管道中受益,与串行过滤器相比,并行过滤器需要做一些繁重的工作。
每个令牌的窗口大小或子问题大小也可以限制吞吐量。使窗口太小可能会导致开销控制有用的工作。使窗口太大可能会导致它们溢出缓存。一个好的指导方针是尝试仍然适合缓存的大窗口大小。你可能需要进行一些试验才能找到合适的窗口大小。
非线性管道
模板函数 parallel_pipeline
仅支持线性管道。 它不直接处理更巴洛克式的管道(Baroque plumbing),如下图所示。
但是,你仍然可以为此使用管道。 只需将过滤器拓扑排序为线性顺序,如下所示:
浅灰色箭头是原始箭头,现在由其他箭头的传递闭包隐含。 通过在过滤器上强制执行线性顺序,似乎会丢失很多并行性,但实际上唯一的损失是管道的延迟,而不是吞吐量。 延迟是令牌从管道的开始流到结束所花费的时间。 给定足够数量的处理器,原始非线性流水线的延迟是三个过滤器。 这是因为过滤器 A 和 B 可以同时处理令牌,同样过滤器 D 和 E 可以同时处理令牌。
在线性流水线中,整个时延由五个过滤器构成。 上述过滤器 A、B、D 和 E 的行为可能需要修改,以便正确处理不需要过滤器操作的对象,而不是传递到管道中的下一个过滤器。
吞吐量保持不变,因为无论拓扑如何,吞吐量仍然受到最慢串行滤波器吞吐量的限制。 如果parallel_pipeline
支持非线性管道,会增加很多编程复杂度,并不会提高吞吐量。 parallel_pipeline
的线性限制是收益与痛苦的良好权衡。
Summary of Loops and Pipelines
oneTBB 中的高级循环和管道模板为你提供高效的可扩展方式来利用多核芯片的强大功能,而无需从头开始。 它们让你可以在高任务模式级别设计软件,而不必担心线程的低级别操作。 因为它们是通用的,所以你可以根据你的特定需求对其进行自定义。 使用这些模板来释放多核的力量,玩得开心。
这篇关于Intel TBB 开发指南 4 Parallelizing Complex Loops的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!