我的IOCP網絡模塊設計
為了設計一個穩定易用高效的iocp網絡模塊,我前前后后花了好幾個月的時間,也曾閱讀過網上很多資料和代碼,但是非常遺憾,能找到的資料一般都說得很含糊,很少有具體的,能找到的代碼離真正能商用的網絡模塊差得太遠,大多只是演示一下最基本的功能,而且大多是有很多問題的,主要問題如下:
1、 很多代碼沒有處理一次僅發送成功部分數據的情況。
2、 幾乎沒有找到能正確管理所有資源的代碼。
3、 大多沒有采用用pool,有的甚至畫蛇添足用什么map查找對應客戶端,沒有充分使用perhandle, perio。
4、 接收發送數據大多拷貝太多次數。
5、 接收管理大多很低效,沒有充分發揮iocp能力。
6、 幾乎都沒有涉及上層如何處理邏輯,也沒有提供相應解決方案(如合并io線程處理或單獨邏輯線程)。
7、 大多沒有分離流數據和包數據。
…
問題還有很多,就不一一列出來了,有一定設計經驗的人應該有同感。要真正解決這些問題也不是那么容易的,特別是在win下用iocp的時候資源釋放是個麻煩的問題,我在資源管理上花了很多時間,起初也犯了很多錯誤,后來在減少同步對象上又花了不少時間(起初client用了兩個同步對象,后來減少為1個)。下面我就我所設計的網絡模塊的各個部分進行簡單的講解
一、內存管理。
內存管理是采用池模式,設計了一個基礎池類,可以管理某固定大小的池
class CBufferPool
{
…
void *newobj();
void delobj(void *pbuf);
…
};
在基礎池類上提供了一個模板的對象池
template <class T>
class CObjPool : public CBufferPool
{
public:
T *newobj()
{
void *p = CBufferPool::newobj();
T *pt = new(p) T;
return pt;
}
void delobj(T* pt)
{
pt->~T();
CBufferPool::delobj(pt);
}
};
在基礎池的基礎上定義了一個簡單的通用池
class CMemoryPool
{
private:
CBufferPool bp[N];
…
};
通用池是由N個不同大小的基礎池組成的,分配的時候圓整到合適的相近基礎池并由基礎池分配。
最后還提供了一個內存分配適配器類,從該類派生的類都支持內存池分配。
class t_alloc
{
public:
static void *operator new(size_t size)
{
return CMemoryPool::instance().newobj(size);
}
static void operator delete(void *p, size_t size)
{
CMemoryPool::instance().delobj(p, size);
}
};
根據測試CMempool分配速度比CObjpool<>稍微慢一點點,所以我在用的時候就直接用t_alloc類派生,而不是用對象池,這是個風格問題,也許有很多人喜歡用更高效一點的objpool方式,但這個并不大礙。
在網絡模塊中OVERLAPPED派生類就要用池進行分配,還有CIocpClient也要用池分配,再就是CBlockBuffer也是從池分配的。
如下定義:
struct IOCP_ACCEPTDATA : public IOCP_RECVDATA, public t_alloc
class CIocpClient : public t_alloc
二、數據緩沖區。
數據緩沖區CBlockBuffer為環形,大小不固定,隨便分配多少,主要有以下幾個元素:
Char *pbase; //環形首部
Char *pread; //當前讀指針
Char *pwrite; //當前寫指針
Int nCapacity; //緩沖區大小
Long nRef; //關聯計數器
用這種形式管理緩沖區有很多好處,發送數據的時候如果只發送了部分數據只要修改pread指針即可,不用移動數據,接收數據并處理的時候如果只處理了部分數據也只要修改pread指針即可,有新數據到達后直接寫到pwrite并修改pwrite指針,不用多次拷貝數據。nRef關聯計數還可處理一個包發給N個人的問題,如果要給N個人發送相同的包,只要分配一個緩沖區,并設置nRef為N就可以不用復制N份。
三、收發緩沖區管理
發送緩沖區
我把CIocpClient的發送數據設計為一個CBlockBuffer 的隊列,如果隊列內有多個則WSASend的時候一次發送多個,如果只有一個則僅發送一個,CIocpClient發送函數提供了兩個,分別是:
Bool SendData(char *pdata, int len);
Bool SendData(CBlockBuffer *pbuffer);
第一個函數會檢測發送鏈的最后一個數據塊能否容納發送數據,如果能復制到最后一個塊,如果不能則分配一個CBlockBuffer掛到發送鏈最后面,當然這個里面要處理同步。
接收緩沖區
接收管理是比較簡單的,只有一個CBlockBuffer,WSARecv的時候直接指向CBlockBuffer->pwrite,所以如果塊大小合適的話基本上是不用拼包的,如果一次沒有收到一個完整的數據包,并且塊還有足夠空間容納剩余空間,那么再提交一個WSARecv讓起始緩沖指向CBlockBuffer->pwrite如此則收到一個完整數據包的過程都不用重新拼包,收到一個完整數據包之后可以調用虛函數讓上層進行處理。
在IocpClient層其實是不支持數據包的,在這個層次只有流的概念,這個后面會專門講解。
四、IocpServer的接入部分管理
我把IocpServer設計為可以支持打開多個監聽端口,對每個監聽端口接入用戶后調用IocpServer的虛函數分配客戶端:
virtual CIocpClient *CreateNewClient(int nServerPort)
分配客戶端之后會調用IocpClient的函數 virtual void OnInitialize();分配內部接收和發送緩沖區,這樣就可以根據來自不同監聽端口的客戶端分配不同的緩沖區和其他資源。
Accept其實是個可以有很多選擇的,最簡單的做法可以用一個線程+accept,當然這個不是高效的,也可以采用多個線程的領導者-追隨者模式+accept實現,還可以是一個線程+WSAAccept,或者多個線程的領導者-追隨者模式+WSAAccept模式,也可以采用AcceptEx模式,我是采用AcceptEx模式做的,做法是有接入后投遞一個AcceptEx,接入后重復利用此OVERLAPPED再投遞,這樣即使管理大量連接也只有起初的幾十個連接會分配 OVERLAPPED后面的都是重復利用前面分配的結構,不會導致再度分配。
IocpServer還提供了一個虛函數
virtual bool CanAccept(const char *pip, int port){return true;}
來管理是否接入某個ip:port 的連接,如果不接入直接會關閉該連接并重復利用此前分配的WSASocket。
五、資源管理
Iocp網絡模塊最難的就是這個了,什么時候客戶端關閉或服務器主動關閉某個連接并收回資源,這是最難處理的問題,我嘗試了幾種做法,最后是采用計數器管理模式,具體做法是這樣的:
CIocpClient有2個計數變量
volatile long m_nSending; //是否正發送中
volatile long m_nRef; //發送接收關聯字
m_nSending表示是否有數據已WSASend中沒有返回
m_nRef表示WSASend和WSARecv有效調用未返回和
在合適的位置調用
inline void AddRef(const char *psource);
inline void Release(const char *psource);
增引用計數和釋放引用計數
if(InterlockedDecrement(&m_nRef)<=0)
{
//glog.print("iocpclient %p Release %s ref %d\r\n", this, psource, m_nRef);
m_server->DelClient(this);
}
當引用計數減少到0的時候刪除客戶端(其實是將內存返回給內存池)。
六、鎖使用
鎖的使用至關重要,多了效率低下,少了不能解決問題,用多少個鎖在什么粒度上用鎖也是這個模塊的關鍵所在。
IocpClient有一個鎖 DECLARE_SIGNEDLOCK_NAME(send); //發送同步鎖
這個鎖是用來控制發送數據鏈管理的,該鎖和前面提到的volatile long m_nSending;共同配合管理發送數據鏈。
可能有人會說recv怎么沒有鎖同步,是的,recv的確沒有鎖,recv不用鎖是為了最大限度提高效率,如果和發送共一個鎖則很多問題可以簡化,但沒有充分發揮iocp的效率。Recv接收數據后就調用OnReceive虛函數進行處理。可以直接io線程內部處理,也可以提交到某個隊列由獨立的邏輯線程處理。具體如何使用完全由使用者決定,底層不做任何限制。
七、服務器定時器管理
服務器定義了如下定時器函數,利用系統提供的時鐘隊列進行管理。
bool AddTimer(int uniqueid, DWORD dueTime, DWORD period, ULONG nflags=WT_EXECUTEINTIMERTHREAD);
bool ChangeTimer(int uniqueid, DWORD dueTime, DWORD period);
bool DelTimer(int uniqueid);
//獲取Timers數量
int GetTimerCount() const;
TimerIterator GetFirstTimerIterator();
TimerNode *GetNextTimer(TimerIterator &it);
bool IsValidTimer(TimerIterator it)
設計思路是給每個定時器分配一個獨立的id,根據id可修改定時器的首次觸發時間和后續每次觸發時間,可根據id刪除定時器,也可遍歷定時器。定時器時間單位為毫秒。
八、模塊類結構
模塊中最重要的就是兩個類CIocpClient和CIocpServer,其他有幾個類從這兩個類派生,圖示如下:
圖表 1
|
圖表 2
|
CIocpClient是完全流式的,沒有包概念。CIocpMsgClient從CIocpClient派生,內部支持包概念:
class CIocpMsgClient : public CIocpClient
{
…
virtual void OnDataTooLong(){};
virtual void OnMsg(PKHEAD *ph){};
bool SendMsg(WORD mtype, WORD stype, const char *pdata, int length);
…
};
template <class TYPE>
class CIocpMsgClientT : public CIocpMsgClient
{
…
void AddMsg(DWORD id, CBFN pfn);
BOOL DelMsg(DWORD id);
…
};
CIocpMsgClientT模板類支持內嵌入式定義,如在
CMyDoc中可這樣定義
CIocpMsgClientT<CMyDoc> client;
后面可以調用client.AddMsg(UMSG_LOGIN, OnLogin);關聯一個類成員函數作為消息處理函數,使用很方便。
CIocpServerT定義很簡單,從CIocpServer派生,重載了CreateNewClient函數
template <class TClient>
class CIocpServerT : public CIocpServer
{
public:
//如果CIocpClient派生了則也需要重載下面的函數,這里可以根據nServerPort分配不同的CIocpClient派生類
virtual CIocpClient *CreateNewClient(int nServerPort)
{
CIocpClient *pclient = new TClient;
…
return pclient;
}
};
八、應用舉例
class CMyClient : public CIocpMsgClient
{
public:
CMyClient() : CIocpMsgClient()
{
}
virtual ~CMyClient()
{
}
virtual void OnConnect()
{
Printf(“用戶連接%s:%d連接到服務器\r\n”, GetPeerAddr().ip(),GetPeerAddr().port());
}
virtual void OnClose()
{
Printf(“用戶%s:%d關閉連接\r\n”, GetPeerAddr().ip(),GetPeerAddr().port());
}
virtual void OnMsg(PKHEAD *phead)
{
SendData((const char *)phead, phead->len+PKHEADLEN);
}
virtual void OnSend(DWORD dwbyte)
{
Printf(“成功發送%d個字符\r\n”, dwbyte);
}
virtual void OnInitialize()
{
m_sendbuf = newbuf(1024);
m_recvbuf = newbuf(4096);
}
friend class CMyServer;
};
class CMyServer : public CIocpServer
{
public:
CMyServer() : CIocpServer
{
}
virtual void OnConnect(CIocpClient *pclient)
{
printf("%p : %d 遠端用戶%s:%d連接到本服務器.\r\n", pclient, pclient->m_socket,
pclient->GetPeerAddr().ip(), pclient->GetPeerAddr().port());
}
virtual void OnClose(CIocpClient *pclient)
{
printf("%p : %d 遠端用戶%s:%d退出.\r\n", pclient, pclient->m_socket,
pclient->GetPeerAddr().ip(), pclient->GetPeerAddr().port());
}
virtual void OnTimer(int uniqueid)
{
If(uniqueid == 10)
{
}
Else if(uniqueid == 60)
{
}
}
//這里可以根據nServerPort分配不同的CIocpClient派生類
virtual CIocpClient *CreateNewClient(int nServerPort)
{
// If(nServerPort == ?)
// …
CIocpClient *pclient = new CMyClient;
…
return pclient;
}
};
Int main(int argc, char *argv[])
{
CMyServer server;
server.AddTimer(60, 10000, 60000);
server.AddTimer(10, 10000, 60000);
//第二個參數為0表示使用默認cpu*2個io線程,>0表示使用該數目的io線程。
//第三個參數為0表示使用默認cpu*4個邏輯線程,如果為-1表示不使用邏輯線程,邏輯在io線程內計算。>0則表示使用該數目的邏輯線程
server.StartServer("1000;2000;4000", 0, 0);
}
從示例可看出,對使用該網絡模塊的人來說非常簡單,只要派生兩個類,集中精力處理消息函數即可,其他內容內部全部包裝了。
九、后記
我研究iocp大概在2005年初,前一個版本的網絡模塊是用多線程+異步事件來做的,iocp網絡模塊基本成型在2005年中,后來又持續進行了一些改進,2005底進入穩定期,2006年又做了一些大的改動,后來又持續進行了一些小的改進,目前該模塊作為服務程序框架已經在很多項目中穩定運行了1年半左右的時間。在此感謝大寶、Chost Cheng、Sunway等眾多網友,是你們的討論給了我靈感和持續改進的動力,也是你們的討論給了我把這些寫出來的決心。若此文能給后來者們一點點啟示我將甚感欣慰,若有錯誤歡迎批評指正。
oldworm
oldworm@21cn.com
2007.9.24