前段時間用了ACE_TP_Reactor做了一個東西,但是對這塊東西不是很有底,所以借著假期仔細的看了一下這一塊的東西,又上網查了一下相關的資料。
在Addison-Wesley - C++NetworkProgrammingVol2的4.3 The ACE_TP_Reactor Class中有這樣幾句話,讓我足足想了一天。
1.Multiple threads running an
ACE_TP_Reactor event loop can process
events concurrently on different handles
2.They can also dispatch timeout and I/O callback methods concurrently on the same
event handler
3.The only serialization in the
ACE_TP_Reactor occurs when I/O events
occur concurrently on the
same handle
4.After a thread obtains a set of active handles from
select(), the other
reactor threads dispatch from that handle set instead of calling
select() again
其實上面的3句話主要表達這樣的幾層含義
1.并行處理不同handle上的事件
2.并行處理同一event handler上的超時回調函數和I/O回調函數
3.多個線程串行處理同一個handle上的I/O事件。
4.雖然TP_Reactor是用leader/follow模式輪流調用select,但是如果一個select()獲得了多個激活的handle,那么其他的線程會分發這些handle ,而不是去再次調用select.
?(這點還沒有想通,也沒有看見是如何實現的?)
“多個線程串行處理同一個handle上的I/O事件”??? ?? 這個是如何達到的呢?ACE源碼中,當處理I/O事件的時候,會將
HANDLE掛起,使得不再對該
HANDLE做事件偵聽。來達到同一個
handle上的I/O事件是被多個線程串行地處理。"并行處理同一event handler上的超時回掉函數和I/O回調函數"??? ?? 這樣好像就比較麻煩了。因為這就意味著TP_Reactor只保證同一個handle下不會有多線程同時調用I/O事件,但是卻有可能同時調用超時回調函數和I/O回調函數。如果在這兩個函數中有對數據的訪問和操作,這就意味著需要有鎖的引入。例外,如果在定時器處理中,超過定時的事件間隔,就會有令一個線程再次調用定時器的處理函數,一下子引入了很多同步的問題。如何解決這個問題呢?
方法一:
??? ??? ??? 更改ACE的源碼,象處理socket事件一樣,在處理定時事件的時候,也把HANDLE掛起。來自http://cpunion.cnblogs.com/archive/2005/08/09/210941.html
int
ACE_TP_Reactor::handle_timer_events?(int?&?/*event_count*/,
?????????????????????????????????????ACE_TP_Token_Guard?&guard)
{
??//?Get?the?current?time
??ACE_Time_Value?cur_time?(this->timer_queue_->gettimeofday?()?+
???????????????????????????this->timer_queue_->timer_skew?());
??//?Look?for?a?node?in?the?timer?queue?whose?timer?<=?the?present
??//?time.
??ACE_Timer_Node_Dispatch_Info?info;
??if?(this->timer_queue_->dispatch_info?(cur_time,
?????????????????????????????????????????info))
????{
????????//?********?fixed?by?lijie?***********
????????if?(info.type_->get_handle?()?!=?ACE_INVALID_HANDLE)
????????{
????????????if?(this->is_suspended_i?(info.type_->get_handle?()))
????????????????return?0;
????????????this->suspend_handler?(info.type_->get_handle?());
????????}
????????//?********?end?**********************
??????const?void?*upcall_act?=?0;
??????//?Preinvoke.
??????this->timer_queue_->preinvoke?(info,
?????????????????????????????????????cur_time,
?????????????????????????????????????upcall_act);
??????
??????//?Release?the?token?before?dispatching?notifies
??????guard.release_token?();
??????//?call?the?functor
??????this->timer_queue_->upcall?(info,
??????????????????????????????????cur_time);
??????//?Postinvoke
??????this->timer_queue_->postinvoke?(info,
??????????????????????????????????????cur_time,
??????????????????????????????????????upcall_act);
??????//?We?have?dispatched?a?timer
??????return?1;
????}
??return?0;
} handle_timer處理完以后,返回以前,加上這句話
this->reactor ()->resume_handler (this->get_handle ());
當然別忘了為Handler編寫get_handle()函數:
ACE_HANDLE?Test_Handler::get_handle?()?const
{
????return?this->peer?().get_handle?();
}
方法二:
??? ?? ?? ??
利用ACE_Pipe和ACE_Message_Queue把所有的事件都排隊到同一個I/O HANDLE上去,再由ACE_TP_Reactor通過多個線程順序串行地觸發我們舊的event_handler來處理這些已經排好隊的事件/消息。我比較贊成用這樣方法。該方法來自:http://blog.csdn.net/zhdaniel/archive/2006/06/29/850888.aspx
???

??? ??
方法三:
??? ?? ?? ?? ^_^干脆就不要對同一個event? handler注冊I/O事件和其他事件。