關于需求
進行程序開發,對于需求的把握是至關重要的。可以說,我之前沒有任何開發服務器程序的經驗,因此首先在對于需求的把握上出現了問題。
本程序的功能:
在Linux環境下實現一個服務器程序,通過管道,從本地的客戶端讀取數據,然后進行解析、組包,之后發送POST給遠程的服務器程序。最后,讀取遠程服務器發送回來的響應,并打印在屏幕上。
我的第一個程序就是僅僅考慮了上述的基本需求,然后編寫了一個單線程同步程序實現。但是,這樣的程序并不能夠滿足一個服務器程序的需求。
主要問題出現在如下方面:
1. 服務器程序代碼要保證絕對的健壯,不能因為程序或數據的異常而退出。
2. 程序要保證盡量減少數據包的丟失。
3. 程序對于管道的監聽應該采用阻塞的模式。
4. 對于管道傳遞過來的大量數據進行快速的處理。
5. 在保證整個服務器程序安全性的基礎上,要盡量增大整個系統的吞吐量。
如果同時的去考慮上述的問題,那么原先的程序結構就要進行調整了。
管道通信
我的程序當中采用了FIFO進行本地程序之間的通信。對于FIFO的應用,我們需要清楚如下事實:
1. open()、read()都可以阻塞程序的運行。到服務器端調用open函數時,如何此時沒有客戶端連接則程序會發生阻塞,直到第一個客戶端程序與服務器程序建立管道連接為止。在建立連接之后,如果管道當中沒有數據可讀,則read函數會發生阻塞直到管道當中有數據可讀為止。如果無客戶端連接,read不會發生阻塞,并且其返回值為0。
上面這一條要特別的注意,因為當read返回值為0的時候,我們就不能以read阻塞的方式來進行監聽了,倘若程序當中不存在其它的阻塞方式,那么整個服務器程序就會陷入循環調用,他會嚴重的浪費處理器資源。具體情景如下:
server_fifo_fd = open(SERVER_FIFO_NAME,O_RDONLY);
while(1)
{
read_res = read(server_fifo_fd, &my_data, sizeof(my_data));
cout << read_res << endl;
if(read_res > 0)
{
...
}
}
一旦出現無客戶端連接的狀態,這個程序將無法進行進入“空轉”狀態,其表現為不斷的打印0.這樣,我們就會因為一個“什么也不做”的循環而白白浪費處理器資源。
那么,我們又應該如何解決上述問題呢?請看下面代碼:
server_fifo_fd = open(SERVER_FIFO_NAME,O_RDONLY);
while(1)
{
read_res = read(server_fifo_fd, &my_data, sizeof(my_data));
cout << read_res << endl;
if(read_res > 0)
{
...
}
else
{
close(server_fifo_fd);
server_fifo_fd = open(SERVER_FIFO_NAME,O_RDONLY);
}
}
如何發現沒有客戶端連接在管道上,我們就關閉管道,然后再打開,通過open來再次讓程序進入阻塞狀態。
io_service
根據我現在的理解,io_service就是一個任務調度機。我們將任務交給io_service,他負責調度現有的系統資源,把這些任務消費掉。對于不同種類的任務,可以將它們分配給不同種類的調度機來分別執行,這樣即便于管理,又有利于增加程序的吞吐量。
在我的程序當中,大體存在兩個獨立的任務:
1. 從管道讀取數據。
2. 與遠程服務器之間的通信。
這樣,可以建立兩個調度機來管理這兩個相對獨立任務。為了運行這兩個調度機,我們首先需要將它們分別綁定在兩個線程上。這里還不得不提到一個問題:boost庫所提供的io_service在沒有任務執行的時候會自動的退出。而boost庫當中標準的線程綁定方式如下:
boost::thread *t_read = new boost::thread(boost::bind(
&boost::asio::io_service::run, io_service_read));
此時,相當于開始運行了io_service。也就是說,如果在它之前,沒有對io_service進行初始的綁定,那么程序就會自行的退出。再有就是如果在運行的過程當中,io_service處理完了其本身的所有任務,而服務器程序又不會新建一個調度機,那么該程序也將死掉。為了解決上述問題,我們需要對于io_service綁定一個資源消耗低而且會永遠執行下去的程序。
boost::asio為我們提供的定時器可以滿足上述的需求,我們可以創建一個循環定時器作為io_service的初始化任務。代碼如下:
class io_clock
{
private:
boost::asio::deadline_timer timer;
public:
io_clock(boost::asio::io_service &io):
timer(io, boost::posix_time::hours(24))
{
timer.async_wait(boost::bind(&io_clock::no_dead,this));
}
void no_dead()
{
timer.expires_at(timer.expires_at()+boost::posix_time::hours(24));
timer.async_wait(boost::bind(&io_clock::no_dead, this));
}
};
這段代碼是一段經典的定時器異步程序。關于異步程序的問題,過一會再討論。
下面繼續討論io_service。現在,我們已經知道什么是調度機了,并且計劃在系統當中運用兩個調度機,一個處理管道讀取,另外一個運行遠程通信。大致流程是,從管道讀取數據,之后進行解析,將解析后的數據傳入另外一個調度機當中實現數據包的組成、發送以及接收等操作。
現在一個新的問題產生了:如何實現兩個調度機之間的數據通信呢?
這里就涉及到io_service當中的post方法了。它可以實現將一個函數綁定到一個正在運行的io_service之上。這樣,只要實現每當io_service1產生了一個數據就可以通過post的方式傳遞給io_service2來進行繼續的執行。
請看下面這段代碼:
while(1)
{
//讀取數據
read_res = read(server_fifo_fd, &my_data, sizeof(my_data));
cout << read_res << endl;
if (read_res > 0)
{
//對于讀取后的數據進行解析
for (int i = 0; i < my_data.number; i++)
{
std::string Furl;
Furl = my_data.package[i].url;
post = my_data.package[i].post;
std::size_t first_sign = Furl.find("http://");
url = Furl.substr(first_sign + 2);
std::size_t second_sign = url.find("/");
url = Furl.substr(first_sign + 2, second_sign);
path = Furl.substr(first_sign + second_sign + 2);
//若為域名,則需要先解析為IP地址。
if (url.find("www") != url.npos)
{
host = gethostbyname(url.c_str());
char **pptr;
char str[32];
pptr = host->h_addr_list;
inet_ntop(host->h_addrtype, *pptr, str, sizeof(str));
url = str;
cout << url;
}
client Client (*socket_io,url, path, post); io->post(boost::bind(&client::process,&Client,url));
}//end for
}//end if
else
{
close(server_fifo_fd);
server_fifo_fd = open(SERVER_FIFO_NAME, O_RDONLY);
}
}
上面程序當中,紅色的部分是不安全的(在異步的情況下)。在掌握了io_service的基本原理之后,這個問題就變得簡單易懂了。上面這段程序是由io_service1運行的,其將接收來的地址解析之后就拋給了io_service2來繼續處理。倘若io_service2還沒有處理完client當中的process方法時本次for循環就結束了,那么Client就會被析構。這時io_service2當中的任務實際上已經被析構掉了,那么io_service2執行到這段任務的時候就會引發非法內存的訪問!!!
異步程序
發生上述問題的一個前提是
io->post(boost::bind(&client::process,&Client,url));的運行不會發生阻塞。
如果process是普通的同步程序,這個問題就不會發生。但同步的阻塞會影響程序并發的行為,這樣就可能降低系統的吞吐量。
為了程序運行的非阻塞實現,我們采用異步程序設計的方式。同步IO與異步IO的本質區別是:同步IO會block當前的調用線程,而異步IO則允許發起IO請求的調用線程繼續執行,等到IO請求被處理后,會通知調用線程。關于異步IO的介紹詳情可以參考http://www.ibm.com/developerworks/cn/linux/l-async/ 。
我總結:寫異步程序的關鍵就是始終提醒自己,這段代碼在什么時候結束你永遠不知道!
這樣,如果我們將網絡任務編寫成異步程序就會實現多個線程并發的執行各自的網絡任務,而這些并發的線程則由io_service進行統一的調度處理。
shared_ptr內存管理機制
從上面的模型可以看出,client實際上是由io_service1創建,然后連同數據一起拋給io_service2去執行。這樣就產生了一個問題,如果通過new的方式來產生一個client對象,那么這個對象應該在什么時候銷毀呢?答案是在process方法執行結束后銷毀。但是process方法執行什么時候執行結束?只有它自己知道。除非用對象調用自己的析構函數來銷毀掉自己。如果采用上述方式,我們就需要在異步程序結束之前,調用這個類本身的析構函數。但是,假如異步程序中途產生異常而沒有執行結束,那么這段內存空間又由誰來釋放呢?為了解決這個問題,我們用shared_ptr來管理client對象。這樣我們先聲明一個shared_ptr指針,然后更改client的聲明 class client:public boost::enable_shared_from_this<client> 將client內部異步調用的this指針變成shared_from_this。
這樣,異步調用結束的時候,他的指針計數也將變為0,這段內存空間就被自動的析構了。
其它問題
Linux操作系統本身限制:打開文件的數量上限為1024,倘若不對網絡并發進行限制,很可能因為打開文件數量達到上限而被系統拒絕進行socket服務。那么,如何控制并發在一定的范圍之內,從而避免數據包的丟失?
另外一個就是:本程序吞吐量限制的瓶頸是網絡并發程序的速度問題,本地管道傳送的速度相比之下要快得多了,這也是為什么在程序當中,我并沒有把管道通信函數寫成異步程序的原因。倘若網絡并發程序的問題可以得到完美的解決,那么這個程序的代碼結構恐怕還要發生如下兩處改變:
1. 程序暫時采用固定長度的數據包發送,這在一定程度上降低了管道通信的速度。
2. 管道數據的讀取與解析在這里是作為一個同步串行程序來執行的,可以進行如下兩種方案的改進:
a. 將解析程序寫入client類當中,交給io_service2執行。
b. 將管道通信函數寫成異步形式。
總結
短短的300多行代碼當中卻集中了管道、操作系統原理、線程池管理、內存管理、智能指針、異步IO、多線程……等思想。作為初學者來說,這個程序使我學會了很多,但是也暴露了很多問題:
1. 對于異步與多線程的概念陌生,導致在編程的過程當中發生了低級的訪問非法內存錯誤。
2. 在編寫多線程異步程序時,要改變思路,不能陷入同步程序的思維模式當中。
3. 在解決問題之前,盡量弄清楚要解決的問題到底是什么,即需求一定要做好!對于服務器程序運行需求的不充分理解,導致了我始終不清楚什么才是符合要求的程序!
4. 如何更有效的解決問題?
所謂“有效”就是用最短的時間以適當的方案解決問題。
如果沒有足夠的基礎,那么無疑會浪費解決問題的時間。在寫這個程序之前,我的腦海當中對于多線程異步調用的概念理解很模糊,更別說按照這個思想來寫程序了。但是,如果等到了解了所有知識之后再去解決問題,時間成本可能又會很高。那么,如何在兩者之間做一個權衡就是快速解決問題的關鍵所在。
我覺得,首先應該明確問題到底是什么。要盡量考慮一切可能出現的問題,然后再考慮如何簡化問題,先做什么,后做什么。這樣,就不至于由于需求不明確而導致對于問題本身的曲解。例如,假設我的服務器程序僅僅用來提供對幾臺主機的服務,那么這個程序的結構就用不著這么復雜了。所以,在這種情況下,數據通信量的因素就必須納入到問題需求的考慮范圍之內。
總結一句話:在做需求的時候,一定要弄清所有決定問題本質的關鍵性因素后再開始制定解決問題的計劃,因為忽略了這些因素后,問題的本質就發生了變化,就不再是原來的問題了。
對于解決問題可能會用到的知識至少要有一個概念上的清晰認識,然后再開始解決問題。否則,解決的過程就會變成一種盲目的探路,雖然最終問題也會得到解決,但是會因為盲目性而導致解決問題中走彎路,浪費不必要的時間。