青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

woaidongmao

文章均收錄自他人博客,但不喜標(biāo)題前加-[轉(zhuǎn)貼],因其丑陋,見(jiàn)諒!~
隨筆 - 1469, 文章 - 0, 評(píng)論 - 661, 引用 - 0
數(shù)據(jù)加載中……

Intel TBB::Pipeline,按序處理數(shù)據(jù)

在上一篇文章(TBB:pipeline,軟件流水線的威力)最后提出了幾個(gè)問(wèn)題,我們逐個(gè)來(lái)看看TBB::Pipeline是怎么解決的。

 

 

為什么Pipeline可以保證數(shù)據(jù)執(zhí)行的順序?既然TBB歸根到底是通過(guò)多線程執(zhí)行任務(wù),為什么不會(huì)在讀入先后兩個(gè)字符串后,后讀入的字符串先被下一個(gè)task處理?Pipeline里是不是有一個(gè)類似于FIFO 先進(jìn)先出隊(duì)列之類的東西?

 

 

之前曾經(jīng)質(zhì)疑過(guò)Pipeline的性能,甚至想自己用MultiThreading來(lái)模擬一個(gè)流水線,但很快就發(fā)現(xiàn)其中實(shí)現(xiàn)的難點(diǎn)。數(shù)據(jù)執(zhí)行的順序性就是其中之一。

 

假設(shè)以一個(gè)thread代表流水線上的一個(gè)節(jié)點(diǎn),如果某節(jié)點(diǎn)是并發(fā)執(zhí)行的,那么就需要2個(gè)以上的thread(AB),上一節(jié)點(diǎn)處理完畢的順序數(shù)據(jù)到底是先送給A還是B呢?處理完畢后后又該先將A還是B中的數(shù)據(jù)送到下一節(jié)點(diǎn)呢?即使可以人為的指定AB之間的優(yōu)先規(guī)則,由于thread本身被調(diào)度的不確定性,實(shí)際運(yùn)行中還是有很多不可預(yù)知的困難。

 

流水線的一個(gè)顯著特性就是保證每個(gè)數(shù)據(jù)均以相同的順序流過(guò)每個(gè)節(jié)點(diǎn)。因此,TBB::Pipeline中的一個(gè)首要任務(wù)就是在節(jié)點(diǎn)被并發(fā)執(zhí)行的同時(shí),仍能夠保證所處理的數(shù)據(jù)的次序而不需額外的處理代碼。此外,在要求串行處理的節(jié)點(diǎn),要保證即使排在前面的數(shù)據(jù)先被處理,即使排在后面的數(shù)據(jù)先到達(dá)。

 

Pipeline的中心思想就是以token來(lái)控制數(shù)據(jù)的處理順序和流水線的深度。Pipeline::run函數(shù)中指定了token的最大值:

 

void pipeline::run( size_t max_number_of_live_tokens ) {}

 

 

每一個(gè)數(shù)據(jù)在進(jìn)入Pipeline的時(shí)候都會(huì)按照先后順序依次分配一個(gè)token,如line1處:

 

task* stage_task::execute() {

    __TBB_ASSERT( !my_at_start || !my_object, NULL );

    if( my_at_start ) {

        if( my_filter->is_serial() ) {

            if( (my_object = (*my_filter)(my_object)) ) {

                my_token = my_pipeline.token_counter++; //line1

                my_token_ready = true;

                ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );

                if( --my_pipeline.input_tokens>0 )

                    spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );

            } else {

                my_pipeline.end_of_input = true; //line2

                return NULL;

            }

...

}

 

如果當(dāng)前流水線中的token全部用完了,那么暫時(shí)就不會(huì)處理新的數(shù)據(jù),直到已進(jìn)入Pipeline的數(shù)據(jù)被處理完畢有空閑的token(line2)

 

仍然以TBB中的例子text_filter為例考慮,流水線為 MyInputFilter->MyTransformFilter->MyOutputFilerMyInputFilter從磁盤(pán)上讀取數(shù)據(jù),MyTransformFilter轉(zhuǎn)換成大寫(xiě)字母,MyOutputFilter將轉(zhuǎn)換好的數(shù)據(jù)寫(xiě)入磁盤(pán)。因此,MyInputFilter節(jié)點(diǎn)和MyOutputFiler節(jié)點(diǎn)必須是串行執(zhí)行,而MyTransformFilter可以并發(fā)執(zhí)行。對(duì)于MyInputFilter讀入的一串順序數(shù)據(jù),token依次為1->2->3,如何保證經(jīng)過(guò)轉(zhuǎn)換后數(shù)據(jù)也是以相同的順序?qū)懭氪疟P(pán)?

 

秘密在于TBB中的一個(gè)類tbb::internal::ordered_bufferMyOutputFilter用它來(lái)保證按照token的順序執(zhí)行其隊(duì)列中的數(shù)據(jù),而不管數(shù)據(jù)進(jìn)入隊(duì)列的先后次序,換句話說(shuō),即使排在后面的數(shù)據(jù)token 2先被某個(gè)MyTransformFilter節(jié)點(diǎn)處理完畢送往MyOutputFilter,只要數(shù)據(jù)token 1沒(méi)到達(dá)沒(méi)被MyOutputFilter執(zhí)行,數(shù)據(jù)2就不會(huì)在數(shù)據(jù)1之前先寫(xiě)入磁盤(pán)。每一個(gè)需要被串行處理的節(jié)點(diǎn),都會(huì)有一個(gè)ordered_buffer類型的成員變量。

 

先看看ordered_buffer的定義:

 

//! A buffer of ordered items.

/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */

class ordered_buffer {

    typedef  Token  size_type;

 

    //! Array of deferred tasks that cannot yet start executing.

    /** Element is NULL if unused. */

    task** array; //數(shù)組,以順序方式保存所有待處理的task

 

    //! Size of array

    /** Always 0 or a power of 2 */

    size_type array_size; //數(shù)組的尺寸

 

    //! Lowest token that can start executing.

    /** All prior Token have already been seen. */

    Token low_token; //當(dāng)前正在處理的token,

 

    //! Serializes updates.

    spin_mutex array_mutex; //用于保護(hù)array并發(fā)訪問(wèn)的鎖

};

 

仍然是在task* stage_task::execute() {

...

 if( ordered_buffer* input_buffer = my_filter->input_buffer ) {

            // The next filter must execute tokens in order.

            stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );

            clone.my_token = my_token;                       //token號(hào)

            clone.my_token_ready = my_token_ready;

            clone.my_object = my_object;                    //數(shù)據(jù)

            next = input_buffer->put_token(clone);//task放入隊(duì)列

        } else {

            /* A semi-hackish way to reexecute the same task object immediately without spawning.

               recycle_as_continuation marks the task for future execution,

               and then 'this' pointer is returned to bypass spawning. */

            recycle_as_continuation();

            next = this;

        }

    } else {

...

}

 

對(duì)于需要被串行處理的節(jié)點(diǎn),使用ordered_bufferput_token函數(shù)將相關(guān)的數(shù)據(jù)和task引用放入隊(duì)列。put_token的實(shí)現(xiàn)是關(guān)鍵:

 

    template<typename StageTask>

    task* put_token( StageTask& putter ) {

        task* result = &putter;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            Token token = putter.next_token_number();

            if( token!=low_token ) {

                // Trying to put token that is beyond low_token.

                // Need to wait until low_token catches up before dispatching.

                result = NULL;

                __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );

                if( token-low_token>=array_size )

                    grow( token-low_token+1 );

                ITT_NOTIFY( sync_releasing, this );

                array[token&array_size-1] = &putter;

            }

        }

        return result;

    }

這個(gè)函數(shù)的實(shí)質(zhì)是,首先取得下一個(gè)要處理的token,然后把待執(zhí)行的task放到ordered_buffer的任務(wù)隊(duì)列中的"合適位置",而low_token指向當(dāng)前需要處理的token編號(hào)。

 

例如low_token=0,當(dāng)前需要處理0號(hào)token,下一個(gè)token1,因此task保存在array[1]處并處于阻塞狀態(tài),待0號(hào)token處理完畢后,low_token增加1,再?gòu)?/span>array數(shù)組中取出1號(hào)token對(duì)應(yīng)的task進(jìn)行處理。

 

Pipeline中是這樣通知串行節(jié)點(diǎn)以處理好一條數(shù)據(jù)的:

還是在task* stage_task::execute() {

...

if( ordered_buffer* input_buffer = my_filter->input_buffer )

            input_buffer->note_done(my_token,*this);

...

}

 

看看note_done的實(shí)現(xiàn)會(huì)有一種大徹大悟的感覺(jué)!如果剛完成的token就是次序最優(yōu)先的token(low_token),那取出下一個(gè)要執(zhí)行的task,以spawn的方式讓TBBtask scheduler來(lái)調(diào)度:

 

 

//! Note that processing of a token is finished.

    /** Fires up processing of the next token, if processing was deferred. */

    void note_done( Token token, task& spawner ) {

        task* wakee=NULL;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            if( token==low_token ) {

                // Wake the next task

                task*& item = array[++low_token & array_size-1];

                ITT_NOTIFY( sync_acquired, this );

                wakee = item;

                item = NULL;

            }

        }

        if( wakee ) {

            spawner.spawn(*wakee);

        }

    }

 

 

 

 

ordered_buffer是一個(gè)非常有趣的實(shí)現(xiàn),相比于常見(jiàn)的用FIFO queue來(lái)實(shí)現(xiàn)線程間的數(shù)據(jù)傳遞,ordered_buffer可謂精巧。我們可以好好利用ordered_buffer的原理來(lái)進(jìn)一步改進(jìn)我們的代碼。

 

posted on 2009-05-02 01:53 肥仔 閱讀(2536) 評(píng)論(0)  編輯 收藏 引用 所屬分類: 庫(kù) & 代碼段

青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            亚洲人成久久| 亚洲精品裸体| 狠狠狠色丁香婷婷综合激情| 国产精品外国| 国产欧美亚洲视频| 国产日韩欧美不卡| 狠狠色狠狠色综合人人| 亚洲日韩欧美视频| 亚洲一区在线直播| 久久精品99国产精品| 欧美成人精品| 在线日韩视频| 亚洲精品资源美女情侣酒店| 亚洲影院在线| 蜜桃久久精品一区二区| 亚洲美女一区| 久久久久国产免费免费| 欧美日韩精品一区二区在线播放| 国产欧美精品| 亚洲麻豆av| 久久精品国亚洲| 最近中文字幕mv在线一区二区三区四区 | 一区二区三区欧美在线| 欧美一区二区三区另类| 欧美激情第1页| 国产亚洲一二三区| 99视频精品全部免费在线| 久久精品国产久精国产一老狼| 欧美激情亚洲另类| 欧美伊人久久久久久午夜久久久久 | 欧美大片va欧美在线播放| 亚洲精品网址在线观看| 久久爱www| 欧美日韩中文另类| 亚洲欧洲日产国产网站| 久久精品国产清高在天天线| 一本色道久久| 欧美精品久久久久久久久老牛影院| 国产精品永久免费观看| 99re热精品| 欧美成人免费观看| 久久成人一区二区| 国产精品影音先锋| 亚洲午夜久久久久久久久电影网| 欧美成人日本| 狂野欧美一区| 亚洲第一网站免费视频| 久久躁日日躁aaaaxxxx| 欧美一区二区在线免费观看| 国产精品毛片在线看| 亚洲在线免费观看| 亚洲视频免费在线观看| 欧美午夜不卡视频| 亚洲一区二区精品| 日韩午夜av| 免费观看亚洲视频大全| 午夜国产精品视频免费体验区| 欧美午夜电影在线观看| 亚洲在线视频| 亚洲在线国产日韩欧美| 国产欧美日韩在线播放| 久久成人免费网| 欧美亚洲自偷自偷| 伊人成人在线视频| 欧美成人性网| 免费黄网站欧美| 亚洲七七久久综合桃花剧情介绍| 欧美激情网友自拍| 欧美激情综合五月色丁香| 99精品99久久久久久宅男| 亚洲三级影院| 欧美视频免费在线观看| 午夜欧美精品久久久久久久| 亚洲专区在线| 国产一区二区日韩| 欧美3dxxxxhd| 欧美三级视频在线| 久久国产精品99久久久久久老狼| 午夜精品久久久久久久| 极品日韩久久| 亚洲电影观看| 欧美性做爰毛片| 久久深夜福利免费观看| 欧美精品久久一区二区| 久久精品二区| 欧美成人综合一区| 久久久久久久激情视频| 欧美剧在线免费观看网站| 一区二区三区在线看| 欧美激情区在线播放| 欧美日本在线看| 欧美一级电影久久| 免费视频亚洲| 亚洲综合视频1区| 久久九九国产精品| 一区二区欧美视频| 久久天堂国产精品| 亚洲综合丁香| 老司机一区二区| 亚洲欧美一区二区视频| 嫩草国产精品入口| 久久精品毛片| 欧美日韩亚洲一区二区三区四区| 久久九九免费视频| 欧美性猛交xxxx乱大交蜜桃| 欧美成人免费在线视频| 国产精品区免费视频| 欧美激情亚洲另类| 一本色道久久99精品综合| 好看的av在线不卡观看| 亚洲少妇最新在线视频| 亚洲国产一区在线| 久久激情一区| 久久精品在线观看| 国产精品视频一区二区三区| 亚洲毛片在线免费观看| 最近看过的日韩成人| 久久久亚洲国产天美传媒修理工| 午夜在线一区| 国产精品久99| 99日韩精品| 在线视频免费在线观看一区二区| 免费观看亚洲视频大全| 免费成人黄色片| 国内成人精品视频| 久久精品在这里| 久久影视三级福利片| 国产有码在线一区二区视频| 亚洲欧美区自拍先锋| 亚欧美中日韩视频| 国产欧美日韩在线视频| 亚洲一级网站| 欧美专区18| 国产一区二区成人| 亚洲欧美国产另类| 久久福利资源站| 国产日韩欧美a| 欧美一级淫片播放口| 久久爱www| 激情综合自拍| 欧美激情国产日韩| 9i看片成人免费高清| 亚洲一区二区三区四区五区黄| 欧美日韩在线免费| 这里只有视频精品| 欧美一区二区女人| 国产精品自在线| 久久精品国产欧美激情| 免费在线观看成人av| 亚洲日本中文字幕区| 欧美日韩精品福利| 亚洲视频在线观看免费| 欧美在线亚洲在线| 激情91久久| 一区二区三区|亚洲午夜| 伊人久久大香线蕉综合热线| 久久精品国产一区二区三区免费看| 欧美ed2k| 亚洲一区免费| 国产一区二区三区在线观看免费视频 | 亚洲第一中文字幕在线观看| 国产精品入口麻豆原神| 国产精品成人一区二区三区吃奶 | 一区二区视频欧美| 久久精品国产视频| 欧美国产1区2区| 一区二区三区免费看| 国产免费观看久久| 欧美大片在线观看一区二区| 亚洲综合精品一区二区| 女女同性女同一区二区三区91| 99综合精品| 国产日韩欧美三区| 欧美精品免费播放| 午夜日韩在线| 亚洲美女91| 欧美成人精品在线| 欧美综合国产| 夜色激情一区二区| 一区在线视频观看| 欧美视频在线一区二区三区| 久久aⅴ国产欧美74aaa| 99视频在线观看一区三区| 久久综合色8888| 性感少妇一区| 亚洲一区二区三区欧美| 亚洲日本欧美天堂| 一区二区在线免费观看| 国产精品久久久久久久久久免费 | 欧美视频中文一区二区三区在线观看| 在线亚洲免费| 欧美电影免费网站| 欧美诱惑福利视频| 亚洲宅男天堂在线观看无病毒| 亚洲国产毛片完整版 | 亚洲视频一二区| 一区二区激情| 亚洲日本免费| 欧美激情成人在线| 国产精品久久一区主播|