1
int ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long)
{
? return this->event_handling (&how_long, FALSE);
}
2
// Waits for and dispatches all events.? Returns -1 on error, 0 if
// max_wait_time expired, or the number of events that were dispatched.
int ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
????????????????????????????????????? int alertable)
{
? ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
? // Make sure we are not closed
? if (!this->open_for_business_ || this->deactivated_)
????? return -1;
? // Stash the current time -- the destructor of this object will
? // automatically compute how much time elapsed since this method was
? // called.
? ACE_Countdown_Time countdown (max_wait_time);
? int result;
?
? do
? {
????? // Check to see if it is ok to enter ::WaitForMultipleObjects
????? // This will acquire <this->lock_> on success On failure, the
????? // lock will not be acquired
?????
????? result = this->ok_to_wait (max_wait_time, alertable);
????? if (result != 1)
??????? return result;
????? // Increment the number of active threads
????? ++this->active_threads_;
????? // Release the <lock_>
????? this->lock_.release ();
????? // Update the countdown to reflect time waiting to play with the
????? // mut and event.
?????
????? countdown.update ();
????? // Calculate timeout
????? int timeout = this->calculate_timeout (max_wait_time);
????? // Wait for event to happen
????? DWORD wait_status = this->wait_for_multiple_events (timeout,
????????????????????????????????????????????????????????? alertable);
????? // Upcall
????? result = this->safe_dispatch (wait_status);
????? if (0 == result)
????? {
????????? // wait_for_multiple_events timed out without dispatching
????????? // anything.? Because of rounding and conversion errors and
????????? // such, it could be that the wait loop timed out, but
????????? // the timer queue said it wasn't quite ready to expire a
????????? // timer. In this case, max_wait_time won't have quite been
????????? // reduced to 0, and we need to go around again. If max_wait_time
????????? // is all the way to 0, just return, as the entire time the
????????? // caller wanted to wait has been used up.
????????? countdown.update ();???? // Reflect time waiting for events
?????????
????????? if (0 == max_wait_time || max_wait_time->usec () == 0)
??????????? break;
????? }
? }while (result == 0);
? return result;
}
3
int ACE_WFMO_Reactor::safe_dispatch (DWORD wait_status)
{
? int result = -1;
? ACE_SEH_TRY
? {
????? result = this->dispatch (wait_status);
? }
? ACE_SEH_FINALLY
? {
????? this->update_state ();
? }
? return result;
}
4
int ACE_WFMO_Reactor::dispatch (DWORD wait_status)
{
? int handlers_dispatched = 0;
? // Expire timers
? handlers_dispatched += this->expire_timers ();
? switch (wait_status)
? {
? case WAIT_FAILED: // Failure.
????? ACE_OS::set_errno_to_last_error ();
????? return -1;
? case WAIT_TIMEOUT: // Timeout.
????? errno = ETIME;
????? return handlers_dispatched;
? default:? // Dispatch.
????? // We'll let dispatch worry about abandoned mutes.
????? handlers_dispatched += this->dispatch_handles (wait_status);
????? return handlers_dispatched;
? }
}
5
// Dispatches any active handles from <handles_[slot]> to
// <handles_[max_handlep1_]>, polling through our handle set looking
// for active handles.
int ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
{
? // dispatch_slot is the absolute slot.? Only += is used to
? // increment it.
? DWORD dispatch_slot = 0;
? // Cache this value, this is the absolute value.
? DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
? // nCount starts off at <max_handlep1>, this is a transient count of
? // handles last waited on.
? DWORD nCount = max_handlep1;
? for (int number_of_handlers_dispatched = 1;;++number_of_handlers_dispatched)
? {
????? const bool ok = (wait_status >= WAIT_OBJECT_0 && wait_status <= (WAIT_OBJECT_0 + nCount));
????? if (ok)
??????? dispatch_slot += wait_status - WAIT_OBJECT_0;
????? else
??????? // Otherwise, a handle was abandoned.
??????? dispatch_slot += wait_status - WAIT_ABANDONED_0;
????? // Dispatch handler
????? if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
??????? return -1;
????? // Increment slot
????? ++dispatch_slot;
????? // We're done.
????? if (dispatch_slot >= max_handlep1)
??????? return number_of_handlers_dispatched;
????? // Readjust nCount
????? nCount = max_handlep1 - dispatch_slot;
????? // Check the remaining handles
????? wait_status = this->poll_remaining_handles (dispatch_slot);
????? switch (wait_status)
????? {
??????? case WAIT_FAILED: // Failure.
????????? ACE_OS::set_errno_to_last_error ();
????????? /* FALLTHRU */
??????? case WAIT_TIMEOUT:
????????? // There are no more handles ready, we can return.
????????? return number_of_handlers_dispatched;
????? }
? }
}
6
int ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
??????????????????????????????????????? DWORD max_handlep1)
{
? // Check if there are window messages that need to be dispatched
? if (slot == max_handlep1)
??? return this->dispatch_window_messages ();
? // Dispatch the handler if it has not been scheduled for deletion.
? // Note that this is a very week test if there are multiple threads
? // dispatching this slot as no locks are held here. Generally, you
? // do not want to do something like deleting the this pointer in
? // handle_close() if you have registered multiple times and there is
? // more than one thread in WFMO_Reactor->handle_events().
? else if (!this->handler_rep_.scheduled_for_deletion (slot))
? {
????? ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
????? if (this->handler_rep_.current_info ()[slot].io_entry_)
??????? return this->complex_dispatch_handler (slot,
?????????????????????????????????????????????? event_handle);
????? else
??????? return this->simple_dispatch_handler (slot,
????????????????????????????????????????????? event_handle);
? }
? else
??? // The handle was scheduled for deletion, so we will skip it.
??? return 0;
}
7
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
??????????????????????????????????????????????? ACE_HANDLE event_handle)
{
? // This dispatch is used for I/O entires.
? ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
??? this->handler_rep_.current_info ()[slot];
? WSANETWORKEVENTS events;
? ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
? if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
????????????????????????????? event_handle,
????????????????????????????? &events) == SOCKET_ERROR)
??? problems = ACE_Event_Handler::ALL_EVENTS_MASK;
? else
? {
????? // Prepare for upcalls. Clear the bits from <events> representing
????? // events the handler is not interested in. If there are any left,
????? // do the upcall(s). upcall will replace events.lNetworkEvents
????? // with bits representing any functions that requested a repeat
????? // callback before checking handles again. In this case, continue
????? // to call back unless the handler is unregistered as a result of
????? // one of the upcalls. The way this is written, the upcalls will
????? // keep being done even if one or more upcalls reported problems.
????? // In practice this may turn out not so good, but let's see. If any
????? // problems, please notify Steve Huston <shuston@riverace.com>
????? // before or after you change this code.
????? events.lNetworkEvents &= current_info.network_events_;
????? while (events.lNetworkEvents != 0)
????? {
????????? ACE_Event_Handler *event_handler = current_info.event_handler_;
????????? int reference_counting_required =
??????????? event_handler->reference_counting_policy ().value () ==
??????????? ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
????????? // Call add_reference() if needed.
????????? if (reference_counting_required)
????????? {
????????????? event_handler->add_reference ();
????????? }
????????? // Upcall
????????? problems |= this->upcall (current_info.event_handler_,
??????????????????????????????????? current_info.io_handle_,
??????????????????????????????????? events);
????????? // Call remove_reference() if needed.
????????? if (reference_counting_required)
????????? {
????????????? event_handler->remove_reference ();
????????? }
????????? if (this->handler_rep_.scheduled_for_deletion (slot))
??????????? break;
????? }
? }
? if (problems != ACE_Event_Handler::NULL_MASK
????? && !this->handler_rep_.scheduled_for_deletion (slot)? )
??? this->handler_rep_.unbind (event_handle, problems);
? return 0;
}
8
ACE_Reactor_Mask ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
?????????????????????????????????????????? ACE_HANDLE io_handle,
?????????????????????????????????????????? WSANETWORKEVENTS &events)
{
? // This method figures out what exactly has happened to the socket
? // and then calls appropriate methods.
? ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
? // Go through the events and do the indicated upcalls. If the handler
? // doesn't want to be called back, clear the bit for that event.
? // At the end, set the bits back to <events> to request a repeat call.
? long actual_events = events.lNetworkEvents;
? int action;
? if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
? {
????? action = event_handler->handle_output (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_WRITE);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
? {
????? if (events.iErrorCode[FD_CONNECT_BIT] == 0)
????? {
????????? // Successful connect
????????? action = event_handler->handle_output (io_handle);
????????? if (action <= 0)
????????? {
????????????? ACE_CLR_BITS (actual_events, FD_CONNECT);
????????????? if (action == -1)
??????????????? ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
????????? }
????? }
????? // Unsuccessful connect
????? else
????? {
????????? action = event_handler->handle_input (io_handle);
????????? if (action <= 0)
????????? {
????????????? ACE_CLR_BITS (actual_events, FD_CONNECT);
????????????? if (action == -1)
??????????????? ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
????????? }
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_OOB))
? {
????? action = event_handler->handle_exception (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_OOB);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_READ))
? {
????? action = event_handler->handle_input (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_READ);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
????? && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
? {
????? action = event_handler->handle_input (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_CLOSE);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
? {
????? action = event_handler->handle_input (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_ACCEPT);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_QOS))
? {
????? action = event_handler->handle_qos (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_QOS);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
? {
????? action = event_handler->handle_group_qos (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
????? }
? }
? events.lNetworkEvents = actual_events;
? return problems;
}
posted on 2007-02-22 10:54
walkspeed 閱讀(1594)
評論(0) 編輯 收藏 引用 所屬分類:
ACE Farmeworks