摘要: 9.1
意圖
接受器-連接器設(shè)計模式(Acceptor-Connector)使分布式系統(tǒng)中的連接建立及服務(wù)初始化與一旦服務(wù)初始化后所執(zhí)行的處理去耦合。這樣的去耦合通過三種組件來完成:acceptor、connector和service handler(服務(wù)處理器)。連接器主動地建立到遠(yuǎn)地接受器組件的連接,并初始化服務(wù)處理器來處理在連接上交...
閱讀全文
posted @
2007-02-27 21:47 walkspeed 閱讀(3585) |
評論 (0) |
編輯 收藏
摘要: 接受器/連接器模式設(shè)計用于降低連接建立與連接建立后所執(zhí)行的服務(wù)之間的耦合。例如,在WWW瀏覽器中,所執(zhí)行的服務(wù)或“實際工作”是解析和顯示客戶瀏覽器接收到的HTML頁面。連接建立是次要的,可能通過BSD socket或其他一些等價的IPC機制來完成。使用這些模式允許程序員專注于“實際工作”,而最少限度地去關(guān)心怎樣在服務(wù)器和客戶之間建立連接。而另外一方面,程序員也可以獨立于他所編寫的、或?qū)⒁帉懙姆?wù)...
閱讀全文
posted @
2007-02-27 21:40 walkspeed 閱讀(7244) |
評論 (1) |
編輯 收藏
第
8
章
前攝器(
Proactor
):用于為異步事件多路分離和分派處理器的對象行為模式
Irfan Pyarali ? Tim Harrison?? Douglas C. Schmidt?? Thomas D. Jordan
現(xiàn)代操作系統(tǒng)為開發(fā)并發(fā)應(yīng)用提供了多種機制。同步多線程是一種流行的機制,用于開發(fā)同時執(zhí)行多個操作的應(yīng)用。但是,線程常常有很高的性能開銷,并且需要對同步模式和原理有深入的了解。因此,有越來越多的操作系統(tǒng)支持異步機制,在減少多線程的大量開銷和復(fù)雜性的同時,提供了并發(fā)的好處。
本論文中介紹的前攝器(Proactor
)模式描述怎樣構(gòu)造應(yīng)用和系統(tǒng),以有效地利用操作系統(tǒng)支持的異步機制。當(dāng)應(yīng)用調(diào)用異步操作時,OS
代表應(yīng)用執(zhí)行此操作。這使得應(yīng)用可以讓多個操作同時運行,而又不需要應(yīng)用擁有相應(yīng)數(shù)目的線程。因此,通過使用更少的線程和有效利用OS
對異步操作的支持,前攝器模式簡化了并發(fā)編程,并改善了性能。
前攝器模式支持多個事件處理器的多路分離和分派,這些處理器由異步事件的完成來觸發(fā)。通過集成完成事件(completion event)的多路分離和相應(yīng)的事件處理器的分派,該模式簡化了異步應(yīng)用的開發(fā)。
這一部分提供使用前攝器模式的上下文和動機。
前攝器模式應(yīng)該被用于應(yīng)用需要并發(fā)執(zhí)行操作的性能好處、又不想受到同步多線程或反應(yīng)式編程的約束時。為說明這些好處,設(shè)想一個需要并發(fā)執(zhí)行多個操作的網(wǎng)絡(luò)應(yīng)用。例如,一個高性能Web服務(wù)器必須并發(fā)處理發(fā)送自多個客戶的HTTP請求[1, 2]。圖8-1 顯示了Web瀏覽器和Web服務(wù)器之間的典型交互。當(dāng)用戶指示瀏覽器打開一個URL時,瀏覽器發(fā)送一個HTTP GET請求給Web服務(wù)器。收到請求,服務(wù)器就解析并校驗請求,并將指定的文件發(fā)回給瀏覽器。
圖8-1 典型的Web服務(wù)器通信軟件體系結(jié)構(gòu)
開發(fā)高性能Web服務(wù)器要求消除以下壓力:
-
并發(fā)性:服務(wù)器必須同時執(zhí)行多個客戶請求;
-
效率:服務(wù)器必須最小化響應(yīng)延遲、最大化吞吐量,并避免不必要地使用CPU;
-
編程簡單性:服務(wù)器的設(shè)計應(yīng)該簡化高效的并發(fā)策略的使用;
-
可適配性:應(yīng)該使繼承新的或改進的傳輸協(xié)議(比如HTTP 1.1[3])所帶來的維護代價最小化。
Web服務(wù)器可以使用若干并發(fā)策略來實現(xiàn),包括多個同步線程、反應(yīng)式同步事件分派和前攝式異步事件分派。下面,我們檢查傳統(tǒng)方法的缺點,并解釋前攝器模式是怎樣提供一種強大的技術(shù),為高性能并發(fā)應(yīng)用而支持高效、靈活的異步事件分派策略的。
8.2.2
傳統(tǒng)并發(fā)模型的常見陷阱和缺陷
同步的多線程和反應(yīng)式編程是實現(xiàn)并發(fā)的常用方法。這一部分描述這些編程模型的缺點。
8.2.2.1
通過多個同步線程實現(xiàn)的并發(fā)
或許最為直觀的實現(xiàn)并發(fā)Web服務(wù)器的途徑是使用同步的多線程。在此模型中,多個服務(wù)器線程同時處理來自多個客戶的HTTP GET請求。每個線程同步地執(zhí)行連接建立、HTTP請求讀取、請求解析和文件傳輸操作。作為結(jié)果,每個操作都阻塞直到完成。
同步線程的主要優(yōu)點是應(yīng)用代碼的簡化。特別是,Web服務(wù)器為服務(wù)客戶A的請求所執(zhí)行的操作在很大程度上獨立于為服務(wù)客戶B的請求所需的操作。因而,很容易在分離的線程中對不同的請求進行服務(wù),因為在線程之間共享的狀態(tài)數(shù)量很少;這也最小化了對同步的需要。而且,在分離的線程中執(zhí)行應(yīng)用邏輯也使得開發(fā)者可以使用直觀的順序命令和阻塞操作。
圖8-2 多線程Web服務(wù)器體系結(jié)構(gòu)
圖8-2顯示使用同步線程來設(shè)計的Web服務(wù)器怎樣并發(fā)地處理多個客戶請求。該圖顯示的Sync Acceptor對象封裝服務(wù)器端用于同步接受網(wǎng)絡(luò)連接的機制。使用“Thread Per Connection”并發(fā)模型,各個線程為服務(wù)HTTP GET請求所執(zhí)行的一系列步驟可被總結(jié)如下:
- 每個線程同步地阻塞在accept socket調(diào)用中,等待客戶連接請求;
- 客戶連接到服務(wù)器,連接被接受;
- 新客戶的HTTP請求被同步地從網(wǎng)絡(luò)連接中讀取;
- 請求被解析;
- 所請求的文件被同步地讀取;
- 文件被同步地發(fā)送給客戶。
附錄A.1中有一個將同步線程模型應(yīng)用于Web服務(wù)器的C++代碼例子。
如上所述,每個并發(fā)地連接的客戶由一個專用的服務(wù)器線程服務(wù)。在繼續(xù)為其他HTTP請求服務(wù)之前,該線程同步地完成一個被請求的操作。因此,要在服務(wù)多個客戶時執(zhí)行同步I/O,Web服務(wù)器必須派生多個線程。盡管這種同步線程模式是直觀的,且能夠相對高效地映射到多CPU平臺上,它還是有以下缺點:
線程策略與并發(fā)策略被緊耦合:這種體系結(jié)構(gòu)要求每個相連客戶都有一個專用的線程。通過針對可用資源(比如使用線程池來對應(yīng)CPU的數(shù)目)、而不是正被并發(fā)服務(wù)的客戶的數(shù)目來調(diào)整其線程策略,可能會更好地優(yōu)化一個并發(fā)應(yīng)用;
更大的同步復(fù)雜性:線程可能會增加序列化對服務(wù)器的共享資源(比如緩存文件和Web頁面點擊日志)的訪問所必需的同步機制的復(fù)雜性;
更多的性能開銷:由于上下文切換、同步和CPU間的數(shù)據(jù)移動[4],線程的執(zhí)行可能很低效;
不可移植性:線程有可能在有些平臺上不可用。而且,根據(jù)對占先式和非占先式線程的支持,OS平臺之間的差異非常大。因而,很難構(gòu)建能夠跨平臺統(tǒng)一運作的多線程服務(wù)器。
作為這些缺點的結(jié)果,多線程常常不是開發(fā)并發(fā)Web服務(wù)器的最為高效的、也不是最不復(fù)雜的解決方案。
8.2.2.2
通過反應(yīng)式同步事件分派實現(xiàn)的并發(fā)
另一種實現(xiàn)同步Web服務(wù)器的常用方法是使用反應(yīng)式事件分派模型。反應(yīng)堆(Reactor)模式描述應(yīng)用怎樣將Event Handler登記到Initiation Dispatcher。Initiation Dispatcher通知Event Handler何時能發(fā)起一項操作而不阻塞。
單線程并發(fā)Web服務(wù)器可以使用反應(yīng)式事件分派模型,它在一個事件循環(huán)中等待Reactor通知它發(fā)起適當(dāng)?shù)牟僮鳌eb服務(wù)器中反應(yīng)式操作的一個例子是Acceptor(接受器)[6]到Initiation Dispatcher的登記。當(dāng)數(shù)據(jù)在網(wǎng)絡(luò)連接上到達(dá)時,分派器回調(diào)Acceptor,后者接受網(wǎng)絡(luò)連接,并創(chuàng)建HTTP Handler。于是這個HTTP Handler就登記到Reactor,以在Web服務(wù)器的單線程控制中處理在那個連接上到來的URL請求。
圖8-3和圖8-4顯示使用反應(yīng)式事件分派設(shè)計的Web服務(wù)器怎樣處理多個客戶。圖8-3顯示當(dāng)客戶連接到Web服務(wù)器時所采取的步驟。圖8-4顯示W(wǎng)eb服務(wù)器怎樣處理客戶請求。圖8-3的一系列步驟可被總結(jié)如下:
圖8-3 客戶連接到反應(yīng)式Web服務(wù)器
圖8-4 客戶發(fā)送HTTP請求到反應(yīng)式Web服務(wù)器
- Web服務(wù)器將Acceptor登記到Initiation Dispatcher,以接受新連接;
- Web服務(wù)器調(diào)用Initiation Dispatcher的事件循環(huán);
- 客戶連接到Web服務(wù)器;
- Initiation Dispatcher將新連接請求通知Acceptor,后者接受新連接;
- Acceptor創(chuàng)建HTTP Handler,以服務(wù)新客戶;
- HTTP Handler將連接登記到Initiation Dispatcher,以讀取客戶請求數(shù)據(jù)(就是說,在連接變得“讀就緒”時);
- HTTP Handler服務(wù)來自新客戶的請求。
圖8-4顯示反應(yīng)式Web服務(wù)器為服務(wù)HTTP GET請求所采取的一系列步驟。該過程描述如下:
- 客戶發(fā)送HTTP GET請求;
- 當(dāng)客戶請求數(shù)據(jù)到達(dá)服務(wù)器時,Initiation Dispatcher通知HTTP Handler;
- 請求以非阻塞方式被讀取,于是如果操作會導(dǎo)致調(diào)用線程阻塞,讀操作就返回EWOULDBLOCK(步驟2和3將重復(fù)直到請求被完全讀取);
- HTTP Handler解析HTTP請求;
- 所請求的文件從文件系統(tǒng)中被同步讀取;
- 為發(fā)送文件數(shù)據(jù)(就是說,當(dāng)連接變得“寫就緒”時),HTTP Handler將連接登記到Initiation Dispatcher;
- 當(dāng)TCP連接變得寫就緒時,Initiation Dispatcher通知HTTP Handler;
- HTTP Handler以非阻塞方式將所請求文件發(fā)送給客戶,于是如果操作會導(dǎo)致調(diào)用線程阻塞,寫操作就返回EWOULDBLOCK(步驟7和8將重復(fù)直到數(shù)據(jù)被完全遞送)。
附錄A.2中有一個將反應(yīng)式事件分派模型應(yīng)用于Web服務(wù)器的C++代碼例子。
因為Initiation Dispatcher運行在單線程中,網(wǎng)絡(luò)I/O操作以非阻塞方式運行在Reactor的控制之下。如果當(dāng)前操作的進度停止了,操作就被轉(zhuǎn)手給Initiation Dispatcher,由它監(jiān)控系統(tǒng)操作的狀態(tài)。當(dāng)操作可以再度前進時,適當(dāng)?shù)腅vent Handler會被通知。
反應(yīng)式模式的主要優(yōu)點是可移植性,粗粒度并發(fā)控制帶來的低開銷(就是說,單線程不需要同步或上下文切換),以及通過使應(yīng)用邏輯與分派機制去耦合所獲得的模塊性。但是,該方法有以下缺點:
復(fù)雜的編程:如從前面的列表所看到的,程序員必須編寫復(fù)雜的邏輯,以保證服務(wù)器不會在服務(wù)一個特定客戶時阻塞。
缺乏多線程的OS
支持:大多數(shù)操作系統(tǒng)通過select系統(tǒng)調(diào)用[7]來實現(xiàn)反應(yīng)式分派模型。但是,select不允許多于一個的線程在同一個描述符集上等待。這使得反應(yīng)式模型不適用于高性能應(yīng)用,因為它沒有有效地利用硬件的并行性。
可運行任務(wù)的調(diào)度:在支持占先式線程的同步多線程體系結(jié)構(gòu)中,將可運行線程調(diào)度并時分(time-slice)到可用CPU上是操作系統(tǒng)的責(zé)任。這樣的調(diào)度支持在反應(yīng)式體系結(jié)構(gòu)中不可用,因為在應(yīng)用中只有一個線程。因此,系統(tǒng)的開發(fā)者必須小心地在所有連接到Web服務(wù)器的客戶之間將線程分時。這只能通過執(zhí)行短持續(xù)時間、非阻塞的操作來完成。
作為這些缺點的結(jié)果,當(dāng)硬件并行可用時,反應(yīng)式事件分派不是最為高效的模型。由于需要避免使用阻塞I/O,該模式還有著相對較高的編程復(fù)雜度。
8.2.3
解決方案:通過前攝式操作實現(xiàn)的并發(fā)
當(dāng)OS平臺支持異步操作時,一種高效而方便的實現(xiàn)高性能Web服務(wù)器的方法是使用前攝式事件分派。使用前攝式事件分派模型設(shè)計的Web服務(wù)器通過一或多個線程控制來處理異步操作的完成。這樣,通過集成完成事件多路分離(completion event demultiplexing)和事件處理器分派,前攝器模式簡化了異步的Web服務(wù)器。
異步的Web服務(wù)器將這樣來利用前攝器模式:首先讓W(xué)eb服務(wù)器向OS發(fā)出異步操作,并將回調(diào)方法登記到Completion Dispatcher(完成分派器),后者將在操作完成時通知Web服務(wù)器。于是OS代表Web服務(wù)器執(zhí)行操作,并隨即在一個周知的地方將結(jié)果排隊。Completion Dispatcher負(fù)責(zé)使完成通知出隊,并執(zhí)行適當(dāng)?shù)摹⒑袘?yīng)用特有的Web服務(wù)器代碼的回調(diào)。
圖8-5 客戶連接到基于前攝器的Web服務(wù)器
圖8-6 客戶發(fā)送請求給基于前攝器的Web服務(wù)器
圖8-5和圖8-6顯示使用前攝式事件分派設(shè)計的Web服務(wù)器怎樣在一或多個線程中并發(fā)地處理多個客戶。圖8-5顯示當(dāng)客戶連接到Web服務(wù)器時所采取的一系列步驟。
- Web服務(wù)器指示Acceptor發(fā)起異步接受;
- 接受器通過OS發(fā)起異步接受,將其自身作為Completion Handler和Completion Dispatcher的引用傳遞;并將用于在異步接受完成時通知Acceptor;
- Web服務(wù)器調(diào)用Completion Dispatcher的事件循環(huán);
- 客戶連接到Web服務(wù)器;
- 當(dāng)異步接受操作完成時,操作系統(tǒng)通知Completion Dispatcher;
- Completion Dispatcher通知接受器;
- Acceptor創(chuàng)建HTTP Handler;
- HTTP Handler發(fā)起異步操作,以讀取來自客戶的請求數(shù)據(jù),并將其自身作為Completion Handler和Completion Dispatcher的引用傳遞;并將用于在異步讀取完成時通知HTTP Handler。
圖8-6 顯示前攝式Web服務(wù)器為服務(wù)HTTP GET請求所采取的步驟。這些步驟解釋如下:
- 客戶發(fā)送HTTP GET請求;
- 讀取操作完成,操作系統(tǒng)通知Completion Dispatcher;
- Completion Dispatcher通知HTTP Handler(步驟2和3將重復(fù)直到整個請求被接收);
- HTTP Handler解析請求;
- HTTP Handler同步地讀取所請求的文件;
- HTTP Handler發(fā)起異步操作,以把文件數(shù)據(jù)寫到客戶連接,并將其自身作為Completion Handler和Completion Dispatcher的引用傳遞;并將用于在異步寫入完成時通知HTTP Handler。
- 當(dāng)寫操作完成時,操作系統(tǒng)通知Completion Dispatcher;
- 隨后Completion Dispatcher通知Completion Handler(步驟6-8將重復(fù)直到文件被完全遞送)。
8.8中有一個將前攝式事件分派模型應(yīng)用于Web服務(wù)器的C++代碼例子。
使用前攝器模式的主要優(yōu)點是可以啟動多個并發(fā)操作,并可并行運行,而不要求應(yīng)用必須擁有多個線程。操作被應(yīng)用異步地啟動,它們在OS的I/O子系統(tǒng)中運行直到完成。發(fā)起操作的線程現(xiàn)在可以服務(wù)另外的請求了。
例如,在上面的例子中,Completion Dispatcher可以是單線程的。當(dāng)HTTP請求到達(dá)時,單個Completion Dispatcher線程解析請求,讀取文件,并發(fā)送響應(yīng)給客戶。因為響應(yīng)是被異步發(fā)送的,多個響應(yīng)就有可能同時被發(fā)送。而且,同步的文件讀取可以被異步的文件讀取取代,以進一步增加并發(fā)的潛力。如果文件讀取是被異步完成的,HTTP Handler所執(zhí)行的唯一的同步操作就只剩下了HTTP協(xié)議請求解析。
前攝式模型的主要缺點是編程邏輯至少和反應(yīng)式模型一樣復(fù)雜。而且,前攝器模式可能會難以調(diào)試,因為異步操作常常有著不可預(yù)測和不可重復(fù)的執(zhí)行序列,這就使分析和調(diào)試復(fù)雜化了。8.7描述怎樣應(yīng)用其他模式(比如異步完成令牌[8])來簡化異步應(yīng)用編程模型。
當(dāng)具有以下一項或多項條件時使用前攝器模式:
- 應(yīng)用需要執(zhí)行一個或多個不阻塞調(diào)用線程的異步操作;
- 當(dāng)異步操作完成時應(yīng)用必須被通知;
- 應(yīng)用需要獨立于它的I/O模型改變它的并發(fā)策略;
- 通過使依賴于應(yīng)用的邏輯與應(yīng)用無關(guān)的底層構(gòu)造去耦合,應(yīng)用將從中獲益;
- 當(dāng)使用多線程方法或反應(yīng)式分派方法時,應(yīng)用的執(zhí)行將很低效,或是不能滿足性能需求。
在圖8-7中使用OMT表示法演示了前攝器模式的結(jié)構(gòu)。
前攝器模式中的關(guān)鍵參與者包括:
前攝發(fā)起器(Proactive Initiator。Web服務(wù)器應(yīng)用的主線程):
- Proactive Initiator是應(yīng)用中任何發(fā)起Asynchronous Operation(異步操作)的實體。它將Completion Handler和Completion Dispatcher登記到Asynchronous Operation Processor(異步操作處理器),此處理器在操作完成時通知前攝發(fā)起器。
完成處理器(Completion Handler。Acceptor和HTTP Handler):
- 前攝器模式將應(yīng)用所實現(xiàn)的Completion Handler接口用于Asynchronous Operation完成通知。
異步操作(Asynchronous Operation。Async_Read、Async_Write和Async_Accept方法):
- Asynchronous Operation被用于代表應(yīng)用執(zhí)行請求(比如I/O和定時器操作)。當(dāng)應(yīng)用調(diào)用Asynchronous Operation時,操作的執(zhí)行沒有借用應(yīng)用的線程控制。因此,從應(yīng)用的角度來看,操作是被異步地執(zhí)行的。當(dāng)Asynchronous Operation完成時,Asynchronous Operation Processor將應(yīng)用通知委托給Completion Dispatcher。
異步操作處理器(Asynchronous Operation Processor。操作系統(tǒng)):
- Asynchronous Operation是由Asynchronous Operation Processor來運行直至完成的。該組件通常由OS實現(xiàn)。
完成分派器(Completion Dispatcher。Notification Queue):
- Completion Dispatcher負(fù)責(zé)在Asynchronous Operation完成時回調(diào)應(yīng)用的Completion Handler。當(dāng)Asynchronous Operation Processor完成異步發(fā)起的操作時,Completion Dispatcher代表應(yīng)用執(zhí)行應(yīng)用回調(diào)。
圖8-7 前攝器模式中的參與者
有若干良好定義的步驟被用于所有Asynchronous Operation。在高水平的抽象上,應(yīng)用異步地發(fā)起操作,并在操作完成時被通知。圖8-8顯示在模式參與者之間必定發(fā)生的下列交互:
-
前攝發(fā)起器發(fā)起操作:為執(zhí)行異步操作,應(yīng)用在Asynchronous Operation Processor上發(fā)起操作。例如,Web服務(wù)器可能要求OS在網(wǎng)絡(luò)上使用特定的socket連接傳輸文件。要請求這樣的操作,Web服務(wù)器必須指定要使用哪一個文件和網(wǎng)絡(luò)連接。而且,Web服務(wù)器必須指定(1)當(dāng)操作完成時通知哪一個Completion Handler,以及(2)一旦文件被傳輸,哪一個Completion Dispatcher應(yīng)該執(zhí)行回調(diào)。
-
異步操作處理器執(zhí)行操作:當(dāng)應(yīng)用在Asynchronous Operation Processor上調(diào)用操作時,它相對于其他應(yīng)用操作異步地運行這些操作。現(xiàn)代操作系統(tǒng)(比如Solaris和Windows NT)在內(nèi)核中提供異步的I/O子系統(tǒng)。
-
異步操作處理器通知完成分派器:當(dāng)操作完成時,Asynchronous Operation Processor取得在操作被發(fā)起時指定的Completion Handler和Completion Dispatcher。隨后Asynchronous Operation Processor將Asynchronous Operation的結(jié)果和Completion Handler傳遞給Completion Dispatcher,以用于回調(diào)。例如,如果文件已被異步傳輸,Asynchronous Operation Processor可以報告完成狀態(tài)(比如成功或失敗),以及寫入網(wǎng)絡(luò)連接的字節(jié)數(shù)。
-
完成分派器通知應(yīng)用:Completion Dispatcher在Completion Handler上調(diào)用完成掛鉤,將由應(yīng)用指定的任何完成數(shù)據(jù)傳遞給它。例如,如果異步讀取完成,通常一個指向新到達(dá)數(shù)據(jù)的指針將會被傳遞給Completion Handler。
圖8-8 前攝器模式的交互圖
這一部分詳述使用前攝器模式的效果。
前攝器模式提供以下好處:
增強事務(wù)分離:前攝器模式使應(yīng)用無關(guān)的異步機制與應(yīng)用特有的功能去耦合。應(yīng)用無關(guān)的機制成為可復(fù)用組件,知道怎樣多路分離與Asynchronous Operation相關(guān)聯(lián)的完成事件,并分派適當(dāng)?shù)挠蒀ompletion Handler定義的回調(diào)方法。同樣地,應(yīng)用特有的功能知道怎樣執(zhí)行特定類型的服務(wù)(比如HTTP處理)。
改善應(yīng)用邏輯可移植性:通過允許接口獨立于執(zhí)行事件多路分離的底層OS調(diào)用而復(fù)用,它改善了應(yīng)用的可移植性。這些系統(tǒng)調(diào)用檢測并報告可能同時發(fā)生在多個事件源之上的事件。事件源可以是I/O端口、定時器、同步對象、信號,等等。在實時POSIX平臺上,異步I/O函數(shù)由aio API族[9]提供。在Windows NT中,I/O完成端口和重疊式(overlapped)I/O被用于實現(xiàn)異步I/O[10]。
完成分派器封裝了并發(fā)機制:使Completion Dispatcher與Asynchronous Operation Processor去耦合的一個好處是應(yīng)用可以通過多種并發(fā)策略來配置Completion Dispatcher,而不會影響其他參與者。如8.7所討論的,Completion Dispatcher可被配置使用包括單線程和線程池方案在內(nèi)的若干并發(fā)策略。
線程策略被與并發(fā)策略去耦合:因為Asynchronous Operation Processor代表Proactive Initiator完成可能長時間運行的操作,應(yīng)用不會被迫派生線程來增加并發(fā)。這使得應(yīng)用可以獨立于它的線程策略改變它的并發(fā)策略。例如,Web服務(wù)器可能只想每個CPU有一個線程,但又想同時服務(wù)更多數(shù)目的客戶。
提高性能:多線程操作系統(tǒng)執(zhí)行上下文切換,以在多個線程控制中輪換。雖然執(zhí)行一次上下文切換的時間保持相當(dāng)?shù)暮愣ǎ绻鸒S上下文要切換到空閑線程的話,在大量線程間輪換的總時間可以顯著地降低應(yīng)用性能。例如,線程可以輪詢OS以查看完成狀態(tài),而這是低效率的。通過只激活那些有事件要處理的合理的線程控制,前攝器模式能夠避免上下文切換的代價。例如,如果沒有待處理的GET請求,Web服務(wù)器不需要啟用HTTP Handler。
應(yīng)用同步的簡化:只要Completion Handler不派生另外的線程控制,可以不考慮、或只考慮少許同步問題而編寫應(yīng)用邏輯。Completion Handler可被編寫為就好像它們存在于一個傳統(tǒng)的單線程環(huán)境中一樣。例如,Web服務(wù)器的HTTP GET處理器可以通過Async Read操作(比如Windows NT TransmitFile函數(shù)[1])來訪問磁盤。
前攝器模式有以下缺點:
難以調(diào)試:以前攝器模式編寫的應(yīng)用可能難以調(diào)試,因為反向的控制流在構(gòu)架基礎(chǔ)結(jié)構(gòu)和應(yīng)用特有的處理器上的回調(diào)方法之間來回振蕩。這增加了在調(diào)試器中對構(gòu)架的運行時行為的“單步跟蹤”的困難度,因為應(yīng)用開發(fā)者可能不了解或不能獲得構(gòu)架的代碼。這與試圖調(diào)試使用LEX和YACC編寫的編譯器的詞法分析器和解析器時所遇到的問題是類似的。在這些應(yīng)用中,當(dāng)線程控制是在用戶定義的動作例程中時,調(diào)試是相當(dāng)直接的。但是一旦線程控制返回到所生成的有限確定自動機(Deterministic Finite Automate,DFA)骨架時,就很難跟住程序邏輯了。
調(diào)度和控制未完成操作:Proactive Initiator可能沒有對Asynchronous Operation的執(zhí)行順序的控制。因此,Asynchronous Operation Processor必須被小心設(shè)計,以支持Asynchronous Operation的優(yōu)先級和取消處理。
前攝器模式可以通過許多方式實現(xiàn)。這一部分討論實現(xiàn)前攝器模式所涉及的步驟。
實現(xiàn)前攝器模式的第一步是構(gòu)建Asynchronous Operation Processor。該組件負(fù)責(zé)代表應(yīng)用異步地執(zhí)行操作。因此,它的兩項主要責(zé)任是輸出Asynchronous Operation API和實現(xiàn)Asynchronous Operation Engine以完成工作。
Asynchronous Operation Processor必須提供API、允許應(yīng)用請求Asynchronous Operation。在設(shè)計這些API時有若干壓力需要考慮:
可移植性:此API不應(yīng)約束應(yīng)用或它的Proactive Initiator使用特定的平臺。
靈活性:常常,異步API可以為許多類型的操作共享。例如,異步I/O操作常常被用于在多種介質(zhì)(比如網(wǎng)絡(luò)和文件)上執(zhí)行I/O。設(shè)計支持這樣的復(fù)用的API可能是有益的。
回調(diào):當(dāng)操作被調(diào)用時,Proactive Initiator必須登記回調(diào)。實現(xiàn)回調(diào)的一種常用方法是讓調(diào)用對象(客戶)輸出接口、讓調(diào)用者知道(服務(wù)器)。因此,Proactive Initiator必須通知Asynchronous Operation Processor,當(dāng)操作完成時,哪一個Completion Handler應(yīng)被回調(diào)。
完成分派器:因為應(yīng)用可以使用多個Completion Dispatcher,Proactive Initiator還必須指示由哪一個Completion Dispatcher來執(zhí)行回調(diào)。
給定所有這些問題,考慮下面的用于異步讀寫的API。Asynch_Stream類是用于發(fā)起異步讀寫的工廠。一旦構(gòu)造,可以使用此類來啟動多個異步讀寫。當(dāng)異步讀取完成時,Asynch_Stream::Read_Result將通過Completion_Handler上的handler_read回調(diào)方法被回傳給handler。類似地,當(dāng)異步寫入完成時,Asynch_Stream::Write_Result將通過Completion_Handler上的handler_write回調(diào)方法被回傳給handler。
class Asynch_Stream
// = TITLE
// A Factory for initiating reads
// and writes asynchronously.
{
// Initializes the factory with information
// which will be used with each asynchronous
// call. <handler> is notified when the
// operation completes. The asynchronous
// operations are performed on the <handle>
// and the results of the operations are
// sent to the <Completion_Dispatcher>.
Asynch_Stream (Completion_Handler &handler,
HANDLE handle,
Completion_Dispatcher *);
// This starts off an asynchronous read.
// Upto <bytes_to_read> will be read and
// stored in the <message_block>.
int read (Message_Block &message_block,
u_long bytes_to_read,
const void *act = 0);
// This starts off an asynchronous write.
// Upto <bytes_to_write> will be written
// from the <message_block>.
int write (Message_Block &message_block,
u_long bytes_to_write,
const void *act = 0);
...
};
Asynchronous Operation Processor必須含有異步執(zhí)行操作的機制。換句話說,當(dāng)應(yīng)用線程調(diào)用Asynchronous Operation時,必須不借用應(yīng)用的線程控制而執(zhí)行此操作。幸好,現(xiàn)代操作系統(tǒng)提供了用于Asynchronous Operation的機制(例如,POSIX 異步I/O和WinNT重疊式I/O)。在這樣的情況下,實現(xiàn)模式的這一部分只需要簡單地將平臺API映射到上面描述的Asynchronous Operation API。
如果OS平臺不提供對Asynchronous Operation的支持,有若干實現(xiàn)技術(shù)可用于構(gòu)建Asynchronous Operation Engine。或許最為直觀的解決方案是使用專用線程來為應(yīng)用執(zhí)行Asynchronous Operation。要實現(xiàn)線程化的Asynchronous Operation Engine,有三個主要步驟:
-
操作調(diào)用:因為操作將在與進行調(diào)用的應(yīng)用線程不同的線程控制中執(zhí)行,必定會發(fā)生某種類型的線程同步。一種方法是為每個操作派生一個線程。更為常用的方法是為Asynchronous Operation Processor而管理一個專用線程池。該方法可能需要應(yīng)用線程在繼續(xù)進行其他應(yīng)用計算之前將操作請求排隊。
-
操作執(zhí)行:既然操作將在專用線程中執(zhí)行,所以它可以執(zhí)行“阻塞”操作,而不會直接阻礙應(yīng)用的進展。例如,在提供異步I/O讀取機制時,專用線程可以在從socket或文件句柄中讀時阻塞。
-
操作完成:當(dāng)操作完成時,應(yīng)用必須被通知到。特別是,專用線程必須將應(yīng)用特有的通知委托給Completion Dispatcher。這要求在線程間進行另外的同步。
當(dāng)Completion Dispatcher從Asynchronous Operation Processor接收到操作完成通知時,它會回調(diào)與應(yīng)用對象相關(guān)聯(lián)的Completion Handler。實現(xiàn)Completion Dispatcher涉及兩個問題:(1)實現(xiàn)回調(diào)以及(2)定義用于執(zhí)行回調(diào)的并發(fā)策略。
Completion Dispatcher必須實現(xiàn)一種機制,Completion Handler通過它被調(diào)用。這要求Proactive Initiator在發(fā)起操作時指定一個回調(diào)。下面是常用的回調(diào)可選方案:
回調(diào)類:Completion Handler輸出接口、讓Completion Dispatcher知道。當(dāng)操作完成時,Completion Dispatcher回調(diào)此接口中的方法,并將已完成操作的有關(guān)信息傳遞給它(比如從網(wǎng)絡(luò)連接中讀取的字節(jié)數(shù))。
函數(shù)指針:Completion Dispatcher通過回調(diào)函數(shù)指針來調(diào)用Completion Handler。該方法有效地打破了Completion Dispatcher和Completion Handler之間的知識依賴。這有兩個好處:
- Completion Handler不會被迫輸出特定的接口;以及
- 在Completion Dispatcher和Completion Handler之間不需要有編譯時依賴。
會合點:Proactive Initiator可以設(shè)立事件對象或條件變量,用作Completion Dispatcher和Completion Handler之間的會合點。這在Completion Handler是Proactive Initiator時最為常見。在Asynchronous Operation運行至完成的同時,Completion Handler處理其他的活動。Completion Handler將在會合點周期性地檢查完成狀態(tài)。
當(dāng)操作完成時,Asynchronous Operation Processor將會通知Completion Dispatcher。在這時,Completion Dispatcher可以利用下面的并發(fā)策略中的一種來執(zhí)行應(yīng)用回調(diào):
動態(tài)線程分派:Completion Dispatcher可為每個Completion Handler動態(tài)分配一個線程。動態(tài)線程分派可通過大多數(shù)多線程操作系統(tǒng)來實現(xiàn)。在有些平臺上,由于創(chuàng)建和銷毀線程資源的開銷,這可能是所列出的Completion Dispatcher實現(xiàn)技術(shù)中最為低效的一種,
后反應(yīng)式分派(Post-reactive dispatching
):Completion Dispatcher可以發(fā)信號給Proactive Initiation所設(shè)立的事件對象或條件變量。盡管輪詢和派生阻塞在事件對象上的子線程都是可選的方案,最為高效的后反應(yīng)式分派方法是將事件登記到Reactor。后反應(yīng)式分派可以通過POSIX實時環(huán)境中的aio_suspend和Win32環(huán)境中的WaitForMultipleObjects來實現(xiàn)。
Call-through
分派:來自Asynchronous Operation Processor的線程控制可被Completion Dispatcher借用,以執(zhí)行Completion Handler。這種“周期偷取”策略可以通過減少空閑線程的影響范圍來提高性能。在一些老操作系統(tǒng)會將上下文切換到空閑線程、又只是從它們切換出去的情況下,這種方法有著收回“失去的”時間的巨大潛力。
Call-through分派在Windows NT中可以使用ReadFileEx和WriteFileEx Win32函數(shù)來實現(xiàn)。例如,線程控制可以使用這些調(diào)用來等待信號量被置位。當(dāng)它等待時,線程通知OS它進入了一種稱為“可報警等待狀態(tài)”(alterable wait state)的特殊狀態(tài)。在這時,OS可以占有對等待中的線程控制的棧和相關(guān)資源的控制,以執(zhí)行Completion Handler。
線程池分派:由Completion Dispatcher擁有的線程池可被用于Completion Handler的執(zhí)行。在池中的每個線程控制已被動態(tài)地分配到可用的CPU。線程池分派可通過Windows NT的I/O完成端口來實現(xiàn)。
在考慮上面描述的Completion Dispatcher技術(shù)的適用性時,考慮表8-1中所示的OS環(huán)境和物理硬件的可能組合? :
線程模型 | 系統(tǒng)類型 |
單處理器 | 多處理器 |
單線程 | A | B |
多線程 | C | D |
表8-1 Completion Dispatcher并發(fā)策略
如果你的OS只支持同步I/O,那就參見反應(yīng)堆模式[5]。但是,大多數(shù)現(xiàn)代操作系統(tǒng)都支持某種類型的異步I/O。
在表8-1的A和B組合中,假定你不等待任何信號量或互斥體,后反應(yīng)方式的異步I/O很可能是最好的。否則,Call-through實現(xiàn)或許更能回應(yīng)你的問題。在C組合中,使用Call-through方法。在D組合中,使用線程池方法。在實踐中,系統(tǒng)化的經(jīng)驗測量對于選擇最為合適的可選方案來說是必需的。
Completion Handler的實現(xiàn)帶來以下考慮。
Completion Handler可能需要維護關(guān)于特定請求的狀態(tài)信息。例如,OS可以通知Web服務(wù)器,只有一部分文件已被寫到網(wǎng)絡(luò)通信端口。作為結(jié)果,Completion Handler可能需要重新發(fā)出請求,直到文件被完全寫出,或連接變得無效。因此,它必須知道原先指定的文件,還剩多少字節(jié)要寫,以及在前一個請求開始時文件指針的位置。
沒有隱含的限制來阻止Proactive Initiator將多個Asynchronous Operation請求分配給單個Completion Handler。因此,Completion Handler必須在完成通知鏈中一一“系上”請求特有的狀態(tài)信息。為完成此工作,Completion Handler可以利用異步完成令牌(Asynchronous Completion Token)模式[8]。
與在任何多線程環(huán)境中一樣,使用前攝器模式的Completion Handler還是要由它自己來確保對共享資源的訪問是線程安全的。但是,Completion Handler不能跨越多個完成通知持有共享資源。否則,就有發(fā)生“用餐哲學(xué)家問題”的危險[11]。
該問題在于一個合理的線程控制永久等待一個信號量被置位時所產(chǎn)生的死鎖。通過設(shè)想一個由一群哲學(xué)家出席的宴會可以演示這一問題。用餐者圍繞一個圓桌就座,在每個哲學(xué)家之間只有一支筷子。當(dāng)哲學(xué)家覺得饑餓時,他必須獲取在他左邊和在他右邊的筷子才能用餐。一旦哲學(xué)家獲得一支筷子,不到吃飽他們就不會放下它。如果所有哲學(xué)家都拿起在他們右邊的筷子,就會發(fā)生死鎖,因為他們將永遠(yuǎn)也不可能拿到左邊的筷子。
8.7.3.3 占先式策略(Preemptive Policy)
Completion Dispatcher類型決定在執(zhí)行時一個Completion Handler是否可占先。當(dāng)與動態(tài)線程和線程池分派器相連時,Completion Handler自然可占先。但是,當(dāng)與后反應(yīng)式Completion Dispatcher相連時,Completion Handler并沒有對其他Completion Handler的占先權(quán)。當(dāng)由Call-through分派器驅(qū)動時,Completion Handler相對于在可報警等待狀態(tài)的線程控制也沒有占先權(quán)。
一般而言,處理器不應(yīng)該執(zhí)行持續(xù)時間長的同步操作,除非使用了多個完成線程,因為應(yīng)用的總體響應(yīng)性將會被顯著地降低。這樣的危險可以通過增強的編程訓(xùn)練來降低。例如,所有Completion Handler被要求用作Proactive Initiator,而不是去執(zhí)行同步操作。
這一部分顯示怎樣使用前攝器模式來開發(fā)Web服務(wù)器。該例子基于ACE構(gòu)架[4]中的前攝器實現(xiàn)。
當(dāng)客戶連接到Web服務(wù)器時,HTTP_Handler的open方法被調(diào)用。于是服務(wù)器就通過在Asynchronous Operation完成時回調(diào)的對象(在此例中是this指針)、用于傳輸數(shù)據(jù)的網(wǎng)絡(luò)連接,以及一旦操作完成時使用的Completion Dispatcher(proactor_)來初始化異步I/O對象。隨后讀操作異步地啟動,而服務(wù)器返回事件循環(huán)。
當(dāng)Async read操作完成時,分派器回調(diào)HTTP_Handler::handle_read_stream。如果有足夠的數(shù)據(jù),客戶請求就被解析。如果整個客戶請求還未完全到達(dá),另一個讀操作就會被異步地發(fā)起。
在對GET請求的響應(yīng)中,服務(wù)器對所請求文件進行內(nèi)存映射,并將文件數(shù)據(jù)異步地寫往客戶。當(dāng)寫操作完成時,分派器回調(diào)HTTP_Handler::handle_write_stream,從而釋放動態(tài)分配的資源。
附錄中含有兩個其他的代碼實例,使用同步的線程模型和同步的(非阻塞)反應(yīng)式模型實現(xiàn)Web服務(wù)器。
class HTTP_Handler
: public Proactor::Event_Handler
// = TITLE
// Implements the HTTP protocol
// (asynchronous version).
//
// = PATTERN PARTICIPANTS
// Proactive Initiator = HTTP_Handler
// Asynch Op = Network I/O
// Asynch Op Processor = OS
// Completion Dispatcher = Proactor
// Completion Handler = HTPP_Handler
{
public:
void open (Socket_Stream *client)
{
// Initialize state for request
request_.state_ = INCOMPLETE;
// Store reference to client.
client_ = client;
// Initialize asynch read stream
stream_.open (*this, client_->handle (), proactor_);
// Start read asynchronously.
stream_.read (request_.buffer (),
request_.buffer_size ());
}
// This is called by the Proactor
// when the asynch read completes
void handle_read_stream(u_long bytes_transferred)
{
if (request_.enough_data(bytes_transferred))
parse_request ();
else
// Start reading asynchronously.
stream_.read (request_.buffer (),
request_.buffer_size ());
}
void parse_request (void)
{
// Switch on the HTTP command type.
switch (request_.command ())
{
// Client is requesting a file.
case HTTP_Request::GET:
// Memory map the requested file.
file_.map (request_.filename ());
// Start writing asynchronously.
stream_.write (file_.buffer (), file_.buffer_size ());
break;
// Client is storing a file
// at the server.
case HTTP_Request::PUT:
// ...
}
}
void handle_write_stream(u_long bytes_transferred)
{
if (file_.enough_data(bytes_transferred))
// Success....
else
// Start another asynchronous write
stream_.write (file_.buffer (), file_.buffer_size ());
}
private:
// Set at initialization.
Proactor *proactor_;
// Memory-mapped file_;
Mem_Map file_;
// Socket endpoint.
Socket_Stream *client_;
// HTTP Request holder
HTTP_Request request_;
// Used for Asynch I/O
Asynch_Stream stream_;
};
下面是一些被廣泛記載的前攝器的使用:
Windows NT中的I/O完成端口:Windows NT操作系統(tǒng)實現(xiàn)了前攝器模式。Windows NT支持多種Asynchronous Operation,比如接受新網(wǎng)絡(luò)連接、讀寫文件和socket,以及通過網(wǎng)絡(luò)連接傳輸文件。操作系統(tǒng)就是Asynchronous Operation Processor。操作結(jié)果在I/O完成端口(它扮演Completion Dispatcher的角色)上排隊。
異步I/O操作的UNIX AIO族:在有些實時POSIX平臺上,前攝器模式是由aio API族[9]來實現(xiàn)的。這些OS特性非常類似于上面描述的Windows NT的特性。一個區(qū)別是UNIX信號可用于實現(xiàn)真正異步的Completion Dispatcher(Windows NT API不是真正異步的)。
ACE Proactor:ACE自適配通信環(huán)境 [4]實現(xiàn)了前攝器組件,它封裝Windows NT上的I/O完成端口,以及POSIX平臺上的aio API。ACE前攝器抽象提供Windows NT所支持的標(biāo)準(zhǔn)C API的OO接口。這一實現(xiàn)的源碼可從ACE網(wǎng)站
http://www.cs.wustl.edu/~schmidt/ACE.html獲取。
Windows NT中的異步過程調(diào)用(Asynchronous Procedure Call):有些系統(tǒng)(比如Windows NT)支持異步過程調(diào)用(APC)。APC是在特定線程的上下文中異步執(zhí)行的函數(shù)。當(dāng)APC被排隊到線程時,系統(tǒng)發(fā)出軟件中斷。下一次線程被調(diào)度時,它將運行該APC。操作系統(tǒng)所發(fā)出的APC被稱為內(nèi)核模式APC。應(yīng)用所發(fā)出的APC被稱為用戶模式APC。
圖8-9演示與前攝器相關(guān)的模式。
圖8-9 前攝器模式的相關(guān)模式
異步完成令牌(ACT)模式[8]通常與前攝器模式結(jié)合使用。當(dāng)Asynchronous Operation完成時,應(yīng)用可能需要比簡單的通知更多的信息來適當(dāng)?shù)靥幚硎录.惒酵瓿闪钆颇J皆试S應(yīng)用將狀態(tài)高效地與Asynchronous Operation的完成相關(guān)聯(lián)。
前攝器模式還與觀察者(Observer)模式[12](在其中,當(dāng)單個主題變動時,相關(guān)對象也會自動更新)有關(guān)。在前攝器模式中,當(dāng)來自多個來源的事件發(fā)生時,處理器被自動地通知。一般而言,前攝器模式被用于異步地將多個輸入源多路分離給與它們相關(guān)聯(lián)的事件處理器,而觀察者通常僅與單個事件源相關(guān)聯(lián)。
前攝器模式可被認(rèn)為是同步反應(yīng)堆模式[5]的一種異步的變體。反應(yīng)堆模式負(fù)責(zé)多個事件處理器的多路分離和分派;它們在可以同步地發(fā)起操作而不會阻塞時被觸發(fā)。相反,前攝器模式也支持多個事件處理器的多路分離和分派,但它們是被異步事件的完成觸發(fā)的。
主動對象(Active Object)模式[13]使方法執(zhí)行與方法調(diào)用去耦合。前攝器模式也是類似的,因為Asynchronous Operation Processor代表應(yīng)用的Proactive Initiator來執(zhí)行操作。就是說,兩種模式都可用于實現(xiàn)Asynchronous Operation。前攝器模式常常用于替代主動對象模式,以使系統(tǒng)并發(fā)策略與線程模型去耦合。
前攝器可被實現(xiàn)為單體(Singleton)[12]。這對于在異步應(yīng)用中,將事件多路分離和完成分派集中到單一的地方來說是有用的。
責(zé)任鏈(Chain of Responsibility,COR)模式[12]使事件處理器與事件源去耦合。在Proactive Initiator與Completion Handler的隔離上,前攝器模式也是類似的。但是,在COR中,事件源預(yù)先不知道哪一個處理器將被執(zhí)行(如果有的話)。在前攝器中,Proactive Initiator完全知道目標(biāo)處理器。但是,通過建立一個Completion Handler(它是由外部工廠動態(tài)配置的責(zé)任鏈的入口),這兩種模式可被結(jié)合在一起:。
前攝器模式包含了一種強大的設(shè)計范式,支持高性能并發(fā)應(yīng)用的高效而靈活的事件分派策略。前攝器模式提供并發(fā)執(zhí)行操作的性能助益,而又不強迫開發(fā)者使用同步多線程或反應(yīng)式編程。
[1] J. Hu, I. Pyarali, and D. C. Schmidt, “Measuring the Impact of Event Dispatching and Concurrency Models on Web Server Performance Over High-speed Networks,” in Proceedings of the 2nd Global Internet Conference, IEEE, November 1997.
[2] J. Hu, I. Pyarali, and D. C. Schmidt, “Applying the Proactor Pattern to High-Performance Web Servers,” in Proceedings of the 10th International Conference on Parallel and Distributed Computing and Systems, IASTED, Oct. 1998.
[3] J. C. Mogul, “The Case for Persistent-connection HTTP,” in Proceedings of ACMSIGCOMM ’95 Conference in Computer Communication Review, (Boston, MA, USA), pp. 299–314, ACM Press, August 1995.
[4] D. C. Schmidt, “ACE: an Object-Oriented Framework for Developing Distributed Applications,” in Proceedings of the 6th USENIX C++ Technical Conference, (Cambridge, Massachusetts), USENIX Association, April 1994.
[5] D. C. Schmidt, “Reactor: An Object Behavioral Pattern for Concurrent Event Demultiplexing and Event Handler Dispatching,” in Pattern Languages of Program Design (J. O. Coplien and D. C. Schmidt, eds.), pp. 529–545, Reading, MA: Addison-Wesley, 1995.
[6] D. C. Schmidt, “Acceptor and Connector: Design Patterns for Initializing Communication Services,” in Pattern Languages of Program Design (R. Martin, F. Buschmann, and D. Riehle, eds.), Reading, MA: Addison-Wesley, 1997.
[7] M. K. McKusick, K. Bostic, M. J. Karels, and J. S. Quarterman, The Design and Implementation of the 4.4BSD Operating System. Addison Wesley, 1996.
[8] I. Pyarali, T. H. Harrison, and D. C. Schmidt, “Asynchronous Completion Token: an Object Behavioral Pattern for Efficient Asynchronous Event Handling,” in Pattern Languages of Program Design (R. Martin, F. Buschmann, and D. Riehle, eds.), Reading, MA: Addison-Wesley, 1997.
[9] “Information Technology – Portable Operating System Interface (POSIX) – Part 1: System Application: Program Interface (API) [C Language],” 1995.
[10] Microsoft Developers Studio, Version 4.2 - Software Development Kit, 1996.
[11] E. W. Dijkstra, “Hierarchical Ordering of Sequential Processes,” Acta Informatica, vol. 1, no. 2, pp. 115–138, 1971.
[12] E. Gamma, R. Helm, R. Johnson, and J. Vlissides, Design Patterns: Elements of Reusable Object-Oriented Software. Reading, MA: Addison-Wesley, 1995.
[13] R. G. Lavender and D. C. Schmidt, “Active Object: an Object Behavioral Pattern for Concurrent Programming,” in Proceedings of the 2nd Annual Conference on the Pattern Languages of Programs, (Monticello, Illinois), pp. 1–7, September 1995.
本附錄概述用于開發(fā)前攝器模式的可選實現(xiàn)的代碼。下面,我們檢查使用多線程的同步I/O和使用單線程的反應(yīng)式I/O。
下面的代碼顯示怎樣使用線程池同步I/O來開發(fā)Web服務(wù)器。當(dāng)客戶連接到服務(wù)器時,池中的一個線程接受連接,并調(diào)用HTTP_Handler中的open方法。隨后服務(wù)器同步地從網(wǎng)絡(luò)連接讀取請求。當(dāng)讀操作完成時,客戶請求隨之被解析。在對GET請求的響應(yīng)中,服務(wù)器對所請求文件進行內(nèi)存映射,并將文件數(shù)據(jù)同步地寫往客戶。注意阻塞I/O是怎樣使Web服務(wù)器能夠遵循2.2.1中所概述的步驟的。
class HTTP_Handler
// = TITLE
// Implements the HTTP protocol
// (synchronous threaded version).
//
// = DESCRIPTION
// This class is called by a
// thread in the Thread Pool.
{
public:
void open (Socket_Stream *client)
{
HTTP_Request request;
// Store reference to client.
client_ = client;
// Synchronously read the HTTP request
// from the network connection and
// parse it.
client_->recv (request);
parse_request (request);
}
void parse_request (HTTP_Request &request)
{
// Switch on the HTTP command type.
switch (request.command ())
{
// Client is requesting a file.
case HTTP_Request::GET:
// Memory map the requested file.
Mem_Map input_file;
input_file.map (request.filename());
// Synchronously send the file
// to the client. Block until the
// file is transferred.
client_->send (input_file.data (),
input_file.size ());
break;
// Client is storing a file at
// the server.
case HTTP_Request::PUT:
// ...
}
}
private:
// Socket endpoint.
Socket_Stream *client_;
// ...
};
下面的代碼顯示怎樣將反應(yīng)堆模式用于開發(fā)Web服務(wù)器。當(dāng)客戶連接到服務(wù)器時,HTTP_Handler::open方法被調(diào)用。服務(wù)器登記I/O句柄和在網(wǎng)絡(luò)句柄“讀就緒“時回調(diào)的對象(在此例中是this指針)。然后服務(wù)器返回事件循環(huán)。
當(dāng)請求數(shù)據(jù)到達(dá)服務(wù)器時,reactor_回調(diào)HTTP_Handler::handle_input方法。客戶數(shù)據(jù)以非阻塞方式被讀取。如果有足夠的數(shù)據(jù),客戶請求就被解析。如果整個客戶請求還沒有到達(dá),應(yīng)用就返回反應(yīng)堆事件循環(huán)。
在對GET請求的響應(yīng)中,服務(wù)器對所請求的文件進行內(nèi)存映射;并在反應(yīng)堆上登記,以在網(wǎng)絡(luò)連接變?yōu)椤皩懢途w”時被通知。當(dāng)向連接寫入數(shù)據(jù)不會阻塞調(diào)用線程時,reactor_就回調(diào)HTTP_Handler::handler_output方法。當(dāng)所有數(shù)據(jù)都已發(fā)送給客戶時,網(wǎng)絡(luò)連接被關(guān)閉。
class HTTP_Handler :
public Reactor::Event_Handler
// = TITLE
// Implements the HTTP protocol
// (synchronous reactive version).
//
// = DESCRIPTION
// The Event_Handler base class
// defines the hooks for
// handle_input()/handle_output().
//
// = PATTERN PARTICIPANTS
// Reactor = Reactor
// Event Handler = HTTP_Handler
{
public:
void open (Socket_Stream *client)
{
// Initialize state for request
request_.state_ = INCOMPLETE;
// Store reference to client.
client_ = client;
// Register with the reactor for reading.
reactor_->register_handler
(client_->handle (),
this,
Reactor::READ_MASK);
}
// This is called by the Reactor when
// we can read from the client handle.
void handle_input (void)
{
int result = 0;
// Non-blocking read from the network
// connection.
do
result = request_.recv (client_->handle ());
while (result != SOCKET_ERROR && request_.state_ == INCOMPLETE);
// No more progress possible,
// blocking will occur
if (request_.state_ == INCOMPLETE && errno == EWOULDBLOCK)
reactor_->register_handler
(client_->handle (),
this,
Reactor::READ_MASK);
else
// We now have the entire request
parse_request ();
}
void parse_request (void)
{
// Switch on the HTTP command type.
switch (request_.command ())
{
// Client is requesting a file.
case HTTP_Request::GET:
// Memory map the requested file.
file_.map (request_.filename ());
// Transfer the file using Reactive I/O.
handle_output ();
break;
// Client is storing a file at
// the server.
case HTTP_Request::PUT:
// ...
}
}
void handle_output (void)
{
// Asynchronously send the file
// to the client.
if (client_->send (file_.data (),
file_.size ())
== SOCKET_ERROR
&& errno == EWOULDBLOCK)
// Register with reactor...
else
// Close down and releas
handle_close ();
}
private:
// Set at initialization.
Reactor *reactor_;
// Memory-mapped file_;
Mem_Map file_;
// Socket endpoint.
Socket_Stream *client_;
// HTTP Request holder.
HTTP_Request request_;
};
本文轉(zhuǎn)載至ACE開發(fā)者網(wǎng)站 作者:Irfan Pyarali Tim Harrison
posted @
2007-02-27 21:17 walkspeed 閱讀(3275) |
評論 (0) |
編輯 收藏
??? ACE_INET_Addr類,包裝了網(wǎng)絡(luò)地址
??? ACE_SOCK_Connector類,扮演主動連接角色,發(fā)起通訊連接。連接到遠(yuǎn)端的服務(wù)。
??? ACE_SOCK_Acceptor類,扮演被動連接角色,等待連接。等待遠(yuǎn)端客戶的請求。
??? ACE_SOCK_Stream類,扮演數(shù)據(jù)通訊角色,發(fā)送和接收數(shù)據(jù)。完成客戶與服務(wù)之間的通訊。
???
??? 利用ACE庫來開發(fā)網(wǎng)絡(luò)通訊程序是很簡單的,一個基本程序只用到以上提到的幾個類,就
可以完成一個基于客戶端、服務(wù)器端模型的網(wǎng)絡(luò)應(yīng)用的開發(fā)。開發(fā)者無需了解Socket在不同平
臺上的實現(xiàn),記憶眾多并相互關(guān)聯(lián)的Socket APIs。
??? 一下以一個Hello World程序為演示。
???
??? 客戶端程序。發(fā)送一個Hello World到遠(yuǎn)端的服務(wù)器,并接收服務(wù)器返回的信息,將信息
打印在屏幕上。
#include <iostream>
#include <string>
#include <ace/ACE.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Stream.h>
int main( int argc, char* argv[] )
{
?ACE::init();//初始化ACE庫,在windows下一定要
?std::string str = "hello world";
?//設(shè)置服務(wù)器地址
?//第一個參數(shù)是端口,第二個是ip地址,也可以是域名。
?//可以先定義一個地址對象,再用ACE_INET_Addr的set函數(shù)來設(shè)定。
?//地址的配置很多,具體的參照文檔
?ACE_INET_Addr peer_addr( 5050, "127.0.0.1" );
?ACE_SOCK_Stream peer_stream;//定義一個通訊隊形
?ACE_SOCK_Connector peer_connector;//定義一個主動連接對象
?peer_connector.connect( peer_stream, peer_addr );//發(fā)起一個連接
?peer_stream.send( str.c_str(), str.length() );//發(fā)送數(shù)據(jù)到服務(wù)器
?str.erase();
?str.resize( sizeof( "hello world" ) );
?peer_stream.recv( (void*)str.c_str(), str.length() );//接收來自服務(wù)器的信息
?std::cout << "from server message : " << str << std::endl;
??? ACE::fini();
?return 0;
}
??? 服務(wù)器端代碼。接收一個遠(yuǎn)端的連接,將接收到的信息打印在屏幕上,并將接收到的信
息返回給客戶端。
#include <iostream>
#include <string>
#include <ace/ACE.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Stream.h>
int main( int argc, char* argv[] )
{
?ACE::init();
?std::string str;
?str.resize( sizeof( "hello world" ) );
?//設(shè)置服務(wù)器地址
?ACE_INET_Addr peer_addr( 5050, "127.0.0.1" );
?ACE_SOCK_Stream peer_stream;
?
?//創(chuàng)建被動連接角色對象
?ACE_SOCK_Acceptor peer_acceptor;
?//開啟被動連接對象,將對象綁定到一個地址上
?peer_acceptor.open( peer_addr );
?//等待連接
?peer_acceptor.accept( peer_stream );
?//數(shù)據(jù)通訊
?peer_stream.recv( (void*)str.c_str(), str.length() );
?std::cout << "from client message : " << str << std::endl;
?peer_stream.send( str.c_str(), str.length() );
?ACE::fini();
?return 0;
}
××××以上代碼需要ACE庫才能運轉(zhuǎn)××××
???
??? 利用ACE編程的基本框架。
??? 客戶端
??? 1 創(chuàng)建地址對象。(ACE_INET_Addr)
??? 2 創(chuàng)建主動連接對象。(ACE_SOCK_Connector)
??? 3 創(chuàng)建數(shù)據(jù)通訊對象。(ACE_SOCK_Stream)
??? 4 設(shè)置服務(wù)器地址。(ACE_INET_Addr::set)
??? 5 將數(shù)據(jù)通訊對象和地址作為參數(shù)傳給主動連接對象,發(fā)起主動連接(ACE_SOCK_Connector::connect)
??? 6 利用通訊對象接收和發(fā)送數(shù)據(jù)。(ACE_SOCK_Stream::recv和ACE_SOCK_Stream::send)
???
??? 服務(wù)器端
??? 1 創(chuàng)建地址對象。(ACE_INET_Addr)
??? 2 創(chuàng)建被動連接對象。(ACE_SOCK_Connector)
??? 3 創(chuàng)建數(shù)據(jù)通訊對象。(ACE_SOCK_Stream)
??? 4 設(shè)置服務(wù)器地址。(ACE_INET_Addr::set)
??? 5 將地址作為參數(shù)傳給被動連接對象,啟動接收(ACE_SOCK_Acceptor::open)
??? 6 將數(shù)據(jù)通訊對象傳給被動連接對象,啟動接收,接受連接(ACE_SOCK_Connector::accept)
??? 7 利用通訊對象接收和發(fā)送數(shù)據(jù)。(ACE_SOCK_Stream::recv和ACE_SOCK_Stream::send)
posted @
2007-02-25 19:30 walkspeed 閱讀(12198) |
評論 (10) |
編輯 收藏
??? ACE_IPC_SAP類是IPC類族的基類,封裝了句柄,提供了訪問句柄的基本接口,基本結(jié)構(gòu)
如下
class ACE_IPC_SAP
{
public:
? int enable (int value) const;
? int disable (int value) const;
? ACE_HANDLE get_handle (void) const;
? void set_handle (ACE_HANDLE handle);
protected:
? ACE_IPC_SAP (void);
? ~ACE_IPC_SAP (void);
private:
? ACE_HANDLE handle_;
};
??? ACE_SOCK類是使用Socket的基類,所有使用Socket通訊的類都從這個類派生。本類的功
能包括
??? 1 創(chuàng)建和銷毀Socket句柄
??? 2 獲取本地和遠(yuǎn)端的網(wǎng)絡(luò)地址
??? 3 設(shè)置和讀取Socket選項。
基本結(jié)構(gòu)如下
class ACE_SOCK : public ACE_IPC_SAP
{
public:
? //設(shè)置Socket的屬性,包裝了setsockopt系統(tǒng)函數(shù)
? int set_option (int level,
????????????????? int option,
????????????????? void *optval,
????????????????? int optlen) const;
? //獲取Socket的屬性,包裝了getsockopt系統(tǒng)函數(shù)
? int get_option (int level,
????????????????? int option,
????????????????? void *optval,
????????????????? int *optlen) const;
? //獲得本地地址
? int get_local_addr (ACE_Addr &) const;
? //獲取遠(yuǎn)端地址
? int get_remote_addr (ACE_Addr &) const;
? //關(guān)閉Socket
? int close (void);
? //打開一個Socket,沒有Qos
? int open (int type,
??????????? int protocol_family,
??????????? int protocol,
??????????? int reuse_addr);
? //打開一個Socket,有Qos
? int open (int type,
??????????? int protocol_family,
??????????? int protocol,
??????????? ACE_Protocol_Info *protocolinfo,
??????????? ACE_SOCK_GROUP g,
??????????? u_long flags,
??????????? int reuse_addr);
protected:
? ACE_SOCK (int type,
??????????? int protocol_family,
??????????? int protocol = 0,
??????????? int reuse_addr = 0);
? ACE_SOCK (int type,
??????????? int protocol_family,
??????????? int protocol,
??????????? ACE_Protocol_Info *protocolinfo,
??????????? ACE_SOCK_GROUP g,
??????????? u_long flags,
??????????? int reuse_addr);
? ACE_SOCK (void);
? ~ACE_SOCK (void);
};
??? ACE_SOCK_IO類,包裝了Socket數(shù)據(jù)通訊的基本方法。本類提供的功能
??? 1 支持?jǐn)?shù)據(jù)的發(fā)送和接收
??? 2 支持分散讀操作
??? 3 支持集中寫操作
??? 4 支持阻塞,非阻塞,定時 I/O操作
基本結(jié)構(gòu)如下
class ACE_SOCK_IO : public ACE_SOCK
{
public:
? ACE_SOCK_IO (void);
? ~ACE_SOCK_IO (void);
? //接收數(shù)據(jù)
? ssize_t recv (void *buf,
??????????????? size_t n,
??????????????? const ACE_Time_Value *timeout = 0) const;
? //分散讀操作
? ssize_t recv (iovec iov[],
??????????????? size_t n,
??????????????? const ACE_Time_Value *timeout = 0) const;
? //發(fā)送數(shù)據(jù)?????????????
? ssize_t send (const void *buf,
??????????????? size_t n,
??????????????? const ACE_Time_Value *timeout = 0) const;
? //集中寫操作
? ssize_t send (const iovec iov[],
??????????????? size_t n,
??????????????? const ACE_Time_Value *timeout = 0) const;
};
??? ACE_SOCK_Stream類,繼承ACE_SOCK_IO類。在ACE_SOCK_IO類提供的功能上,添加了發(fā)送
和接收剛好n個字節(jié)的能力。基本結(jié)構(gòu)如下
class ACE_Export ACE_SOCK_Stream : public ACE_SOCK_IO
{
public:
? ACE_SOCK_Stream (void);
? ACE_SOCK_Stream (ACE_HANDLE h);
? ~ACE_SOCK_Stream (void);
?
? //剛好讀取n個字節(jié)的數(shù)據(jù)
? ssize_t recv_n (void *buf,
????????????????? size_t len,
????????????????? const ACE_Time_Value *timeout = 0,
????????????????? size_t *bytes_transferred = 0) const;
? //分散讀剛好n個字節(jié)操作
? ssize_t recvv_n (iovec iov[],
?????????????????? int iovcnt,
?????????????????? const ACE_Time_Value *timeout = 0,
?????????????????? size_t *bytes_transferred = 0) const;
? //剛好發(fā)送n個字節(jié)的數(shù)據(jù)
? ssize_t send_n (const void *buf,
????????????????? size_t len,
????????????????? const ACE_Time_Value *timeout = 0,
????????????????? size_t *bytes_transferred = 0) const;
? //集中寫剛好n個字節(jié)操作
? ssize_t sendv_n (const iovec iov[],
?????????????????? int iovcnt,
?????????????????? const ACE_Time_Value *timeout = 0,
?????????????????? size_t *bytes_transferred = 0) const;
? int close_reader (void);
? int close_writer (void);
? int close (void);
? typedef ACE_INET_Addr PEER_ADDR;
};
??? ACE_SOCK_Acceptor類是一個工廠類,用來被動產(chǎn)生一個新的通訊端點。提供如下能力
??? 1 接收對等端的連接
??? 2 連接可以通過阻塞、非阻塞或定時方式接受。
基本結(jié)構(gòu)如下
class ACE_Export ACE_SOCK_Acceptor : public ACE_SOCK
{
public:
? ACE_SOCK_Acceptor (void);
? ACE_SOCK_Acceptor (const ACE_Addr &local_sap,
???????????????????? int reuse_addr = 0,
???????????????????? int protocol_family = PF_UNSPEC,
???????????????????? int backlog = ACE_DEFAULT_BACKLOG,
???????????????????? int protocol = 0);
? ~ACE_SOCK_Acceptor (void);
? //打開一個監(jiān)聽
? int open (const ACE_Addr &local_sap,
??????????? int reuse_addr = 0,
??????????? int protocol_family = PF_UNSPEC,
??????????? int backlog = ACE_DEFAULT_BACKLOG,
??????????? int protocol = 0);
? int close (void);
? //接受一個對等端的連接,產(chǎn)生一個通訊
? int accept (ACE_SOCK_Stream &new_stream,
????????????? ACE_Addr *remote_addr = 0,
????????????? ACE_Time_Value *timeout = 0,
????????????? int restart = 1,
????????????? int reset_new_handle = 0) const;
};
??? ACE_SOCK_Connector類是一個工廠類,用來主動建立一個新的通訊端。提供的功能如下
??? 1 發(fā)起一個到對等接受者的連接,并在連接后產(chǎn)生一個通訊對象
??? 2 連接可以通過阻塞、非阻塞或定時方式發(fā)起
基本結(jié)構(gòu)如下
class ACE_SOCK_Connector
{
public:
? ACE_SOCK_Connector (void);
? ACE_SOCK_Connector (ACE_SOCK_Stream &new_stream,
????????????????????? const ACE_Addr &remote_sap,
????????????????????? const ACE_Time_Value *timeout = 0,
????????????????????? const ACE_Addr &local_sap = ACE_Addr::sap_any,
????????????????????? int reuse_addr = 0,
????????????????????? int flags = 0,
????????????????????? int perms = 0,
????????????????????? int protocol = 0);
? //發(fā)起一個連接
? int connect (ACE_SOCK_Stream &new_stream,
?????????????? const ACE_Addr &remote_sap,
?????????????? const ACE_Time_Value *timeout = 0,
?????????????? const ACE_Addr &local_sap = ACE_Addr::sap_any,
?????????????? int reuse_addr = 0,
?????????????? int flags = 0,
?????????????? int perms = 0,
?????????????? int protocol = 0);
? ~ACE_SOCK_Connector (void);
? int complete (ACE_SOCK_Stream &new_stream,
??????????????? ACE_Addr *remote_sap = 0,
??????????????? const ACE_Time_Value *timeout = 0);
};
??? 以上的類結(jié)構(gòu)是簡化的,以突出重點功能。要完全了解每個類,看源代碼。
posted @
2007-02-24 14:15 walkspeed 閱讀(3872) |
評論 (0) |
編輯 收藏
??? 多路分配的入口函數(shù)handle_events,在框架的實現(xiàn)接口類中定義
??? 多路分配的主體實現(xiàn)函數(shù)event_handling,在ACE_WFMO_Reactor中定義,偽
實現(xiàn)代碼如下
int ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
????????????????????????????????????? int alertable)
{
? int result = 0;
?
? do
? {
????? //等待在句柄集上發(fā)生的事件
????? //wait_for_multiple_events的具體實現(xiàn)是使用
????? //WaitForMultipleObjectsEx函數(shù)
????? DWORD wait_status = this->wait_for_multiple_events (timeout,
????????????????????????????????????????????????????????? alertable);
????? //分發(fā)事件
????? result = this->safe_dispatch (wait_status);
? }while (result == 0);
? return result;
}
??? 分發(fā)的主體函數(shù)是dispatch_handles,在ACE_WFMO_Reactor中定義,偽實現(xiàn)
代碼如下
int ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
{
? DWORD dispatch_slot = 0;
? //活動的句柄總數(shù)
? DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
? //查找要分發(fā)的句柄的索引
? for (int number_of_handlers_dispatched = 1;;++number_of_handlers_dispatched)
? {
????? //計算有事件發(fā)生,要分發(fā)的句柄索引
????? dispatch_slot += wait_status - WAIT_OBJECT_0;
????? //分發(fā)給相應(yīng)的事件處理對象
????? if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
??????? return -1;
????? ++dispatch_slot;
????? if (dispatch_slot >= max_handlep1)
??????? return number_of_handlers_dispatched;//分發(fā)了幾個事件
????? //檢查剩下的句柄中有沒有有事件發(fā)生的
????? wait_status = this->poll_remaining_handles (dispatch_slot);
????? switch (wait_status)
????? {
??????? case WAIT_FAILED: // Failure.
????????? ACE_OS::set_errno_to_last_error ();
????????? /* FALLTHRU */
??????? case WAIT_TIMEOUT:
????????? // There are no more handles ready, we can return.
????????? return number_of_handlers_dispatched;//分發(fā)了幾個事件
????? }
? }
}
??? 找到具體事件處理對象主體函數(shù)complex_dispatch_hander,在ACE_WFMO_Reactor
中定義,為代碼如下
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
??????????????????????????????????????????????? ACE_HANDLE event_handle)
{
? //找到當(dāng)前的分發(fā)的信息
? ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
??? this->handler_rep_.current_info ()[slot];
? WSANETWORKEVENTS events;
? ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
? if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
????????????????????????????? event_handle,
????????????????????????????? &events) == SOCKET_ERROR)
??? problems = ACE_Event_Handler::ALL_EVENTS_MASK;
? else
? {
????? //發(fā)生的事件于要檢測的事件是否相同,相同就分發(fā)
????? events.lNetworkEvents &= current_info.network_events_;
????? while (events.lNetworkEvents != 0)
????? {
????????? ACE_Event_Handler *event_handler = current_info.event_handler_;
????????? //調(diào)用事件處理對象,進行事件處理
????????? problems |= this->upcall (current_info.event_handler_,
??????????????????????????????????? current_info.io_handle_,
??????????????????????????????????? events);
????????? if (this->handler_rep_.scheduled_for_deletion (slot))
??????????? break;
????? }
? }
? return 0;
}
posted @
2007-02-22 11:46 walkspeed 閱讀(1397) |
評論 (0) |
編輯 收藏
1
int ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long)
{
? return this->event_handling (&how_long, FALSE);
}
2
// Waits for and dispatches all events.? Returns -1 on error, 0 if
// max_wait_time expired, or the number of events that were dispatched.
int ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
????????????????????????????????????? int alertable)
{
? ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
? // Make sure we are not closed
? if (!this->open_for_business_ || this->deactivated_)
????? return -1;
? // Stash the current time -- the destructor of this object will
? // automatically compute how much time elapsed since this method was
? // called.
? ACE_Countdown_Time countdown (max_wait_time);
? int result;
?
? do
? {
????? // Check to see if it is ok to enter ::WaitForMultipleObjects
????? // This will acquire <this->lock_> on success On failure, the
????? // lock will not be acquired
?????
????? result = this->ok_to_wait (max_wait_time, alertable);
????? if (result != 1)
??????? return result;
????? // Increment the number of active threads
????? ++this->active_threads_;
????? // Release the <lock_>
????? this->lock_.release ();
????? // Update the countdown to reflect time waiting to play with the
????? // mut and event.
?????
????? countdown.update ();
????? // Calculate timeout
????? int timeout = this->calculate_timeout (max_wait_time);
????? // Wait for event to happen
????? DWORD wait_status = this->wait_for_multiple_events (timeout,
????????????????????????????????????????????????????????? alertable);
????? // Upcall
????? result = this->safe_dispatch (wait_status);
????? if (0 == result)
????? {
????????? // wait_for_multiple_events timed out without dispatching
????????? // anything.? Because of rounding and conversion errors and
????????? // such, it could be that the wait loop timed out, but
????????? // the timer queue said it wasn't quite ready to expire a
????????? // timer. In this case, max_wait_time won't have quite been
????????? // reduced to 0, and we need to go around again. If max_wait_time
????????? // is all the way to 0, just return, as the entire time the
????????? // caller wanted to wait has been used up.
????????? countdown.update ();???? // Reflect time waiting for events
?????????
????????? if (0 == max_wait_time || max_wait_time->usec () == 0)
??????????? break;
????? }
? }while (result == 0);
? return result;
}
3
int ACE_WFMO_Reactor::safe_dispatch (DWORD wait_status)
{
? int result = -1;
? ACE_SEH_TRY
? {
????? result = this->dispatch (wait_status);
? }
? ACE_SEH_FINALLY
? {
????? this->update_state ();
? }
? return result;
}
4
int ACE_WFMO_Reactor::dispatch (DWORD wait_status)
{
? int handlers_dispatched = 0;
? // Expire timers
? handlers_dispatched += this->expire_timers ();
? switch (wait_status)
? {
? case WAIT_FAILED: // Failure.
????? ACE_OS::set_errno_to_last_error ();
????? return -1;
? case WAIT_TIMEOUT: // Timeout.
????? errno = ETIME;
????? return handlers_dispatched;
? default:? // Dispatch.
????? // We'll let dispatch worry about abandoned mutes.
????? handlers_dispatched += this->dispatch_handles (wait_status);
????? return handlers_dispatched;
? }
}
5
// Dispatches any active handles from <handles_[slot]> to
// <handles_[max_handlep1_]>, polling through our handle set looking
// for active handles.
int ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
{
? // dispatch_slot is the absolute slot.? Only += is used to
? // increment it.
? DWORD dispatch_slot = 0;
? // Cache this value, this is the absolute value.
? DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
? // nCount starts off at <max_handlep1>, this is a transient count of
? // handles last waited on.
? DWORD nCount = max_handlep1;
? for (int number_of_handlers_dispatched = 1;;++number_of_handlers_dispatched)
? {
????? const bool ok = (wait_status >= WAIT_OBJECT_0 && wait_status <= (WAIT_OBJECT_0 + nCount));
????? if (ok)
??????? dispatch_slot += wait_status - WAIT_OBJECT_0;
????? else
??????? // Otherwise, a handle was abandoned.
??????? dispatch_slot += wait_status - WAIT_ABANDONED_0;
????? // Dispatch handler
????? if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
??????? return -1;
????? // Increment slot
????? ++dispatch_slot;
????? // We're done.
????? if (dispatch_slot >= max_handlep1)
??????? return number_of_handlers_dispatched;
????? // Readjust nCount
????? nCount = max_handlep1 - dispatch_slot;
????? // Check the remaining handles
????? wait_status = this->poll_remaining_handles (dispatch_slot);
????? switch (wait_status)
????? {
??????? case WAIT_FAILED: // Failure.
????????? ACE_OS::set_errno_to_last_error ();
????????? /* FALLTHRU */
??????? case WAIT_TIMEOUT:
????????? // There are no more handles ready, we can return.
????????? return number_of_handlers_dispatched;
????? }
? }
}
6
int ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
??????????????????????????????????????? DWORD max_handlep1)
{
? // Check if there are window messages that need to be dispatched
? if (slot == max_handlep1)
??? return this->dispatch_window_messages ();
? // Dispatch the handler if it has not been scheduled for deletion.
? // Note that this is a very week test if there are multiple threads
? // dispatching this slot as no locks are held here. Generally, you
? // do not want to do something like deleting the this pointer in
? // handle_close() if you have registered multiple times and there is
? // more than one thread in WFMO_Reactor->handle_events().
? else if (!this->handler_rep_.scheduled_for_deletion (slot))
? {
????? ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
????? if (this->handler_rep_.current_info ()[slot].io_entry_)
??????? return this->complex_dispatch_handler (slot,
?????????????????????????????????????????????? event_handle);
????? else
??????? return this->simple_dispatch_handler (slot,
????????????????????????????????????????????? event_handle);
? }
? else
??? // The handle was scheduled for deletion, so we will skip it.
??? return 0;
}
7
int ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
??????????????????????????????????????????????? ACE_HANDLE event_handle)
{
? // This dispatch is used for I/O entires.
? ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
??? this->handler_rep_.current_info ()[slot];
? WSANETWORKEVENTS events;
? ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
? if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
????????????????????????????? event_handle,
????????????????????????????? &events) == SOCKET_ERROR)
??? problems = ACE_Event_Handler::ALL_EVENTS_MASK;
? else
? {
????? // Prepare for upcalls. Clear the bits from <events> representing
????? // events the handler is not interested in. If there are any left,
????? // do the upcall(s). upcall will replace events.lNetworkEvents
????? // with bits representing any functions that requested a repeat
????? // callback before checking handles again. In this case, continue
????? // to call back unless the handler is unregistered as a result of
????? // one of the upcalls. The way this is written, the upcalls will
????? // keep being done even if one or more upcalls reported problems.
????? // In practice this may turn out not so good, but let's see. If any
????? // problems, please notify Steve Huston <shuston@riverace.com>
????? // before or after you change this code.
????? events.lNetworkEvents &= current_info.network_events_;
????? while (events.lNetworkEvents != 0)
????? {
????????? ACE_Event_Handler *event_handler = current_info.event_handler_;
????????? int reference_counting_required =
??????????? event_handler->reference_counting_policy ().value () ==
??????????? ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
????????? // Call add_reference() if needed.
????????? if (reference_counting_required)
????????? {
????????????? event_handler->add_reference ();
????????? }
????????? // Upcall
????????? problems |= this->upcall (current_info.event_handler_,
??????????????????????????????????? current_info.io_handle_,
??????????????????????????????????? events);
????????? // Call remove_reference() if needed.
????????? if (reference_counting_required)
????????? {
????????????? event_handler->remove_reference ();
????????? }
????????? if (this->handler_rep_.scheduled_for_deletion (slot))
??????????? break;
????? }
? }
? if (problems != ACE_Event_Handler::NULL_MASK
????? && !this->handler_rep_.scheduled_for_deletion (slot)? )
??? this->handler_rep_.unbind (event_handle, problems);
? return 0;
}
8
ACE_Reactor_Mask ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
?????????????????????????????????????????? ACE_HANDLE io_handle,
?????????????????????????????????????????? WSANETWORKEVENTS &events)
{
? // This method figures out what exactly has happened to the socket
? // and then calls appropriate methods.
? ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
? // Go through the events and do the indicated upcalls. If the handler
? // doesn't want to be called back, clear the bit for that event.
? // At the end, set the bits back to <events> to request a repeat call.
? long actual_events = events.lNetworkEvents;
? int action;
? if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
? {
????? action = event_handler->handle_output (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_WRITE);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
? {
????? if (events.iErrorCode[FD_CONNECT_BIT] == 0)
????? {
????????? // Successful connect
????????? action = event_handler->handle_output (io_handle);
????????? if (action <= 0)
????????? {
????????????? ACE_CLR_BITS (actual_events, FD_CONNECT);
????????????? if (action == -1)
??????????????? ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
????????? }
????? }
????? // Unsuccessful connect
????? else
????? {
????????? action = event_handler->handle_input (io_handle);
????????? if (action <= 0)
????????? {
????????????? ACE_CLR_BITS (actual_events, FD_CONNECT);
????????????? if (action == -1)
??????????????? ACE_SET_BITS (problems, ACE_Event_Handler::CONNECT_MASK);
????????? }
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_OOB))
? {
????? action = event_handler->handle_exception (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_OOB);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_READ))
? {
????? action = event_handler->handle_input (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_READ);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
????? && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
? {
????? action = event_handler->handle_input (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_CLOSE);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
? {
????? action = event_handler->handle_input (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_ACCEPT);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_QOS))
? {
????? action = event_handler->handle_qos (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_QOS);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
????? }
? }
? if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
? {
????? action = event_handler->handle_group_qos (io_handle);
????? if (action <= 0)
????? {
????????? ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
????????? if (action == -1)
??????????? ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
????? }
? }
? events.lNetworkEvents = actual_events;
? return problems;
}
posted @
2007-02-22 10:54 walkspeed 閱讀(1594) |
評論 (0) |
編輯 收藏
??????? ACE這個開源項目的代碼的量是很大的。對于系統(tǒng)編程、網(wǎng)絡(luò)編程是非常的有用,但是對其介紹的書和資料是非常的少。要想了解和很好的使用,就必須要自己去看源代碼了。(好在還有三本書,庫中的事例還是比較的多)
??????? ACE庫中的代碼大體可以分為三大部分
??????? 1? OS Adaptation(操作系統(tǒng)適配)部分。這部分主要是屏蔽各操作系統(tǒng)的API的不同,將系統(tǒng)調(diào)用接口統(tǒng)一到C++函數(shù)的接口,以實現(xiàn)平臺的可移植。
??????? 2? C++ Wrapper Facade(C++外包)部分。這部分主要是將相互關(guān)聯(lián)的操作和數(shù)據(jù)結(jié)構(gòu)封裝到C++類中,提供統(tǒng)一的接口。提供強類型的檢測,降低不必要的認(rèn)為錯誤。利用C++多態(tài)、繼承等能力,形成一個架構(gòu),使本地的、遠(yuǎn)端的操作統(tǒng)一在一個相同接口和使用策略下。
??????? 3? Framework(框架)部分。為一組相關(guān)的應(yīng)用提供可復(fù)用的架構(gòu)。開發(fā)者在確定了應(yīng)用后,選擇可用的架構(gòu)開發(fā)應(yīng)用程序,不用考慮平臺和底層機制。快速的開發(fā)。
??????? 第一部分面對大量的底層的細(xì)節(jié),而且目標(biāo)是平臺的移植,如果對平臺的移植感興趣的兄弟姐妹可以看看。一般情況下沒什么看的必要。
??????? 第二部分和第三部分對我們的應(yīng)用和學(xué)習(xí)如何利用C++來設(shè)計程序非常的有價值,要多看。細(xì)心揣摩,特別是配上設(shè)計的問題(宏觀的和微觀的)。
posted @
2007-02-20 12:28 walkspeed 閱讀(6982) |
評論 (3) |
編輯 收藏
??????? chown??? 設(shè)置文件或目錄的所有權(quán)用戶。
??????? chown 用戶名.組名 文件或目錄名。
?????? 例 1 chown user.group directory? 2 chown user.group file
??????? chmod??? 改變文件或目錄的用戶權(quán)限。
??????? 一個文件或目錄的用戶權(quán)限由三個部分組成 所有者權(quán)限 所有者所在的組的權(quán)限 其他用戶的權(quán)限。每個權(quán)限又由三種權(quán)力組成,讀權(quán)力 寫權(quán)力 執(zhí)行權(quán)力
??????? 例 1 chmod 711 file 所有者有讀、寫、執(zhí)行權(quán)力
???????????????????????????????????????? 所有者所在組用戶有執(zhí)行權(quán)力
???????????????????????????????????????? 其他用戶有執(zhí)行權(quán)力
posted @
2007-02-04 23:35 walkspeed 閱讀(581) |
評論 (0) |
編輯 收藏
??????? 想學(xué)的時候就開始學(xué)習(xí)。聽過設(shè)計模式,心動了,那就開始學(xué)吧。是的,開始時不能明白那里面寫些什么,不要緊,先把它們背下來。里面的實例代碼用手寫一邊,不要只是看一邊。就看那本叫設(shè)計模式的書,這不是說那些以它為根本的注解的書不好,只是個人認(rèn)為注解帶有注解人的理解和思考,看了反而添加了更多的不理解的概念。將寫過的代碼拿出來,用模式的眼光去重新審視,有了想法就去編碼實現(xiàn),并不但的改進。
posted @
2007-01-30 21:28 walkspeed 閱讀(564) |
評論 (1) |
編輯 收藏