接受器/連接器模式設計用于降低連接建立與連接建立后所執行的服務之間的耦合。例如,在WWW瀏覽器中,所執行的服務或“實際工作”是解析和顯示客戶瀏覽器接收到的HTML頁面。連接建立是次要的,可能通過BSD socket或其他一些等價的IPC機制來完成。使用這些模式允許程序員專注于“實際工作”,而最少限度地去關心怎樣在服務器和客戶之間建立連接。而另外一方面,程序員也可以獨立于他所編寫的、或將要編寫的服務例程,去調諧連接建立的策略。
因為該模式降低了服務和連接建立方法之間的耦合,非常容易改動其中一個,而不影響另外一個,從而也就可以復用以前編寫的連接建立機制和服務例程的代碼。在同樣的例子中,使用這些模式的瀏覽器程序員一開始可以構造他的系統、使用特定的連接建立機制來運行它和測試它;然后,如果先前的連接機制被證明不適合他所構造的系統,他可以決定說他希望將底層連接機制改變為多線程的(或許使用線程池策略)。因為此模式提供了嚴格的去耦合,只需要極少的努力就可以實現這樣的變動。
在你能夠清楚地理解這一章中的許多例子,特別是更為高級的部分之前,你必須通讀有關反應堆和IPC_SAP的章節(特別是接受器和連接器部分)。此外,你還可能需要參考線程和線程管理部分。
?
?
7.1
接受器模式
??? 接受器通常被用在你本來會使用
BSD accept()
系統調用的地方。接受器模式也適用于同樣的環境,但就如我們將看到的,它提供了更多的功能。在
ACE
中,接收器模式借助名為
ACE_Acceptor
的“工廠”(
Factory
)實現。工廠(通常)是用于對助手對象的實例化過程進行抽象的類。在面向對象設計中,復雜的類常常會將特定功能委托給助手類。復雜的類對助手的創建和委托必須很靈活。這種靈活性是在工廠的幫助下獲得的。工廠允許一個對象通過改變它所委托的對象來改變它的底層策略,而工廠提供給應用的接口卻是一樣的,這樣,可能根本就無需對客戶代碼進行改動(有關工廠的更多信息,請閱讀“設計模式”中的參考文獻)。
?
?
圖7-1 工廠模式示意圖
?????? ACE_Acceptor
工廠允許應用開發者改變“助手”對象,以用于:
?? 同樣地,
ACE_Connector
工廠允許應用開發者改變“助手”對象,以用于:
?
??? 下面的討論同時適用于接受器和連接器,所以作者將只討論接受器,而連接器同樣具有相應的參數。
???????? ACE_Acceptor
被實現為模板容器,通過兩個類作為實參來進行實例化。第一個參數實現特定的服務(類型為
ACE_Event_Handler
。因為它被用于處理
I/O
事件,所以必須來自事件處理類層次),應用在建立連接后執行該服務;第二個參數是“具體的”接受器(可以是在
IPC_SAP
一章中討論的各種變種)。
特別要注意的是ACE_Acceptor工廠和底層所用的具體接受器是非常不同的。具體接受器可完全獨立于ACE_Acceptor工廠使用,而無需涉及我們在這里討論的接受器模式(獨立使用接受器已在IPC_SAP一章中討論和演示)。ACE_Acceptor工廠內在于接受器模式,并且不能在沒有底層具體接受器的情況下使用。ACE_Acceptor使用底層的具體接受器來建立連接。如我們已看到的,有若干ACE的類可被用作ACE_Acceptor工廠模板的第二個參數(也就是,具體接受器類)。但是服務處理類必須由應用開發者來實現,而且其類型必須是ACE_Event_Handler。ACE_Acceptor工廠可以這樣來實例化:
typedef ACE_Acceptor<My_Service_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;
??? 這里,名為
My_Service_Handler
的事件處理器和具體接受器
ACE_SOCK_ACCEPTOR
被傳給
MyAcceptor
。
ACE_SOCK_ACCEPTOR
是基于
BSD socket
流家族的
TCP
接受器(各種可傳給接受器工廠的不同類型的接受器,見表
7-1
和
IPC
一章)。請再次注意,在使用接受器模式時,我們總是處理兩個接受器:名為
ACE_Acceptor
的工廠接受器,和
ACE
中的某種具體接受器,比如
ACE_SOCK_ACCEPTOR
(你可以創建自定義的具體接受器來取代
ACE_SOCK_ACCEPTOR
,但你將很可能無需改變
ACE_Acceptor
工廠類中的任何東西)。
重要提示
:
ACE_SOCK_ACCEPTOR實際上是一個宏,其定義為:#define ACE_SOCK_ACCEPTOR ACE_SOCK_Acceptor, ACE_INET_Addr我們認為這個宏的使用是必要的,因為在類中的typedefs在某些平臺上無法工作。如果不是這樣的話,ACE_Acceptor就可以這樣來實例化:typedef ACE_Acceptor<My_Service_Handler,ACE_SOCK_Acceptor>MyAcceptor;在表7-1中對宏進行了說明。7.1.1 組件??? 如上面的討論所說明的,在接受器模式中有三個主要的參與類:?
??? 具體接受器
:它含有建立連接的特定策略,連接與底層的傳輸協議機制系在一起。下面是在ACE中的各種具體接受器的例子:ACE_SOCK_ACCEPTOR(使用TCP來建立連接)、ACE_LSOCK_ACCEPTOR(使用UNIX域socket來建立連接),等等。 :由應用開發者編寫,它的open()方法在連接建立后被自動回調。接受器構架假定服務處理類的類型是ACE_Event_Handler,這是ACE定義的接口類(該類已在反應堆一章中詳細討論過)。另一個特別為接受器和連接器模式的服務處理而創建的類是ACE_Svc_Handler。該類不僅基于ACE_Event_Handler接口(這是使用反應堆所必需的),同時還基于在ASX流構架中使用的ACE_Task類。ACE_Task類提供的功能有:創建分離的線程、使用消息隊列來存儲到來的數據消息、并發地處理它們,以及其他一些有用的功能。如果與接受器模式一起使用的具體服務處理器派生自ACE_Svc_Handler、而不是ACE_Event_Handler,它就可以獲得這些額外的功能。對ACE_Svc_Handler中的額外功能的使用,在這一章的高級課程里詳細討論。在下面的討論中,我們將使用ACE_Svc_Handler作為我們的事件處理器。在簡單的ACE_Event_Handler和ACE_Svc_Handler類之間的重要區別是,后者擁有一個底層通信流組件。這個流在ACE_Svc_Handler模板被實例化的時候設置。而在使用ACE_Event_Handler的情況下,我們必須自己增加I/O通信端點(也就是,流對象),作為事件處理器的私有數據成員。因而,在這樣的情況下,應用開發者應該將他的服務處理器創建為ACE_Svc_Handler類的子類,并首先實現將被構架自動回調的open()方法。此外,因為ACE_Svc_Handler是一個模板,通信流組件和鎖定機制是作為模板參數被傳入的。 反應堆:與ACE_Acceptor協同使用。如我們將看到的,在實例化接受器后,我們啟動反應堆的事件處理循環。反應堆,如先前所解釋的,是一個事件分派類;而在此情況下,它被接受器用于將連接建立事件分派到適當的服務處理例程。 7.1.2 用法??? 通過觀察一個簡單的例子,可以進一步了解接受器。這個例子是一個簡單的應用,它使用接受器接受連接,隨后回調服務例程。當服務例程被回調時,它就讓用戶知道連接已經被成功地建立。例7-1#include ”ace/Reactor.h”#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
//Create a Service Handler whose open() method will be called back
//automatically. This
class MUST derive from ACE_Svc_Handler which is an//interface and as can be seen is a
template container class itself. The//first parameter to ACE_Svc_Handler is the
underlying stream that it//may use for communication. Since we are using TCP sockets the
stream//is ACE_SOCK_STREAM. The second is the internal synchronization
//mechanism it
could use. Since we have a single threaded application we//pass it a “null” lock which
will do nothing.class My_Svc_Handler:
public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>
{
//the open method which will be called back automatically after the
//connection has been
established.public:
int open(void*)
{
cout<<”Connection established”<<endl;
}
};
// Create the acceptor as described above.
typedef ACE_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;
int main(int argc, char* argv[])
{
//create the address on which we wish to connect. The constructor takes
//the port
number on which to listen and will automatically take the//host’s IP address as the IP
Address for the addr object ?ACE_INET_Addr addr(PORT_NUM);
//instantiate the appropriate acceptor object with the address on which
//we wish to
accept and the Reactor instance we want to use. In this//case we just use the global
ACE_Reactor singleton. (Read more about//the reactor in the previous chapter)
MyAcceptor acceptor(addr, ACE_Reactor::instance());
while(1)
// Start the reactor’s event loop
ACE_Reactor::instance()->handle_events();
}
??? 在上面的例子中,我們首先創建我們希望在其上接受連接的端點地址。因為我們決定使用TCP/IP作為底層的連接協議,我們創建一個ACE_INET_Addr來作為我們的端點,并將我們所要偵聽的端口號傳給它。我們將這個地址和反應堆單體的實例傳給我們要在此之后進行實例化的接受器。這個接受器在被實例化后,將自動接受任何在PORT_NUM端口上的連接請求,并且在為這樣的請求建立連接之后回調My_Svc_Handler的open()方法。注意當我們實例化ACE_Acceptor工廠時,我們傳給它的是我們想要使用的具體接受器(也就是ACE_SOCK_ACCEPTOR)和具體服務處理器(也就是My_Svc_Handler)。 現在讓我們嘗試一下更為有趣的事情。在下一個例子中,我們將在連接請求到達、服務處理器被回調之后,將服務處理器登記回反應堆。現在,如果新創建的連接上有任何數據到達,我們的服務處理例程
handle_input()方法都會被自動回調。因而在此例中,我們在同時使用反應堆和接受器的特性:例7-2#include ”ace/Reactor.h”#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
#define PORT_NUM 10101
#define DATA_SIZE 12
//forward declaration
class My_Svc_Handler;
//Create the Acceptor class
typedef ACE_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR>
MyAcceptor;
//Create a service handler similar to as seen in example 1. Except this
//time include the handle_input() method which will be called back
//automatically by the reactor when new data arrives on the newly
//established connection
class My_Svc_Handler:
public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>
{
public:
My_Svc_Handler()
{
data= new char[DATA_SIZE];
}
int open(void*)
{
cout<<”Connection established”<<endl;
//Register the service handler with the reactor
ACE_Reactor::instance()->register_handler(this,
ACE_Event_Handler::READ_MASK);
return 0;
}
int handle_input(ACE_HANDLE)
{
//After using the peer() method of ACE_Svc_Handler to obtain a
//reference to the underlying stream of the service handler class
//we call recv_n() on it to read the data which has been received.
//This data is stored in the data array and then printed out
peer().recv_n(data,DATA_SIZE);
ACE_OS::printf(”<< %s\n”,data);
//keep yourself registered with the reactor
return 0;
}
private:
char* data;
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(PORT_NUM);
//create the acceptor
MyAcceptor acceptor(addr, //address to accept on
ACE_Reactor::instance()); //the reactor to use
while(1)
//Start the reactor’s event loop
ACE_Reactor::instance()->handle_events();
}
??? 這個例子和前一例子的唯一區別是我們在服務處理器的open()方法中將服務處理器登記到反應堆上。因此,我們必須編寫handle_input()方法;當數據在連接上到達時,這個方法會被反應堆回調。在此例中我們只是將我們接收到的數據打印到屏幕上。ACE_Svc_Handler類的peer()方法返回對底層的對端流的引用。我們使用底層流包裝類的recv_n()方法來獲取連接上接收到的數據。 該模式真正的威力在于,底層的連接建立機制完全封裝在具體接受器中,從而可以很容易地改變。在下一個例子里,我們改變底層的連接建立機制,讓它使用
UNIX域socket、而不是TCP socket。這個例子(下劃線標明少量變動)如下所示:例7-3class My_Svc_Handler:public ACE_Svc_Handler <ACE_LSOCK_STREAM,ACE_NULL_SYNCH>{
public:
int open(void*)
{
cout<<”Connection established”<<endl;
ACE_Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
}
int handle_input(ACE_HANDLE)
{
char* data= new char[DATA_SIZE];
peer().recv_n(data,DATA_SIZE);
ACE_OS::printf(”<< %s\n”,data);
return 0;
}
};
typedef ACE_Acceptor<My_Svc_Handler,ACE_LSOCK_ACCEPTOR> MyAcceptor;
int main(int argc, char* argv[])
{
ACE_UNIX_Addr addr(”/tmp/addr.ace”);
MyAcceptor acceptor(address, ACE_Reactor::instance());
while(1) /* Start the reactor’s event loop */
ACE_Reactor::instance()->handle_events();
}
?例
7-2和例7-3不同的地方標注了下劃線。正如我們所說過的,兩個程序間的不同非常之少,但是它們使用的連接建立范式卻極不相同。ACE中可用的連接建立機制在表7-1中列出:
接受器類型 | 所用地址 | 所用流 | 具體接受器 |
TCP 流接受器 | ACE_INET_Addr | ACE_SOCK_STREAM | ACE_SOCK_ACCEPTOR |
UNIX 域本地流socket接受器 | ACE_UNIX_Addr | ACE_LSOCK_STREAM | ACE_LSOCK_ACCEPTOR |
管道作為底層通信機制 | ACE_SPIPE_Addr | ACE_SPIPE_STREAM | ACE_SPIPE_ACCEPTOR |
?表
7-1 ACE中的連接建立機制?
?7.2
連接器? 連接器與接受器非常類似。它也是一個工廠,但卻是用于主動地連接遠程主機。在連接建立后,它將自動回調適當的服務處理對象的
open()方法。連接器通常被用在你本來會使用BSD connect()調用的地方。在ACE中,連接器,就如同接受器,被實現為名為ACE_Connector的模板容器類。如先前所提到的,它需要兩個參數,第一個是事件處理器類,它在連接建立時被調用;第二個是“具體的”連接器類。 你必須注意,底層的具體連接器和
ACE_Connector工廠是非常不一樣的。ACE_Connector工廠使用底層的具體連接器來建立連接。隨后ACE_Connector工廠使用適當的事件或服務處理例程(通過模板參數傳入)來在具體的連接器建立起連接之后處理新連接。如我們在IPC一章中看到的,沒有ACE_Connector工廠,也可以使用這個具體的連接器。但是,沒有具體的連接器類,就會無法使用ACE_Connector工廠(因為要由具體的連接器類來實際處理連接建立)。 下面是對
ACE_Connector類進行實例化的一個例子:?
?typedef ACE_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR> MyConnector;
?
?這個例子中的第二個參數是具體連接器類
ACE_SOCK_CONNECTOR。連接器和接受器模式一樣,在內部使用反應堆來在連接建立后回調服務處理器的open()方法。我們可以復用我們為前面的接受器例子所寫的服務處理例程。 一個使用連接器的例子可以進一步說明這一點:
?
?例
7-4?typedef ACE_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR> MyConnector;
int main(int argc, char * argv[])
{
ACE_INET_Addr addr(PORT_NO,HOSTNAME);
My_Svc_Handler * handler= new My_Svc_Handler;
//Create the connector
MyConnector connector;
//Connects to remote machine
if(connector.connect(handler,addr)==-1)
ACE_ERROR(LM_ERROR,”%P|%t, %p”,”Connection failed”);
//Registers with the Reactor
while(1)
ACE_Reactor::instance()->handle_events();
}
?
在上面的例子中,
HOSTNAME和PORT_NO是我們希望主動連接到的機器和端口。在實例化連接器之后,我們調用它的連接方法,將服務例程(會在連接完全建立后被回調),以及我們希望連接到的地址傳遞給它。?
?
7.2.1 同時使用接受器和連接器? 一般而言,接受器和連接器模式會在一起使用。在客戶
/服務器應用中,服務器通常含有接受器,而客戶含有連接器。但是,在特定的應用中,可能需要同時使用接受器和連接器。下面給出這樣的應用的一個例子:一條消息被反復發送給對端機器,而與此同時也從遠端接受另一消息。因為兩種功能必須同時執行,簡單的解決方案就是分別在不同的線程里發送和接收消息。 這個例子同時包含接受器和連接器。用戶可以在命令行上給出參數,告訴應用它想要扮演服務器還是客戶角色。隨后應用就相應地調用
main_accept()或main_connect()。?
?例
7-5?#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
#include ”ace/Thread.h”
//Add our own Reactor singleton
typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;
//Create an Acceptor
typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;
//Create a Connector
typedef ACE_Connector<MyServiceHandler,ACE_SOCK_CONNECTOR> Connector;
class MyServiceHandler:
public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_NULL_SYNCH>
{
public:
//Used by the two threads “globally” to determine their peer stream
static ACE_SOCK_Stream* Peer;
//Thread ID used to identify the threads
ACE_thread_t t_id;
int open(void*)
{
cout<<”Acceptor: received new connection”<<endl;
//Register with the reactor to remember this handle
Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
//Determine the peer stream and record it globally
MyServiceHandler::Peer=&peer();
//Spawn new thread to send string every second
ACE_Thread::spawn((ACE_THR_FUNC)send_data,0,THR_NEW_LWP,&t_id);
//keep the service handler registered by returning 0 to the
//reactor
return 0;
}
static void* send_data(void*)
{
while(1)
{
cout<<”>>Hello World”<<endl;
Peer->send_n(”Hello World”,sizeof(”Hello World”));
//Go to sleep for a second before sending again
ACE_OS::sleep(1);
}
return 0;
}
int handle_input(ACE_HANDLE)
{
char* data= new char[12];
//Check if peer aborted the connection
if(Peer.recv_n(data,12)==0)
{
cout<<”Peer probably aborted connection”);
ACE_Thread::cancel(t_id); //kill sending thread ..
return -1; //de-register from the Reactor.
}
//Show what you got..
cout<<”<< %s\n”,data”<<endl;
//keep yourself registered
return 0;
}
};
//Global stream identifier used by both threads
ACE_SOCK_Stream * MyServiceHandler::Peer=0;
void main_accept()
{
ACE_INET_Addr addr(PORT_NO);
Acceptor myacceptor(addr,Reactor::instance());
while(1)
Reactor::instance()->handle_events();
return 0;
}
void main_connect()
{
ACE_INET_Addr addr(PORT_NO,HOSTNAME);
Connector myconnector;
myconnector.connect(my_svc_handler,addr);
while(1)
Reactor::instance()->handle_events();
}
int main(int argc, char* argv[])
{
// Use ACE_Get_Opt to parse and obtain arguments and then call the
// appropriate function for accept or connect.
...
}
?這個簡單的例子演示怎樣聯合使用接受器和連接模式來生成服務處理例程,該例程與底層的網絡連接建立方法是完全分離的。通過改變相應的設定具體連接器和接受器的模板參數,可以很容易地改用任何其他的底層網絡連接建立協議。
?
?7.3
高級課程? 下面的部分更為詳細地解釋接受器和連接器模式實際上是如何工作的。如果你想要調諧服務處理和連接建立策略(其中包括調諧底層具體連接器將要使用的服務處理例程的創建和并發策略,以及連接建立策略),對該模式的進一步了解就是必要的。此外,還有一部分內容解釋怎樣使用通過
ACE_Svc_Handler類自動獲得的高級特性。最后,我們說明怎樣與接受器和連接器模式一起使用簡單的輕量級ACE_Event_Handler。?
?
7.3.1 ACE_SVC_HANDLER類? 如上面所提到的,
ACE_Svc_Handler類基于ACE_Task(它是ASX流構架的一部分)和ACE_Event_Handler接口類。因而ACE_Svc_Handler既是任務,又是事件處理器。這里我們將簡要介紹ACE_Task和ACE_Svc_Handler的功能。?
?7.3.1.1 ACE_Task
? ACE_Task
被設計為與ASX流構架一起使用;ASX基于UNIX系統V中的流機制。在設計上ASX與Larry Peterson構建的X-kernel協議工具非常類似。? ASX
的基本概念是:到來的消息會被分配給由若干模塊(module)組成的流。每個模塊在到來的消息上執行某種固定操作,然后把它傳遞給下一個模塊作進一步處理,直到它到達流的末端為止。模塊中的實際處理由任務來完成。每個模塊通常有兩個任務,一個用于處理到來的消息,一個用于處理外出的消息。在構造協議棧時,這種結構是非常有用的。因為每個模塊都有固定的簡單接口,所創建的模塊可以很容易地在不同的應用間復用。例如,設想一個應用,它處理來自數據鏈路層的消息。程序員會構造若干模塊,每個模塊分別處理不同層次的協議。因而,他會構造一個單獨的模塊,進行網絡層處理;另一個進行傳輸層處理;還有一個進行表示層處理。在構造這些模塊之后,它們可以(在ASX的幫助下)被“串”成一個流來使用。如果后來創建了一個新的(也許是更好的)傳輸模塊,就可以在不對程序產生任何影響的情況下、在流中替換先前的傳輸模塊。注意模塊就像是容納任務的容器。這些任務是實際的處理元件。一個模塊可能需要兩個任務,如同在上面的例子中;也可能只需要一個任務。如你可能會猜到的,ACE_Task是模塊中被稱為任務的處理元件的實現。?
?7.3.1.2
任務通信的體系結構? 每個
ACE_Task都有一個內部的消息隊列,用以與其他任務、模塊或是外部世界通信。如果一個ACE_Task想要發送一條消息給另一個任務,它就將此消息放入目的任務的消息隊列中。一旦目的任務收到此消息,它就會立即對它進行處理。 所有
ACE_Task都可以作為0個或多個線程來運行。消息可以由多個線程放入ACE_Task的消息隊列,或是從中取出,程序員無需擔心破壞任何數據結構。因而任務可被用作由多個協作線程組成的系統的基礎構建組件。各個線程控制都可封裝在ACE_Task中,與其他任務通過發送消息到它們的消息隊列來進行交互。 這種體系結構的唯一問題是,任務只能通過消息隊列與在同一進程內的其他任務相互通信。
ACE_Svc_Handler解決了這一問題,它同時繼承自ACE_Task和ACE_Event_Handler,并且增加了一個私有數據流。這種結合使得ACE_Svc_Handler對象能夠用作這樣的任務:它能夠處理事件、并與遠地主機的任務間發送和接收數據。? ACE_Task
被實現為模板容器,它通過鎖定機制來進行實例化。該鎖用于保證內部的消息隊列在多線程環境中的完整性。如先前所提到的,ACE_Svc_Handler模板容器不僅需要鎖定機制,還需要用于與遠地任務通信的底層數據流來作為參數。
7.3.1.3 創建ACE_Svc_Handler
?
ACE_Svc_Handler
模板通過鎖定機制和底層流來實例化,以創建所需的服務處理器。如果應用只是單線程的,就不需要使用鎖,可以用ACE_NULL_SYNCH來將其實例化。但是,如果我們想要在多線程應用中使用這個模板,可以這樣來進行實例化:?
?class MySvcHandler:
public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>
?{
?...
?}
?
?7.3.1.4
在服務處理器中創建多個線程? 在上面的例
7-5中,我們使用ACE_Thread包裝類和它的靜態方法spawn(),創建了單獨的線程來發送數據給遠地對端。但是,在我們完成此工作時,我們必須定義使用C++ static修飾符的文件范圍內的靜態send_data()方法。結果當然就是,我們無法訪問我們實例化的實際對象的任何數據成員。換句話說,我們被迫使send_data()成員函數成為class-wide的函數,而這并不是我們所想要的。這樣做的唯一原因是,ACE_Thread::spawn()只能使用靜態成員函數來作為它所創建的線程的入口。另一個有害的副作用是到對端流的引用也必須成為靜態的。簡而言之,這不是編寫這些代碼的最好方式。? ACE_Task
提供了更好的機制來避免發生這樣的問題。每個ACE_Task都有activate()方法,可用于為ACE_Task創建線程。所創建的線程的入口是非靜態成員函數svc()。因為svc()是非靜態函數,它可以調用任何對象實例專有的數據或成員函數。ACE對程序員隱藏了該機制的所有實現細節。activate()方法有著非常多的用途,它允許程序員創建多個線程,所有這些線程都使用svc()方法作為它們的入口。還可以設置線程優先級、句柄、名字,等等。activate()方法的原型是:?
// = Active object activation method.
virtual int activate (long flags = THR_NEW_LWP,
int n_threads = 1,
int force_active = 0,
long priority = ACE_DEFAULT_THREAD_PRIORITY,
int grp_id = -1,
ACE_Task_Base *task = 0,
ACE_hthread_t thread_handles[] = 0,
void *stack[] = 0,
size_t stack_size[] = 0,
ACE_thread_t thread_names[] = 0);
? 第一個參數
flags描述將要創建的線程所希望具有的屬性。在線程一章里有詳細描述。可用的標志有:?
THR_CANCEL_DISABLE, THR_CANCEL_ENABLE, THR_CANCEL_DEFERRED,
THR_CANCEL_ASYNCHRONOUS, THR_BOUND, THR_NEW_LWP, THR_DETACHED,
THR_SUSPENDED, THR_DAEMON, THR_JOINABLE, THR_SCHED_FIFO,
THR_SCHED_RR, THR_SCHED_DEFAULT
? 第二個參數
n_threads指定要創建的線程的數目。第三個參數force_active用于指定是否應該創建新線程,即使activate()方法已在先前被調用過、因而任務或服務處理器已經在運行多個線程。如果此參數被設為false(0),且如果activate()是再次被調用,該方法就會設置失敗代碼,而不會生成更多的線程。 第四個參數用于設置運行線程的優先級。缺省情況下,或優先級被設為
ACE_DEFAULT_THREAD_PRIORITY,方法會使用給定的調度策略(在flags中指定,例如,THR_SCHED_DEFAULT)的“適當”優先級。這個值是動態計算的,并且是在給定策略的最低和最高優先級之間。如果顯式地給定一個值,這個值就會被使用。注意實際的優先級值極大地依賴于實現,最好不要直接使用。在線程一章中,可讀到更多有關線程優先級的內容。 還可以傳入將要創建的線程的線程句柄、線程名和棧空間,以在線程創建過程中使用。如果它們被設置為
NULL,它們就不會被使用。但是如果要使用activate創建多個線程,就必須傳入線程的名字或句柄,才能有效地對它們進行使用。 下面的例子可以幫助你進一步理解
activate方法的使用:?
?例
7-6?#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
class MyServiceHandler; //forward declaration
typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;
typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;
class MyServiceHandler:
public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>
{
// The two thread names are kept here
ACE_thread_t thread_names[2];
public:
int open(void*)
{
ACE_DEBUG((LM_DEBUG, ”Acceptor: received new connection \n”));
//Register with the reactor to remember this handler..
Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
ACE_DEBUG((LM_DEBUG,”Acceptor: ThreadID:(%t) open\n”));
//Create two new threads to create and send messages to the
//remote machine.
activate(THR_NEW_LWP,
2, //2 new threads
0, //force active false, if already created don’t try again.
ACE_DEFAULT_THREAD_PRIORITY,//Use default thread priority
-1,
this,//Which ACE_Task object to create? In this case this one.
0,// don’t care about thread handles used
0,// don’t care about where stacks are created
0,//don’t care about stack sizes
thread_names); // keep identifiers in thread_names
//keep the service handler registered with the acceptor.
return 0;
}
void send_message1(void)
{
//Send message type 1
ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));
//Send the data to the remote peer
ACE_DEBUG((LM_DEBUG,”Sent message1”));
peer().send_n(”Message1”,LENGTH_MSG_1);
} //end send_message1
int send_message2(void)
{
//Send message type 1
ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));
//Send the data to the remote peer
ACE_DEBUG((LM_DEBUG,”Sent Message2”));
peer().send_n(”Message2”,LENGTH_MSG_2);
}//end send_message_2
int svc(void)
{
ACE_DEBUG( (LM_DEBUG,”(%t) Svc thread \n”));
if(ACE_Thread::self()== thread_names[0])
while(1) send_message1(); //send message 1s forever
else
while(1) send_message2(); //send message 2s forever
return 0; // keep the com, piler happy.
}
int handle_input(ACE_HANDLE)
{
ACE_DEBUG((LM_DEBUG,”(%t) handle_input ::”));
char* data= new char[13];
//Check if peer aborted the connection
if(peer().recv_n(data,12)==0)
{
printf(”Peer probably aborted connection”);
return -1; //de-register from the Reactor.
}
//Show what you got..
ACE_OS::printf(”<< %s\n”,data);
//keep yourself registered
return 0;
}
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(10101);
ACE_DEBUG((LM_DEBUG,”Thread: (%t) main”));
//Prepare to accept connections
Acceptor myacceptor(addr,Reactor::instance());
// wait for something to happen.
while(1)
Reactor::instance()->handle_events();
return 0;
}
?在此例中,服務處理器在它的
open()方法中被登記到反應堆上,隨后程序調用activate()來創建2個線程。線程的名字被記錄下來,以便在它們調用svc()例程時,我們可以將它們區別開。每個線程發送一條不同類型的消息給遠地對端。注意在此例中,線程的創建是完全透明的。此外,因為入口是普通的非靜態成員函數,它不需要進行“丑陋的”改動來記住數據成員,比如說對端流。無論何時只要我們需要,我們就可以簡單地調用成員函數peer()來獲取底層的流。7.3.1.5使用服務處理器中的消息隊列機制
如前面所提到的,
ACE_Svc_Handler類擁有內建的消息隊列。這個消息隊列被用作在ACE_Svc_Handler和外部世界之間的主要通信接口。其他任務想要發給該服務處理器的消息被放入它的消息隊列中。這些消息會在單獨的線程里(通過調用activate()方法創建)處理。隨后另一個線程就可以把處理過的消息通過網絡發送給另外的遠地目的地(很可能是另外的ACE_Svc_Handler)。 如先前所提到的,在這種多線程情況下,
ACE_Svc_Handler會自動地使用鎖來確保消息隊列的完整性。所用的鎖即通過實例化ACE_Svc_Handler模板類創建具體服務處理器時所傳遞的鎖。之所用通過這樣的方式來傳遞鎖,是因為這樣程序員就可以對他的應用進行“調諧”。不同平臺上的不同鎖定機制有著不同程度的開銷。如果需要,程序員可以創建他自己的優化的、遵從ACE的鎖接口定義的鎖,并將其用于服務處理器。這是程序員通過使用ACE可獲得的靈活性的又一范例。重要的是程序員必須意識到,在此服務處理例程中的額外線程將帶來顯著的鎖定開銷。為將此開銷降至最低,程序員必須仔細地設計他的程序,確保使這樣的開銷最小化。特別地,上面描述的例子有可能導致過度的開銷,在大多數情況下可能并不實用。 ACE_Task
,進而是ACE_Svc_Handler(因為服務處理器也是一種任務),具有若干可用于對底層隊列進行設置、操作、入隊和出隊操作的方法。這里我們將只討論這些方法中的一部分。因為在服務處理器中(通過使用msg_queue()方法)可以獲取指向消息隊列自身的指針,所以也可以直接調用底層隊列(也就是,ACE_Message_Queue)的所有公共方法。(有關消息隊列提供的所有方法的更多細節,請參見后面的“消息隊列”一章。) 如上面所提到的,服務處理器的底層消息隊列是
ACE_Message_Queue的實例,它是由服務處理器自動創建的。在大多數情況下,沒有必要調用ACE_Message_Queue的底層方法,因為在ACE_Svc_Handler類中已對它們的大多數進行了包裝。ACE_Message_Queue是用于使ACE_Message_Block進隊或出隊的隊列。每個ACE_Message_Block都含有指向“引用計數”(reference-counted)的ACE_Data_Block的指針,ACE_Data_Block依次又指向存儲在塊中的實際數據(見“消息隊列”一章)。這使得ACE_Message_Block可以很容易地進行數據共享。 ACE_Message_Block
的主要作用是進行高效數據操作,而不帶來許多拷貝開銷。每個消息塊都有一個讀指針和寫指針。無論何時我們從塊中讀取時,讀指針會在數據塊中向前增長。類似地,當我們向塊中寫的時候,寫指針也會向前移動,這很像在流類型系統中的情況。可以通過ACE_Message_Block的構造器向它傳遞分配器,以用于分配內存(有關Allocator的更多信息,參見“內存管理”一章)。例如,可以使用ACE_Cached_Allocation_Strategy,它預先分配內存并從內存池中返回指針,而不是在需要的時候才從堆中分配內存。這樣的功能在需要可預測的性能時十分有用,比如在實時系統中。 下面的例子演示怎樣使用消息隊列的一些功能:
例
7-7#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
#include ”ace/Thread.h”
#define NETWORK_SPEED 3
class MyServiceHandler; //forward declaration
typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;
typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;
class MyServiceHandler:
public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>{
// The message sender and creator threads are handled here.
ACE_thread_t thread_names[2];
public:
int open(void*)
{
ACE_DEBUG((LM_DEBUG, ”Acceptor: received new connection \n”));
//Register with the reactor to remember this handler..
Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
ACE_DEBUG((LM_DEBUG,”Acceptor: ThreadID:(%t) open\n”));
//Create two new threads to create and send messages to the
//remote machine.
activate(THR_NEW_LWP,
2, //2 new threads
0,
ACE_DEFAULT_THREAD_PRIORITY,
-1,
this,
0,
0,
0,
thread_names); // identifiers in thread_handles
//keep the service handler registered with the acceptor.
return 0;
}
void send_message(void)
{
//Dequeue the message and send it off
ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));
//dequeue the message from the message queue
ACE_Message_Block *mb;
ACE_ASSERT(this->getq(mb)!=-1);
int length=mb->length();
char *data =mb->rd_ptr();
//Send the data to the remote peer
ACE_DEBUG((LM_DEBUG,”%s \n”,data,length));
peer().send_n(data,length);
//Simulate very SLOW network.
ACE_OS::sleep(NETWORK_SPEED);
//release the message block
mb->release();
} //end send_message
int construct_message(void)
{
// A very fast message creation algorithm
// would lead to the need for queuing messages..
// here. These messages are created and then sent
// using the SLOW send_message() routine which is
// running in a different thread so that the message
//construction thread isn’t blocked.
ACE_DEBUG((LM_DEBUG,”(%t)Constructing message::>> ”));
// Create a new message to send
ACE_Message_Block *mb;
char *data=”Hello Connector”;
ACE_NEW_RETURN (mb,ACE_Message_Block (16,//Message 16 bytes long
ACE_Message_Block::MB_DATA,//Set header to data
0,//No continuations.
data//The data we want to send
), 0);
mb->wr_ptr(16); //Set the write pointer.
// Enqueue the message into the message queue
// we COULD have done a timed wait for enqueuing in case
// someone else holds the lock to the queue so it doesn’t block
//forever..
ACE_ASSERT(this->putq(mb)!=-1);
ACE_DEBUG((LM_DEBUG,”Enqueued msg successfully\n”));
}
int svc(void)
{
ACE_DEBUG( (LM_DEBUG,”(%t) Svc thread \n”));
//call the message creator thread
if(ACE_Thread::self()== thread_names[0])
while(1) construct_message(); //create messages forever
else
while(1) send_message(); //send messages forever
return 0; // keep the compiler happy.
}
int handle_input(ACE_HANDLE)
{
ACE_DEBUG((LM_DEBUG,”(%t) handle_input ::”));
char* data= new char[13];
//Check if peer aborted the connection
if(peer().recv_n(data,12)==0)
{
printf(”Peer probably aborted connection”);
return -1; //de-register from the Reactor.
}
//Show what you got..
ACE_OS::printf(”<< %s\n”,data);
//keep yourself registered
return 0;
}
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(10101);
ACE_DEBUG((LM_DEBUG,”Thread: (%t) main”));
//Prepare to accept connections
Acceptor myacceptor(addr,Reactor::instance());
// wait for something to happen.
while(1)
Reactor::instance()->handle_events();
return 0;
}
這個例子演示怎樣使用
putq()和getq()方法來在隊列中放入或取出消息塊。它還演示怎樣創建消息塊,隨后設置它的寫指針,并根據它的讀指針進行讀取。注意消息塊中的實際數據的起始位置由消息塊的讀指針指示。消息塊的length()成員函數返回在消息塊中存儲的底層數據的長度,其中不包括ACE_Message_Block中用于管理目的的部分。另外,我們也顯示了怎樣使用release()方法來釋放消息塊(mb)。 要了解更多關于如何使用消息塊、數據塊或是消息隊列的信息,請閱讀此教程中有關“消息隊列”、
ASX框架和其他相關的部分。
7.4
接受器和連接器模式工作原理 接受器和連接器工廠(也就是
ACE_Connector和ACE_Acceptor)有著非常類似的運行結構。它們的工作可大致劃分為三個階段:
端點或連接初始化階段
服務初始化階段
服務處理階段
7.4.1 端點或連接初始化階段 在使用接受器的情況下,應用級程序員可以調用
ACE_Acceptor工廠的open()方法,或是它的缺省構造器(它實際上會調用open()方法),來開始被動偵聽連接。當接受器工廠的open()方法被調用時,如果反應堆單體還沒有被實例化,open()方法就首先對其進行實例化。隨后它調用底層具體接受器的open()方法。于是具體接受器會完成必要的初始化來偵聽連接。例如,在使用ACE_SOCK_Acceptor的情況中,它打開socket,將其綁定到用戶想要在其上偵聽新連接的端口和地址上。在綁定端口后,它將會發出偵聽調用。open方法隨后將接受器工廠登記到反應堆。因而在接收到任何到來的連接請求時,反應堆會自動回調接受器工廠的handle_input()方法。注意正是因為這一原因,接受器工廠才從ACE_Event_Handler類層次派生;這樣它才可以響應ACCEPT事件,并被反應堆自動回調。 在使用連接器的情況中,應用程序員調用連接器工廠的
connect()方法或connect_n()方法來發起到對端的連接。除了其他一些選項,這兩個方法的參數包括我們想要連接到的遠地地址,以及我們是想要同步還是異步地完成連接。我們可以同步或異步地發起NUMBER_CONN個連接:
//Synchronous
OurConnector.connect_n(NUMBER_CONN,ArrayofMySvcHandlers,Remote_Addr,0,
ACE_Synch_Options::synch);
//Asynchronous
OurConnector.connect_n(NUMBER_CONN,ArrayofMySvcHandlers,Remote_Addr,0,
ACE_Synch_Options::asynch);
如果連接請求是異步的,
ACE_Connector會在反應堆上登記自己,等待連接被建立(ACE_Connector也派生自ACE_Event_Handler類層次)。一旦連接被建立,反應堆將隨即自動回調連接器。但如果連接請求是同步的,connect()調用將會阻塞,直到連接被建立、或是超時到期為止。超時值可通過改變特定的ACE_Synch_Options來指定。詳情請參見參考手冊。
7.4.2 接受器的服務初始化階段 在有連接請求在指定的地址和端口上到來時,反應堆自動回調
ACE_Acceptor工廠的handle_input()方法。 該方法是一個“模板方法”(
Template Method)。模板方法用于定義一個算法的若干步驟的順序,并允許改變特定步驟的執行。這種變動是通過允許子類定義這些方法的實現來完成的。(有關模板方法的更多信息見“設計模式”參考指南)。 在我們的這個案例中,模板方法將算法定義如下:
:創建服務處理器。 accept_svc_handler():將連接接受進前一步驟創建的服務處理器。 activate_svc_handler():啟動這個新服務處理器。
這些方法都可以被重新編寫,從而靈活地決定這些操作怎樣來實際執行。
這樣,
handle_input()將首先調用make_svc_handler()方法,創建適當類型的服務處理器(如我們在上面的例子中所看到的那樣,服務處理器的類型由應用程序員在ACE_Acceptor模板被實例化時傳入)。在缺省情況下,make_svc_handler()方法只是實例化恰當的服務處理器。但是,make_svc_handler()是一個“橋接”(bridge)方法,可被重載以提供更多復雜功能。(橋接是一種設計模式,它使類層次的接口與實現去耦合。參閱“設計模式”參考文獻)。例如,服務處理器可創建為進程級或線程級的單體,或者從庫中動態鏈接,從磁盤中加載,甚或通過更復雜的方式創建,如從數據庫中查找并獲取服務處理器,并將它裝入內存。 在服務處理器被創建后,
handle_input()方法調用accept_svc_handler()。該方法將連接“接受進”服務處理器;缺省方式是調用底層具體接受器的accept()方法。在ACE_SOCK_Acceptor被用作具體接受器的情況下,它調用BSD accept()例程來建立連接(“接受”連接)。在連接建立后,連接句柄在服務處理器中被自動設置(接受“進”服務處理器);這個服務處理器是先前通過調用make_svc_handler()創建的。該方法也可被重載,以提供更復雜的功能。例如,不是實際創建新連接,而是“回收利用”舊連接。在我們演示各種不同的接受和連接策略時,將更為詳盡地討論這一點。
7.4.3 連接器的服務初始化階段 應用發出的
connect()方法與接受器工廠中的handle_input()相類似,也就是,它是一個“模板方法”。 在我們的這個案例中,模板方法
connect()定義下面一些可被重定義的步驟:
:創建服務處理器。 connect_svc_handler():將連接接受進前一步驟創建的服務處理器。 activate_svc_handler():啟動這個新服務處理器。
每一方法都可以被重新編寫,從而靈活地決定這些操作怎樣來實際執行。
這樣,在應用發出
connect()調用后,連接器工廠通過調用make_svc_handler()來實例化恰當的服務處理器,一如在接受器的案例中所做的那樣。其缺省行為只是實例化適當的類,并且也可以通過與接受器完全相同的方式重載。進行這樣的重載的原因可以與上面提到的原因非常類似。 在服務處理器被創建后,
connect()調用確定連接是要成為異步的還是同步的。如果是異步的,在繼續下一步驟之前,它將自己登記到反應堆,隨后調用connect_svc_handler()方法。該方法的缺省行為是調用底層具體連接器的connect()方法。在使用ACE_SOCK_Connector的情況下,這意味著將適當的選項設置為阻塞或非阻塞式I/O,然后發出BSD connect()調用。如果連接被指定為同步的,connect()調用將會阻塞、直到連接完全建立。在這種情況下,在連接建立后,它將在服務處理器中設置句柄,以與它現在連接到的對端通信(該句柄即是通過在服務處理器中調用peer()方法獲得的在流中存儲的句柄,見上面的例子)。在服務處理器中設置句柄后,連接器模式將進行到最后階段:服務處理。 如果連接被指定為異步的,在向底層的具體連接器發出非阻塞式
connect()調用后,對connect_svc_handler()的調用將立即返回。在使用ACE_SOCK_Connector的情況中,這意味著發出非阻塞式BSD connect()調用。在連接稍后被實際建立時,反應堆將回調ACE_Connector工廠的handle_output()方法,該方法在通過make_svc_handler()方法創建的服務處理器中設置新句柄。然后工廠將進行到下一階段:服務處理。 與
accept_svc_handler()情況一樣,connect_svc_handler()是一個“橋接”方法,可進行重載以提供變化的功能。
7.4.4 服務處理 一旦服務處理器被創建、連接被建立,以及句柄在服務處理器中被設置,
ACE_Acceptor的handle_input()方法(或者在使用ACE_Connector的情況下,是handle_output()或connect_svc_handler())將調用activate_svc_handler()方法。該方法將隨即啟用服務處理器。其缺省行為是調用作為服務處理器的入口的open()方法。如我們在上面的例子中所看到的,在服務處理器開始執行時,open()方法是第一個被調用的方法。是在open()方法中,我們調用activate()方法來創建多個線程控制;并在反應堆上登記服務處理器,這樣當新的數據在連接上到達時,它會被自動回調。該方法也是一個“橋接”方法,可被重載以提供更為復雜的功能。特別地,這個重載的方法可以提供更為復雜的并發策略,比如,在另一不同的進程中運行服務處理器。
7.5
調諧接受器和連接器策略 如上面所提到的,因為使用了可以重載的橋接方法,很容易對接受器和連接器進行調諧。橋接方法允許調諧:
服務處理器的創建策略:通過重載接受器或連接器的make_svc_handler()方法來實現。例如,這可以意味著復用已有的服務處理器,或使用某種復雜的方法來獲取服務處理器,如上面所討論的那樣。 連接策略:連接創建策略可通過重載connect_svc_handler()或accept_svc_handler()方法來改變。 服務處理器的并發策略:服務處理器的并發策略可通過重載activate_svc_handler()方法來改變。例如,服務處理器可以在另外的進程中創建。
如上所示,調諧是通過重載
ACE_Acceptor或ACE_Connector類的橋接方法來完成的。ACE的設計使得程序員很容易完成這樣的重載和調諧。
7.5.1 ACE_Strategy_Connector和ACE_Strategy_Acceptor類 為了方便上面所提到的對接受器和連接器模式的調諧方法,
ACE提供了兩種特殊的“可調諧”接受器和連接器工廠,那就是ACE_Strategy_Acceptor和ACE_Strategy_Connector。它們和ACE_Acceptor與ACE_Connector非常類似,同時還使用了“策略”模式。 策略模式被用于使算法行為與類的接口去耦合。其基本概念是允許一個類(稱為
Context Class,上下文類)的底層算法獨立于使用該類的客戶進行變動。這是通過具體策略類的幫助來完成的。具體策略類封裝執行操作的算法或方法。這些具體策略類隨后被上下文類用于執行各種操作(上下文類將“工作”委托給具體策略類)。因為上下文類不直接執行任何操作,當需要改變功能時,無需對它進行修改。對上下文類所做的唯一修改是使用另一個具體策略類來執行改變了的操作。(要閱讀有關策略模式的更多信息,參見“設計模式”的附錄)。 在
ACE中,ACE_Strategy_Connector和ACE_Strategy_Acceptor使用若干具體策略類來改變算法,以創建服務處理器,建立連接,以及為服務處理器設置并發方法。如你可能已經猜到的一樣,ACE_Strategy_Connector和ACE_Strategy_Acceptor利用了上面提到的橋接方法所提供的可調諧性。
7.5.1.1
使用策略接受器和連接器 在
ACE中已有若干具體的策略類可用于“調諧”策略接受器和連接器。當類被實例化時,它們作為參數被傳入策略接受器或連接器。表7-2顯示了可用于調諧策略接受器和連接器類的一些類。需要修改 | 具體策略類 | 描述 |
創建策略 ( 重定義make_svc_handler()) | ACE_NOOP_Creation_Strategy | 這個具體策略并不實例化服務處理器,而只是一個空操作。 |
ACE_Singleton_Strategy | 保證服務處理器被創建為單體。也就是,所有連接將有效地使用同一個服務處理例程。 |
ACE_DLL_Strategy | 通過從動態鏈接庫中動態鏈接服務處理器來對它進行創建。 |
連接策略 ( 重定義connect_svc_handler()) | ACE_Cached_Connect_Strategy | 檢查是否有已經連接到特定的遠地地址的服務處理器沒有在被使用。如果有這樣一個服務處理器,就對它進行復用。 |
并發策略 ( 重定義activate_svc_handler()) | ACE_NOOP_Concurrency_Strategy | 一個“無為”( do-nothing)的并發策略。它甚至不調用服務處理器的open()方法。 |
ACE_Process_Strategy | 在另外的進程中創建服務處理器,并調用它的 open()方法。 |
ACE_Reactive_Strategy | 先在反應堆上登記服務處理器,然后調用它的 open()方法。 |
ACE_Thread_Strategy | 先調用服務處理器的 open()方法,然后調用它的activate()方法,以讓另外的線程來啟動服務處理器的svc()方法。 |
表
7-2 用于調諧策略接受器和連接器類的類
下面的例子演示策略接受器和連接器類的使用。
例
7-8#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
#define PORT_NUM 10101
#define DATA_SIZE 12
//forward declaration
class My_Svc_Handler;
//instantiate a strategy acceptor
typedef ACE_Strategy_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;
//instantiate a concurrency strategy
typedef ACE_Process_Strategy<My_Svc_Handler> Concurrency_Strategy;
// Define the Service Handler
class My_Svc_Handler:
public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>
{
private:
char* data;
public:
My_Svc_Handler()
{
data= new char[DATA_SIZE];
}
My_Svc_Handler(ACE_Thread_Manager* tm)
{
data= new char[DATA_SIZE];
}
int open(void*)
{
cout<<”Connection established”<<endl;
//Register with the reactor
ACE_Event_Handler::READ_MASK);
return 0;
}
int handle_input(ACE_HANDLE)
{
peer().recv_n(data,DATA_SIZE);
ACE_OS::printf(”<< %s\n”,data);
// keep yourself registered with the reactor
return 0;
}
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(PORT_NUM);
//Concurrency Strategy
Concurrency_Strategy my_con_strat;
//Instantiate the acceptor
MyAcceptor acceptor(addr, //address to accept on
ACE_Reactor::instance(), //the reactor to use
0,
// don’t care about creation strategy0, // don’t care about connection estb. strategy
&my_con_strat
); // use our new process concurrency strategy
while(1) /* Start the reactor’s event loop */
ACE_Reactor::instance()->handle_events();
}
這個例子基于上面的例
7-2。唯一的不同是它使用了ACE_Strategy_Acceptor,而不是使用ACE_Acceptor;并且它還使用ACE_Process_Strategy作為服務處理器的并發策略。這種并發策略保證一旦連接建立后,服務處理器在單獨的進程中被實例化。如果在特定服務上的負載變得過于繁重,使用ACE_Process_Strategy可能是一個好主意。但是,在大多數情況下,使用ACE_Process_Strategy會過于昂貴,而ACE_Thread_Strategy可能是更好的選擇。
7.5.1.2
使用ACE_Cached_Connect_Strategy進行連接緩存
在許多應用中,客戶會連接到服務器,然后重新連接到同一服務器若干次;每次都要建立連接,執行某些工作,然后掛斷連接(比如像在
Web客戶中所做的那樣)。不用說,這樣做是非常低效而昂貴的,因為連接建立和掛斷是非常昂貴的操作。在這樣的情況下,連接者可以采用一種更好的策略:“記住”老連接并保持它,直到確定客戶不會再重新建立連接為止。ACE_Cached_Connect_Strategy就提供了這樣一種緩存策略。這個策略對象被ACE_Strategy_Connector用于提供基于緩存的連接建立。如果一個連接已經存在,ACE_Strategy_Connector將會復用它,而不是創建新的連接。 當客戶試圖重新建立連接到先前已經連接的服務器時,
ACE_Cached_Connect_Strategy確保對老的連接和服務處理器進行復用,而不是創建新的連接和服務處理器。因而,實際上,ACE_Cached_Connect_Strategy不僅管理連接建立策略,它還管理服務處理器創建策略。因為在此例中,用戶不想創建新的服務處理器,我們將ACE_Null_Creation_Strategy傳遞給ACE_Strategy_Connector。如果連接先前沒有建立過,ACE_Cached_Connect_Strategy將自動使用內部的創建策略來實例化適當的服務處理器,它是在這個模板類被實例化時傳入的。這個策略可被設置為用戶想要使用的任何一種策略。除此而外,也可以將ACE_Cached_Connect_Strategy自己在其構造器中使用的創建、并發和recycling策略傳給它。下面的例子演示這些概念:
例
7-9#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Connector.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Connector.h”
#include ”ace/INET_Addr.h”
#define PORT_NUM 10101
#define DATA_SIZE 16
//forward declaration
class My_Svc_Handler;
//Function prototype
static void make_connections(void *arg);
// Template specializations for the hashing function for the
// hash_map which is used by the cache. The cache is used internally by the
// Cached Connection Strategy . Here we use ACE_Hash_Addr
// as our external identifier. This utility class has already
// overloaded the == operator and the hash() method. (The
// hashing function). The hash() method delegates the work to
// hash_i() and we use the IP address and port to get a
// a unique integer hash value.
size_t ACE_Hash_Addr<ACE_INET_Addr>::hash_i (const ACE_INET_Addr &addr) const
{
return addr.get_ip_address () + addr.get_port_number ();
}
//instantiate a strategy acceptor
typedef ACE_Strategy_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR>
STRATEGY_CONNECTOR;
//Instantiate the Creation Strategy
typedef ACE_NOOP_Creation_Strategy<My_Svc_Handler>
NULL_CREATION_STRATEGY;
//Instantiate the Concurrency Strategy
typedef ACE_NOOP_Concurrency_Strategy<My_Svc_Handler>
NULL_CONCURRENCY_STRATEGY;
//Instantiate the Connection Strategy
typedef ACE_Cached_Connect_Strategy<My_Svc_Handler,
ACE_SOCK_CONNECTOR,
ACE_SYNCH_RW_MUTEX>
CACHED_CONNECT_STRATEGY;
class My_Svc_Handler:
public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_MT_SYNCH>
{
private:
char* data;
public:
My_Svc_Handler()
{
data= new char[DATA_SIZE];
}
My_Svc_Handler(ACE_Thread_Manager* tm)
{
data= new char[DATA_SIZE];
}
//Called before the service handler is recycled..
int
recycle (void *a=0)/P>
{
ACE_DEBUG ((LM_DEBUG,
”(%P|%t) recycling Svc_Handler %d with handle %d\n”,
this, this->peer ().get_handle ()));
return 0;
}
int open(void*)
{
ACE_DEBUG((LM_DEBUG,”(%t)Connection established \n”));
//Register the service handler with the reactor
ACE_Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
activate(THR_NEW_LWP|THR_DETACHED);
return 0;
}
int handle_input(ACE_HANDLE)
{
ACE_DEBUG((LM_DEBUG,”Got input in thread: (%t) \n”));
peer().recv_n(data,DATA_SIZE);
ACE_DEBUG((LM_DEBUG,”<< %s\n”,data));
//keep yourself registered with the reactor
return 0;
}
int svc(void)
{
//send a few messages and then mark connection as idle so that it can
// be recycled
later.ACE_DEBUG((LM_DEBUG,”Started the service routine \n”));
for(int i=0;i<3;i++)
{
ACE_DEBUG((LM_DEBUG,”(%t)>>Hello World\n”));
ACE_OS::fflush(stdout);
peer().send_n(”Hello World”,sizeof(”Hello World”));
}
//Mark the service handler as being idle now and let the
//other threads reuse this connection
this->idle(1);
//Wait for the thread to die
this->thr_mgr()->wait();
return 0;
}
};
ACE_INET_Addr *addr;
int main(int argc, char* argv[])
{
addr= new ACE_INET_Addr(PORT_NUM,argv[1]);
//Creation Strategy
NULL_CREATION_STRATEGY creation_strategy;
//Concurrency Strategy
NULL_CONCURRENCY_STRATEGY concurrency_strategy;
//Connection Strategy
CACHED_CONNECT_STRATEGY caching_connect_strategy;
//instantiate the connector
STRATEGY_CONNECTOR connector(
ACE_Reactor::instance(), //the reactor to use
&creation_strategy,
&caching_connect_strategy,
&concurrency_strategy);
//Use the thread manager to spawn a single thread
//to connect multiple times passing it the address
//of the strategy connector
if(ACE_Thread_Manager::instance()->spawn(
(ACE_THR_FUNC) make_connections,
(void *) &connector,
THR_NEW_LWP) == -1)
ACE_ERROR ((LM_ERROR, ”(%P|%t) %p\n%a”, ”client thread spawn failed”));
while(1) /* Start the reactor’s event loop */
ACE_Reactor::instance()->handle_events();
}
//Connection establishment function, tries to establish connections
//to the same server again and re-uses the connections from the cache
void make_connections(void *arg)
{
ACE_DEBUG((LM_DEBUG,”(%t)Prepared to connect \n”));
STRATEGY_CONNECTOR *connector= (STRATEGY_CONNECTOR*) arg;
for (int i = 0; i < 10; i++)
{
My_Svc_Handler *svc_handler = 0;
// Perform a blocking connect to the server using the Strategy
// Connector with a connection caching strategy. Since we are
// connecting to the same <server_addr> these calls will return the
// same dynamically allocated <Svc_Handler> for each <connect> call.
if (connector->connect (svc_handler, *addr) == -1)
{
ACE_ERROR ((LM_ERROR, ”(%P|%t) %p\n”, ”connection failed\n”));
return;
}
// Rest for a few seconds so that the connection has been freed up
ACE_OS::sleep (5);
}}
在上面的例子中,緩存式連接策略被用于緩存連接。要使用這一策略,需要一點額外的工作:定義
ACE_Cached_Connect_Strategy在內部使用的哈希映射管理器的hash()方法。這個hash()方法用于對服務處理器和ACE_Cached_Connect_Strategy內部使用的連接進行哈希運算,放入緩存映射中。它簡單地使用IP地址和端口號的總和作為哈希函數,這也許并不是很好的哈希函數。 這個例子比至今為止我們所列舉的例子都要復雜一點,所以有理由多進行一點討論。
我們為
ACE_Strategy_Acceptor使用空操作并發和創建策略。使用空操作創建策略是必須的。如上面所解釋的,如果沒有使用ACE_NOOP_Creation_Strategy,ACE_Cached_Connection_Strategy將會產生斷言失敗。但是,在使用ACE_Cached_Connect_Strategy時,任何并發策略都可以和策略接受器一起使用。如上面所提到的,ACE_Cached_Connect_Strategys所用的底層創建策略可以由用戶來設置。還可以設置recycling策略。這是在實例化caching_connect_strategy時,通過將所需的創建和recycling策略的對象傳給它的構造器來完成的。在這里我們沒有這樣做,而是使用了缺省的創建和recycling策略。 在適當地設置連接器后,我們使用
Thread_Manager來派生新線程,且將make_connections()方法作為線程的入口。該方法使用我們的新的策略連接器來連接到遠地站點。在連接建立后,該線程休眠5秒鐘,然后使用我們的緩存式連接器來重新創建同樣的連接。于是該線程應該在連接器緩存中找到這個連接并復用它。 和平常一樣,一旦連接建立后,反應堆回調我們的服務處理器(
My_Svc_Handler)。隨后My_Svc_Handler的open()方法通過調用My_Svc_Handler的activate()方法來使它成為主動對象。svc()方法隨后發送三條消息給遠地主機,并通過調用服務處理器的idle()方法將該連接標記為空閑。注意this->thr_mrg_wait()要求線程管理器等待所有在線程管理器中的線程終止。如果你不要求線程管理器等待其它線程,根據在ACE中設定的語義,一旦ACE_Task(在此例中是ACE_Task類型的ACE_Svc_Handler)中的線程終止,ACE_Task對象(在此例中是ACE_My_Svc_Handler)就會被自動刪除。如果發生了這種情況,在Cache_Connect_Strategy查找先前緩存的連接時,它就不會如我們期望的那樣找到My_Svc_Handler,因為它已經被刪除掉了。 在
My_Svc_Handler中還重載了ACE_Svc_Handler中的recycle()方法。當有舊連接被ACE_Cache_Connect_Strategy找到時,這個方法就會被自動回調,這樣服務處理器就可以在此方法中完成回收利用所特有的操作。在我們的例子中,我們只是打印出在緩存中找到的處理器的this指針的地址。在程序運行時,每次連接建立后所使用的句柄的地址是相同的,從而說明緩存工作正常。
7.6
通過接受器和連接器模式使用簡單事件處理器 有時, ,使用重量級的
ACE_Svc_Handler作為接受器和連接器的處理器不僅沒有必要,而且會導致代碼臃腫。在這樣情況下,用戶可以使用較輕的ACE_Event_Handler方法來作為反應堆在連接一旦建立時所回調的類。要采用這種方法,程序員需要重載get_handle()方法,并包含將要用于事件處理器的具體底層流。下面的例子有助于演示這些變動。這里我們還編寫了新的peer()方法,它返回底層流的引用(reference),就像在ACE_Svc_Handler類中所做的那樣。
例
7-10#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
#define PORT_NUM 10101
#define DATA_SIZE 12
//forward declaration
class My_Event_Handler;
//Create the Acceptor class
typedef ACE_Acceptor<My_Event_Handler,ACE_SOCK_ACCEPTOR>
MyAcceptor;
//Create an event handler similar to as seen in example 2. We have to
//overload the get_handle() method and write the peer() method. We also
//provide the data member peer_ as the underlying stream which is
//used.
class My_Event_Handler: public ACE_Event_Handler
{
private:
char* data;
//Add a new attribute for the underlying stream which will be used by
//the Event Handler
ACE_SOCK_Stream peer_;
public:
My_Event_Handler()
{
data= new char[DATA_SIZE];
}
int open(void*)
{
cout<<”Connection established”<<endl;
//Register the event handler with the reactor
ACE_Reactor::instance()->register_handler(this,
ACE_Event_Handler::READ_MASK);
return 0;
}
int handle_input(ACE_HANDLE)
{
// After using the peer() method of our ACE_Event_Handler to obtain a
//reference to the underlying stream of the service handler class we
//call recv_n() on it to read the data which has been received. This
//data is stored in the data array and then printed out
peer().recv_n(data,DATA_SIZE);
ACE_OS::printf(”<< %s\n”,data);
// keep yourself registered with the reactor
return 0;
}
// new method which returns the handle to the reactor when it
//asks for it.
ACE_HANDLE get_handle(void) const
{
return this->peer_.get_handle();
}
//new method which returns a reference to the peer stream
ACE_SOCK_Stream &peer(void) const
{
return (ACE_SOCK_Stream &) this->peer_;
}
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(PORT_NUM);
//create the acceptor
MyAcceptor acceptor(addr, //address to accept on
ACE_Reactor::instance()); //the reactor to use
while(1) /* Start the reactor’s event loop */
ACE_Reactor::instance()->handle_events();
}
文轉載至
ACE開發者/FONT>
posted on 2007-02-27 21:40
walkspeed 閱讀(7244)
評論(1) 編輯 收藏 引用 所屬分類:
ACE Farmeworks