來(lái)自 http://sites.google.com/site/polarisnotme/boost/asio/xue-xi-1

boost::asio::Io_service

構(gòu)造函數(shù)

構(gòu)造函數(shù)的主要?jiǎng)幼骶褪钦{(diào)用CreateIoCompletionPort創(chuàng)建了一個(gè)初始iocp。

Dispatch和post的區(qū)別

Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后執(zhí)行。

Dispatch會(huì)首先檢查當(dāng)前thread是不是io_service.run/runonce/poll/poll_once線程,如果是,則直接運(yùn)行。

poll和run的區(qū)別

兩 者代碼幾乎一樣,都是首先檢查是否有outstanding的消息,如果沒(méi)有直接返回,否則調(diào)用do_one()。唯一的不同是在調(diào)用size_t do_one(bool block, boost::system::error_code& ec)時(shí)前者block = false,后者block = true。

該參數(shù)的作用體現(xiàn)在:

BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,

&completion_key, &overlapped, block ? timeout : 0);

因此可以看出,poll處理的是已經(jīng)完成了的消息,也即GetQueuedCompletionStatus立刻能返回的。而run則會(huì)導(dǎo)致等待。

poll 的作用是依次處理當(dāng)前已經(jīng)完成了的消息,直到所有已經(jīng)完成的消息處理完成為止。如果沒(méi)有已經(jīng)完成了得消息,函數(shù)將退出。poll不會(huì)等待。這個(gè)函數(shù)有點(diǎn)類 似于PeekMessage。鑒于PeekMessage很少用到,poll的使用場(chǎng)景我也有點(diǎn)疑惑。poll的一個(gè)應(yīng)用場(chǎng)景是如果希望handler的 處理有優(yōu)先級(jí),也即,如果消息完成速度很快,同時(shí)可能完成多個(gè)消息,而消息的處理過(guò)程可能比較耗時(shí),那么可以在完成之后的消息處理函數(shù)中不真正處理數(shù)據(jù), 而是把handler保存在隊(duì)列中,然后按優(yōu)先級(jí)統(tǒng)一處理。代碼如下:

  while (io_service.run_one())  { 
    // The custom invocation hook adds the handlers to the priority queue 
    // rather than executing them from within the poll_one() call. 
    while (io_service.poll_one())      ;    pri_queue.execute_all();  }

循環(huán)執(zhí)行poll_one讓已經(jīng)完成的消息的wrap_handler處理完畢,也即插入一個(gè)隊(duì)列中,然后再統(tǒng)一處理之。這里的wrap_handler是一個(gè)class,在post的時(shí)候,用如下代碼:

io_service.post(pri_queue.wrap(0, low_priority_handler));或者  acceptor.async_accept(server_socket, pri_queue.wrap(100, high_priority_handler));

  template <typename Handler> wrapped_handler<Handler> handler_priority_queue::wrap(int priority, Handler handler) 
  {    return wrapped_handler<Handler>(*this, priority, handler);  }

參見(jiàn)boost_asio/example/invocation/prioritised_handlers.cpp

這個(gè)sample也同時(shí)表現(xiàn)了wrap的使用場(chǎng)景。

也即把handler以及參數(shù)都wrap成一個(gè)object,然后把object插入一個(gè)隊(duì)列,在pri_queue.execute_all中按優(yōu)先級(jí)統(tǒng)一處理。

run的作用是處理消息,如果有消息未完成將一直等待到所有消息完成并處理之后才退出。

reset和stop

文檔中reset的解釋是重置io_service以便下一次調(diào)用。

當(dāng) run,run_one,poll,poll_one是被stop掉導(dǎo)致退出,或者由于完成了所有任務(wù)(正常退出)導(dǎo)致退出時(shí),在調(diào)用下一次 run,run_one,poll,poll_one之前,必須調(diào)用此函數(shù)。reset不能在run,run_one,poll,poll_one正在運(yùn) 行時(shí)調(diào)用。如果是消息處理handler(用戶代碼)拋出異常,則可以在處理之后直接繼續(xù)調(diào)用 io.run,run_one,poll,poll_one。 例如:

boost::asio::io_service io_service;
...
for (;;)
{
try
{
io_service.run();
break; // run() exited normally
}
catch (my_exception& e)
{
// Deal with exception as appropriate.
}
}
在拋出了異常的情況下,stopped_還沒(méi)來(lái)得及被asio設(shè)置為1,所以無(wú)需調(diào)用reset。

reset函數(shù)的代碼僅有一行:

void reset()

{

::InterlockedExchange(&stopped_, 0);

}

也即,當(dāng)io.stop時(shí),會(huì)設(shè)置stopped_=1。當(dāng)完成所有任務(wù)時(shí),也會(huì)設(shè)置。

總的來(lái)說(shuō),單線程情況下,不管io.run是如何退出的,在下一次調(diào)用io.run之前調(diào)用一次reset沒(méi)有什么壞處。例如:

for(;;)

{

try

{

io.run();

}

catch(…)

{

}

io.reset();

}

如果是多線程在運(yùn)行io.run,則應(yīng)該小心,因?yàn)閞eset必須是所有的run,run_one,poll,poll_one退出后才能調(diào)用。

文檔中的stop的解釋是停止io_service的處理循環(huán)。

此函數(shù)不是阻塞函數(shù),也即,它僅僅只是給iocp發(fā)送一個(gè)退出消息而并不是等待其真正退出。因?yàn)閜oll和poll_one本來(lái)就不等待 (GetQueuedCompletionStatus時(shí)timeout = 0),所以此函數(shù)對(duì)poll和poll_one無(wú)意義。對(duì)于run_one來(lái)說(shuō),如果該事件還未完成,則run_one會(huì)立刻返回。如果該事件已經(jīng)完成, 并且還在處理中,則stop并無(wú)特殊意義(會(huì)等待handler完成后自然退出)。對(duì)于run來(lái)說(shuō),stop的調(diào)用會(huì)導(dǎo)致run中的 GetQueuedCompletionStatus立刻返回。并且由于設(shè)置了stopped = 1,此前完成的消息的handlers也不會(huì)被調(diào)用。考慮一下這種情況:在io.stop之前,有1k個(gè)消息已經(jīng)完成但尚未處理,io.run正在依次從 GetQueuedCompletionStatus中獲得信息并且調(diào)用handlers,調(diào)用io.stop設(shè)置stopped=1將導(dǎo)致后許 GetQueuedCompletionStatus返回的消息直接被丟棄,直到收到退出消息并退出io.run為止。

void stop()

{

if (::InterlockedExchange(&stopped_, 1) == 0)

{

if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))

{

DWORD last_error = ::GetLastError();

boost::system::system_error e(

boost::system::error_code(last_error,

boost::asio::error::get_system_category()),

"pqcs");

boost::throw_exception(e);

}

}

}

注意除了讓當(dāng)前代碼退出之外還有一個(gè)副作用就是設(shè)置了stopped_=1。這個(gè)副作用導(dǎo)致在stop之后如果不調(diào)用reset,所有run,run_one,poll,poll_one都將直接退出。

另一個(gè)需要注意的是,stop會(huì)導(dǎo)致所有未完成的消息以及完成了但尚未處理得消息都直接被丟棄,不會(huì)導(dǎo)致handlers倍調(diào)用。

注意這兩個(gè)函數(shù)都不會(huì)CloseHandle(iocp.handle_),那是析構(gòu)函數(shù)干的事情。

注意此處有個(gè)細(xì)節(jié):一次PostQueuedCompletionStatus僅導(dǎo)致一次 GetQueuedCompletionStatus返回,那么如果有多個(gè)thread此時(shí)都在io.run,并且block在 GetQueuedCompletionStatus時(shí),調(diào)用io.stop將PostQueuedCompletionStatus并且導(dǎo)致一個(gè) thread的GetQueuedCompletionStatus返回。那么其他的thread呢?進(jìn)入io_service的do_one(由run 函數(shù)調(diào)用)代碼可以看到,當(dāng)GetQueuedCompletionStatus返回并且發(fā)現(xiàn)是退出消息時(shí),會(huì)再發(fā)送一次 PostQueuedCompletionStatus。代碼如下:

  else

  {

    // Relinquish responsibility for dispatching timers. If the io_service

    // is not being stopped then the thread will get an opportunity to

    // reacquire timer responsibility on the next loop iteration.

    if (dispatching_timers)

    {

      ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);

    }

    // The stopped_ flag is always checked to ensure that any leftover

    // interrupts from a previous run invocation are ignored.

    if (::InterlockedExchangeAdd(&stopped_, 0) != 0)

    {

      // Wake up next thread that is blocked on GetQueuedCompletionStatus.

      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))

      {

        last_error = ::GetLastError();

        ec = boost::system::error_code(last_error,

            boost::asio::error::get_system_category());

        return 0;

      }

      ec = boost::system::error_code();

      return 0;

    }

  }

}

Wrap

這個(gè)函數(shù)是一個(gè)語(yǔ)法糖。

Void func(int a);

io_service.wrap(func)(a);

相當(dāng)于io_service.dispatch(bind(func,a));

可以保存io_service.wrap(func)到g,以便在稍后某些時(shí)候調(diào)用g(a);

例如:

socket_.async_read_some(boost::asio::buffer(buffer_),      strand_.wrap( 
        boost::bind(&connection::handle_read, shared_from_this(), 
          boost::asio::placeholders::error, 
          boost::asio::placeholders::bytes_transferred)));

這是一個(gè)典型的wrap用法。注意async_read_some要求的參數(shù)是一個(gè)handler,在read_some結(jié)束后被調(diào)用。由于希望真正被調(diào)用的handle_read是串行化的,在這里再post一個(gè)消息給io_service。以上代碼類似于:

void A::func(error,bytes_transferred)

{

strand_.dispatch(boost::bind(handle_read,shared_from_this(),error,bytes_transferred);

}

socket_.async_read_some(boost::asio::buffer(buffer_), func);

注意1點(diǎn):

io_service.dispatch(bind(func,a1,…an)),這里面都是傳值,無(wú)法指定bind(func,ref(a1)…an)); 所以如果要用ref語(yǔ)義,則應(yīng)該在傳入wrap時(shí)顯式指出。例如:

void func(int& i){i+=1;}

void main()

{

int i = 0;

boost::asio::io_service io;

io.wrap(func)(boost::ref(i));

io.run();

printf("i=%d\n");

}

當(dāng)然在某些場(chǎng)合下,傳遞shared_ptr也是可以的(也許更好)。

從handlers拋出的異常的影響

當(dāng)handlers拋出異常時(shí),該異常會(huì)傳遞到本線程最外層的io.run,run_one,poll,poll_one,不會(huì)影響其他線程。捕獲該異常是程序員自己的責(zé)任。

例如:

boost::asio::io_service io_service;

Thread1,2,3,4()

{

for (;;)

{

try

{

io_service.run();

break; // run() exited normally

}

catch (my_exception& e)

{

// Deal with exception as appropriate.

}

}

}

Void func(void)

{

throw 1;

}

Thread5()

{

io_service.post(func);

}

注意這種情況下無(wú)需調(diào)用io_service.reset()。

這種情況下也不能調(diào)用reset,因?yàn)檎{(diào)用reset之前必須讓所有其他線程正在調(diào)用的io_service.run退出。(reset調(diào)用時(shí)不能有任何run,run_one,poll,poll_one正在運(yùn)行)

Work

有些應(yīng)用程序希望在沒(méi)有pending的消息時(shí),io.run也不退出。比如io.run運(yùn)行于一個(gè)后臺(tái)線程,該線程在程序的異步請(qǐng)求發(fā)出之前就啟動(dòng)了。

可以通過(guò)如下代碼實(shí)現(xiàn)這種需求:

main()

{

boost::asio::io_service io_service;

boost::asio::io_service::work work(io_service);

Create thread

Getchar();

}

Thread()

{

Io_service.run();

}

這種情況下,如果work不被析構(gòu),該線程永遠(yuǎn)不會(huì)退出。在work不被析構(gòu)得情況下就讓其退出,可以調(diào)用io.stop。這將導(dǎo)致 io.run立刻退出,所有未完成的消息都將丟棄。已完成的消息(但尚未進(jìn)入handler的)也不會(huì)調(diào)用其handler函數(shù)(由于在stop中設(shè)置了 stopped_= 1)。

如果希望所有發(fā)出的異步消息都正常處理之后io.run正常退出,work對(duì)象必須析構(gòu),或者顯式的刪除。

boost::asio::io_service io_service;

auto_ptr<boost::asio::io_service::work> work(

new boost::asio::io_service::work(io_service));

...

work.reset(); // Allow run() to normal exit.

work是一個(gè)很小的輔助類,只支持構(gòu)造函數(shù)和析構(gòu)函數(shù)。(還有一個(gè)get_io_service返回所關(guān)聯(lián)的io_service)

代碼如下:

inline io_service::work::work(boost::asio::io_service& io_service)

: io_service_(io_service)

{

io_service_.impl_.work_started();

}

inline io_service::work::work(const work& other)

: io_service_(other.io_service_)

{

io_service_.impl_.work_started();

}

inline io_service::work::~work()

{

io_service_.impl_.work_finished();

}

void work_started()

{

::InterlockedIncrement(&outstanding_work_);

}

// Notify that some work has finished.

void work_finished()

{

if (::InterlockedDecrement(&outstanding_work_) == 0)

stop();

}

可以看出構(gòu)造一個(gè)work時(shí),outstanding_work_+1,使得io.run在完成所有異步消息后判斷outstanding_work_時(shí)不會(huì)為0,因此會(huì)繼續(xù)調(diào)用GetQueuedCompletionStatus并阻塞在這個(gè)函數(shù)上。

而析構(gòu)函數(shù)中將其-1,并判斷其是否為0,如果是,則post退出消息給GetQueuedCompletionStatus讓其退出。

因此work如果析構(gòu),則io.run會(huì)在處理完所有消息之后正常退出。work如果不析構(gòu),則io.run會(huì)一直運(yùn)行不退出。如果用戶直接調(diào)用io.stop,則會(huì)讓io.run立刻退出。

特別注意的是,work提供了一個(gè)拷貝構(gòu)造函數(shù),因此可以直接在任意地方使用。對(duì)于一個(gè)io_service來(lái)說(shuō),有多少個(gè)work實(shí)例關(guān) 聯(lián),則outstanding_work_就+1了多少次,只有關(guān)聯(lián)到同一個(gè)io_service的work全被析構(gòu)之后,io.run才會(huì)在所有消息處 理結(jié)束之后正常退出。

strand

strand是另一個(gè)輔助類,提供2個(gè)接口dispatch和post,語(yǔ)義和io_service的dispatch和post類似。區(qū) 別在于,同一個(gè)strand所發(fā)出的dispatch和post絕對(duì)不會(huì)并行執(zhí)行,dispatch和post所包含的handlers也不會(huì)并行。因此 如果希望串行處理每一個(gè)tcp連接,則在accept之后應(yīng)該在該連接的數(shù)據(jù)結(jié)構(gòu)中構(gòu)造一個(gè)strand,并且所有dispatch/post(recv /send)操作都由該strand發(fā)出。strand的作用巨大,考慮如下場(chǎng)景:有多個(gè)thread都在執(zhí)行async_read_some,那么由于 線程調(diào)度,很有可能后接收到的包先被處理,為了避免這種情況,就只能收完數(shù)據(jù)后放入一個(gè)隊(duì)列中,然后由另一個(gè)線程去統(tǒng)一處理。

void connection::start() 

socket_.async_read_some(boost::asio::buffer(buffer_), 
strand_.wrap( 
boost::bind(&connection::handle_read, shared_from_this(), 
boost::asio::placeholders::error, 
boost::asio::placeholders::bytes_transferred))); 
}

不使用strand的處理方式:

前端tcp iocp收包,并且把同一個(gè)tcp連接的包放入一個(gè)list,如果list以前為空,則post一個(gè)消息給后端vnn iocp。后端vnn iocp收到post的消息后循環(huán)從list中獲取數(shù)據(jù),并且處理,直到list為空為止。處理結(jié)束后重新調(diào)用 GetQueuedCompletionStatus進(jìn)入等待。如果前端tcp iocp發(fā)現(xiàn)list過(guò)大,意味著處理速度小于接收速度,則不再調(diào)用iocpRecv,并且設(shè)置標(biāo)志,當(dāng)vnn iocp thread處理完了當(dāng)前所有積壓的數(shù)據(jù)包后,檢查這個(gè)標(biāo)志,重新調(diào)用一次iocpRecv。

使用strand的處理方式:

前端tcp iocp收包,收到包后直接通過(guò)strand.post(on_recved)發(fā)給后端vnn iocp。后端vnn iocp處理完之后再調(diào)用一次strand.async_read_some。

這兩種方式我沒(méi)看出太大區(qū)別來(lái)。如果對(duì)數(shù)據(jù)包的處理的確需要阻塞操作,例如db query,那么使用后端iocp以及后端thread是值得考慮的。這種情況下,前端iocp由于僅用來(lái)異步收發(fā)數(shù)據(jù),因此1個(gè)thread就夠了。在 確定使用2級(jí)iocp的情況下,前者似乎更為靈活,也沒(méi)有增加什么開銷。

值得討論的是,如果后端多個(gè)thread都處于db query狀態(tài),那么實(shí)際上此時(shí)依然沒(méi)有thread可以提供數(shù)據(jù)處理服務(wù),因此2級(jí)iocp意義其實(shí)就在于在這種情況下,前端tcp iocp依然可以accept,以及recv第一次數(shù)據(jù),不會(huì)導(dǎo)致用戶connect不上的情況。在后端thread空閑之后會(huì)處理這期間的recv到的 數(shù)據(jù)并在此async_read_some。

如果是單級(jí)iocp(假定handlers沒(méi)有阻塞操作),多線程,那么strand的作用很明顯。這種情況下,很明顯應(yīng)該讓一個(gè)tcp連接的數(shù)據(jù)處理過(guò)程串行化。

Strand的實(shí)現(xiàn)原理

Strand內(nèi)部實(shí)現(xiàn)機(jī)制稍微有點(diǎn)復(fù)雜。每次發(fā)出strand請(qǐng)求(例如 async_read(strand_.wrap(funobj1))),strand再次包裹了一次成為funobj2。在async_read完成 時(shí),系統(tǒng)調(diào)用funobj2,檢查是否正在執(zhí)行該strand所發(fā)出的完成函數(shù)(檢查該strand的一個(gè)標(biāo)志位),如果沒(méi)有,則直接調(diào)用 funobj2。如果有,則檢查是否就是當(dāng)前thread在執(zhí)行,如果是,則直接調(diào)用funobj2(這種情況可能發(fā)生在嵌套調(diào)用的時(shí)候,但并不產(chǎn)生同步 問(wèn)題,就像同一個(gè)thread可以多次進(jìn)入同一個(gè)critical_session一樣)。如果不是,則把該funobj2插入到strand內(nèi)部維護(hù)的 一個(gè)隊(duì)列中。