在上一篇文章(TBB:pipeline,軟件流水線的威力)最后提出了幾個問題,我們逐個來看看TBB::Pipeline是怎么解決的。
為什么Pipeline可以保證數據執行的順序?既然TBB歸根到底是通過多線程執行任務,為什么不會在讀入先后兩個字符串后,后讀入的字符串先被下一個task處理?Pipeline里是不是有一個類似于FIFO 先進先出隊列之類的東西?
之前曾經質疑過Pipeline的性能,甚至想自己用MultiThreading來模擬一個流水線,但很快就發現其中實現的難點。數據執行的順序性就是其中之一。
假設以一個thread代表流水線上的一個節點,如果某節點是并發執行的,那么就需要2個以上的thread(A和B),上一節點處理完畢的順序數據到底是先送給A還是B呢?處理完畢后后又該先將A還是B中的數據送到下一節點呢?即使可以人為的指定A和B之間的優先規則,由于thread本身被調度的不確定性,實際運行中還是有很多不可預知的困難。
流水線的一個顯著特性就是保證每個數據均以相同的順序流過每個節點。因此,TBB::Pipeline中的一個首要任務就是在節點被并發執行的同時,仍能夠保證所處理的數據的次序而不需額外的處理代碼。此外,在要求串行處理的節點,要保證即使排在前面的數據先被處理,即使排在后面的數據先到達。
Pipeline的中心思想就是以token來控制數據的處理順序和流水線的深度。Pipeline::run函數中指定了token的最大值:
void pipeline::run( size_t max_number_of_live_tokens ) {}
每一個數據在進入Pipeline的時候都會按照先后順序依次分配一個token,如line1處:
task* stage_task::execute() {
__TBB_ASSERT( !my_at_start || !my_object, NULL );
if( my_at_start ) {
if( my_filter->is_serial() ) {
if( (my_object = (*my_filter)(my_object)) ) {
my_token = my_pipeline.token_counter++; //line1
my_token_ready = true;
ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
if( --my_pipeline.input_tokens>0 )
spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );
} else {
my_pipeline.end_of_input = true; //line2
return NULL;
}
...
}
如果當前流水線中的token全部用完了,那么暫時就不會處理新的數據,直到已進入Pipeline的數據被處理完畢有空閑的token(line2處)
仍然以TBB中的例子text_filter為例考慮,流水線為 MyInputFilter->MyTransformFilter->MyOutputFiler,MyInputFilter從磁盤上讀取數據,MyTransformFilter轉換成大寫字母,MyOutputFilter將轉換好的數據寫入磁盤。因此,MyInputFilter節點和MyOutputFiler節點必須是串行執行,而MyTransformFilter可以并發執行。對于MyInputFilter讀入的一串順序數據,token依次為1->2->3,如何保證經過轉換后數據也是以相同的順序寫入磁盤?
秘密在于TBB中的一個類tbb::internal::ordered_buffer,MyOutputFilter用它來保證按照token的順序執行其隊列中的數據,而不管數據進入隊列的先后次序,換句話說,即使排在后面的數據token 2先被某個MyTransformFilter節點處理完畢送往MyOutputFilter,只要數據token 1沒到達沒被MyOutputFilter執行,數據2就不會在數據1之前先寫入磁盤。每一個需要被串行處理的節點,都會有一個ordered_buffer類型的成員變量。
先看看ordered_buffer的定義:
//! A buffer of ordered items.
/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */
class ordered_buffer {
typedef Token size_type;
//! Array of deferred tasks that cannot yet start executing.
/** Element is NULL if unused. */
task** array; //數組,以順序方式保存所有待處理的task
//! Size of array
/** Always 0 or a power of 2 */
size_type array_size; //數組的尺寸
//! Lowest token that can start executing.
/** All prior Token have already been seen. */
Token low_token; //當前正在處理的token,
//! Serializes updates.
spin_mutex array_mutex; //用于保護array并發訪問的鎖
};
仍然是在task* stage_task::execute() {
...
if( ordered_buffer* input_buffer = my_filter->input_buffer ) {
// The next filter must execute tokens in order.
stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );
clone.my_token = my_token; //token號
clone.my_token_ready = my_token_ready;
clone.my_object = my_object; //數據
next = input_buffer->put_token(clone);//將task放入隊列
} else {
/* A semi-hackish way to reexecute the same task object immediately without spawning.
recycle_as_continuation marks the task for future execution,
and then 'this' pointer is returned to bypass spawning. */
recycle_as_continuation();
next = this;
}
} else {
...
}
對于需要被串行處理的節點,使用ordered_buffer的put_token函數將相關的數據和task引用放入隊列。put_token的實現是關鍵:
template<typename StageTask>
task* put_token( StageTask& putter ) {
task* result = &putter;
{
spin_mutex::scoped_lock lock( array_mutex );
Token token = putter.next_token_number();
if( token!=low_token ) {
// Trying to put token that is beyond low_token.
// Need to wait until low_token catches up before dispatching.
result = NULL;
__TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );
if( token-low_token>=array_size )
grow( token-low_token+1 );
ITT_NOTIFY( sync_releasing, this );
array[token&array_size-1] = &putter;
}
}
return result;
}
這個函數的實質是,首先取得下一個要處理的token,然后把待執行的task放到ordered_buffer的任務隊列中的"合適位置",而low_token指向當前需要處理的token編號。
例如low_token=0,當前需要處理0號token,下一個token為1,因此task保存在array[1]處并處于阻塞狀態,待0號token處理完畢后,low_token增加1,再從array數組中取出1號token對應的task進行處理。
Pipeline中是這樣通知串行節點以處理好一條數據的:
還是在task* stage_task::execute() {
...
if( ordered_buffer* input_buffer = my_filter->input_buffer )
input_buffer->note_done(my_token,*this);
...
}
看看note_done的實現會有一種大徹大悟的感覺!如果剛完成的token就是次序最優先的token(low_token),那取出下一個要執行的task,以spawn的方式讓TBB的task scheduler來調度:
//! Note that processing of a token is finished.
/** Fires up processing of the next token, if processing was deferred. */
void note_done( Token token, task& spawner ) {
task* wakee=NULL;
{
spin_mutex::scoped_lock lock( array_mutex );
if( token==low_token ) {
// Wake the next task
task*& item = array[++low_token & array_size-1];
ITT_NOTIFY( sync_acquired, this );
wakee = item;
item = NULL;
}
}
if( wakee ) {
spawner.spawn(*wakee);
}
}
ordered_buffer是一個非常有趣的實現,相比于常見的用FIFO queue來實現線程間的數據傳遞,ordered_buffer可謂精巧。我們可以好好利用ordered_buffer的原理來進一步改進我們的代碼。