在上一篇文章(TBB:pipeline,軟件流水線的威力)最后提出了幾個(gè)問(wèn)題,我們逐個(gè)來(lái)看看TBB::Pipeline是怎么解決的。
為什么Pipeline可以保證數(shù)據(jù)執(zhí)行的順序?既然TBB歸根到底是通過(guò)多線程執(zhí)行任務(wù),為什么不會(huì)在讀入先后兩個(gè)字符串后,后讀入的字符串先被下一個(gè)task處理?Pipeline里是不是有一個(gè)類(lèi)似于FIFO 先進(jìn)先出隊(duì)列之類(lèi)的東西?
之前曾經(jīng)質(zhì)疑過(guò)Pipeline的性能,甚至想自己用MultiThreading來(lái)模擬一個(gè)流水線,但很快就發(fā)現(xiàn)其中實(shí)現(xiàn)的難點(diǎn)。數(shù)據(jù)執(zhí)行的順序性就是其中之一。
假設(shè)以一個(gè)thread代表流水線上的一個(gè)節(jié)點(diǎn),如果某節(jié)點(diǎn)是并發(fā)執(zhí)行的,那么就需要2個(gè)以上的thread(A和B),上一節(jié)點(diǎn)處理完畢的順序數(shù)據(jù)到底是先送給A還是B呢?處理完畢后后又該先將A還是B中的數(shù)據(jù)送到下一節(jié)點(diǎn)呢?即使可以人為的指定A和B之間的優(yōu)先規(guī)則,由于thread本身被調(diào)度的不確定性,實(shí)際運(yùn)行中還是有很多不可預(yù)知的困難。
流水線的一個(gè)顯著特性就是保證每個(gè)數(shù)據(jù)均以相同的順序流過(guò)每個(gè)節(jié)點(diǎn)。因此,TBB::Pipeline中的一個(gè)首要任務(wù)就是在節(jié)點(diǎn)被并發(fā)執(zhí)行的同時(shí),仍能夠保證所處理的數(shù)據(jù)的次序而不需額外的處理代碼。此外,在要求串行處理的節(jié)點(diǎn),要保證即使排在前面的數(shù)據(jù)先被處理,即使排在后面的數(shù)據(jù)先到達(dá)。
Pipeline的中心思想就是以token來(lái)控制數(shù)據(jù)的處理順序和流水線的深度。Pipeline::run函數(shù)中指定了token的最大值:
void pipeline::run( size_t max_number_of_live_tokens ) {}
每一個(gè)數(shù)據(jù)在進(jìn)入Pipeline的時(shí)候都會(huì)按照先后順序依次分配一個(gè)token,如line1處:
task* stage_task::execute() {
__TBB_ASSERT( !my_at_start || !my_object, NULL );
if( my_at_start ) {
if( my_filter->is_serial() ) {
if( (my_object = (*my_filter)(my_object)) ) {
my_token = my_pipeline.token_counter++; //line1
my_token_ready = true;
ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
if( --my_pipeline.input_tokens>0 )
spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );
} else {
my_pipeline.end_of_input = true; //line2
return NULL;
}
...
}
如果當(dāng)前流水線中的token全部用完了,那么暫時(shí)就不會(huì)處理新的數(shù)據(jù),直到已進(jìn)入Pipeline的數(shù)據(jù)被處理完畢有空閑的token(line2處)
仍然以TBB中的例子text_filter為例考慮,流水線為 MyInputFilter->MyTransformFilter->MyOutputFiler,MyInputFilter從磁盤(pán)上讀取數(shù)據(jù),MyTransformFilter轉(zhuǎn)換成大寫(xiě)字母,MyOutputFilter將轉(zhuǎn)換好的數(shù)據(jù)寫(xiě)入磁盤(pán)。因此,MyInputFilter節(jié)點(diǎn)和MyOutputFiler節(jié)點(diǎn)必須是串行執(zhí)行,而MyTransformFilter可以并發(fā)執(zhí)行。對(duì)于MyInputFilter讀入的一串順序數(shù)據(jù),token依次為1->2->3,如何保證經(jīng)過(guò)轉(zhuǎn)換后數(shù)據(jù)也是以相同的順序?qū)懭氪疟P(pán)?
秘密在于TBB中的一個(gè)類(lèi)tbb::internal::ordered_buffer,MyOutputFilter用它來(lái)保證按照token的順序執(zhí)行其隊(duì)列中的數(shù)據(jù),而不管數(shù)據(jù)進(jìn)入隊(duì)列的先后次序,換句話說(shuō),即使排在后面的數(shù)據(jù)token 2先被某個(gè)MyTransformFilter節(jié)點(diǎn)處理完畢送往MyOutputFilter,只要數(shù)據(jù)token 1沒(méi)到達(dá)沒(méi)被MyOutputFilter執(zhí)行,數(shù)據(jù)2就不會(huì)在數(shù)據(jù)1之前先寫(xiě)入磁盤(pán)。每一個(gè)需要被串行處理的節(jié)點(diǎn),都會(huì)有一個(gè)ordered_buffer類(lèi)型的成員變量。
先看看ordered_buffer的定義:
//! A buffer of ordered items.
/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */
class ordered_buffer {
typedef Token size_type;
//! Array of deferred tasks that cannot yet start executing.
/** Element is NULL if unused. */
task** array; //數(shù)組,以順序方式保存所有待處理的task
//! Size of array
/** Always 0 or a power of 2 */
size_type array_size; //數(shù)組的尺寸
//! Lowest token that can start executing.
/** All prior Token have already been seen. */
Token low_token; //當(dāng)前正在處理的token,
//! Serializes updates.
spin_mutex array_mutex; //用于保護(hù)array并發(fā)訪問(wèn)的鎖
};
仍然是在task* stage_task::execute() {
...
if( ordered_buffer* input_buffer = my_filter->input_buffer ) {
// The next filter must execute tokens in order.
stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );
clone.my_token = my_token; //token號(hào)
clone.my_token_ready = my_token_ready;
clone.my_object = my_object; //數(shù)據(jù)
next = input_buffer->put_token(clone);//將task放入隊(duì)列
} else {
/* A semi-hackish way to reexecute the same task object immediately without spawning.
recycle_as_continuation marks the task for future execution,
and then 'this' pointer is returned to bypass spawning. */
recycle_as_continuation();
next = this;
}
} else {
...
}
對(duì)于需要被串行處理的節(jié)點(diǎn),使用ordered_buffer的put_token函數(shù)將相關(guān)的數(shù)據(jù)和task引用放入隊(duì)列。put_token的實(shí)現(xiàn)是關(guān)鍵:
template<typename StageTask>
task* put_token( StageTask& putter ) {
task* result = &putter;
{
spin_mutex::scoped_lock lock( array_mutex );
Token token = putter.next_token_number();
if( token!=low_token ) {
// Trying to put token that is beyond low_token.
// Need to wait until low_token catches up before dispatching.
result = NULL;
__TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );
if( token-low_token>=array_size )
grow( token-low_token+1 );
ITT_NOTIFY( sync_releasing, this );
array[token&array_size-1] = &putter;
}
}
return result;
}
這個(gè)函數(shù)的實(shí)質(zhì)是,首先取得下一個(gè)要處理的token,然后把待執(zhí)行的task放到ordered_buffer的任務(wù)隊(duì)列中的"合適位置",而low_token指向當(dāng)前需要處理的token編號(hào)。
例如low_token=0,當(dāng)前需要處理0號(hào)token,下一個(gè)token為1,因此task保存在array[1]處并處于阻塞狀態(tài),待0號(hào)token處理完畢后,low_token增加1,再?gòu)?/span>array數(shù)組中取出1號(hào)token對(duì)應(yīng)的task進(jìn)行處理。
Pipeline中是這樣通知串行節(jié)點(diǎn)以處理好一條數(shù)據(jù)的:
還是在task* stage_task::execute() {
...
if( ordered_buffer* input_buffer = my_filter->input_buffer )
input_buffer->note_done(my_token,*this);
...
}
看看note_done的實(shí)現(xiàn)會(huì)有一種大徹大悟的感覺(jué)!如果剛完成的token就是次序最優(yōu)先的token(low_token),那取出下一個(gè)要執(zhí)行的task,以spawn的方式讓TBB的task scheduler來(lái)調(diào)度:
//! Note that processing of a token is finished.
/** Fires up processing of the next token, if processing was deferred. */
void note_done( Token token, task& spawner ) {
task* wakee=NULL;
{
spin_mutex::scoped_lock lock( array_mutex );
if( token==low_token ) {
// Wake the next task
task*& item = array[++low_token & array_size-1];
ITT_NOTIFY( sync_acquired, this );
wakee = item;
item = NULL;
}
}
if( wakee ) {
spawner.spawn(*wakee);
}
}
ordered_buffer是一個(gè)非常有趣的實(shí)現(xiàn),相比于常見(jiàn)的用FIFO queue來(lái)實(shí)現(xiàn)線程間的數(shù)據(jù)傳遞,ordered_buffer可謂精巧。我們可以好好利用ordered_buffer的原理來(lái)進(jìn)一步改進(jìn)我們的代碼。