• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            woaidongmao

            文章均收錄自他人博客,但不喜標題前加-[轉貼],因其丑陋,見諒!~
            隨筆 - 1469, 文章 - 0, 評論 - 661, 引用 - 0
            數據加載中……

            Intel TBB::Pipeline,按序處理數據

            在上一篇文章(TBB:pipeline,軟件流水線的威力)最后提出了幾個問題,我們逐個來看看TBB::Pipeline是怎么解決的。

             

             

            為什么Pipeline可以保證數據執行的順序?既然TBB歸根到底是通過多線程執行任務,為什么不會在讀入先后兩個字符串后,后讀入的字符串先被下一個task處理?Pipeline里是不是有一個類似于FIFO 先進先出隊列之類的東西?

             

             

            之前曾經質疑過Pipeline的性能,甚至想自己用MultiThreading來模擬一個流水線,但很快就發現其中實現的難點。數據執行的順序性就是其中之一。

             

            假設以一個thread代表流水線上的一個節點,如果某節點是并發執行的,那么就需要2個以上的thread(AB),上一節點處理完畢的順序數據到底是先送給A還是B呢?處理完畢后后又該先將A還是B中的數據送到下一節點呢?即使可以人為的指定AB之間的優先規則,由于thread本身被調度的不確定性,實際運行中還是有很多不可預知的困難。

             

            流水線的一個顯著特性就是保證每個數據均以相同的順序流過每個節點。因此,TBB::Pipeline中的一個首要任務就是在節點被并發執行的同時,仍能夠保證所處理的數據的次序而不需額外的處理代碼。此外,在要求串行處理的節點,要保證即使排在前面的數據先被處理,即使排在后面的數據先到達。

             

            Pipeline的中心思想就是以token來控制數據的處理順序和流水線的深度。Pipeline::run函數中指定了token的最大值:

             

            void pipeline::run( size_t max_number_of_live_tokens ) {}

             

             

            每一個數據在進入Pipeline的時候都會按照先后順序依次分配一個token,如line1處:

             

            task* stage_task::execute() {

                __TBB_ASSERT( !my_at_start || !my_object, NULL );

                if( my_at_start ) {

                    if( my_filter->is_serial() ) {

                        if( (my_object = (*my_filter)(my_object)) ) {

                            my_token = my_pipeline.token_counter++; //line1

                            my_token_ready = true;

                            ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );

                            if( --my_pipeline.input_tokens>0 )

                                spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );

                        } else {

                            my_pipeline.end_of_input = true; //line2

                            return NULL;

                        }

            ...

            }

             

            如果當前流水線中的token全部用完了,那么暫時就不會處理新的數據,直到已進入Pipeline的數據被處理完畢有空閑的token(line2)

             

            仍然以TBB中的例子text_filter為例考慮,流水線為 MyInputFilter->MyTransformFilter->MyOutputFilerMyInputFilter從磁盤上讀取數據,MyTransformFilter轉換成大寫字母,MyOutputFilter將轉換好的數據寫入磁盤。因此,MyInputFilter節點和MyOutputFiler節點必須是串行執行,而MyTransformFilter可以并發執行。對于MyInputFilter讀入的一串順序數據,token依次為1->2->3,如何保證經過轉換后數據也是以相同的順序寫入磁盤?

             

            秘密在于TBB中的一個類tbb::internal::ordered_buffer,MyOutputFilter用它來保證按照token的順序執行其隊列中的數據,而不管數據進入隊列的先后次序,換句話說,即使排在后面的數據token 2先被某個MyTransformFilter節點處理完畢送往MyOutputFilter,只要數據token 1沒到達沒被MyOutputFilter執行,數據2就不會在數據1之前先寫入磁盤。每一個需要被串行處理的節點,都會有一個ordered_buffer類型的成員變量。

             

            先看看ordered_buffer的定義:

             

            //! A buffer of ordered items.

            /** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */

            class ordered_buffer {

                typedef  Token  size_type;

             

                //! Array of deferred tasks that cannot yet start executing.

                /** Element is NULL if unused. */

                task** array; //數組,以順序方式保存所有待處理的task

             

                //! Size of array

                /** Always 0 or a power of 2 */

                size_type array_size; //數組的尺寸

             

                //! Lowest token that can start executing.

                /** All prior Token have already been seen. */

                Token low_token; //當前正在處理的token,

             

                //! Serializes updates.

                spin_mutex array_mutex; //用于保護array并發訪問的鎖

            };

             

            仍然是在task* stage_task::execute() {

            ...

             if( ordered_buffer* input_buffer = my_filter->input_buffer ) {

                        // The next filter must execute tokens in order.

                        stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );

                        clone.my_token = my_token;                       //token

                        clone.my_token_ready = my_token_ready;

                        clone.my_object = my_object;                    //數據

                        next = input_buffer->put_token(clone);//task放入隊列

                    } else {

                        /* A semi-hackish way to reexecute the same task object immediately without spawning.

                           recycle_as_continuation marks the task for future execution,

                           and then 'this' pointer is returned to bypass spawning. */

                        recycle_as_continuation();

                        next = this;

                    }

                } else {

            ...

            }

             

            對于需要被串行處理的節點,使用ordered_bufferput_token函數將相關的數據和task引用放入隊列。put_token的實現是關鍵:

             

                template<typename StageTask>

                task* put_token( StageTask& putter ) {

                    task* result = &putter;

                    {

                        spin_mutex::scoped_lock lock( array_mutex );

                        Token token = putter.next_token_number();

                        if( token!=low_token ) {

                            // Trying to put token that is beyond low_token.

                            // Need to wait until low_token catches up before dispatching.

                            result = NULL;

                            __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );

                            if( token-low_token>=array_size )

                                grow( token-low_token+1 );

                            ITT_NOTIFY( sync_releasing, this );

                            array[token&array_size-1] = &putter;

                        }

                    }

                    return result;

                }

            這個函數的實質是,首先取得下一個要處理的token,然后把待執行的task放到ordered_buffer的任務隊列中的"合適位置",而low_token指向當前需要處理的token編號。

             

            例如low_token=0,當前需要處理0token,下一個token1,因此task保存在array[1]處并處于阻塞狀態,待0token處理完畢后,low_token增加1,再從array數組中取出1token對應的task進行處理。

             

            Pipeline中是這樣通知串行節點以處理好一條數據的:

            還是在task* stage_task::execute() {

            ...

            if( ordered_buffer* input_buffer = my_filter->input_buffer )

                        input_buffer->note_done(my_token,*this);

            ...

            }

             

            看看note_done的實現會有一種大徹大悟的感覺!如果剛完成的token就是次序最優先的token(low_token),那取出下一個要執行的task,以spawn的方式讓TBBtask scheduler來調度:

             

             

            //! Note that processing of a token is finished.

                /** Fires up processing of the next token, if processing was deferred. */

                void note_done( Token token, task& spawner ) {

                    task* wakee=NULL;

                    {

                        spin_mutex::scoped_lock lock( array_mutex );

                        if( token==low_token ) {

                            // Wake the next task

                            task*& item = array[++low_token & array_size-1];

                            ITT_NOTIFY( sync_acquired, this );

                            wakee = item;

                            item = NULL;

                        }

                    }

                    if( wakee ) {

                        spawner.spawn(*wakee);

                    }

                }

             

             

             

             

            ordered_buffer是一個非常有趣的實現,相比于常見的用FIFO queue來實現線程間的數據傳遞,ordered_buffer可謂精巧。我們可以好好利用ordered_buffer的原理來進一步改進我們的代碼。

             

            posted on 2009-05-02 01:53 肥仔 閱讀(2490) 評論(0)  編輯 收藏 引用 所屬分類: 庫 & 代碼段

            欧美精品一区二区精品久久| 久久亚洲2019中文字幕| 狠狠狠色丁香婷婷综合久久五月 | 久久久精品久久久久特色影视| 久久毛片免费看一区二区三区| 国产成人精品综合久久久久| 日本精品久久久中文字幕| 久久国产精品无| 热久久国产精品| 日产精品久久久久久久性色| 久久久久九国产精品| 精品无码久久久久国产| 一本久久综合亚洲鲁鲁五月天| 久久线看观看精品香蕉国产| 97香蕉久久夜色精品国产 | 伊人精品久久久久7777| 久久99热国产这有精品| 伊人久久久AV老熟妇色| 日韩欧美亚洲国产精品字幕久久久| 69久久夜色精品国产69| 久久亚洲国产精品成人AV秋霞| 欧美日韩中文字幕久久伊人| 久久久久久毛片免费播放| 色天使久久综合网天天| 亚洲一级Av无码毛片久久精品| 国产午夜精品理论片久久| a高清免费毛片久久| 久久永久免费人妻精品下载| 人妻无码精品久久亚瑟影视| 久久久久久久亚洲精品| 精品久久综合1区2区3区激情| 久久精品国产99国产精品澳门| 久久国产精品一国产精品金尊| 中文无码久久精品| 久久久SS麻豆欧美国产日韩| 久久亚洲精品无码VA大香大香| 最新久久免费视频| 久久久久免费精品国产| 99精品国产99久久久久久97| 无遮挡粉嫩小泬久久久久久久 | 成人综合伊人五月婷久久|