• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            一動不如一靜

            C++博客 首頁 新隨筆 聯系 聚合 管理
              20 Posts :: 0 Stories :: 10 Comments :: 0 Trackbacks
            ACE_TP_Reactor::ACE_TP_Reactor?(size_t?max_number_of_handles,
            ????????????????????????????????
            int ?restart,
            ????????????????????????????????ACE_Sig_Handler?
            * sh,
            ????????????????????????????????ACE_Timer_Queue?
            * tq,
            ????????????????????????????????
            int ?mask_signals,
            ????????????????????????????????
            int ?s_queue)
            ??:?ACE_Select_Reactor?(max_number_of_handles,?restart,?sh,?tq,?
            0 ,? 0 ,?mask_signals,?s_queue)
            {
            ??ACE_TRACE?(
            " ACE_TP_Reactor::ACE_TP_Reactor " );
            ??
            this -> supress_notify_renew?( 1 );
            }

            template?
            < class ?ACE_SELECT_REACTOR_TOKEN >
            ACE_Select_Reactor_T
            < ACE_SELECT_REACTOR_TOKEN > ::ACE_Select_Reactor_T
            ??(size_t?size,
            ???
            int ?rs,
            ???ACE_Sig_Handler?
            * sh,
            ???ACE_Timer_Queue?
            * tq,
            ???
            int ?disable_notify_pipe,
            ???ACE_Reactor_Notify?
            * notify,
            ???
            int ?mask_signals,
            ???
            int ?s_queue)
            ????:?ACE_Select_Reactor_Impl?(mask_signals)
            ????,?token_?(
            * this ,?s_queue)
            ????,?lock_adapter_?(token_)
            ????,?deactivated_?(
            0 )
            {
            ??ACE_TRACE?(
            " ACE_Select_Reactor_T::ACE_Select_Reactor_T " );

            ??
            if ?( this -> open?(size,
            ??????????????????rs,
            ??????????????????sh,
            ??????????????????tq,
            ??????????????????disable_notify_pipe,
            ??????????????????notify)?
            == ? - 1 )
            ????ACE_ERROR?((LM_ERROR,
            ????????????????ACE_LIB_TEXT?(
            " %p\n " ),
            ????????????????ACE_LIB_TEXT?(
            " ACE_Select_Reactor_T::open? " )
            ????????????????ACE_LIB_TEXT?(
            " failed?inside?ACE_Select_Reactor_T::CTOR " )));
            }

            template?
            < class ?ACE_SELECT_REACTOR_TOKEN > ? int
            ACE_Select_Reactor_T
            < ACE_SELECT_REACTOR_TOKEN > ::open
            ??(size_t?size,
            ???
            int ?restart,
            ???ACE_Sig_Handler?
            * sh,
            ???ACE_Timer_Queue?
            * tq,
            ???
            int ?disable_notify_pipe,
            ???ACE_Reactor_Notify?
            * notify)
            {
            ??ACE_TRACE?(
            " ACE_Select_Reactor_T::open " );
            ??ACE_MT?(ACE_GUARD_RETURN?(ACE_SELECT_REACTOR_TOKEN,?ace_mon,?
            this -> token_,? - 1 ));

            ??
            // ?Can't?initialize?ourselves?more?than?once.
            ?? if ?( this -> initialized_)
            ????
            return ? - 1 ;

            ??
            this -> owner_? = ?ACE_Thread::self?();
            ??
            this -> restart_? = ?restart;
            ??
            this -> signal_handler_? = ?sh;
            ??
            this -> timer_queue_? = ?tq;
            ??
            this -> notify_handler_? = ?notify;

            ??
            int ?result? = ? 0 ;

            ??
            // ?Allows?the?signal?handler?to?be?overridden.
            ?? if ?( this -> signal_handler_? == ? 0 )
            ????
            {
            ??????ACE_NEW_RETURN?(
            this -> signal_handler_,
            ??????????????????????ACE_Sig_Handler,
            ??????????????????????
            - 1 );

            ??????
            if ?( this -> signal_handler_? == ? 0 )
            ????????result?
            = ? - 1 ;
            ??????
            else
            ????????
            this -> delete_signal_handler_? = ? 1 ;
            ????}


            ??
            // ?Allows?the?timer?queue?to?be?overridden.
            ?? if ?(result? != ? - 1 ? && ? this -> timer_queue_? == ? 0 )
            ????
            {
            ??????ACE_NEW_RETURN?(
            this -> timer_queue_,
            ??????????????????????ACE_Timer_Heap,
            ??????????????????????
            - 1 );

            ??????
            if ?( this -> timer_queue_? == ? 0 )
            ????????result?
            = ? - 1 ;
            ??????
            else
            ????????
            this -> delete_timer_queue_? = ? 1 ;
            ????}


            ??
            // ?Allows?the?Notify_Handler?to?be?overridden.
            ?? if ?(result? != ? - 1 ? && ? this -> notify_handler_? == ? 0 )
            ????
            {
            ??????ACE_NEW_RETURN?(
            this -> notify_handler_,
            ??????????????????????ACE_Select_Reactor_Notify,
            ??????????????????????
            - 1 );

            ??????
            if ?( this -> notify_handler_? == ? 0 )
            ????????result?
            = ? - 1 ;
            ??????
            else
            ????????
            this -> delete_notify_handler_? = ? 1 ;
            ????}


            ??
            if ?(result? != ? - 1 ? && ? this -> handler_rep_.open?(size)? == ? - 1 )?????? /* **********handler_rep_.open************** */
            ????result?
            = ? - 1 ;
            ??
            else ? if ?( this -> notify_handler_ -> open?( this ,???????????????????? /* *****notify_handler_->open**** */
            ????????????????????????????????????????
            0 ,
            ????????????????????????????????????????disable_notify_pipe)?
            == ? - 1 )
            ????result?
            = ? - 1 ;

            ??
            if ?(result? != ? - 1 )
            ????
            // ?We're?all?set?to?go.
            ???? this -> initialized_? = ? 1 ;
            ??
            else
            ????
            // ?This?will?close?down?all?the?allocated?resources?properly.
            ???? this -> close?();

            ??
            return ?result;
            }

            ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify?(
            void )
            ??:?max_notify_iterations_?(
            - 1 )
            {
            }

            int
            ACE_Select_Reactor_Notify::open?(ACE_Reactor_Impl?
            * r,
            ?????????????????????????????????ACE_Timer_Queue?
            * ,
            ?????????????????????????????????
            int ?disable_notify_pipe)
            {
            ??ACE_TRACE?(
            " ACE_Select_Reactor_Notify::open " );

            ??
            if ?(disable_notify_pipe? == ? 0 )
            ????
            {
            ??????
            this -> select_reactor_? =
            ????????dynamic_cast
            < ACE_Select_Reactor_Impl? *> ?(r);

            ??????
            if ?(select_reactor_? == ? 0 )
            ????????
            {
            ??????????errno?
            = ?EINVAL;
            ??????????
            return ? - 1 ;
            ????????}


            ??????
            if ?( this -> notification_pipe_.open?()? == ? - 1 )
            ????????
            return ? - 1 ;
            #if ?defined?(F_SETFD)
            ??????ACE_OS::fcntl?(
            this -> notification_pipe_.read_handle?(),?F_SETFD,? 1 );
            ??????ACE_OS::fcntl?(
            this -> notification_pipe_.write_handle?(),?F_SETFD,? 1 );
            #endif ?/*?F_SETFD?*/

            #if ?defined?(ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
            ??????ACE_Notification_Buffer?
            * temp;

            ??????ACE_NEW_RETURN?(temp,
            ??????????????????????ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
            ??????????????????????
            - 1 );

            ??????
            if ?( this -> alloc_queue_.enqueue_head?(temp)? == ? - 1 )
            ????????
            {
            ??????????delete?[]?temp;
            ??????????
            return ? - 1 ;
            ????????}


            ??????
            for ?(size_t?i? = ? 0 ;?i? < ?ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;? ++ i)
            ????????
            if ?(free_queue_.enqueue_head?(temp? + ?i)? == ? - 1 )
            ??????????
            return ? - 1 ;

            #endif ?/*?ACE_HAS_REACTOR_NOTIFICATION_QUEUE?*/

            ??????
            // ?There?seems?to?be?a?Win32?bug?with?this??Set?this?into
            ??????
            // ?non-blocking?mode.
            ?????? if ?(ACE::set_flags?( this -> notification_pipe_.read_handle?(),????????????????????????????? /* **************設置為非阻賽模式********* */
            ??????????????????????????ACE_NONBLOCK)?
            == ? - 1 )
            ????????
            return ? - 1 ;
            ??????
            else
            ????????
            return ? this -> select_reactor_ -> register_handler???????????????????????????????????????? /* ************注冊處理器*************** */
            ??????????(
            this -> notification_pipe_.read_handle?(),
            ???????????
            this ,
            ???????????ACE_Event_Handler::READ_MASK);
            ????}

            ??
            else
            ????
            {
            ??????
            this -> select_reactor_? = ? 0 ;
            ??????
            return ? 0 ;
            ????}

            }

            int
            ACE_Pipe::open?(
            int ?buffer_size)
            {
            ??ACE_TRACE?(
            " ACE_Pipe::open " );

            #if ?defined?(ACE_LACKS_SOCKETPAIR)?||?defined?(__Lynx__)
            ??ACE_INET_Addr?my_addr;
            ??ACE_SOCK_Acceptor?acceptor;
            ??ACE_SOCK_Connector?connector;
            ??ACE_SOCK_Stream?reader;
            ??ACE_SOCK_Stream?writer;
            ??
            int ?result? = ? 0 ;
            #?
            if ?defined?(ACE_WIN32)
            ??ACE_INET_Addr?local_any??(static_cast
            < u_short > ?( 0 ),?ACE_LOCALHOST);
            #?
            else
            ??ACE_Addr?local_any?
            = ?ACE_Addr::sap_any;
            #?endif?
            /* ?ACE_WIN32? */

            ??
            // ?Bind?listener?to?any?port?and?then?find?out?what?the?port?was.
            ?? if ?(acceptor.open?(local_any)? == ? - 1
            ??????
            || ?acceptor.get_local_addr?(my_addr)? == ? - 1 )
            ????result?
            = ? - 1 ;
            ??
            else
            ????
            {
            ??????ACE_INET_Addr?sv_addr?(my_addr.get_port_number?(),
            ?????????????????????????????ACE_LOCALHOST);

            ??????
            // ?Establish?a?connection?within?the?same?process.
            ?????? if ?(connector.connect?(writer,?sv_addr)? == ? - 1 )
            ????????result?
            = ? - 1 ;
            ??????
            else ? if ?(acceptor.accept?(reader)? == ? - 1 )
            ????????
            {
            ??????????writer.close?();
            ??????????result?
            = ? - 1 ;
            ????????}

            ????}


            ??
            // ?Close?down?the?acceptor?endpoint?since?we?don't?need?it?anymore.
            ??acceptor.close?();
            ??
            if ?(result? == ? - 1 )
            ????
            return ? - 1 ;

            ??
            this -> handles_[ 0 ]? = ?reader.get_handle?();
            ??
            this -> handles_[ 1 ]? = ?writer.get_handle?();

            #?
            if ? ! defined?(ACE_LACKS_TCP_NODELAY)
            ??
            int ?one? = ? 1 ;

            ??
            // ?Make?sure?that?the?TCP?stack?doesn't?try?to?buffer?small?writes.
            ??
            // ?Since?this?communication?is?purely?local?to?the?host?it?doesn't
            ??
            // ?affect?network?performance.

            ??
            if ?(writer.set_option?(ACE_IPPROTO_TCP,
            ?????????????????????????TCP_NODELAY,
            ?????????????????????????
            & one,
            ?????????????????????????
            sizeof ?one)? == ? - 1 )
            ????
            {
            ??????
            this -> close?();
            ??????
            return ? - 1 ;
            ????}

            #?endif?
            /* ?!?ACE_LACKS_TCP_NODELAY? */

            #?
            if ?defined?(ACE_LACKS_SOCKET_BUFSIZ)
            ????ACE_UNUSED_ARG?(buffer_size);
            #?
            else ?? /* ?!?ACE_LACKS_SOCKET_BUFSIZ? */
            ??
            if ?(reader.set_option?(SOL_SOCKET,
            ?????????????????????????SO_RCVBUF,
            ?????????????????????????reinterpret_cast?
            < void ? *> ?( & buffer_size),
            ?????????????????????????
            sizeof ?(buffer_size))? == ? - 1
            ??????
            && ?errno? != ?ENOTSUP)
            ????
            {
            ??????
            this -> close?();
            ??????
            return ? - 1 ;
            ????}

            ??
            else ? if ?(writer.set_option?(SOL_SOCKET,
            ??????????????????????????????SO_SNDBUF,
            ??????????????????????????????reinterpret_cast?
            < void ? *> ?( & buffer_size),
            ??????????????????????????????
            sizeof ?(buffer_size))? == ? - 1
            ???????????
            && ?errno? != ?ENOTSUP)
            ????
            {
            ??????
            this -> close?();
            ??????
            return ? - 1 ;
            ????}

            #?endif?
            /* ?!?ACE_LACKS_SOCKET_BUFSIZ? */

            #elif ?defined?(ACE_HAS_STREAM_PIPES)?||?defined?(__QNX__)
            ??ACE_UNUSED_ARG?(buffer_size);
            ??
            if ?(ACE_OS::pipe?( this -> handles_)? == ? - 1 )
            ????ACE_ERROR_RETURN?((LM_ERROR,
            ???????????????????????ACE_LIB_TEXT?(
            " %p\n " ),
            ???????????????????????ACE_LIB_TEXT?(
            " pipe " )),
            ??????????????????????
            - 1 );

            #if ?!defined(__QNX__)
            ??
            int ?arg? = ?RMSGN;

            ??
            // ?Enable?"msg?no?discard"?mode,?which?ensures?that?record
            ??
            // ?boundaries?are?maintained?when?messages?are?sent?and?received.
            ?? if ?(ACE_OS::ioctl?( this -> handles_[ 0 ],
            ?????????????????????I_SRDOPT,
            ?????????????????????(
            void ? * )?arg)? == ? - 1
            ??????
            || ?ACE_OS::ioctl?( this -> handles_[ 1 ],
            ????????????????????????I_SRDOPT,
            ????????????????????????(
            void ? * )?arg)? == ? - 1 )
            ????
            {
            ??????
            this -> close?();
            ??????ACE_ERROR_RETURN?((LM_ERROR,
            ?????????????????????????ACE_LIB_TEXT?(
            " %p\n " ),
            ?????????????????????????ACE_LIB_TEXT?(
            " ioctl " )),? - 1 );
            ????}

            #endif ?/*?__QNX__?*/

            #else ??/*?!?ACE_LACKS_SOCKETPAIR?&&?!?ACE_HAS_STREAM_PIPES?*/
            ??
            if ?(ACE_OS::socketpair?(AF_UNIX,
            ??????????????????????????SOCK_STREAM,
            ??????????????????????????
            0 ,
            ??????????????????????????
            this -> handles_)? == ? - 1 )
            ????ACE_ERROR_RETURN?((LM_ERROR,
            ???????????????????????ACE_LIB_TEXT?(
            " %p\n " ),
            ???????????????????????ACE_LIB_TEXT?(
            " socketpair " )),
            ??????????????????????
            - 1 );
            #?
            if ?defined?(ACE_LACKS_SOCKET_BUFSIZ)
            ??ACE_UNUSED_ARG?(buffer_size);
            #?
            else ?? /* ?!?ACE_LACKS_SOCKET_BUFSIZ? */
            ??
            if ?(ACE_OS::setsockopt?( this -> handles_[ 0 ],
            ??????????????????????????SOL_SOCKET,
            ??????????????????????????SO_RCVBUF,
            ??????????????????????????reinterpret_cast?
            < const ? char ? *> ?( & buffer_size),
            ??????????????????????????
            sizeof ?(buffer_size))? == ? - 1
            ??????
            && ?errno? != ?ENOTSUP)
            ????
            {
            ??????
            this -> close?();
            ??????
            return ? - 1 ;
            ????}

            ??
            if ?(ACE_OS::setsockopt?( this -> handles_[ 1 ],
            ??????????????????????????SOL_SOCKET,
            ??????????????????????????SO_SNDBUF,
            ??????????????????????????reinterpret_cast?
            < const ? char ? *> ?( & buffer_size),
            ??????????????????????????
            sizeof ?(buffer_size))? == ? - 1
            ??????
            && ?errno? != ?ENOTSUP)
            ????
            {
            ??????
            this -> close?();
            ??????
            return ? - 1 ;
            ????}

            #?endif?
            /* ?!?ACE_LACKS_SOCKET_BUFSIZ? */
            #endif ??/*?!?ACE_LACKS_SOCKETPAIR?&&?!?ACE_HAS_STREAM_PIPES?*/
            ??
            // ?Point?both?the?read?and?write?HANDLES?to?the?appropriate?socket
            ??
            // ?HANDLEs.

            ??
            return ? 0 ;
            }

            ACE_INLINE?
            int
            ACE_SOCK::set_option?(
            int ?level,
            ??????????????????????
            int ?option,
            ??????????????????????
            void ? * optval,
            ??????????????????????
            int ?optlen)? const
            {
            ??ACE_TRACE?(
            " ACE_SOCK::set_option " );
            ??
            return ?ACE_OS::setsockopt?( this -> get_handle?(),?level,
            ?????????????????????????????option,?(
            char ? * )?optval,?optlen);
            }

            ACE_INLINE?
            int
            ACE_OS::setsockopt?(ACE_HANDLE?handle,
            ????????????????????
            int ?level,
            ????????????????????
            int ?optname,
            ????????????????????
            const ? char ? * optval,
            ????????????????????
            int ?optlen)
            {
            ??ACE_OS_TRACE?(
            " ACE_OS::setsockopt " );

            ??
            #if ?defined?(ACE_HAS_WINSOCK2)?&&?(ACE_HAS_WINSOCK2?!=?0)?&&?defined(SO_REUSEPORT)
            ??
            // ?To?work?around?an?inconsistency?with?Microsofts?implementation?of
            ??
            // ?sockets,?we?will?check?for?SO_REUSEADDR,?and?ignore?it.?Winsock
            ??
            // ?always?behaves?as?if?SO_REUSEADDR=1.?Some?implementations?have
            ??
            // ?the?same?behaviour?as?Winsock,?but?use?a?new?name?for
            ??
            // ?it.?SO_REUSEPORT.??If?you?want?the?normal?behaviour?for
            ??
            // ?SO_REUSEADDR=0,?then?NT?4?sp4?and?later?supports
            ??
            // ?SO_EXCLUSIVEADDRUSE.?This?also?requires?using?an?updated?Platform
            ??
            // ?SDK?so?it?was?decided?to?ignore?the?option?for?now.?(Especially
            ??
            // ?since?Windows?always?sets?SO_REUSEADDR=1,?which?we?can?mimic?by?doing
            ??
            // ?nothing.)
            ?? if ?(level? == ?SOL_SOCKET)? {
            ????
            if ?(optname? == ?SO_REUSEADDR)? {
            ??????
            return ? 0 ;? // ?Not?supported?by?Winsock
            ????}

            ????
            if ?(optname? == ?SO_REUSEPORT)? {
            ??????optname?
            = ?SO_REUSEADDR;
            ????}

            ??}

            ??
            #endif ?/*ACE_HAS_WINSOCK2*/

            ??
            int ?result;
            ??ACE_SOCKCALL?(::setsockopt?((ACE_SOCKET)?handle,
            ??????????????????????????????level,
            ??????????????????????????????optname,
            ??????????????????????????????(ACE_SOCKOPT_TYPE1)?optval,
            ??????????????????????????????optlen),
            ????????????????
            int ,
            ????????????????
            - 1 ,
            ????????????????result);
            #if ?defined?(WSAEOPNOTSUPP)
            ??
            if ?(result? == ? - 1 ? && ?errno? == ?WSAEOPNOTSUPP)
            #else ?
            ??
            if ?(result? == ? - 1 )
            #endif ?/*?WSAEOPNOTSUPP?*/
            ????errno?
            = ?ENOTSUP;
            ??
            return ?result;
            }




            注冊的部分:

            int
            ACE_TP_Reactor::register_handler?(ACE_Event_Handler?
            * eh,
            ??????????????????????????????????ACE_Reactor_Mask?mask)
            {
            ??
            return ?ACE_Select_Reactor::register_handler?(eh,
            ???????????????????????????????????????????????mask);
            }



            int
            ACE_TP_Reactor::handle_events?(ACE_Time_Value?
            * max_wait_time)
            {
            ??ACE_TRACE?(
            " ACE_TP_Reactor::handle_events " );

            ??
            // ?Stash?the?current?time?--?the?destructor?of?this?object?will
            ??
            // ?automatically?compute?how?much?time?elapsed?since?this?method?was
            ??
            // ?called.
            ??ACE_Countdown_Time?countdown?(max_wait_time);

            ??
            //
            ??
            // ?The?order?of?these?events?is?very?subtle,?modify?with?care.
            ??
            //

            ??
            // ?Instantiate?the?token?guard?which?will?try?grabbing?the?token?for
            ??
            // ?this?thread.
            ??ACE_TP_Token_Guard?guard?( this -> token_);

            ??
            int ?result? = ?guard.acquire_read_token?(max_wait_time);????????????????????????????????? // 這里有獲取token鎖

            ??
            // ?If?the?guard?is?NOT?the?owner?just?return?the?retval
            ?? if ?( ! guard.is_owner?())
            ????
            return ?result;

            ??
            // ?After?getting?the?lock?just?just?for?deactivation..
            ?? if ?( this -> deactivated_)
            ????
            return ? - 1 ;

            ??
            // ?Update?the?countdown?to?reflect?time?waiting?for?the?token.
            ??countdown.update?();

            ??
            return ? this -> dispatch_i?(max_wait_time,
            ???????????????????????????guard);
            }


            int
            ACE_TP_Reactor::dispatch_i?(ACE_Time_Value?
            * max_wait_time,
            ????????????????????????????ACE_TP_Token_Guard?
            & guard)
            {
            ??
            int ?event_count? =
            ????
            this -> get_event_for_dispatching?(max_wait_time);

            ??
            int ?result? = ? 0 ;

            ??
            // ?Note:?We?are?passing?the?<event_count>?around,?to?have?record?of
            ??
            // ?how?many?events?still?need?processing.?May?be?this?could?be
            ??
            // ?useful?in?future.

            ??
            // ?Dispatch?signals
            ?? if ?(event_count? == ? - 1 )
            ????
            {
            ??????
            // ?Looks?like?we?dont?do?any?upcalls?in?dispatch?signals.?If?at
            ??????
            // ?a?later?point?of?time,?we?decide?to?handle?signals?we?have?to
            ??????
            // ?release?the?lock?before?we?make?any?upcalls..?What?is?here
            ??????
            // ?now?is?not?the?right?thing
            ??????
            //
            ??????
            // ?@@?We?need?to?do?better..
            ?????? return ? this -> handle_signals?(event_count,
            ???????????????????????????????????guard);
            ????}


            ??
            // ?If?there?are?no?signals?and?if?we?had?received?a?proper
            ??
            // ?event_count?then?first?look?at?dispatching?timeouts.?We?need?to
            ??
            // ?handle?timers?early?since?they?may?have?higher?latency
            ??
            // ?constraints?than?I/O?handlers.??Ideally,?the?order?of?dispatching
            ??
            // ?should?be?a?strategy

            ??
            // ?NOTE:?The?event?count?does?not?have?the?number?of?timers?that
            ??
            // ?needs?dispatching.?But?we?are?still?passing?this?along.?We?dont
            ??
            // ?need?to?do?that.?In?the?future?we?*may*?have?the?timers?also
            ??
            // ?returned?through?the?<event_count>.?Just?passing?that?along?for
            ??
            // ?that?day.
            ??result? = ? this -> handle_timer_events?(event_count,
            ??????????????????????????????????????guard);

            ??
            if ?(result? > ? 0 )
            ????
            return ?result;

            ??
            // ?Else?just?go?ahead?fall?through?for?further?handling.

            ??
            if ?(event_count? > ? 0 )
            ????
            {
            ??????
            // ?Next?dispatch?the?notification?handlers?(if?there?are?any?to
            ??????
            // ?dispatch).??These?are?required?to?handle?multiple-threads
            ??????
            // ?that?are?trying?to?update?the?<Reactor>.
            ??????result? = ? this -> handle_notify_events?(event_count,
            ???????????????????????????????????????????guard);

            ??????
            if ?(result? > ? 0 )
            ????????
            return ?result;

            ??????
            // ?Else?just?fall?through?for?further?handling
            ????}


            ??
            if ?(event_count? > ? 0 )
            ????
            {
            ??????
            // ?Handle?socket?events
            ?????? return ? this -> handle_socket_events?(event_count,
            ?????????????????????????????????????????guard);
            ????}


            ??
            return ? 0 ;
            }

            int
            ACE_TP_Reactor::get_event_for_dispatching?(ACE_Time_Value?
            * max_wait_time)
            {
            ??
            // ?If?the?reactor?handler?state?has?changed,?clear?any?remembered
            ??
            // ?ready?bits?and?re-scan?from?the?master?wait_set.
            ?? if ?( this -> state_changed_)
            ????
            {
            ??????
            this -> ready_set_.rd_mask_.reset?();
            ??????
            this -> ready_set_.wr_mask_.reset?();
            ??????
            this -> ready_set_.ex_mask_.reset?();

            ??????
            this -> state_changed_? = ? false ;
            ????}

            ??
            else
            ????
            {
            ??????
            // ?This?is?a?hack?somewhere,?under?certain?conditions?(which
            ??????
            // ?I?don't?understand)?the?mask?will?have?all?of?its?bits?clear,
            ??????
            // ?yet?have?a?size_?>?0.?This?is?an?attempt?to?remedy?the?affect,
            ??????
            // ?without?knowing?why?it?happens.

            ??????
            this -> ready_set_.rd_mask_.sync?( this -> ready_set_.rd_mask_.max_set?());
            ??????
            this -> ready_set_.wr_mask_.sync?( this -> ready_set_.wr_mask_.max_set?());
            ??????
            this -> ready_set_.ex_mask_.sync?( this -> ready_set_.ex_mask_.max_set?());
            ????}


            ??
            return ? this -> wait_for_multiple_events?( this -> ready_set_,
            ?????????????????????????????????????????max_wait_time);
            }


            template?
            < class ?ACE_SELECT_REACTOR_TOKEN > ? int
            ACE_Select_Reactor_T
            < ACE_SELECT_REACTOR_TOKEN > ::wait_for_multiple_events
            ??(ACE_Select_Reactor_Handle_Set?
            & dispatch_set,
            ???ACE_Time_Value?
            * max_wait_time)
            {
            ??ACE_TRACE?(
            " ACE_Select_Reactor_T::wait_for_multiple_events " );
            ??u_long?width?
            = ? 0 ;
            ??ACE_Time_Value?timer_buf?(
            0 );
            ??ACE_Time_Value?
            * this_timeout;

            ??
            int ?number_of_active_handles? = ? this -> any_ready?(dispatch_set);

            ??
            // ?If?there?are?any?bits?enabled?in?the?<ready_set_>?then?we'll
            ??
            // ?handle?those?first,?otherwise?we'll?block?in?<select>.

            ??
            if ?(number_of_active_handles? == ? 0 )
            ????
            {
            ??????
            do
            ????????
            {
            ??????????this_timeout?
            =
            ????????????
            this -> timer_queue_ -> calculate_timeout?(max_wait_time,
            ???????????????????????????????????????????????????
            & timer_buf);
            ??????????width?
            = ?(u_long)? this -> handler_rep_.max_handlep1?();

            ??????????dispatch_set.rd_mask_?
            = ? this -> wait_set_.rd_mask_;???????????????????????????????????????? /* ***************wait_set_?---->?ready_set_********* */
            ??????????dispatch_set.wr_mask_?
            = ? this -> wait_set_.wr_mask_;
            ??????????dispatch_set.ex_mask_?
            = ? this -> wait_set_.ex_mask_;
            ??????????number_of_active_handles?
            = ?ACE_OS::select?( int ?(width),
            ?????????????????????????????????????????????????????dispatch_set.rd_mask_,
            ?????????????????????????????????????????????????????dispatch_set.wr_mask_,
            ?????????????????????????????????????????????????????dispatch_set.ex_mask_,
            ?????????????????????????????????????????????????????this_timeout);
            ????????}

            ??????
            while ?(number_of_active_handles? == ? - 1 ? && ? this -> handle_error?()? > ? 0 );

            ??????
            if ?(number_of_active_handles? > ? 0 )
            ????????
            {
            #if ?!defined?(ACE_WIN32)
            ??????????
            // ?Resynchronize?the?fd_sets?so?their?"max"?is?set?properly.
            ??????????dispatch_set.rd_mask_.sync?( this -> handler_rep_.max_handlep1?());
            ??????????dispatch_set.wr_mask_.sync?(
            this -> handler_rep_.max_handlep1?());
            ??????????dispatch_set.ex_mask_.sync?(
            this -> handler_rep_.max_handlep1?());
            #endif ?/*?ACE_WIN32?*/
            ????????}

            ??????
            else ? if ?(number_of_active_handles? == ? - 1 )
            ????????
            {
            ??????????
            // ?Normally,?select()?will?reset?the?bits?in?dispatch_set
            ??????????
            // ?so?that?only?those?filed?descriptors?that?are?ready?will
            ??????????
            // ?have?bits?set.??However,?when?an?error?occurs,?the?bit
            ??????????
            // ?set?remains?as?it?was?when?the?select?call?was?first?made.
            ??????????
            // ?Thus,?we?now?have?a?dispatch_set?that?has?every?file
            ??????????
            // ?descriptor?that?was?originally?waited?for,?which?is?not
            ??????????
            // ?correct.??We?must?clear?all?the?bit?sets?because?we
            ??????????
            // ?have?no?idea?if?any?of?the?file?descriptors?is?ready.
            ??????????
            //
            ??????????
            // ?NOTE:?We?dont?have?a?test?case?to?reproduce?this
            ??????????
            // ?problem.?But?pleae?dont?ignore?this?and?remove?it?off.
            ??????????dispatch_set.rd_mask_.reset?();
            ??????????dispatch_set.wr_mask_.reset?();
            ??????????dispatch_set.ex_mask_.reset?();
            ????????}

            ????}


            ??
            // ?Return?the?number?of?events?to?dispatch.
            ?? return ?number_of_active_handles;
            }

            ???
            ???
            int
            ACE_TP_Reactor::handle_socket_events?(
            int ? & event_count,
            ??????????????????????????????????????ACE_TP_Token_Guard?
            & guard)
            {

            ??
            // ?We?got?the?lock,?lets?handle?some?I/O?events.
            ??ACE_EH_Dispatch_Info?dispatch_info;

            ??
            this -> get_socket_event_info?(dispatch_info);

            ??
            // ?If?there?is?any?event?handler?that?is?ready?to?be?dispatched,?the
            ??
            // ?dispatch?information?is?recorded?in?dispatch_info.
            ?? if ?( ! dispatch_info.dispatch?())
            ????
            {
            ??????
            return ? 0 ;
            ????}


            ??
            // ?Suspend?the?handler?so?that?other?threads?don't?start?dispatching
            ??
            // ?it.
            ??
            //
            ??
            // ?NOTE:?This?check?was?performed?in?older?versions?of?the
            ??
            // ?TP_Reactor.?Looks?like?it?is?a?waste..
            ?? if ?(dispatch_info.event_handler_? != ? this -> notify_handler_)
            ????
            this -> suspend_i?(dispatch_info.handle_);

            ??
            int ?resume_flag? =
            ????dispatch_info.event_handler_
            -> resume_handler?();

            ??
            int ?reference_counting_required? =
            ????dispatch_info.event_handler_
            -> reference_counting_policy?().value?()? ==
            ????ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

            ??
            // ?Call?add_reference()?if?needed.
            ?? if ?(reference_counting_required)
            ????
            {
            ??????dispatch_info.event_handler_
            -> add_reference?();
            ????}


            ??
            // ?Release?the?lock.??Others?threads?can?start?waiting.???????????????????????????????????? // 這里放鎖
            ??guard.release_token?();

            ??
            int ?result? = ? 0 ;

            ??
            // ?If?there?was?an?event?handler?ready,?dispatch?it.
            ??
            // ?Decrement?the?event?left
            ?? -- event_count;

            ??
            // ?Dispatched?an?event
            ?? if ?( this -> dispatch_socket_event?(dispatch_info)? == ? 0 )
            ????
            ++ result;

            ??
            // ?Resume?handler?if?required.
            ?? if ?(dispatch_info.event_handler_? != ? this -> notify_handler_? &&
            ??????resume_flag?
            == ?ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
            ????
            this -> resume_handler?(dispatch_info.handle_);

            ??
            // ?Call?remove_reference()?if?needed.
            ?? if ?(reference_counting_required)
            ????
            {
            ??????dispatch_info.event_handler_
            -> remove_reference?();
            ????}


            ??
            return ?result;
            }


            int
            ACE_TP_Reactor::get_socket_event_info?(ACE_EH_Dispatch_Info?
            & event )
            {
            ??
            // ?Nothing?to?dispatch?yet
            ?? event .reset?();

            ??
            // ?Check?for?dispatch?in?write,?except,?read.?Only?catch?one,?but?if
            ??
            // ?one?is?caught,?be?sure?to?clear?the?handle?from?each?mask?in?case
            ??
            // ?there?is?more?than?one?mask?set?for?it.?This?would?cause?problems
            ??
            // ?if?the?handler?is?suspended?for?dispatching,?but?its?set?bit?in
            ??
            // ?another?part?of?ready_set_?kept?it?from?being?dispatched.
            ?? int ?found_io? = ? 0 ;
            ??ACE_HANDLE?handle;

            ??
            // ?@@todo:?We?can?do?quite?a?bit?of?code?reduction?here.?Let?me?get
            ??
            // ?it?to?work?before?I?do?this.
            ?? {
            ????ACE_Handle_Set_Iterator?handle_iter?(
            this -> ready_set_.wr_mask_);???????????????????? /* **********這里是ready_set_********** */ 什么時間被給值的呢?

            ????
            while ?( ! found_io? && ?(handle? = ?handle_iter?())? != ?ACE_INVALID_HANDLE)
            ??????
            {
            ????????
            if ?( this -> is_suspended_i?(handle))
            ??????????
            continue ;

            ????????
            // ?Remember?this?info
            ???????? event . set ?(handle,
            ???????????????????
            this -> handler_rep_.find?(handle),
            ???????????????????ACE_Event_Handler::WRITE_MASK,
            ???????????????????
            & ACE_Event_Handler::handle_output);

            ????????
            this -> clear_handle_read_set?(handle);
            ????????found_io?
            = ? 1 ;
            ??????}

            ??}


            ??
            if ?( ! found_io)
            ????
            {
            ??????ACE_Handle_Set_Iterator?handle_iter?(
            this -> ready_set_.ex_mask_);

            ??????
            while ?( ! found_io? && ?(handle? = ?handle_iter?())? != ?ACE_INVALID_HANDLE)
            ????????
            {
            ??????????
            if ?( this -> is_suspended_i?(handle))
            ????????????
            continue ;

            ??????????
            // ?Remember?this?info
            ?????????? event . set ?(handle,
            ?????????????????????
            this -> handler_rep_.find?(handle),
            ?????????????????????ACE_Event_Handler::EXCEPT_MASK,
            ?????????????????????
            & ACE_Event_Handler::handle_exception);

            ??????????
            this -> clear_handle_read_set?(handle);

            ??????????found_io?
            = ? 1 ;
            ????????}

            ????}


            ??
            if ?( ! found_io)
            ????
            {
            ??????ACE_Handle_Set_Iterator?handle_iter?(
            this -> ready_set_.rd_mask_);

            ??????
            while ?( ! found_io? && ?(handle? = ?handle_iter?())? != ?ACE_INVALID_HANDLE)
            ????????
            {
            ??????????
            if ?( this -> is_suspended_i?(handle))
            ????????????
            continue ;

            ??????????
            // ?Remember?this?info
            ?????????? event . set ?(handle,
            ?????????????????????
            this -> handler_rep_.find?(handle),
            ?????????????????????ACE_Event_Handler::READ_MASK,
            ?????????????????????
            & ACE_Event_Handler::handle_input);

            ??????????
            this -> clear_handle_read_set?(handle);
            ??????????found_io?
            = ? 1 ;
            ????????}

            ????}


            ??
            return ?found_io;
            }

            int
            ACE_TP_Reactor::dispatch_socket_event?(ACE_EH_Dispatch_Info?
            & dispatch_info)
            {
            ??ACE_TRACE?(
            " ACE_TP_Reactor::dispatch_socket_event " );

            ??ACE_HANDLE?handle?
            = ?dispatch_info.handle_;
            ??ACE_Event_Handler?
            * event_handler? = ?dispatch_info.event_handler_;
            ??ACE_Reactor_Mask?mask?
            = ?dispatch_info.mask_;
            ??ACE_EH_PTMF?callback?
            = ?dispatch_info.callback_;

            ??
            // ?Check?for?removed?handlers.
            ?? if ?(event_handler? == ? 0 )
            ????
            return ? - 1 ;

            ??
            // ?Upcall.?If?the?handler?returns?positive?value?(requesting?a
            ??
            // ?reactor?callback)?don't?set?the?ready-bit?because?it?will?be
            ??
            // ?ignored?if?the?reactor?state?has?changed.?Just?call?back
            ??
            // ?as?many?times?as?the?handler?requests?it.?Other?threads?are?off
            ??
            // ?handling?other?things.
            ?? int ?status? = ? 1 ;
            ??
            while ?(status? > ? 0 )
            ????status?
            = ?(event_handler ->* callback)?(handle);

            ??
            // ?If?negative,?remove?from?Reactor
            ?? if ?(status? < ? 0 )
            ????
            {
            ??????
            int ?retval? =
            ????????
            this -> remove_handler?(handle,?mask);

            ??????
            return ?retval;
            ????}


            ??
            // ?assert?(status?>=?0);
            ?? return ? 0 ;
            }
            posted on 2007-02-25 20:41 一動不如一靜 閱讀(2258) 評論(0)  編輯 收藏 引用 所屬分類: ACE
            国产亚洲综合久久系列| 国产99久久久久久免费看| 久久国产成人亚洲精品影院| 久久久无码人妻精品无码| 精品伊人久久久| 久久精品国产亚洲αv忘忧草 | 精品久久久久久国产| 亚洲国产日韩欧美综合久久| 久久综合日本熟妇| 综合久久精品色| 一本色道久久88精品综合| 一本色综合网久久| 久久久无码精品亚洲日韩按摩 | 亚洲色婷婷综合久久| 亚洲中文字幕无码久久2017| 99精品久久久久久久婷婷| 亚洲精品无码专区久久久| 蜜臀av性久久久久蜜臀aⅴ| 精品综合久久久久久888蜜芽| 国产美女久久精品香蕉69| 97超级碰碰碰碰久久久久| 久久人妻少妇嫩草AV无码蜜桃 | 中文字幕久久久久人妻| 久久精品国产久精国产思思| 99国产精品久久久久久久成人热| 精品久久久久久久| 久久久久无码国产精品不卡| 日韩欧美亚洲综合久久| 色婷婷综合久久久中文字幕| 久久综合狠狠色综合伊人| 性高湖久久久久久久久AAAAA| 久久人人爽人人爽人人爽| 久久99精品久久久久婷婷| 国产成人综合久久久久久| 漂亮人妻被中出中文字幕久久| 欧美牲交A欧牲交aⅴ久久| 色综合久久中文综合网| 欧美激情一区二区久久久| 国产91色综合久久免费| 一本大道久久东京热无码AV| 久久精品国产亚洲av水果派|