協(xié)程,即協(xié)作式程序,其思想是,一系列互相依賴的協(xié)程間依次使用CPU,每次只有一個協(xié)程工作,而其他協(xié)程處于休眠狀態(tài)。協(xié)程可以在運行期間的某個點上暫停執(zhí)行,并在恢復運行時從暫停的點上繼續(xù)執(zhí)行。 協(xié)程已經(jīng)被證明是一種非常有用的程序組件,不僅被python、lua、ruby等腳本語言廣泛采用,而且被新一代面向多核的編程語言如golang rust-lang等采用作為并發(fā)的基本單位。 協(xié)程可以被認為是一種用戶空間線程,與傳統(tǒng)的線程相比,有2個主要的優(yōu)點:
首先來簡單回顧一下一些常用的網(wǎng)絡(luò)編程模型。網(wǎng)絡(luò)編程模型可以大體的分為同步模型和異步模型兩類。
同步模型使用阻塞IO模式,在阻塞IO模式下調(diào)用read等IO函數(shù)時會阻塞線程直到IO完成或失敗。
同步模型的典型代表是thread per connection模型,每當阻塞在主線程上的accept調(diào)用返回時則創(chuàng)建一個新的線程去服務(wù)于新的socket的讀/寫。這種模型的優(yōu)點是程序簡潔,編寫簡單;缺點是可伸縮性收到線程數(shù)的限制,當連接越來越多時,線程也越來越多,頻繁的線程切換會嚴重拖累性能。
異步模型一般使用非阻塞IO模式,并配合epoll/select/poll等多路復用機制。在非阻塞模式下調(diào)用read,如果沒有數(shù)據(jù)可讀則立即返回并通知用戶沒有可讀(EAGAIN/EWOULDBLOCK),而非阻塞當前線程。異步模型可以使一個線程同時服務(wù)于多個IO對象。
異步模型的典型代表是reactor模型。在reactor模型中,我們將所有要處理的IO事件注冊到一個中心的IO多路復用器中(一般為epoll/select/poll),同時主線程阻塞在多路復用器上。一旦有IO事件到來或者就緒,多路復用器返回并將對應(yīng)的IO事件分發(fā)到對應(yīng)的處理器(即回調(diào)函數(shù))中,最后處理器調(diào)用read/write函數(shù)來進行IO操作。
異步模型的特點是性能和可伸縮性比同步模型要好很多,但是其結(jié)構(gòu)復雜,不易于編寫和維護。在異步模型中,IO之前的代碼(IO任務(wù)的提交者)和IO之后的處理代碼(回調(diào)函數(shù))是割裂開來的。
協(xié)程為克服同步模型和異步模型的缺點,并結(jié)合他們的優(yōu)點提供了可能: 現(xiàn)在假設(shè)我們有3個協(xié)程A,B,C分別要進行數(shù)次IO操作。這3個協(xié)程運行在同一個調(diào)度器或者說線程的上下文中,并依次使用CPU。調(diào)度器在其內(nèi)部維護了一個多路復用器(epoll/select/poll)。
協(xié)程A首先運行,當它執(zhí)行到一個IO操作,但該IO操作并沒有立即就緒時,A將該IO事件注冊到調(diào)度器中,并主動放棄CPU。這時調(diào)度器將B切換到CPU上開始執(zhí)行,同樣,當它碰到一個IO操作的時候?qū)O事件注冊到調(diào)度器中,并主動放棄CPU。調(diào)度器將C切換到cpu上開始執(zhí)行。當所有協(xié)程都被“阻塞”后,調(diào)度器檢查注冊的IO事件是否發(fā)生或就緒。假設(shè)此時協(xié)程B注冊的IO時間已經(jīng)就緒,調(diào)度器將恢復B的執(zhí)行,B將從上次放棄CPU的地方接著向下運行。A和C同理。
這樣,對于每一個協(xié)程來說,是同步的模型;但是對于整個應(yīng)用程序來說,卻是異步的模型。
好了,原理說完了,我們來看一個實際的例子,echo server。
在這個例子中,我們將使用orchid庫來編寫一個echo server。orchid庫是一個構(gòu)建于boost基礎(chǔ)上的 協(xié)程/網(wǎng)絡(luò)IO 庫。
echo server首先必須要處理連接事件,我們創(chuàng)建一個協(xié)程來專門處理連接事件:
typedef boost::shared_ptr<orchid::socket> socket_ptr;
//處理ACCEPT事件的協(xié)程
void handle_accept(orchid::coroutine_handle co) {
try {
orchid::acceptor acceptor(co -> get_scheduler().get_io_service());//構(gòu)建一個acceptor
acceptor.bind_and_listen("5678",true);
for(;;) {
socket_ptr sock(new orchid::socket(co -> get_scheduler().get_io_service()));
acceptor.accept(*sock,co);
//在調(diào)度器上創(chuàng)建一個協(xié)程來服務(wù)新的socket。第一個參數(shù)是要創(chuàng)建的協(xié)程的main函數(shù),第二個參數(shù)是要創(chuàng)建的協(xié)程的棧的大小。
co -> get_scheduler().spawn(boost::bind(handle_io,_1,sock),orchid::minimum_stack_size());
}
} catch(boost::system::system_error& e) {
cerr<<e.code()<<" "<<e.what()<<endl;
}
}
在orchid中,協(xié)程的main函數(shù)必須滿足函數(shù)簽名void(orchid::coroutine_handle),如handle_accept所示,其中參數(shù)co是協(xié)程句柄,代表了當前函數(shù)所位于的協(xié)程。
在上面的代碼中,我們創(chuàng)建了一個acceptor,并讓它監(jiān)聽5678端口,然后在"阻塞"等待連接到來,當連接事件到來時,創(chuàng)建一個新的協(xié)程來服務(wù)新的socket。處理套接字IO的協(xié)程如下:
//處理SOCKET IO事件的協(xié)程
void handle_io(orchid::coroutine_handle co,socket_ptr sock) {
orchid::tcp_ostream out(*sock,co);
orchid::tcp_istream in(*sock,co);
for(std::string str;std::getline(in, str) && out;) {
out<<str<<endl;
}
}
IO處理協(xié)程首先在傳入的套接字上創(chuàng)建了一個輸入流和一個輸出流,分別代表了TCP的輸入和輸出。然后不斷地從輸入流中讀取一行,并輸出到輸出流當中。當socket上的TCP連接斷開時,輸入流和輸出流的eof標志為會被置位,因此循環(huán)結(jié)束,協(xié)程退出。
orchid可以使用戶以流的形式來操作套接字。輸入流和輸出流分別提供了std::istream和std::ostream的接口;輸入流和輸出流是帶緩沖的,如果用戶需要無緩沖的讀寫socket或者自建緩沖,可以直接調(diào)用orchid::socket的read和write函數(shù)。但是需要注意這兩個函數(shù)會拋出boost::system_error異常來表示錯誤。
細心的讀者可能已經(jīng)發(fā)現(xiàn),handle_io的函數(shù)簽名并不滿足void(orchid::coroutine_handle),回到handle_accept中,可以發(fā)現(xiàn),實際上我們使用了boost.bind對handle _ io函數(shù)進行了適配,使之符合函數(shù)簽名的要求。
最后是main函數(shù):
int main() {
orchid::scheduler sche;
sche.spawn(handle_accept,orchid::coroutine::minimum_stack_size());//創(chuàng)建協(xié)程
sche.run();
}
在上面這個echo server的例子中,我們采用了一種 coroutine per connection 的編程模型,與傳統(tǒng)的 thread per connection 模型一樣的簡潔清晰,但是整個程序?qū)嶋H上運行在同一線程當中。
由于協(xié)程的切換開銷遠遠小于線程,因此我們可以輕易的同時啟動上千協(xié)程來同時服務(wù)上千連接,這是 thread per connection的模型很難做到的;在性能方面,整個底層的IO系統(tǒng)實際上是使用boost.asio這種高性能的異步io庫實現(xiàn)的。而且與IO所費的時間相比,協(xié)程切換的開銷基本可以忽略。
因此通過協(xié)程,我們可以在保持同步IO模型簡潔性的同時,獲得近似于異步IO模型的高性能。
協(xié)程,即協(xié)作式程序,其思想是,一系列互相依賴的協(xié)程間依次使用CPU,每次只有一個協(xié)程工作,而其他協(xié)程處于休眠狀態(tài)。協(xié)程可以在運行期間的某個點上暫停執(zhí)行,并在恢復運行時從暫停的點上繼續(xù)執(zhí)行。 協(xié)程已經(jīng)被證明是一種非常有用的程序組件,不僅被python、lua、ruby等腳本語言廣泛采用,而且被新一代面向多核的編程語言如golang rust-lang等采用作為并發(fā)的基本單位。 協(xié)程可以被認為是一種用戶空間線程,與傳統(tǒng)的線程相比,有2個主要的優(yōu)點:
首先來簡單回顧一下一些常用的網(wǎng)絡(luò)編程模型。網(wǎng)絡(luò)編程模型可以大體的分為同步模型和異步模型兩類。
同步模型使用阻塞IO模式,在阻塞IO模式下調(diào)用read等IO函數(shù)時會阻塞線程直到IO完成或失敗。
同步模型的典型代表是thread per connection模型,每當阻塞在主線程上的accept調(diào)用返回時則創(chuàng)建一個新的線程去服務(wù)于新的socket的讀/寫。這種模型的優(yōu)點是程序簡潔,編寫簡單;缺點是可伸縮性收到線程數(shù)的限制,當連接越來越多時,線程也越來越多,頻繁的線程切換會嚴重拖累性能。
異步模型一般使用非阻塞IO模式,并配合epoll/select/poll等多路復用機制。在非阻塞模式下調(diào)用read,如果沒有數(shù)據(jù)可讀則立即返回并通知用戶沒有可讀(EAGAIN/EWOULDBLOCK),而非阻塞當前線程。異步模型可以使一個線程同時服務(wù)于多個IO對象。
異步模型的典型代表是reactor模型。在reactor模型中,我們將所有要處理的IO事件注冊到一個中心的IO多路復用器中(一般為epoll/select/poll),同時主線程阻塞在多路復用器上。一旦有IO事件到來或者就緒,多路復用器返回并將對應(yīng)的IO事件分發(fā)到對應(yīng)的處理器(即回調(diào)函數(shù))中,最后處理器調(diào)用read/write函數(shù)來進行IO操作。
異步模型的特點是性能和可伸縮性比同步模型要好很多,但是其結(jié)構(gòu)復雜,不易于編寫和維護。在異步模型中,IO之前的代碼(IO任務(wù)的提交者)和IO之后的處理代碼(回調(diào)函數(shù))是割裂開來的。
協(xié)程為克服同步模型和異步模型的缺點,并結(jié)合他們的優(yōu)點提供了可能: 現(xiàn)在假設(shè)我們有3個協(xié)程A,B,C分別要進行數(shù)次IO操作。這3個協(xié)程運行在同一個調(diào)度器或者說線程的上下文中,并依次使用CPU。調(diào)度器在其內(nèi)部維護了一個多路復用器(epoll/select/poll)。
協(xié)程A首先運行,當它執(zhí)行到一個IO操作,但該IO操作并沒有立即就緒時,A將該IO事件注冊到調(diào)度器中,并主動放棄CPU。這時調(diào)度器將B切換到CPU上開始執(zhí)行,同樣,當它碰到一個IO操作的時候?qū)O事件注冊到調(diào)度器中,并主動放棄CPU。調(diào)度器將C切換到cpu上開始執(zhí)行。當所有協(xié)程都被“阻塞”后,調(diào)度器檢查注冊的IO事件是否發(fā)生或就緒。假設(shè)此時協(xié)程B注冊的IO時間已經(jīng)就緒,調(diào)度器將恢復B的執(zhí)行,B將從上次放棄CPU的地方接著向下運行。A和C同理。
這樣,對于每一個協(xié)程來說,是同步的模型;但是對于整個應(yīng)用程序來說,卻是異步的模型。
好了,原理說完了,我們來看一個實際的例子,echo server。
在這個例子中,我們將使用orchid庫來編寫一個echo server。orchid庫是一個構(gòu)建于boost基礎(chǔ)上的 協(xié)程/網(wǎng)絡(luò)IO 庫。
echo server首先必須要處理連接事件,我們創(chuàng)建一個協(xié)程來專門處理連接事件:
typedef boost::shared_ptr<orchid::socket> socket_ptr;
//處理ACCEPT事件的協(xié)程
void handle_accept(orchid::coroutine_handle co) {
try {
orchid::acceptor acceptor(co -> get_scheduler().get_io_service());//構(gòu)建一個acceptor
acceptor.bind_and_listen("5678",true);
for(;;) {
socket_ptr sock(new orchid::socket(co -> get_scheduler().get_io_service()));
acceptor.accept(*sock,co);
//在調(diào)度器上創(chuàng)建一個協(xié)程來服務(wù)新的socket。第一個參數(shù)是要創(chuàng)建的協(xié)程的main函數(shù),第二個參數(shù)是要創(chuàng)建的協(xié)程的棧的大小。
co -> get_scheduler().spawn(boost::bind(handle_io,_1,sock),orchid::minimum_stack_size());
}
} catch(boost::system::system_error& e) {
cerr<<e.code()<<" "<<e.what()<<endl;
}
}
在orchid中,協(xié)程的main函數(shù)必須滿足函數(shù)簽名void(orchid::coroutine_handle),如handle_accept所示,其中參數(shù)co是協(xié)程句柄,代表了當前函數(shù)所位于的協(xié)程。
在上面的代碼中,我們創(chuàng)建了一個acceptor,并讓它監(jiān)聽5678端口,然后在"阻塞"等待連接到來,當連接事件到來時,創(chuàng)建一個新的協(xié)程來服務(wù)新的socket。處理套接字IO的協(xié)程如下:
//處理SOCKET IO事件的協(xié)程
void handle_io(orchid::coroutine_handle co,socket_ptr sock) {
orchid::tcp_ostream out(*sock,co);
orchid::tcp_istream in(*sock,co);
for(std::string str;std::getline(in, str) && out;) {
out<<str<<endl;
}
}
IO處理協(xié)程首先在傳入的套接字上創(chuàng)建了一個輸入流和一個輸出流,分別代表了TCP的輸入和輸出。然后不斷地從輸入流中讀取一行,并輸出到輸出流當中。當socket上的TCP連接斷開時,輸入流和輸出流的eof標志為會被置位,因此循環(huán)結(jié)束,協(xié)程退出。
orchid可以使用戶以流的形式來操作套接字。輸入流和輸出流分別提供了std::istream和std::ostream的接口;輸入流和輸出流是帶緩沖的,如果用戶需要無緩沖的讀寫socket或者自建緩沖,可以直接調(diào)用orchid::socket的read和write函數(shù)。但是需要注意這兩個函數(shù)會拋出boost::system_error異常來表示錯誤。
細心的讀者可能已經(jīng)發(fā)現(xiàn),handle_io的函數(shù)簽名并不滿足void(orchid::coroutine_handle),回到handle_accept中,可以發(fā)現(xiàn),實際上我們使用了boost.bind對handle _ io函數(shù)進行了適配,使之符合函數(shù)簽名的要求。
最后是main函數(shù):
int main() {
orchid::scheduler sche;
sche.spawn(handle_accept,orchid::coroutine::minimum_stack_size());//創(chuàng)建協(xié)程
sche.run();
}
在上面這個echo server的例子中,我們采用了一種 coroutine per connection 的編程模型,與傳統(tǒng)的 thread per connection 模型一樣的簡潔清晰,但是整個程序?qū)嶋H上運行在同一線程當中。
由于協(xié)程的切換開銷遠遠小于線程,因此我們可以輕易的同時啟動上千協(xié)程來同時服務(wù)上千連接,這是 thread per connection的模型很難做到的;在性能方面,整個底層的IO系統(tǒng)實際上是使用boost.asio這種高性能的異步io庫實現(xiàn)的。而且與IO所費的時間相比,協(xié)程切換的開銷基本可以忽略。
因此通過協(xié)程,我們可以在保持同步IO模型簡潔性的同時,獲得近似于異步IO模型的高性能。