• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            woaidongmao

            文章均收錄自他人博客,但不喜標(biāo)題前加-[轉(zhuǎn)貼],因其丑陋,見(jiàn)諒!~
            隨筆 - 1469, 文章 - 0, 評(píng)論 - 661, 引用 - 0
            數(shù)據(jù)加載中……

            Intel TBB::Pipeline,按序處理數(shù)據(jù)

            在上一篇文章(TBB:pipeline,軟件流水線的威力)最后提出了幾個(gè)問(wèn)題,我們逐個(gè)來(lái)看看TBB::Pipeline是怎么解決的。

             

             

            為什么Pipeline可以保證數(shù)據(jù)執(zhí)行的順序?既然TBB歸根到底是通過(guò)多線程執(zhí)行任務(wù),為什么不會(huì)在讀入先后兩個(gè)字符串后,后讀入的字符串先被下一個(gè)task處理?Pipeline里是不是有一個(gè)類(lèi)似于FIFO 先進(jìn)先出隊(duì)列之類(lèi)的東西?

             

             

            之前曾經(jīng)質(zhì)疑過(guò)Pipeline的性能,甚至想自己用MultiThreading來(lái)模擬一個(gè)流水線,但很快就發(fā)現(xiàn)其中實(shí)現(xiàn)的難點(diǎn)。數(shù)據(jù)執(zhí)行的順序性就是其中之一。

             

            假設(shè)以一個(gè)thread代表流水線上的一個(gè)節(jié)點(diǎn),如果某節(jié)點(diǎn)是并發(fā)執(zhí)行的,那么就需要2個(gè)以上的thread(AB),上一節(jié)點(diǎn)處理完畢的順序數(shù)據(jù)到底是先送給A還是B呢?處理完畢后后又該先將A還是B中的數(shù)據(jù)送到下一節(jié)點(diǎn)呢?即使可以人為的指定AB之間的優(yōu)先規(guī)則,由于thread本身被調(diào)度的不確定性,實(shí)際運(yùn)行中還是有很多不可預(yù)知的困難。

             

            流水線的一個(gè)顯著特性就是保證每個(gè)數(shù)據(jù)均以相同的順序流過(guò)每個(gè)節(jié)點(diǎn)。因此,TBB::Pipeline中的一個(gè)首要任務(wù)就是在節(jié)點(diǎn)被并發(fā)執(zhí)行的同時(shí),仍能夠保證所處理的數(shù)據(jù)的次序而不需額外的處理代碼。此外,在要求串行處理的節(jié)點(diǎn),要保證即使排在前面的數(shù)據(jù)先被處理,即使排在后面的數(shù)據(jù)先到達(dá)。

             

            Pipeline的中心思想就是以token來(lái)控制數(shù)據(jù)的處理順序和流水線的深度。Pipeline::run函數(shù)中指定了token的最大值:

             

            void pipeline::run( size_t max_number_of_live_tokens ) {}

             

             

            每一個(gè)數(shù)據(jù)在進(jìn)入Pipeline的時(shí)候都會(huì)按照先后順序依次分配一個(gè)token,如line1處:

             

            task* stage_task::execute() {

                __TBB_ASSERT( !my_at_start || !my_object, NULL );

                if( my_at_start ) {

                    if( my_filter->is_serial() ) {

                        if( (my_object = (*my_filter)(my_object)) ) {

                            my_token = my_pipeline.token_counter++; //line1

                            my_token_ready = true;

                            ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );

                            if( --my_pipeline.input_tokens>0 )

                                spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );

                        } else {

                            my_pipeline.end_of_input = true; //line2

                            return NULL;

                        }

            ...

            }

             

            如果當(dāng)前流水線中的token全部用完了,那么暫時(shí)就不會(huì)處理新的數(shù)據(jù),直到已進(jìn)入Pipeline的數(shù)據(jù)被處理完畢有空閑的token(line2)

             

            仍然以TBB中的例子text_filter為例考慮,流水線為 MyInputFilter->MyTransformFilter->MyOutputFilerMyInputFilter從磁盤(pán)上讀取數(shù)據(jù),MyTransformFilter轉(zhuǎn)換成大寫(xiě)字母,MyOutputFilter將轉(zhuǎn)換好的數(shù)據(jù)寫(xiě)入磁盤(pán)。因此,MyInputFilter節(jié)點(diǎn)和MyOutputFiler節(jié)點(diǎn)必須是串行執(zhí)行,而MyTransformFilter可以并發(fā)執(zhí)行。對(duì)于MyInputFilter讀入的一串順序數(shù)據(jù),token依次為1->2->3,如何保證經(jīng)過(guò)轉(zhuǎn)換后數(shù)據(jù)也是以相同的順序?qū)懭氪疟P(pán)?

             

            秘密在于TBB中的一個(gè)類(lèi)tbb::internal::ordered_bufferMyOutputFilter用它來(lái)保證按照token的順序執(zhí)行其隊(duì)列中的數(shù)據(jù),而不管數(shù)據(jù)進(jìn)入隊(duì)列的先后次序,換句話說(shuō),即使排在后面的數(shù)據(jù)token 2先被某個(gè)MyTransformFilter節(jié)點(diǎn)處理完畢送往MyOutputFilter,只要數(shù)據(jù)token 1沒(méi)到達(dá)沒(méi)被MyOutputFilter執(zhí)行,數(shù)據(jù)2就不會(huì)在數(shù)據(jù)1之前先寫(xiě)入磁盤(pán)。每一個(gè)需要被串行處理的節(jié)點(diǎn),都會(huì)有一個(gè)ordered_buffer類(lèi)型的成員變量。

             

            先看看ordered_buffer的定義:

             

            //! A buffer of ordered items.

            /** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */

            class ordered_buffer {

                typedef  Token  size_type;

             

                //! Array of deferred tasks that cannot yet start executing.

                /** Element is NULL if unused. */

                task** array; //數(shù)組,以順序方式保存所有待處理的task

             

                //! Size of array

                /** Always 0 or a power of 2 */

                size_type array_size; //數(shù)組的尺寸

             

                //! Lowest token that can start executing.

                /** All prior Token have already been seen. */

                Token low_token; //當(dāng)前正在處理的token,

             

                //! Serializes updates.

                spin_mutex array_mutex; //用于保護(hù)array并發(fā)訪問(wèn)的鎖

            };

             

            仍然是在task* stage_task::execute() {

            ...

             if( ordered_buffer* input_buffer = my_filter->input_buffer ) {

                        // The next filter must execute tokens in order.

                        stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );

                        clone.my_token = my_token;                       //token號(hào)

                        clone.my_token_ready = my_token_ready;

                        clone.my_object = my_object;                    //數(shù)據(jù)

                        next = input_buffer->put_token(clone);//task放入隊(duì)列

                    } else {

                        /* A semi-hackish way to reexecute the same task object immediately without spawning.

                           recycle_as_continuation marks the task for future execution,

                           and then 'this' pointer is returned to bypass spawning. */

                        recycle_as_continuation();

                        next = this;

                    }

                } else {

            ...

            }

             

            對(duì)于需要被串行處理的節(jié)點(diǎn),使用ordered_bufferput_token函數(shù)將相關(guān)的數(shù)據(jù)和task引用放入隊(duì)列。put_token的實(shí)現(xiàn)是關(guān)鍵:

             

                template<typename StageTask>

                task* put_token( StageTask& putter ) {

                    task* result = &putter;

                    {

                        spin_mutex::scoped_lock lock( array_mutex );

                        Token token = putter.next_token_number();

                        if( token!=low_token ) {

                            // Trying to put token that is beyond low_token.

                            // Need to wait until low_token catches up before dispatching.

                            result = NULL;

                            __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );

                            if( token-low_token>=array_size )

                                grow( token-low_token+1 );

                            ITT_NOTIFY( sync_releasing, this );

                            array[token&array_size-1] = &putter;

                        }

                    }

                    return result;

                }

            這個(gè)函數(shù)的實(shí)質(zhì)是,首先取得下一個(gè)要處理的token,然后把待執(zhí)行的task放到ordered_buffer的任務(wù)隊(duì)列中的"合適位置",而low_token指向當(dāng)前需要處理的token編號(hào)。

             

            例如low_token=0,當(dāng)前需要處理0號(hào)token,下一個(gè)token1,因此task保存在array[1]處并處于阻塞狀態(tài),待0號(hào)token處理完畢后,low_token增加1,再?gòu)?/span>array數(shù)組中取出1號(hào)token對(duì)應(yīng)的task進(jìn)行處理。

             

            Pipeline中是這樣通知串行節(jié)點(diǎn)以處理好一條數(shù)據(jù)的:

            還是在task* stage_task::execute() {

            ...

            if( ordered_buffer* input_buffer = my_filter->input_buffer )

                        input_buffer->note_done(my_token,*this);

            ...

            }

             

            看看note_done的實(shí)現(xiàn)會(huì)有一種大徹大悟的感覺(jué)!如果剛完成的token就是次序最優(yōu)先的token(low_token),那取出下一個(gè)要執(zhí)行的task,以spawn的方式讓TBBtask scheduler來(lái)調(diào)度:

             

             

            //! Note that processing of a token is finished.

                /** Fires up processing of the next token, if processing was deferred. */

                void note_done( Token token, task& spawner ) {

                    task* wakee=NULL;

                    {

                        spin_mutex::scoped_lock lock( array_mutex );

                        if( token==low_token ) {

                            // Wake the next task

                            task*& item = array[++low_token & array_size-1];

                            ITT_NOTIFY( sync_acquired, this );

                            wakee = item;

                            item = NULL;

                        }

                    }

                    if( wakee ) {

                        spawner.spawn(*wakee);

                    }

                }

             

             

             

             

            ordered_buffer是一個(gè)非常有趣的實(shí)現(xiàn),相比于常見(jiàn)的用FIFO queue來(lái)實(shí)現(xiàn)線程間的數(shù)據(jù)傳遞,ordered_buffer可謂精巧。我們可以好好利用ordered_buffer的原理來(lái)進(jìn)一步改進(jìn)我們的代碼。

             

            posted on 2009-05-02 01:53 肥仔 閱讀(2490) 評(píng)論(0)  編輯 收藏 引用 所屬分類(lèi): 庫(kù) & 代碼段

            人妻无码久久精品| 久久―日本道色综合久久| 久久久久久久久久久久中文字幕| 久久伊人五月丁香狠狠色| 久久婷婷五月综合国产尤物app| 老司机国内精品久久久久| 91麻豆精品国产91久久久久久 | 伊人久久国产免费观看视频| 久久亚洲国产成人精品性色| 91久久精品国产成人久久| 久久精品国产99国产精品导航| 色综合合久久天天综合绕视看| 日本久久中文字幕| 热re99久久6国产精品免费| 国产 亚洲 欧美 另类 久久| 久久综合88熟人妻| 久久最新免费视频| 久久精品www| av国内精品久久久久影院| 伊人久久大香线蕉综合影院首页| 久久影视国产亚洲| 97久久综合精品久久久综合| 色综合久久久久久久久五月| 精品熟女少妇AV免费久久| 狠狠色综合久久久久尤物| 久久久久亚洲精品无码网址 | 青草国产精品久久久久久| 久久国产热这里只有精品| 国产精品无码久久四虎| 精品久久8x国产免费观看| 99久久无色码中文字幕人妻| 欧美激情精品久久久久久| 久久精品国产亚洲5555| 久久综合狠狠综合久久激情 | 久久九九全国免费| 国产精品九九九久久九九| 日本久久久久亚洲中字幕| 久久久久久午夜成人影院| 亚洲狠狠婷婷综合久久久久| 免费无码国产欧美久久18| 亚洲精品高清国产一线久久|