前段時(shí)間用了ACE_TP_Reactor做了一個(gè)東西,但是對(duì)這塊東西不是很有底,所以借著假期仔細(xì)的看了一下這一塊的東西,又上網(wǎng)查了一下相關(guān)的資料。
在Addison-Wesley - C++NetworkProgrammingVol2的4.3 The ACE_TP_Reactor Class中有這樣幾句話,讓我足足想了一天。
1.Multiple threads running an
ACE_TP_Reactor event loop can process
events concurrently on different handles
2.They can also dispatch timeout and I/O callback methods concurrently on the same
event handler
3.The only serialization in the
ACE_TP_Reactor occurs when I/O events
occur concurrently on the
same handle
4.After a thread obtains a set of active handles from
select(), the other
reactor threads dispatch from that handle set instead of calling
select() again
其實(shí)上面的3句話主要表達(dá)這樣的幾層含義
1.并行處理不同handle上的事件
2.并行處理同一event handler上的超時(shí)回調(diào)函數(shù)和I/O回調(diào)函數(shù)
3.多個(gè)線程串行處理同一個(gè)handle上的I/O事件。
4.雖然TP_Reactor是用leader/follow模式輪流調(diào)用select,但是如果一個(gè)select()獲得了多個(gè)激活的handle,那么其他的線程會(huì)分發(fā)這些handle ,而不是去再次調(diào)用select.
?(這點(diǎn)還沒有想通,也沒有看見是如何實(shí)現(xiàn)的?)
“多個(gè)線程串行處理同一個(gè)handle上的I/O事件”??? ?? 這個(gè)是如何達(dá)到的呢?ACE源碼中,當(dāng)處理I/O事件的時(shí)候,會(huì)將
HANDLE掛起,使得不再對(duì)該
HANDLE做事件偵聽。來達(dá)到同一個(gè)
handle上的I/O事件是被多個(gè)線程串行地處理。"并行處理同一event handler上的超時(shí)回掉函數(shù)和I/O回調(diào)函數(shù)"??? ?? 這樣好像就比較麻煩了。因?yàn)檫@就意味著TP_Reactor只保證同一個(gè)handle下不會(huì)有多線程同時(shí)調(diào)用I/O事件,但是卻有可能同時(shí)調(diào)用超時(shí)回調(diào)函數(shù)和I/O回調(diào)函數(shù)。如果在這兩個(gè)函數(shù)中有對(duì)數(shù)據(jù)的訪問和操作,這就意味著需要有鎖的引入。例外,如果在定時(shí)器處理中,超過定時(shí)的事件間隔,就會(huì)有令一個(gè)線程再次調(diào)用定時(shí)器的處理函數(shù),一下子引入了很多同步的問題。如何解決這個(gè)問題呢?
方法一:
??? ??? ??? 更改ACE的源碼,象處理socket事件一樣,在處理定時(shí)事件的時(shí)候,也把HANDLE掛起。來自http://cpunion.cnblogs.com/archive/2005/08/09/210941.html
int
ACE_TP_Reactor::handle_timer_events?(int?&?/*event_count*/,
?????????????????????????????????????ACE_TP_Token_Guard?&guard)
{
??//?Get?the?current?time
??ACE_Time_Value?cur_time?(this->timer_queue_->gettimeofday?()?+
???????????????????????????this->timer_queue_->timer_skew?());
??//?Look?for?a?node?in?the?timer?queue?whose?timer?<=?the?present
??//?time.
??ACE_Timer_Node_Dispatch_Info?info;
??if?(this->timer_queue_->dispatch_info?(cur_time,
?????????????????????????????????????????info))
????{
????????//?********?fixed?by?lijie?***********
????????if?(info.type_->get_handle?()?!=?ACE_INVALID_HANDLE)
????????{
????????????if?(this->is_suspended_i?(info.type_->get_handle?()))
????????????????return?0;
????????????this->suspend_handler?(info.type_->get_handle?());
????????}
????????//?********?end?**********************
??????const?void?*upcall_act?=?0;
??????//?Preinvoke.
??????this->timer_queue_->preinvoke?(info,
?????????????????????????????????????cur_time,
?????????????????????????????????????upcall_act);
??????
??????//?Release?the?token?before?dispatching?notifies
??????guard.release_token?();
??????//?call?the?functor
??????this->timer_queue_->upcall?(info,
??????????????????????????????????cur_time);
??????//?Postinvoke
??????this->timer_queue_->postinvoke?(info,
??????????????????????????????????????cur_time,
??????????????????????????????????????upcall_act);
??????//?We?have?dispatched?a?timer
??????return?1;
????}
??return?0;
} handle_timer處理完以后,返回以前,加上這句話
this->reactor ()->resume_handler (this->get_handle ());
當(dāng)然別忘了為Handler編寫get_handle()函數(shù):
ACE_HANDLE?Test_Handler::get_handle?()?const
{
????return?this->peer?().get_handle?();
}
方法二:
??? ?? ?? ??
利用ACE_Pipe和ACE_Message_Queue把所有的事件都排隊(duì)到同一個(gè)I/O HANDLE上去,再由ACE_TP_Reactor通過多個(gè)線程順序串行地觸發(fā)我們舊的event_handler來處理這些已經(jīng)排好隊(duì)的事件/消息。我比較贊成用這樣方法。該方法來自:http://blog.csdn.net/zhdaniel/archive/2006/06/29/850888.aspx
???

??? ??
方法三:
??? ?? ?? ?? ^_^干脆就不要對(duì)同一個(gè)event? handler注冊(cè)I/O事件和其他事件。