首先從定義一個(gè)reactor開始。
ACE_TP_Reactor select_reactor_one(g_unOneMaxHandle, 0, 0, 0, 1);
上面的這句話觸發(fā)了一下的一些行為。主要就是給event_handlers_分配了內(nèi)存。這里就決定了你能支持多少條連接
int
ACE_Select_Reactor_Handler_Repository::open?(size_t?size)

{
??ACE_TRACE?(
"
ACE_Select_Reactor_Handler_Repository::open
"
);
??
this
->
max_size_?
=
?size;
??
this
->
max_handlep1_?
=
?
0
;

#if
?defined?(ACE_WIN32)
??
//
?Try?to?allocate?the?memory.*
??ACE_NEW_RETURN?(
this
->
event_handlers_,
??????????????????ACE_Event_Tuple[size],
??????????????????
-
1
);

??
//
?Initialize?the?ACE_Event_Handler?*?to?{?ACE_INVALID_HANDLE,?0?}.
??
for
?(size_t?h?
=
?
0
;?h?
<
?size;?
++
h)

????
{
??????ACE_SELECT_REACTOR_HANDLE?(h)?
=
?ACE_INVALID_HANDLE;??
//
對event_handlers_進(jìn)行初始化
??????ACE_SELECT_REACTOR_EVENT_HANDLER?(
this
,?h)?
=
?
0
;
????}
#else
??
//
?Try?to?allocate?the?memory.
??ACE_NEW_RETURN?(
this
->
event_handlers_,
??????????????????ACE_Event_Handler?
*
[size],
??????????????????
-
1
);

??
//
?Initialize?the?ACE_Event_Handler?*?to?NULL.
??
for
?(size_t?h?
=
?
0
;?h?
<
?size;?
++
h)
????ACE_SELECT_REACTOR_EVENT_HANDLER?(
this
,?h)?
=
?
0
;
#endif
?/*?ACE_WIN32?*/
??
//
?Try?to?increase?the?number?of?handles?if?<size>?is?greater?than???檢查給定的size是否超出了ACE_MaxHandle
??
//
?the?current?limit.
??
return
?ACE::set_handle_limit?(static_cast
<
int
>
?(size),?
1
);
}
接下來就是注冊監(jiān)聽了。
acceptor.open(addr);
那么這句話又干了些什么呢?
int
ACE_SOCK_Acceptor::open?(const?ACE_Addr?&local_sap,
?????????????????????????int?reuse_addr,
?????????????????????????int?protocol_family,
?????????????????????????int?backlog,
?????????????????????????int?protocol)


{
??ACE_TRACE?("ACE_SOCK_Acceptor::open");

??if?(local_sap?!=?ACE_Addr::sap_any)
????protocol_family?=?local_sap.get_type?();
??else?if?(protocol_family?==?PF_UNSPEC)

????
{
#if?defined?(ACE_HAS_IPV6)
??????protocol_family?=?ACE::ipv6_enabled?()???PF_INET6?:?PF_INET;
#else
??????protocol_family?=?PF_INET;
#endif?/*?ACE_HAS_IPV6?*/
????}

??if?(ACE_SOCK::open?(SOCK_STREAM,?????????????????
??????????????????????protocol_family,
??????????????????????protocol,
??????????????????????reuse_addr)?==?-1)
????return?-1;
??else
????return?this->shared_open?(local_sap,
??????????????????????????????protocol_family,
??????????????????????????????backlog);
}
int
ACE_SOCK::open?(int?type,
????????????????int?protocol_family,
????????????????int?protocol,
????????????????int?reuse_addr)


{
??ACE_TRACE?("ACE_SOCK::open");
??int?one?=?1;

??this->set_handle?(ACE_OS::socket?(protocol_family,
????????????????????????????????????type,
????????????????????????????????????protocol));

??if?(this->get_handle?()?==?ACE_INVALID_HANDLE)
????return?-1;
??else?if?(protocol_family?!=?PF_UNIX
???????????&&?reuse_addr
???????????&&?this->set_option?(SOL_SOCKET,
????????????????????????????????SO_REUSEADDR,
????????????????????????????????&one,
????????????????????????????????sizeof?one)?==?-1)

????
{
??????this->close?();
??????return?-1;
????}
??return?0;
}
ACE_INLINE?ACE_HANDLE
ACE_OS::socket?(int?domain,
????????????????int?type,
????????????????int?proto)


{
??ACE_OS_TRACE?("ACE_OS::socket");
??ACE_SOCKCALL_RETURN?(::socket?(domain,
?????????????????????????????????type,
?????????????????????????????????proto),
???????????????????????ACE_HANDLE,
???????????????????????ACE_INVALID_HANDLE);
}

int
ACE_SOCK_Acceptor::shared_open?(const?ACE_Addr?&local_sap,
????????????????????????????????int?protocol_family,
????????????????????????????????int?backlog)


{
??ACE_TRACE?("ACE_SOCK_Acceptor::shared_open");
??int?error?=?0;

#if?defined?(ACE_HAS_IPV6)
??ACE_ASSERT?(protocol_family?==?PF_INET?||?protocol_family?==?PF_INET6);

??if?(protocol_family?==?PF_INET6)

????
{
??????sockaddr_in6?local_inet6_addr;
??????ACE_OS::memset?(reinterpret_cast<void?*>?(&local_inet6_addr),
??????????????????????0,
??????????????????????sizeof?local_inet6_addr);

??????if?(local_sap?==?ACE_Addr::sap_any)

????????
{
??????????local_inet6_addr.sin6_family?=?AF_INET6;
??????????local_inet6_addr.sin6_port?=?0;
??????????local_inet6_addr.sin6_addr?=?in6addr_any;
????????}
??????else
????????local_inet6_addr?=?*reinterpret_cast<sockaddr_in6?*>?(local_sap.get_addr?());

??????//?We?probably?don't?need?a?bind_port?written?here.
??????//?There?are?currently?no?supported?OS's?that?define
??????//?ACE_LACKS_WILDCARD_BIND.
??????if?(ACE_OS::bind?(this->get_handle?(),
????????????????????????reinterpret_cast<sockaddr?*>?(&local_inet6_addr),
????????????????????????sizeof?local_inet6_addr)?==?-1)
????????error?=?1;
????}
??else
#endif
??if?(protocol_family?==?PF_INET)

????
{

??????sockaddr_in?local_inet_addr;??????????????????????????????????/**//***************************addr**********************/
??????ACE_OS::memset?(reinterpret_cast<void?*>?(&local_inet_addr),
??????????????????????0,
??????????????????????sizeof?local_inet_addr);

??????if?(local_sap?==?ACE_Addr::sap_any)

????????
{
??????????local_inet_addr.sin_port?=?0;
????????}
??????else
????????local_inet_addr?=?*reinterpret_cast<sockaddr_in?*>?(local_sap.get_addr?());
??????if?(local_inet_addr.sin_port?==?0)

????????
{
??????????if?(ACE::bind_port?(this->get_handle?(),
??????????????????????????????ACE_NTOHL?(ACE_UINT32?(local_inet_addr.sin_addr.s_addr)))?==?-1)
????????????error?=?1;
????????}

??????else?if?(ACE_OS::bind?(this->get_handle?(),????????????????????/**//***********************bind**********************/
?????????????????????????????reinterpret_cast<sockaddr?*>?(&local_inet_addr),
?????????????????????????????sizeof?local_inet_addr)?==?-1)
????????error?=?1;
????}
??else?if?(ACE_OS::bind?(this->get_handle?(),
?????????????????????????(sockaddr?*)?local_sap.get_addr?(),
?????????????????????????local_sap.get_size?())?==?-1)
????error?=?1;

??if?(error?!=?0

??????||?ACE_OS::listen?(this->get_handle?(),??/**//****************listen?***************************/
?????????????????????????backlog)?==?-1)

????
{
??????error?=?1;
??????this->close?();
????}

??return?error???-1?:?0;
}接下來就是注冊了。
return m_Reactor->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
?那么這句話又做了什么呢?
????ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler
??(ACE_Event_Handler?*handler,
???ACE_Reactor_Mask?mask)


{
??ACE_TRACE?("ACE_Select_Reactor_T::register_handler");
??ACE_MT?(ACE_GUARD_RETURN?(ACE_SELECT_REACTOR_TOKEN,?ace_mon,?this->token_,?-1));

??return?this->register_handler_i?(handler->get_handle?(),?handler,?mask);??/**//**************所以一定要實(shí)現(xiàn)get_handle()方法**************/
}

template?<class?ACE_SELECT_REACTOR_TOKEN>?int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i
??(ACE_HANDLE?handle,
???ACE_Event_Handler?*event_handler,
???ACE_Reactor_Mask?mask)


{
??ACE_TRACE?("ACE_Select_Reactor_T::register_handler_i");

??//?Insert?the?<handle,?event_handle>?tuple?into?the?Handler
??//?Repository.
??return?this->handler_rep_.bind?(handle,?event_handler,?mask);
}
int
ACE_Select_Reactor_Handler_Repository::bind?(ACE_HANDLE?handle,
?????????????????????????????????????????????ACE_Event_Handler?*event_handler,
?????????????????????????????????????????????ACE_Reactor_Mask?mask)


{
??ACE_TRACE?("ACE_Select_Reactor_Handler_Repository::bind");

??if?(handle?==?ACE_INVALID_HANDLE)
????handle?=?event_handler->get_handle?();

??if?(this->invalid_handle?(handle))
????return?-1;

??//?Is?this?handle?already?in?the?Reactor?
??int?existing_handle?=?0;

#if?defined?(ACE_WIN32)

??ssize_t?assigned_slot?=?-1;

??for?(ssize_t?i?=?0;?i?<?this->max_handlep1_;?++i)

????
{
??????//?If?handle?is?already?registered.
??????if?(ACE_SELECT_REACTOR_HANDLE?(i)?==?handle)

????????
{
??????????//?Cannot?use?a?different?handler?for?an?existing?handle.
??????????if?(ACE_SELECT_REACTOR_EVENT_HANDLER?(this,?i)?!=
??????????????event_handler)
????????????return?-1;

??????????//?Remember?location.
??????????assigned_slot?=?i;

??????????//?Remember?that?this?handle?is?already?registered?in?the
??????????//?Reactor.
??????????existing_handle?=?1;

??????????//?We?can?stop?looking?now.
??????????break;
????????}
??????else
????????//?Here's?the?first?free?slot,?so?let's?take?it.
????????if?(ACE_SELECT_REACTOR_HANDLE?(i)?==?ACE_INVALID_HANDLE?&&
????????????assigned_slot?==?-1)

??????????
{
????????????assigned_slot?=?i;
??????????}
????}

??if?(assigned_slot?>?-1)
????//?We?found?a?spot.

????
{
??????ACE_SELECT_REACTOR_HANDLE?(assigned_slot)?=?handle;
??????ACE_SELECT_REACTOR_EVENT_HANDLER?(this,?assigned_slot)?=?event_handler;
????}
??else?if?(this->max_handlep1_?<?this->max_size_)???????????????????//?第一次添加一定走這里

????
{
??????//?Insert?at?the?end?of?the?active?portion.
??????ACE_SELECT_REACTOR_HANDLE?(this->max_handlep1_)?=?handle;???????//event_handlers_[max_handlep1_]?.handle_=?handle
??????ACE_SELECT_REACTOR_EVENT_HANDLER?(this,?this->max_handlep1_)?=?event_handler;??//event_handlers_[max_handlep1_].event_handle_?=?event_handler
??????++this->max_handlep1_;????????????????????????????????????????????//?max_handlep1_增加了
????}
??else

????
{
??????//?No?more?room?at?the?inn!
??????errno?=?ENOMEM;
??????return?-1;
????}

#else

??//?Check?if?this?handle?is?already?registered.
??ACE_Event_Handler?*current_handler?=
????ACE_SELECT_REACTOR_EVENT_HANDLER?(this,?handle);

??if?(current_handler)

????
{
??????//?Cannot?use?a?different?handler?for?an?existing?handle.
??????if?(current_handler?!=?event_handler)
????????return?-1;

??????//?Remember?that?this?handle?is?already?registered?in?the
??????//?Reactor.
??????existing_handle?=?1;
????}

??ACE_SELECT_REACTOR_EVENT_HANDLER?(this,?handle)?=?event_handler;

??if?(this->max_handlep1_?<?handle?+?1)
????this->max_handlep1_?=?handle?+?1;

#endif?/*?ACE_WIN32?*/

??if?(this->select_reactor_.is_suspended_i?(handle))???????????????//?檢查是否在suspend_set_集合中

????
{
??????this->select_reactor_.bit_ops?(handle,
?????????????????????????????????????mask,
?????????????????????????????????????this->select_reactor_.suspend_set_,
?????????????????????????????????????ACE_Reactor::ADD_MASK);
????}
??else

????
{
??????this->select_reactor_.bit_ops?(handle,???????????????????????????//注意這里添加到wait_set_集合中去了,這個(gè)是最要關(guān)注的地方
?????????????????????????????????????mask,????????????????????????????
?????????????????????????????????????this->select_reactor_.wait_set_,
?????????????????????????????????????ACE_Reactor::ADD_MASK);

??????//?Note?the?fact?that?we've?changed?the?state?of?the?<wait_set_>,
??????//?which?is?used?by?the?dispatching?loop?to?determine?whether?it?can
??????//?keep?going?or?if?it?needs?to?reconsult?select().
??????//?this->select_reactor_.state_changed_?=?1;
????}

??//?If?new?entry,?call?add_reference()?if?needed.
??if?(!existing_handle)
????event_handler->add_reference?();

??return?0;
}


/**//*this->select_reactor_.wait_set_中包含這樣三個(gè)變量rd_mask_/wr_mask_/ex_mask_,每一個(gè)結(jié)構(gòu)都入下所示
rd_mask_
??????size_=0?
??????max_handle_=0xffffffff?
??????mask_
??????????fd_count=0?
??????????fd_array=0x00126da8??數(shù)組長度為1024
?????nbits_?256位的固定數(shù)值的數(shù)組

*/
int

ACE_Select_Reactor_Impl::bit_ops?(ACE_HANDLE?handle,?????????????????????????????????/**//*************設(shè)置到wait_set_中******************/
??????????????????????????????????ACE_Reactor_Mask?mask,
??????????????????????????????????ACE_Select_Reactor_Handle_Set?&handle_set,
??????????????????????????????????int?ops)


{

??ACE_FDS_PTMF?ptmf??=?&ACE_Handle_Set::set_bit;
??u_long?omask?=?ACE_Event_Handler::NULL_MASK;

??//?Find?the?old?reactor?masks.??This?automatically?does?the?work?of
??//?the?GET_MASK?operation.
??if?(handle_set.rd_mask_.is_set?(handle))
????ACE_SET_BITS?(omask,?ACE_Event_Handler::READ_MASK);
??if?(handle_set.wr_mask_.is_set?(handle))
????ACE_SET_BITS?(omask,?ACE_Event_Handler::WRITE_MASK);
??if?(handle_set.ex_mask_.is_set?(handle))
????ACE_SET_BITS?(omask,?ACE_Event_Handler::EXCEPT_MASK);

??switch?(ops)

????
{
????case?ACE_Reactor::GET_MASK:
??????//?The?work?for?this?operation?is?done?in?all?cases?at?the
??????//?begining?of?the?function.
??????break;
????case?ACE_Reactor::CLR_MASK:
??????ptmf?=?&ACE_Handle_Set::clr_bit;
??????//?State?was?changed.?we?need?to?reflect?that?change?in?the
??????//?dispatch_mask?I?assume?that?only?ACE_Reactor::CLR_MASK?should
??????//?be?treated?here??which?means?we?need?to?clear?the?handle|mask
??????//?from?the?current?dispatch?handler
??????this->clear_dispatch_mask?(handle,?mask);

??????/**//*?FALLTHRU?*/
????case?ACE_Reactor::SET_MASK:

??????/**//*?FALLTHRU?*/
????case?ACE_Reactor::ADD_MASK:

??????//?The?following?code?is?rather?subtle
??Note?that?if?we?are
??????//?doing?a?ACE_Reactor::SET_MASK?then?if?the?bit?is?not?enabled
??????//?in?the?mask?we?need?to?clear?the?bit?from?the?ACE_Handle_Set.
??????//?On?the?other?hand,?if?we?are?doing?a?ACE_Reactor::CLR_MASK?or
??????//?a?ACE_Reactor::ADD_MASK?we?just?carry?out?the?operations
??????//?specified?by?the?mask.

??????//?READ,?ACCEPT,?and?CONNECT?flag?will?place?the?handle?in?the
??????//?read?set.
??????if?(ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::READ_MASK)
??????????||?ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::ACCEPT_MASK)
??????????||?ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::CONNECT_MASK))

????????
{
??????????(handle_set.rd_mask_.*ptmf)?(handle);
????????}
??????else?if?(ops?==?ACE_Reactor::SET_MASK)
????????handle_set.rd_mask_.clr_bit?(handle);

??????//?WRITE?and?CONNECT?flag?will?place?the?handle?in?the?write?set
??????if?(ACE_BIT_ENABLED?(mask,
???????????????????????????ACE_Event_Handler::WRITE_MASK)
??????????||?ACE_BIT_ENABLED?(mask,
??????????????????????????????ACE_Event_Handler::CONNECT_MASK))

????????
{
??????????(handle_set.wr_mask_.*ptmf)?(handle);
????????}
??????else?if?(ops?==?ACE_Reactor::SET_MASK)
????????handle_set.wr_mask_.clr_bit?(handle);

??????//?EXCEPT?(and?CONNECT?on?Win32)?flag?will?place?the?handle?in
??????//?the?except?set.
??????if?(ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::EXCEPT_MASK)
#if?defined?(ACE_WIN32)
??????????||?ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::CONNECT_MASK)
#endif?/*?ACE_WIN32?*/
??????????)

????????
{
??????????(handle_set.ex_mask_.*ptmf)?(handle);
????????}
??????else?if?(ops?==?ACE_Reactor::SET_MASK)
????????handle_set.ex_mask_.clr_bit?(handle);
??????break;
????default:
??????return?-1;
????}
??return?omask;
}
ACE_INLINE?void
ACE_Handle_Set::set_bit?(ACE_HANDLE?handle)


{
??ACE_TRACE?("ACE_Handle_Set::set_bit");
??if?((handle?!=?ACE_INVALID_HANDLE)
??????&&?(!this->is_set?(handle)))

????
{
#if?defined?(ACE_WIN32)
??????FD_SET?((SOCKET)?handle,
??????????????&this->mask_);
??????this->size_++;
#else?/*?ACE_WIN32?*/
#if?defined?(ACE_HAS_BIG_FD_SET)
??????if?(this->size_?==?0)
????????FD_ZERO?(&this->mask_);

??????if?(handle?<?this->min_handle_)
????????this->min_handle_?=?handle;
#endif?/*?ACE_HAS_BIG_FD_SET?*/

??????FD_SET?(handle,
??????????????&this->mask_);
??????this->size_++;

??????if?(handle?>?this->max_handle_)
????????this->max_handle_?=?handle;
#endif?/*?ACE_WIN32?*/
????}
}

然后就是分發(fā)了:
template?<class?ACE_SELECT_REACTOR_TOKEN>?int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events
??(ACE_Time_Value?*max_wait_time)


{
??ACE_TRACE?("ACE_Select_Reactor_T::handle_events");

#if?defined?(ACE_MT_SAFE)?&&?(ACE_MT_SAFE?!=?0)

??//?Stash?the?current?time?--?the?destructor?of?this?object?will
??//?automatically?compute?how?much?time?elapsed?since?this?method?was
??//?called.
??ACE_Countdown_Time?countdown?(max_wait_time);

??ACE_GUARD_RETURN?(ACE_SELECT_REACTOR_TOKEN,?ace_mon,?this->token_,?-1);

??if?(ACE_OS::thr_equal?(ACE_Thread::self?(),
?????????????????????????this->owner_)?==?0?||?this->deactivated_)
????return?-1;

??//?Update?the?countdown?to?reflect?time?waiting?for?the?mutex.
??countdown.update?();
#else
??if?(this->deactivated_)
????return?-1;
#endif?/*?ACE_MT_SAFE?*/

??return?this->handle_events_i?(max_wait_time);
}

template?<class?ACE_SELECT_REACTOR_TOKEN>?int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i
??(ACE_Time_Value?*max_wait_time)


{
??int?result?=?-1;

??ACE_SEH_TRY

????
{
??????//?We?use?the?data?member?dispatch_set_?as?the?current?dispatch
??????//?set.

??????//?We?need?to?start?from?a?clean?dispatch_set
??????this->dispatch_set_.rd_mask_.reset?();
??????this->dispatch_set_.wr_mask_.reset?();
??????this->dispatch_set_.ex_mask_.reset?();

??????int?number_of_active_handles?=
????????this->wait_for_multiple_events?(this->dispatch_set_,
????????????????????????????????????????max_wait_time);

??????result?=
????????this->dispatch?(number_of_active_handles,
????????????????????????this->dispatch_set_);?
????}
??ACE_SEH_EXCEPT?(this->release_token?())

????
{
??????//?As?it?stands?now,?we?catch?and?then?rethrow?all?Win32
??????//?structured?exceptions?so?that?we?can?make?sure?to?release?the
??????//?<token_>?lock?correctly.
????}

??return?result;
}
template?<class?ACE_SELECT_REACTOR_TOKEN>?int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::wait_for_multiple_events
(ACE_Select_Reactor_Handle_Set?&dispatch_set,
?ACE_Time_Value?*max_wait_time)


{
????
????u_long?width?=?0;
????ACE_Time_Value?timer_buf?(0);
????ACE_Time_Value?*this_timeout;

????int?number_of_active_handles?=?this->any_ready?(dispatch_set);

????//?If?there?are?any?bits?enabled?in?the?<ready_set_>?then?we'll
????//?handle?those?first,?otherwise?we'll?block?in?<select>.

????if?(number_of_active_handles?==?0)

????
{
????????do

????????
{
????????????this_timeout?=
????????????????this->timer_queue_->calculate_timeout?(max_wait_time,
????????????????&timer_buf);
????????????width?=?(u_long)?this->handler_rep_.max_handlep1?();

????????????dispatch_set.rd_mask_?=?this->wait_set_.rd_mask_;
????????????dispatch_set.wr_mask_?=?this->wait_set_.wr_mask_;
????????????dispatch_set.ex_mask_?=?this->wait_set_.ex_mask_;
????????????number_of_active_handles?=?ACE_OS::select?(int?(width),
????????????????dispatch_set.rd_mask_,
????????????????dispatch_set.wr_mask_,
????????????????dispatch_set.ex_mask_,
????????????????this_timeout);
????????}
????????while?(number_of_active_handles?==?-1?&&?this->handle_error?()?>?0);

????????if?(number_of_active_handles?>?0)

????????
{
#if?!defined?(ACE_WIN32)
????????????//?Resynchronize?the?fd_sets?so?their?"max"?is?set?properly.
????????????dispatch_set.rd_mask_.sync?(this->handler_rep_.max_handlep1?());
????????????dispatch_set.wr_mask_.sync?(this->handler_rep_.max_handlep1?());
????????????dispatch_set.ex_mask_.sync?(this->handler_rep_.max_handlep1?());
#endif?/*?ACE_WIN32?*/
????????}
????????else?if?(number_of_active_handles?==?-1)

????????
{
????????????//?Normally,?select()?will?reset?the?bits?in?dispatch_set
????????????//?so?that?only?those?filed?descriptors?that?are?ready?will
????????????//?have?bits?set.??However,?when?an?error?occurs,?the?bit
????????????//?set?remains?as?it?was?when?the?select?call?was?first?made.
????????????//?Thus,?we?now?have?a?dispatch_set?that?has?every?file
????????????//?descriptor?that?was?originally?waited?for,?which?is?not
????????????//?correct.??We?must?clear?all?the?bit?sets?because?we
????????????//?have?no?idea?if?any?of?the?file?descriptors?is?ready.
????????????//
????????????//?NOTE:?We?dont?have?a?test?case?to?reproduce?this
????????????//?problem.?But?pleae?dont?ignore?this?and?remove?it?off.
????????????dispatch_set.rd_mask_.reset?();
????????????dispatch_set.wr_mask_.reset?();
????????????dispatch_set.ex_mask_.reset?();
????????}
????}

????//?Return?the?number?of?events?to?dispatch.
????return?number_of_active_handles;
}
template?<class?ACE_SELECT_REACTOR_TOKEN>?int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch
??(int?active_handle_count,
???ACE_Select_Reactor_Handle_Set?&dispatch_set)


{
??ACE_TRACE?("ACE_Select_Reactor_T::dispatch");

??int?io_handlers_dispatched?=?0;
??int?other_handlers_dispatched?=?0;
??int?signal_occurred?=?0;
??//?The?following?do/while?loop?keeps?dispatching?as?long?as?there
??//?are?still?active?handles.??Note?that?the?only?way?we?should?ever
??//?iterate?more?than?once?through?this?loop?is?if?signals?occur
??//?while?we're?dispatching?other?handlers.

??do

????
{
??????//?Note?that?we?keep?track?of?changes?to?our?state.??If?any?of
??????//?the?dispatch_*()?methods?below?return?-1?it?means?that?the
??????//?<wait_set_>?state?has?changed?as?the?result?of?an
??????//?<ACE_Event_Handler>?being?dispatched.??This?means?that?we
??????//?need?to?bail?out?and?rerun?the?select()?loop?since?our
??????//?existing?notion?of?handles?in?<dispatch_set>?may?no?longer?be
??????//?correct.
??????//
??????//?In?the?beginning,?our?state?starts?out?unchanged.??After
??????//?every?iteration?(i.e.,?due?to?signals),?our?state?starts?out
??????//?unchanged?again.

??????this->state_changed_?=?false;

??????//?Perform?the?Template?Method?for?dispatching?all?the?handlers.

??????//?First?check?for?interrupts.
??????if?(active_handle_count?==?-1)

????????
{
??????????//?Bail?out?--?we?got?here?since?<select>?was?interrupted.
??????????if?(ACE_Sig_Handler::sig_pending?()?!=?0)

????????????
{
??????????????ACE_Sig_Handler::sig_pending?(0);

??????????????//?If?any?HANDLES?in?the?<ready_set_>?are?activated?as?a
??????????????//?result?of?signals?they?should?be?dispatched?since
??????????????//?they?may?be?time?critical
??????????????active_handle_count?=?this->any_ready?(dispatch_set);

??????????????//?Record?the?fact?that?the?Reactor?has?dispatched?a
??????????????//?handle_signal()?method.??We?need?this?to?return?the
??????????????//?appropriate?count?below.
??????????????signal_occurred?=?1;
????????????}
??????????else
????????????return?-1;
????????}

??????//?Handle?timers?early?since?they?may?have?higher?latency
??????//?constraints?than?I/O?handlers.??Ideally,?the?order?of
??????//?dispatching?should?be?a?strategy
??????else?if?(this->dispatch_timer_handlers?(other_handlers_dispatched)?==?-1)
????????//?State?has?changed?or?timer?queue?has?failed,?exit?loop.
????????break;

??????//?Check?to?see?if?there?are?no?more?I/O?handles?left?to
??????//?dispatch?AFTER?we've?handled?the?timers
??????else?if?(active_handle_count?==?0)
????????return?io_handlers_dispatched
??????????+?other_handlers_dispatched
??????????+?signal_occurred;

??????//?Next?dispatch?the?notification?handlers?(if?there?are?any?to
??????//?dispatch).??These?are?required?to?handle?multi-threads?that
??????//?are?trying?to?update?the?<Reactor>.

??????else?if?(this->dispatch_notification_handlers
???????????????(dispatch_set,
????????????????active_handle_count,
????????????????other_handlers_dispatched)?==?-1)
????????//?State?has?changed?or?a?serious?failure?has?occured,?so?exit
????????//?loop.
????????break;

??????//?Finally,?dispatch?the?I/O?handlers.
??????else?if?(this->dispatch_io_handlers
???????????????(dispatch_set,
????????????????active_handle_count,
????????????????io_handlers_dispatched)?==?-1)
????????//?State?has?changed,?so?exit?loop.
????????break;

??????//?if?state?changed,?we?need?to?re-eval?active_handle_count,
??????//?so?we?will?not?end?with?an?endless?loop
??????if?(this->state_changed_)

??????
{
??????????active_handle_count?=?this->dispatch_set_.rd_mask_.num_set?()
??????????????+?this->dispatch_set_.wr_mask_.num_set?()
??????????????+?this->dispatch_set_.ex_mask_.num_set?();
??????}
????}
??while?(active_handle_count?>?0);

??return?io_handlers_dispatched?+?other_handlers_dispatched?+?signal_occurred;
}

template?<class?ACE_SELECT_REACTOR_TOKEN>?int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_handlers
??(ACE_Select_Reactor_Handle_Set?&dispatch_set,
???int?&number_of_active_handles,
???int?&number_of_handlers_dispatched)


{
??ACE_TRACE?("ACE_Select_Reactor_T::dispatch_io_handlers");

??//?Handle?output?events?(this?code?needs?to?come?first?to?handle?the
??//?obscure?case?of?piggy-backed?data?coming?along?with?the?final
??//?handshake?message?of?a?nonblocking?connection).

??if?(this->dispatch_io_set?(number_of_active_handles,
?????????????????????????????number_of_handlers_dispatched,
?????????????????????????????ACE_Event_Handler::WRITE_MASK,
?????????????????????????????dispatch_set.wr_mask_,
?????????????????????????????this->ready_set_.wr_mask_,
?????????????????????????????&ACE_Event_Handler::handle_output)?==?-1)

????
{
??????number_of_active_handles?-=?number_of_handlers_dispatched;
??????return?-1;
????}

??//?ACE_DEBUG?((LM_DEBUG,??ACE_LIB_TEXT?("ACE_Select_Reactor_T::dispatch?-?EXCEPT\n")));
??if?(this->dispatch_io_set?(number_of_active_handles,
?????????????????????????????number_of_handlers_dispatched,
?????????????????????????????ACE_Event_Handler::EXCEPT_MASK,
?????????????????????????????dispatch_set.ex_mask_,
?????????????????????????????this->ready_set_.ex_mask_,
?????????????????????????????&ACE_Event_Handler::handle_exception)?==?-1)

????
{
??????number_of_active_handles?-=?number_of_handlers_dispatched;
??????return?-1;
????}

??//?ACE_DEBUG?((LM_DEBUG,??ACE_LIB_TEXT?("ACE_Select_Reactor_T::dispatch?-?READ\n")));
??if?(this->dispatch_io_set?(number_of_active_handles,
?????????????????????????????number_of_handlers_dispatched,
?????????????????????????????ACE_Event_Handler::READ_MASK,
?????????????????????????????dispatch_set.rd_mask_,
?????????????????????????????this->ready_set_.rd_mask_,
?????????????????????????????&ACE_Event_Handler::handle_input)?==?-1)

????
{
??????number_of_active_handles?-=?number_of_handlers_dispatched;
??????return?-1;
????}

??number_of_active_handles?-=?number_of_handlers_dispatched;
??return?0;
}
template?<class?ACE_SELECT_REACTOR_TOKEN>?int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set
??(int?number_of_active_handles,
???int?&number_of_handlers_dispatched,
???int?mask,
???ACE_Handle_Set?&dispatch_mask,
???ACE_Handle_Set?&ready_mask,
???ACE_EH_PTMF?callback)


{
??ACE_TRACE?("ACE_Select_Reactor_T::dispatch_io_set");
??ACE_HANDLE?handle;

??ACE_Handle_Set_Iterator?handle_iter?(dispatch_mask);

??while?((handle?=?handle_iter?())?!=?ACE_INVALID_HANDLE?&&
?????????number_of_handlers_dispatched?<?number_of_active_handles)

????
{
??????++number_of_handlers_dispatched;

??????this->notify_handle?(handle,
???????????????????????????mask,
???????????????????????????ready_mask,
???????????????????????????this->handler_rep_.find?(handle),
????????????????????????????callback);

??????//?clear?the?bit?from?that?dispatch?mask,
??????//?so?when?we?need?to?restart?the?iteration?(rebuilding?the?iterator
)
??????//?we?will?not?dispatch?the?already?dispatched?handlers
??????this->clear_dispatch_mask?(handle,?mask);

??????if?(this->state_changed_)

????????
{

??????????handle_iter.reset_state?();
??????????this->state_changed_?=?false;
????????}
????}

??return?0;
}
template?<class?ACE_SELECT_REACTOR_TOKEN>?void
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle
??(ACE_HANDLE?handle,
???ACE_Reactor_Mask?mask,
???ACE_Handle_Set?&ready_mask,
???ACE_Event_Handler?*event_handler,
???ACE_EH_PTMF?ptmf)


{
??ACE_TRACE?("ACE_Select_Reactor_T::notify_handle");
??//?Check?for?removed?handlers.
??if?(event_handler?==?0)
????return;

??int?reference_counting_required?=
????event_handler->reference_counting_policy?().value?()?==
????ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

??//?Call?add_reference()?if?needed.
??if?(reference_counting_required)

????
{
??????event_handler->add_reference?();
????}

??int?status?=?(event_handler->*ptmf)?(handle);

??if?(status?<?0)
????this->remove_handler_i?(handle,?mask);
??else?if?(status?>?0)
????ready_mask.set_bit?(handle);

??//?Call?remove_reference()?if?needed.
??if?(reference_counting_required)

????
{
??????event_handler->remove_reference?();
????}
}最后看看,當(dāng)我們hand_input返回-1的時(shí)候,又干了什么呢?
template?<class?ACE_SELECT_REACTOR_TOKEN>?int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i
??(ACE_HANDLE?handle,
???ACE_Reactor_Mask?mask)


{
??ACE_TRACE?("ACE_Select_Reactor_T::remove_handler_i");

??//?Unbind?this?handle.
??return?this->handler_rep_.unbind?(handle,?mask);
}
//?Remove?the?binding?of?<ACE_HANDLE>.

int
ACE_Select_Reactor_Handler_Repository::unbind?(ACE_HANDLE?handle,
???????????????????????????????????????????????ACE_Reactor_Mask?mask)


{
??ACE_TRACE?("ACE_Select_Reactor_Handler_Repository::unbind");

??size_t?slot?=?0;
??ACE_Event_Handler?*event_handler?=?this->find?(handle,?&slot);

??if?(event_handler?==?0)
????return?-1;

??//?Clear?out?the?<mask>?bits?in?the?Select_Reactor's?wait_set.
??this->select_reactor_.bit_ops?(handle,
?????????????????????????????????mask,
?????????????????????????????????this->select_reactor_.wait_set_,
?????????????????????????????????ACE_Reactor::CLR_MASK);

??//?And?suspend_set.
??this->select_reactor_.bit_ops?(handle,
?????????????????????????????????mask,
?????????????????????????????????this->select_reactor_.suspend_set_,
?????????????????????????????????ACE_Reactor::CLR_MASK);

??//?Note?the?fact?that?we've?changed?the?state?of?the?<wait_set_>,
??//?which?is?used?by?the?dispatching?loop?to?determine?whether?it?can
??//?keep?going?or?if?it?needs?to?reconsult?select().
??//?this->select_reactor_.state_changed_?=?1;

??//?If?there?are?no?longer?any?outstanding?events?on?this?<handle>
??//?then?we?can?totally?shut?down?the?Event_Handler.

??int?has_any_wait_mask?=
????(this->select_reactor_.wait_set_.rd_mask_.is_set?(handle)
?????||?this->select_reactor_.wait_set_.wr_mask_.is_set?(handle)
?????||?this->select_reactor_.wait_set_.ex_mask_.is_set?(handle));
??int?has_any_suspend_mask?=
????(this->select_reactor_.suspend_set_.rd_mask_.is_set?(handle)
?????||?this->select_reactor_.suspend_set_.wr_mask_.is_set?(handle)
?????||?this->select_reactor_.suspend_set_.ex_mask_.is_set?(handle));

??int?complete_removal?=?0;

??if?(!has_any_wait_mask?&&?!has_any_suspend_mask)

????
{
??????//?The?handle?has?been?completed?removed.
??????complete_removal?=?1;

??????ACE_SELECT_REACTOR_EVENT_HANDLER?(this,?slot)?=?0;

#if?defined?(ACE_WIN32)

??????ACE_SELECT_REACTOR_HANDLE?(slot)?=?ACE_INVALID_HANDLE;

??????if?(this->max_handlep1_?==?(int)?slot?+?1)

????????
{
??????????//?We've?deleted?the?last?entry?(i.e.,?i?+?1?==?the?current
??????????//?size?of?the?array),?so?we?need?to?figure?out?the?last
??????????//?valid?place?in?the?array?that?we?should?consider?in
??????????//?subsequent?searches.

??????????int?i;

??????????for?(i?=?this->max_handlep1_?-?1;
???????????????i?>=?0?&&?ACE_SELECT_REACTOR_HANDLE?(i)?==?ACE_INVALID_HANDLE;
???????????????--i)
????????????continue;

??????????this->max_handlep1_?=?i?+?1;
????????}

#else

??????if?(this->max_handlep1_?==?handle?+?1)

????????
{
??????????//?We've?deleted?the?last?entry,?so?we?need?to?figure?out
??????????//?the?last?valid?place?in?the?array?that?is?worth?looking
??????????//?at.
??????????ACE_HANDLE?wait_rd_max?=
????????????this->select_reactor_.wait_set_.rd_mask_.max_set?();
??????????ACE_HANDLE?wait_wr_max?=
????????????this->select_reactor_.wait_set_.wr_mask_.max_set?();
??????????ACE_HANDLE?wait_ex_max?=
????????????this->select_reactor_.wait_set_.ex_mask_.max_set?();

??????????ACE_HANDLE?suspend_rd_max?=
????????????this->select_reactor_.suspend_set_.rd_mask_.max_set?();
??????????ACE_HANDLE?suspend_wr_max?=
????????????this->select_reactor_.suspend_set_.wr_mask_.max_set?();
??????????ACE_HANDLE?suspend_ex_max?=
????????????this->select_reactor_.suspend_set_.ex_mask_.max_set?();

??????????//?Compute?the?maximum?of?six?values.
??????????this->max_handlep1_?=?wait_rd_max;
??????????if?(this->max_handlep1_?<?wait_wr_max)
????????????this->max_handlep1_?=?wait_wr_max;
??????????if?(this->max_handlep1_?<?wait_ex_max)
????????????this->max_handlep1_?=?wait_ex_max;

??????????if?(this->max_handlep1_?<?suspend_rd_max)
????????????this->max_handlep1_?=?suspend_rd_max;
??????????if?(this->max_handlep1_?<?suspend_wr_max)
????????????this->max_handlep1_?=?suspend_wr_max;
??????????if?(this->max_handlep1_?<?suspend_ex_max)
????????????this->max_handlep1_?=?suspend_ex_max;

??????????++this->max_handlep1_;
????????}

#endif?/*?ACE_WIN32?*/

????}

??int?requires_reference_counting?=
????event_handler->reference_counting_policy?().value?()?==
????ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

??//?Close?down?the?<Event_Handler>?unless?we've?been?instructed?not
??//?to.
??if?(ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::DONT_CALL)?==?0)
????event_handler->handle_close?(handle,?mask);

??//?Call?remove_reference()?if?the?removal?is?complete?and?reference
??//?counting?is?needed.
??if?(complete_removal?&&
??????requires_reference_counting)

????
{
??????event_handler->remove_reference?();
????}

??return?0;
}

void
ACE_Select_Reactor_Impl::clear_dispatch_mask?(ACE_HANDLE?handle,
??????????????????????????????????????????????ACE_Reactor_Mask?mask)


{
??ACE_TRACE?("ACE_Select_Reactor_Impl::clear_dispatch_mask");

??//??Use?handle?and?mask?in?order?to?modify?the?sets
??//?(wait/suspend/ready/dispatch),?that?way,?the?dispatch_io_set?loop
??//?will?not?be?interrupt,?and?there?will?no?reason?to?rescan?the
??//?wait_set?and?re-calling?select?function,?which?is?*very*
??//?expensive.?It?seems?that?wait/suspend/ready?sets?are?getting
??//?updated?in?register/remove?bind/unbind?etc?functions.??The?only
??//?thing?need?to?be?updated?is?the?dispatch_set?(also?can??be?found
??//?in?that?file?code?as?dispatch_mask).??Because?of?that,?we?need
??//?that?dispatch_set?to?be?member?of?the?ACE_Select_Reactor_impl?in
??//?Select_Reactor_Base.h?file??That?way?we?will?have?access?to?that
??//?member?in?that?function.

??//?We?kind?of?invalidate?the?iterator?in?dispatch_io_set?because?its
??//?an?array?and?index?built?from?the?original?dispatch-set.?Take?a
??//?look?at?dispatch_io_set?for?more?details.

??//?We?only?need?to?clr_bit,?because?we?are?interested?in?clearing?the
??//?handles?that?was?removed,?so?no?dispatching?to?these?handles?will
??//?occur.
??if?(ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::READ_MASK)?||
??????ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::ACCEPT_MASK))

????
{
??????this->dispatch_set_.rd_mask_.clr_bit?(handle);
????}
??if?(ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::WRITE_MASK))

????
{
??????this->dispatch_set_.wr_mask_.clr_bit?(handle);
????}
??if?(ACE_BIT_ENABLED?(mask,?ACE_Event_Handler::EXCEPT_MASK))

????
{
??????this->dispatch_set_.ex_mask_.clr_bit?(handle);
????}

??//?That?will?make?the?dispatch_io_set?iterator?re-start?and?rescan
??//?the?dispatch?set.
??this->state_changed_?=?true;
}