|
Posted on 2009-01-31 19:47 S.l.e!ep.¢% 閱讀(3285) 評(píng)論(8) 編輯 收藏 引用 所屬分類: C++
1. 不使用線程池 2.? 暫時(shí)只封裝了 win32 的 icop 3.? 暫時(shí)還未寫測(cè)試用例,因?yàn)椴恢涝趺磳?-_-!!! //?networksocket.h:?interface?for?the?CNetworkSocket?class.
//
 /**///////////////////////////////////////////////////////////////////////
#if?!defined(NETWORKSOCKET_H__A63ED8A5_7ED1_463D_A0F0_41F6E9C79441__INCLUDED_)
#define?NETWORKSOCKET_H__A63ED8A5_7ED1_463D_A0F0_41F6E9C79441__INCLUDED_

#if?_MSC_VER?>?1000
#pragma?once
#endif?//?_MSC_VER?>?1000

#include?<vector>

#include?<winsock2.h>
#include?<process.h>
#pragma??comment(lib,?"ws2_32.lib")

#define?BUFFER_SIZE?4096

enum?OPERATION_TYPE
  {
????OPERATION_TYPE_RECV?=?0,
????OPERATION_TYPE_SEND?=?1
};

//IO操作數(shù)據(jù)
typedef?struct?_PER_IO_OPERATION_DATA
  {
????OVERLAPPED?OverLapped;
????WSABUF?DataBuf;
????char?szBuf[BUFFER_SIZE];
????OPERATION_TYPE?OperationType;??????//?操作類型表示

}PER_IO_OPERATION_DATA,*PPER_IO_OPERATION_DATA;

class?CNetworkSocket??
  {
public:
????CNetworkSocket();
????virtual?~CNetworkSocket();

????bool?init();
????bool?uninit();

????bool?initNetWork(unsigned?short?nPort);
????bool?unitNetWork();

????bool?send(SOCKET?s,?const?char*?pBuf,?int?nLen);

private:
????SOCKET?m_listensock;
????HANDLE?m_hIocpPort;
????std::vector<HANDLE>?m_vectorThreadHandle;

????static?unsigned?__stdcall?_WorkerThreadProc(void*?pVoid);????
????static?unsigned?__stdcall?_AcceptThreadProc(void*?pVoid);
};

#endif?//?!defined(NETWORKSOCKET_H__A63ED8A5_7ED1_463D_A0F0_41F6E9C79441__INCLUDED_)
 //?networksocket.cpp:?implementation?of?the?CNetworkSocket?class.
//
 /**///////////////////////////////////////////////////////////////////////
#include?"networksocket.h"

 /**/////////////////////////////////////////////////////////////////////// //?Construction/Destruction
 /**///////////////////////////////////////////////////////////////////////
CNetworkSocket::CNetworkSocket()
  {
????m_listensock?=?INVALID_SOCKET;
????m_hIocpPort??=?INVALID_HANDLE_VALUE;
}

CNetworkSocket::~CNetworkSocket()
  {

}

bool?CNetworkSocket::init()
  {
????WORD?wVersionRequested;
????WSADATA?wsaData;
????int?nErrCode?=?0;
????
????wVersionRequested?=?MAKEWORD(2,?2);
????
????nErrCode?=?::WSAStartup(wVersionRequested,?&wsaData);
????if?(?nErrCode?!=?0?)?
 ???? {
????????return?false;
????}

????if?(?LOBYTE(wsaData.wVersion)?!=?2?||?HIBYTE(wsaData.wVersion)?!=?2?)?
 ???? {
????????bool?b?=?uninit();
????????return?false;
????}
????else
 ???? {
????????return?true;
????}
}

bool?CNetworkSocket::uninit()
  {
????int?nErrCode?=?0;
????nErrCode?=?::WSACleanup();

????return?true;
}

bool?CNetworkSocket::initNetWork(unsigned?short?nPort)
  {
????int?nErrCode?=?0;

????//?創(chuàng)建?socket
????m_listensock?=?::WSASocket(AF_INET,?SOCK_STREAM,?0,?NULL,?0,?WSA_FLAG_OVERLAPPED);
????
????if(?INVALID_SOCKET?==?m_listensock?)
 ???? {
????????nErrCode?=?::WSAGetLastError();
????????return?false;
????}

????//?綁定端口
????sockaddr_in?InternetAddr;
????InternetAddr.sin_family?=?AF_INET;
????InternetAddr.sin_addr.S_un.S_addr?=?::htonl(INADDR_ANY);
????InternetAddr.sin_port?=?htons(nPort);

????if(?::bind(?m_listensock,?(PSOCKADDR)&InternetAddr,?sizeof(InternetAddr))?==?SOCKET_ERROR?)
 ???? {
????????nErrCode?=?::WSAGetLastError();
????????return?false;
????}

????//?開始監(jiān)聽
????if(????::listen(m_listensock,?5)?==?SOCKET_ERROR?)
 ???? {
????????nErrCode?=?::WSAGetLastError();
????????return?false;
????}

????//?創(chuàng)建完成端口
????m_hIocpPort?=?::CreateIoCompletionPort(INVALID_HANDLE_VALUE,?NULL,?0,?0);
????????
????if(?m_hIocpPort?==?INVALID_HANDLE_VALUE?)
 ???? {
????????nErrCode?=?::WSAGetLastError();
????????return?false;
????}

????//?啟動(dòng)工作線程,線程數(shù)為CPU處理器數(shù)量*2+2
????SYSTEM_INFO?sys_Info;
????::GetSystemInfo(&sys_Info);

????for(int?i?=?0;?i?<?(int(sys_Info.dwNumberOfProcessors)?*?2?+?2);?i++)
 ???? {
????????HANDLE?ThreadHandle?=?INVALID_HANDLE_VALUE;
????????DWORD?ThreadID??????=?0;
????????
????????unsigned?int?nThreadId??=???0?;
????????ThreadHandle?=?(HANDLE)_beginthreadex(NULL,?0,?_WorkerThreadProc,?(void?*)this,??0,?&nThreadId);
????????
????????if?(?ThreadHandle?==?0?)
????????????return?false;

????????m_vectorThreadHandle.push_back(ThreadHandle);
????}

????//?啟動(dòng)偵聽線程
????HANDLE?ThreadHandle?=?INVALID_HANDLE_VALUE;
????unsigned?int?nThreadId?=?0?;

????ThreadHandle?=?(HANDLE)_beginthreadex(NULL,?0,?_AcceptThreadProc,?(void?*)this,??0,?&nThreadId);
????
????if?(?ThreadHandle?==?0?)
????????????return?false;

????m_vectorThreadHandle.push_back(ThreadHandle);

????return?true;
}

bool?CNetworkSocket::unitNetWork()
  {
????//?啟動(dòng)工作線程,線程數(shù)為CPU處理器數(shù)量*2+2
????SYSTEM_INFO?sys_Info;
????::GetSystemInfo(&sys_Info);
????
????for(int?i?=?0;?i?<?(int(sys_Info.dwNumberOfProcessors)?*?2?+?2);?i++)
 ???? {
????????//寄出退出消息
????????::PostQueuedCompletionStatus(m_hIocpPort,????-1,????-1,?NULL);
????}

????std::vector<HANDLE>::iterator?iter_t;
????for?(?iter_t?=?m_vectorThreadHandle.begin();?iter_t?!=?m_vectorThreadHandle.end();?iter_t++?)
 ???? {?
????????DWORD?dwRet?=?::WaitForSingleObject(*iter_t,?INFINITE);
????}

????//關(guān)閉網(wǎng)絡(luò)的偵聽
????::shutdown(m_listensock,?0);
????::closesocket(m_listensock);
????
????return?true;
}

unsigned?__stdcall?CNetworkSocket::_WorkerThreadProc(void*?pVoid)
  {
????CNetworkSocket*?pThis?=?(CNetworkSocket*)pVoid;
????DWORD?dwByteTransferred?=?0;
????unsigned?long?nFlag?=?0;
????DWORD?RecvByte?=?0;

????SOCKET?ClientSock?=?INVALID_SOCKET;
?????PPER_IO_OPERATION_DATA?PerIoData;

?????while?(?true?)
 ????? {
????????BOOL?bSuccess?=?::GetQueuedCompletionStatus(pThis->m_hIocpPort,
???????????????????????????????????????????????????????&dwByteTransferred,
?????????????????????????????????????????????????????(LPDWORD)&ClientSock,
?????????????????????????????????????????????????????(LPOVERLAPPED*)&PerIoData,
?????????????????????????????????????????????????????INFINITE);

????????//退出信號(hào)到達(dá),退出線程
????????if(?dwByteTransferred?==?-1?&&?PerIoData?==?NULL?)
????????????return?1;

????????//客戶機(jī)已經(jīng)斷開連接或者連接出現(xiàn)錯(cuò)誤
????????if(?dwByteTransferred?==?0?&&?(PerIoData->OperationType?==?OPERATION_TYPE_RECV?||?PerIoData->OperationType?==?OPERATION_TYPE_SEND?)?)
 ???????? {
????????????::closesocket(ClientSock);
????????????::GlobalFree(PerIoData);
????????????continue;
????????}

????????//?接收完成
????????if?(?PerIoData->OperationType?==?OPERATION_TYPE_RECV?)
 ???????? {
????????????//?處理接收數(shù)據(jù)
????????????//?pThis->OnRecv(ClientSock,?PerIoData->szBuf,?dwByteTransferred);

????????????//將源數(shù)據(jù)置空
????????????::memset(PerIoData->szBuf,?0,?BUFFER_SIZE);
????????????dwByteTransferred=0;

????????????//重置IO操作數(shù)據(jù)
????????????unsigned?long?Flag=0;
????????????::ZeroMemory(&(PerIoData->OverLapped),?sizeof(OVERLAPPED));
????????????
????????????PerIoData->DataBuf.buf????=?PerIoData->szBuf;
????????????PerIoData->DataBuf.len????=?BUFFER_SIZE;
????????????PerIoData->OperationType?=?OPERATION_TYPE_RECV;

????????????//?再投遞另一個(gè)Recv請(qǐng)求
????????????::WSARecv(ClientSock,?&(PerIoData->DataBuf),?1,?&RecvByte,?&Flag,?&(PerIoData->OverLapped),????NULL);
????????}

????????//?發(fā)送完成,置空緩沖區(qū),釋放緩沖區(qū)
????????if(?PerIoData->OperationType?==?OPERATION_TYPE_SEND?)
 ???????? {
????????????::memset(PerIoData,?0,?sizeof(PER_IO_OPERATION_DATA));
????????????::GlobalFree(PerIoData);
????????????dwByteTransferred?=?0;
????????}
?
?????}//?while?(?true?)

????return?0;
}

unsigned?__stdcall?CNetworkSocket::_AcceptThreadProc(void*?pVoid)
  {
????CNetworkSocket*?pThis?=?(CNetworkSocket*)pVoid;

????SOCKET?AcceptSock?=?INVALID_SOCKET;

????while?(?true?)
 ???? {
????????AcceptSock?=?::WSAAccept(pThis->m_listensock,?NULL,?NULL,?NULL,?0);

????????//關(guān)聯(lián)客戶端口到完成端口,句柄數(shù)據(jù)在此時(shí)被綁定到完成端口
????????::CreateIoCompletionPort((HANDLE)AcceptSock,?pThis->m_hIocpPort,?(DWORD)AcceptSock,?0);

????????PPER_IO_OPERATION_DATA?PerIoData?=?(PPER_IO_OPERATION_DATA)::GlobalAlloc(GPTR,?sizeof(PER_IO_OPERATION_DATA));
????????unsigned?long?nFlag?=?0;
????????DWORD?RecvByte?=?0;
????????::ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
????????
????????PerIoData->DataBuf.buf?????=?PerIoData->szBuf;
????????PerIoData->DataBuf.len?????=?BUFFER_SIZE;
????????PerIoData->OperationType??=?OPERATION_TYPE_RECV;

????????//提交首個(gè)接收數(shù)據(jù)請(qǐng)求
????????//這時(shí)
????????//如果客戶端斷開連接
????????//則也可以以接收數(shù)據(jù)時(shí)得到通知????
????????::WSARecv(AcceptSock,?&(PerIoData->DataBuf),?1,?&RecvByte,?&nFlag,?&(PerIoData->OverLapped),?NULL);

????}//?while?(?true?)

????return?0;
}

bool?CNetworkSocket::send(SOCKET?s,?const?char*?pBuf,?int?nLen)
  {
????if(?s?==?INVALID_SOCKET?||?pBuf?==?NULL?||?nLen?==?0?)
????????return?false;
????
????PPER_IO_OPERATION_DATA?PerIoData?=?(PPER_IO_OPERATION_DATA)::GlobalAlloc(GPTR,?sizeof(PER_IO_OPERATION_DATA));
????
????unsigned?long?nFlag?=?0;
????DWORD?dwSendByte?=?0;
????::ZeroMemory(&(PerIoData->OverLapped),?sizeof(OVERLAPPED));
????::memcpy(PerIoData->szBuf,?pBuf,?nLen);

????PerIoData->DataBuf.buf?=?PerIoData->szBuf;
????PerIoData->DataBuf.len?=?nLen;
????PerIoData->OperationType??=?OPERATION_TYPE_SEND;

????int?nRet?=?::WSASend(s,?&(PerIoData->DataBuf),?1,?&dwSendByte,?nFlag,?&(PerIoData->OverLapped),?NULL);

????if(?nRet==SOCKET_ERROR?&&?GetLastError()?!=?WSA_IO_PENDING?)
 ???? {
????????return?false;
????}
????else
 ???? {
????????return?true;
????}
}

Feedback
在send時(shí),重新分配了一個(gè)IO對(duì)象,這個(gè)對(duì)象也許不會(huì)在你想象中那樣被釋放了。
在GET中的釋放的地方,好像沒問題,但如果達(dá)到上千個(gè)連接,而且每個(gè)連接的數(shù)據(jù)非量密的時(shí)候,會(huì)發(fā)現(xiàn)GET時(shí)總是在判斷Recv的情況而沒有判斷Send的情況。
這種情況,send時(shí)分配的對(duì)象就堆在服務(wù)器了。
現(xiàn)在我一直在懷疑WSASend后,GET一直沒有取得完成的事件。
雖然都說肯定會(huì)有完成事件的。
總這,你這個(gè)類可以應(yīng)付小型的網(wǎng)絡(luò)應(yīng)用,但無法應(yīng)付上規(guī)模的應(yīng)用。
這個(gè)類還有一個(gè)問題,
就是當(dāng)連接斷開后,沒有清理資源。
還有沒有主動(dòng)檢測(cè)無效連接的功能。
在WinSock上使用IOCP
本文章假設(shè)你已經(jīng)理解WindowsNT的I/O模型以及I/O完成端口(IOCP),并且比較熟悉將要用到的API,如果你打算學(xué)習(xí)IOCP,請(qǐng)參考Jeffery Richter的Advanced Windows(第三版),第15章I/O設(shè)備,里面有極好的關(guān)于完成端口的討論以及對(duì)即將使用API的說明。
IOCP提供了一個(gè)用于開發(fā)高效率和易擴(kuò)展程序的模型。Winsock2提供了對(duì)IOCP的支持,并在WindowsNT平臺(tái)得到了完整的實(shí)現(xiàn)。然而IOCP是所有WindowsNT I/O模型中最難理解和實(shí)現(xiàn)的,為了幫助你使用IOCP設(shè)計(jì)一個(gè)更好的Socket服務(wù),本文提供了一些訣竅。
Tip 1:使用Winsock2 IOCP函數(shù)例如WSASend和WSARecv,如同Win32文件I/O函數(shù),例如WriteFile和ReadFile。
微軟提供的Socket句柄是一個(gè)可安裝文件系統(tǒng)(IFS)句柄,因此你可以使用Win32的文件I/O函數(shù)調(diào)用這個(gè)句柄,然而,將Socket句柄和文件系統(tǒng)聯(lián)系起來,你不得不陷入很多的Kernal/User模式轉(zhuǎn)換的問題中,例如線程的上下文轉(zhuǎn)換,花費(fèi)的代價(jià)還包括參數(shù)的重新排列導(dǎo)致的性能降低。
因此你應(yīng)該使用只被Winsock2中IOCP允許的函數(shù)來使用IOCP。在ReadFile和WriteFile中會(huì)發(fā)生的額外的參數(shù)重整以及模式轉(zhuǎn)換只會(huì)發(fā)生在一種情況下,那就是如果句柄的提供者并沒有將自己的WSAPROTOCOL_INFO結(jié)構(gòu)中的DwServiceFlags1設(shè)置為XP1_IFS_HANDLES。
注解:即使使用WSASend和WSARecv,這些提供者仍然具有不可避免的額外的模式轉(zhuǎn)換,當(dāng)然ReadFile和WriteFile需要更多的轉(zhuǎn)換。
TIP 2: 確定并發(fā)工作線程數(shù)量和產(chǎn)生的工作線程總量。
并發(fā)工作線程的數(shù)量和工作線程的數(shù)量并不是同一概念。你可以決定IOCP使用最多2個(gè)的并發(fā)線程以及包括10個(gè)工作線程的線程池。工作線程池?fù)碛械木€程多于或者等于并發(fā)線程的數(shù)量時(shí),工作線程處理隊(duì)列中一個(gè)封包的時(shí)候可以調(diào)用win32的Wait函數(shù),這樣可以無延遲的處理隊(duì)列中另外的封包。
如果隊(duì)列中有正在等待被處理的封包,系統(tǒng)將會(huì)喚醒一個(gè)工作線程處理他,最后,第一個(gè)線程確認(rèn)正在休眠并且可以被再次調(diào)用,此時(shí),可調(diào)用線程數(shù)量會(huì)多于IOCP允許的并發(fā)線程數(shù)量(例如,NumberOFConcurrentThreads)。然而,當(dāng)下一個(gè)線程調(diào)用GetQueueCompletionStatus并且進(jìn)入等待狀態(tài),系統(tǒng)不會(huì)喚醒他。一般來說,系統(tǒng)會(huì)試圖保持你設(shè)定的并發(fā)工作線程數(shù)量。
一般來講,每擁有一個(gè)CPU,在IOCP中你可以使用一個(gè)并發(fā)工作線程,要做到這點(diǎn),當(dāng)你第一次初始化IOCP的時(shí)候,可以在調(diào)用CreateIOCompletionPort的時(shí)候?qū)umberOfConcurrentThreads設(shè)置為0。
TIP 3:將一個(gè)提交的I/O操作和完成封包的出列聯(lián)系起來。
當(dāng)對(duì)一個(gè)封包進(jìn)行出列,可以調(diào)用GetQueuedCompletionStatus返回一個(gè)完成Key和一個(gè)復(fù)合的結(jié)構(gòu)體給I/O。你可以分別的使用這兩個(gè)結(jié)構(gòu)體來返回一個(gè)句柄和一個(gè)I/O操作信息,當(dāng)你將IOCP提供的句柄信息注冊(cè)給Socket,那么你可以將注冊(cè)的Socket句柄當(dāng)做一個(gè)完成Key來使用。為每一個(gè)I/O的"extend"操作提供一個(gè)包含你的應(yīng)用程序IO狀態(tài)信息的復(fù)合結(jié)構(gòu)體。當(dāng)然,必須確定你為每個(gè)的I/O提供的是唯一的復(fù)合結(jié)構(gòu)體。當(dāng)I/O完成的時(shí)候,會(huì)返回一個(gè)指向結(jié)構(gòu)體的指針。
TIP 4:I/O完成封包隊(duì)列的行為
IOCP中完成封包隊(duì)列的等待次序并不決定于Winsock2 I/O調(diào)用產(chǎn)生的順序。如果一個(gè)Winsock2的I/O調(diào)用返回了SUCCESS或者IO_PENDING,那么他保證當(dāng)I/O操作完成后,完成封包會(huì)進(jìn)入IOCP的等待隊(duì)列,而不管Socket句柄是否已經(jīng)關(guān)閉。如果你關(guān)閉了socket句柄,那么將來調(diào)用WSASend,WSASendTo,WSARecv和WSARecvFrom會(huì)失敗并返回一個(gè)不同于SUCCES或者IO_PENDING的代碼,這時(shí)將不會(huì)產(chǎn)生一個(gè)完成封包。而在這種情況下,前一次使用GetQueuedCompletionStatus提交的I/O操作所得到的完成封包,會(huì)顯示一個(gè)失敗的信息。
如果你刪除了IOCP本身,那么不會(huì)有任何I/O請(qǐng)求發(fā)送給IOCP,因?yàn)镮OCP的句柄已經(jīng)不可用,盡管系統(tǒng)底層的IOCP核心結(jié)構(gòu)并不會(huì)在所有已提交I/O請(qǐng)求完成之前被移除。
TIP5:IOCP的清除
很重要的一件事是使用復(fù)合I/O時(shí)候的IOCP清除:如果一個(gè)I/O操作尚未完成,那么千萬不要釋放該操作創(chuàng)建的復(fù)合結(jié)構(gòu)體。HasOverlappedIoCompleted函數(shù)可以幫助你檢查一個(gè)I/O操作是否已經(jīng)完成。
關(guān)閉服務(wù)一般有兩種情況,第一種你并不關(guān)心尚未結(jié)束的I/O操作的完成狀態(tài),你只希望盡可能快的關(guān)閉他。第二種,你打算關(guān)閉服務(wù),但是你需要獲知未結(jié)束I/O操作的完成狀態(tài)。
第一種情況你可以調(diào)用PostQueueCompletionStatus(N次,N等于你的工作線程數(shù)量)來提交一個(gè)特殊的完成封包,他通知所有的工作線程立即退出,關(guān)閉所有socket句柄和他們關(guān)聯(lián)的復(fù)合結(jié)構(gòu)體,然后關(guān)閉完成端口(IOCP)。在關(guān)閉復(fù)合結(jié)構(gòu)體之前使用HasOverlappedIOCompleted檢查他的完成狀態(tài)。如果一個(gè)socket關(guān)閉了,所有基于他的未結(jié)束的I/O操作會(huì)很快的完成。
在第二種情況,你可以延遲工作線程的退出來保證所有的完成封包可以被適當(dāng)?shù)某隽小D憧梢允紫汝P(guān)閉所有的socket句柄和IOCP。可是,你需要維護(hù)一個(gè)未完成I/O的數(shù)字,以便你的線程可以知道可以安全退出的時(shí)間。盡管當(dāng)隊(duì)列中有很多完成封包在等待的時(shí)候,活動(dòng)的工作線程不能立即退出,但是在IOCP服務(wù)中使用全局I/O計(jì)數(shù)器并且使用臨界區(qū)保護(hù)他的代價(jià)并不會(huì)象你想象的那樣昂貴。
因?yàn)槲野l(fā)現(xiàn)當(dāng)大理recv后,就沒機(jī)會(huì)處理send的完成事件了。
如果在recv有同步或有復(fù)雜的邏輯,比如recv后需要send幾個(gè),此時(shí)可能產(chǎn)生死鎖,以致永遠(yuǎn)沒機(jī)會(huì)get到send的完成事件。
而如果IOCP本身就能將這兩個(gè)事件用兩個(gè)不同的GET分開,那業(yè)務(wù)邏輯就也好處理一些。
@苦丁茶
我也在寫iocp,對(duì)于你的評(píng)論,我認(rèn)為非常正確,但是到現(xiàn)在為止,還沒有找到一個(gè)好的辦法處理send的完成事件。你找到了嗎
|