<<3D游戲程序設(shè)計(jì)大師技巧>>這本書(shū).
網(wǎng)上討論完成端口資料很多,大寶收集的最多,有些是錯(cuò)誤的,有些說(shuō)的比較模糊.我就進(jìn)可能清晰說(shuō)明一下完成端口在游戲開(kāi)發(fā)中一般模型.并解
決幾個(gè)難點(diǎn)問(wèn)題.
由于完成端口是多線模型(當(dāng)然可以把工作線程設(shè)定為一個(gè))所以設(shè)計(jì)到資源的時(shí)候要線程安全,所以我開(kāi)始簡(jiǎn)單封裝了一下幾個(gè)stl的容器.
/***************************************以下代碼要與后邊的代碼一起編譯*******************************************/
//memorypool.h
//particle_allocator.h
#pragma once
#include <iostream>
#include <list>
#include <queue>
#include <windows.h>
using namespace std;
//關(guān)鍵區(qū)鎖
class CLock
{
????CRITICAL_SECTION _crisection;
public:
????CLock()????
????{
????InitializeCriticalSection( &_crisection );
????}
????~CLock()??
????{
????DeleteCriticalSection( &_crisection );????
????}
????void Lock()
????{
????EnterCriticalSection( &_crisection );??????
????}
????void Unlock()
????{
????LeaveCriticalSection( &_crisection );??????
????}
};
//鏈表模板
template <class T>
class CList : public list<T>
{
public:
????CList(void){};
????virtual ~CList(void){};
????CLock _guard;
????DWORD Size()
????{
????DWORD dwSize = 0;
????_guard.Lock();
????dwSize = size();
????_guard.Unlock();
????return dwSize;
????};
????void Clear()
????{
????_guard.Lock();
????clear();
????_guard.Unlock();
????}
????//添加數(shù)據(jù)到鏈表末尾,并返回添加后的該數(shù)據(jù)的節(jié)點(diǎn)指針
????iterator Push_Back(T lpData)
????{
????iterator lpNode = NULL;
????_guard.Lock();
????lpNode = insert(end(), lpData);
????_guard.Unlock();
????return lpNode;
????};
????//刪除一個(gè)節(jié)點(diǎn)
????void Erase(iterator lpNode)
????{
????_guard.Lock();
????erase(lpNode);
????_guard.Unlock();
????};
};
template <class T>
class CQueue : private CLock
{
private:
????queue<T> m_Queue;
public:
????CQueue(void) {};
????virtual ~CQueue(void) {};
????DWORD GetSize()
????{
????return (DWORD)m_Queue.size();
????};
????virtual void Push(T lpData)
????{
????Lock();
????m_Queue.push(lpData);
????Unlock();
????};
????virtual T Pop()
????{
????T lpData = NULL;
????Lock();
????if(m_Queue.size())
????{
????????lpData = m_Queue.front();
????????m_Queue.pop();
????}
????Unlock();
????return lpData;
????};
};
template<class ElementClass, int NumElement>
class CMemoryPool
{
????enum
????{
????chunk_size =NumElement
????};
????typedef unsigned char byte;
????list<byte *>??chunks;//內(nèi)存指針
????queue<ElementClass *> free_list;//空閑塊隊(duì)列
????CLock guard;
public:
????CMemoryPool()
????{
????InitMemPool();
????}
????~CMemoryPool()
????{
????DestroyMemPool();
????}
????//初時(shí)化內(nèi)存池
????void InitMemPool()
????{
????guard.Lock();
????used = 0;
????free = 0;
????//byte *memory;
????//memory = new byte[chunk_size*sizeof(ElementClass)];
????//if (!memory)
????//{
????//????cout << "內(nèi)存分配出錯(cuò)....."<< endl;
????//????return ;
????//}
????//chunks.push_front(memory);
????//for(int i =0;i< chunk_size;i++,free++)
????//{
????//????ElementClass *newnode = (ElementClass *)(memory + i*sizeof(ElementClass));
????//????free_list.push(newnode);
????//}
????guard.Unlock();
????}
????//銷(xiāo)毀內(nèi)存池
????void DestroyMemPool()
????{
????while(!free_list.empty())
????{
????????free_list.pop();
????}
????for(std::list<byte *>::iterator all_iter = chunks.begin();all_iter!=chunks.end();++all_iter)
????{
????????delete [](*all_iter);
????}
????chunks.clear();
????used = 0;
????free= 0;
????}
????//從內(nèi)存池里分配一塊內(nèi)存
????ElementClass* MemPoolAlloc()
????{
????guard.Lock();
????byte *memory;
????if(free_list.empty())
????{
????????memory = new byte[chunk_size*sizeof(ElementClass)];
????????if (!memory)
????????{
????????cout << "內(nèi)存分配出錯(cuò)....."<< endl;
????????return NULL;
????????}
????????chunks.push_front(memory);
????????for(int i =0;i< chunk_size;i++,free++)
????????{
????????ElementClass *newnode = (ElementClass *)(memory + i*sizeof(ElementClass));
????????free_list.push(newnode);
????????}
????}
????ElementClass??*redata = NULL;
????redata = free_list.front();
????ZeroMemory(redata,sizeof(ElementClass));
????free_list.pop();
????++used;
????--free;
????guard.Unlock();
????return redata;
????}
????//還原內(nèi)存池
????void MemPoolFree(ElementClass??*p)
????{
????guard.Lock();
????ZeroMemory(p,sizeof(ElementClass));
????free_list.push(p);
????-- used;
????++ free;
????guard.Unlock();
????}
????//內(nèi)存池信息統(tǒng)計(jì).
????void GetMemPoolInfo()
????{
????cout << "used is "
????????<< used
????????<< " ;??free is "
????????<< free << endl;
????}
private:
????UINT used;
????UINT free;
};
/********************************************************************************************************************/
代碼都非常簡(jiǎn)單.相信有點(diǎn)c++基礎(chǔ)的朋友都能看懂.我只是簡(jiǎn)單說(shuō)一下內(nèi)存池代碼.內(nèi)存池里ElementClass類(lèi)型一定要結(jié)構(gòu)形的.也就是那種POD
類(lèi)型還是什么類(lèi)型我剛剛聽(tīng)說(shuō)這個(gè)名詞,所以記不住.內(nèi)存池第一次分派的時(shí)候,分派NumElement>個(gè)ElementClass類(lèi)型節(jié)點(diǎn),這些節(jié)點(diǎn)構(gòu)成一個(gè)隊(duì)
列free_list,這些都是通過(guò)模版參數(shù)傳入的.每次用的時(shí)候就就從頭找出一個(gè)節(jié)點(diǎn),釋放的時(shí)候又在尾部加入一個(gè)節(jié)點(diǎn),很好理解.用完的時(shí)候再
分配N(xiāo)umElement>個(gè)ElementClass類(lèi)型節(jié)點(diǎn),節(jié)點(diǎn)到最后統(tǒng)一釋放.
接下來(lái)我們看看程序用到的頭文件
//iocpsever.cpp
#include <winsock2.h>
#include "memorypool.h"
#include <Mswsock.h>
#include <process.h>
using namespace std;
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")
using namespace std;
1.頭文件有一個(gè)郁悶的問(wèn)題就是winsock2.h與windows互相包含問(wèn)題.不知道有什么完美解決辦法,我再項(xiàng)目中用afxsock.h替代winsock2.h.
需要用到庫(kù)兩個(gè)庫(kù)文件我們都通過(guò)編譯器指令連接進(jìn)來(lái).
2.完成端口模型用一句話來(lái)講就是,把所有的socket都綁定到完成端口上,通過(guò)完成端口統(tǒng)一來(lái)處理.綁定以后這些socket上有任何的情況,都通
過(guò)完成端口反映出來(lái),達(dá)到簡(jiǎn)化處理邏輯的作用.完成端口確實(shí)不會(huì)讓你失望,邏輯結(jié)構(gòu)非常簡(jiǎn)單.
3.我們首先為socket進(jìn)行分類(lèi).服務(wù)器的socket可以分為三類(lèi),一類(lèi)就是用于監(jiān)聽(tīng)的socket,監(jiān)聽(tīng)socket綁定時(shí)機(jī)是它成功綁定到某一個(gè)端口上的
時(shí)候我們就可以把它綁定到完成端口上.所以我把它放進(jìn)初始化間斷.第二類(lèi)客戶(hù)連接進(jìn)入的socket,這個(gè)只能放進(jìn)當(dāng)連接進(jìn)入完成端口,accept
事件觸發(fā)的時(shí)候我們把它綁定到完成端口上.第三類(lèi)就是服務(wù)器連接出去的socket,我把它放進(jìn)線程里,在線程內(nèi)部進(jìn)行綁定,因?yàn)檫@類(lèi)socket相
對(duì)比較少.
4.接下來(lái)我們考慮連接問(wèn)題.由于完成端口是一種異步模型,它工作機(jī)制和我們傳統(tǒng)的同步socket連接模型不一樣,我們先投遞許多連接消息,當(dāng)
連接成功的時(shí)候,對(duì)應(yīng)的完成端口上發(fā)生accept事件.這里的accept是我們自己定義的,并且在AcceptEx的時(shí)候通過(guò)重疊結(jié)構(gòu)傳入的,你也可以任
意定義,但是我想你不會(huì)把a(bǔ)ccept定義成send,而send定義成 receive吧.由于我們?cè)贏cceptEx的時(shí)候傳入單io數(shù)據(jù)結(jié)構(gòu),所以一定要關(guān)閉完成端
口的時(shí)候把它釋放掉.關(guān)于資源的釋放我們放在下邊來(lái)考慮.話題不要扯遠(yuǎn),回來(lái)連接上來(lái),我們到底該怎么樣投遞呢.如果我們一下投遞的過(guò)多,
就會(huì)造成資源浪費(fèi),過(guò)少如果用完該怎么辦.這樣客戶(hù)就不能那個(gè)連接了.通用的做法就是每次投遞10個(gè),如果用完繼續(xù)投機(jī)10個(gè).怎么樣才能知道
用完呢?需要在監(jiān)聽(tīng)socket上注冊(cè)一個(gè)FD_ACCEPT事件,當(dāng)投遞用完的時(shí)候事件觸發(fā).這就就可以繼續(xù)投機(jī),我也把它放進(jìn)線程里邊去做這件事.
????g_hAcceptExOverEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
????if(!g_hAcceptExOverEvent)
????{
????return false;
????}
????//幫定事件,由于我們用AcceptEx是一個(gè)異步的投遞,這樣幫定之后,如果投遞的AcceptEx事件全部完成
????//則g_hAcceptExOverEvent事件得到通知,進(jìn)而同步AcceptEx調(diào)用
????if(WSAEventSelect(Listen, g_hAcceptExOverEvent, FD_ACCEPT) == SOCKET_ERROR)
????{
????return false;
????}
????//由于開(kāi)始是復(fù)位,變成置位
????SetEvent(g_hAcceptExOverEvent);
5.最后討論資源釋放問(wèn)題,分析如下代碼
????closesocket(Listen);
????g_session._guard.Lock();
????for(CList<LPPER_HANDLE_DATA>::iterator lpNode = g_session.begin(); lpNode != g_session.end();lpNode++)
????{
????closesocket((*lpNode)->Socket);
????printf(" close socket??= %d\n",(*lpNode)->Socket);
????}
????g_session._guard.Unlock();
????Sleep(1000);
????//向IOCP發(fā)送結(jié)束線程信號(hào)
????for(DWORD i = 0; i < threadcnt; i++)
????PostQueuedCompletionStatus(CompletionPort, 0, NULL, NULL);
????//等待工作線程結(jié)束,等待時(shí)間10秒
????if(WaitForMultipleObjects(threadcnt, m_arrayIOCPThreadHandle, TRUE, 10000) != WAIT_OBJECT_0)
????{
????//如果10秒內(nèi)沒(méi)有結(jié)束所有線程,就強(qiáng)制結(jié)束
????for(DWORD i = 0; i < threadcnt; i++)
????????TerminateThread(m_arrayIOCPThreadHandle[i], 0);
????}
????//關(guān)閉所有工作線程句柄
????for(DWORD i = 0; i < threadcnt; i++)
????CloseHandle(m_arrayIOCPThreadHandle[i]);
????//釋放線程句柄數(shù)組
????delete [] m_arrayIOCPThreadHandle;
????m_arrayIOCPThreadHandle = NULL;
????CloseHandle(CompletionPort);
????g_per_io_data.DestroyMemPool();
????g_per_handle_data.DestroyMemPool();
????g_session.Clear();
我們也分三類(lèi)來(lái)考慮
首先第一類(lèi)監(jiān)聽(tīng)socket,監(jiān)聽(tīng)socket關(guān)閉之后,由于連接事件與監(jiān)聽(tīng)socket有關(guān)系.所以所有未完成的連接都會(huì)從完成端口返回來(lái).這些連接里我
們分配了單io數(shù)據(jù)結(jié)構(gòu)一定要釋放.
釋放邏輯如下
bRet = GetQueuedCompletionStatus(CompletionPort,
????????&BytesTransferred,
????????(PULONG_PTR)
????????&lpCompletionKey,
????????(LPOVERLAPPED*)
????????&lpPerIoData,
????????INFINITE);
????if(!bRet)//沒(méi)有取到任何東西
????{
????????if(!lpPerIoData)
????????continue;
????}
????//收到線程結(jié)束信號(hào)
????if(!lpCompletionKey)
????????return 0;
????LPPER_HANDLE_DATA lpSession = (LPPER_HANDLE_DATA)lpCompletionKey;
????if(!bRet)
????{
????????if(BytesTransferred == 0 )//斷開(kāi)連接
????????{
????????if (lpPerIoData->OperationType == OP_READ)
????????{
????????????printf("client socket %d disconnect\n",lpPerIoData->Socket);
????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????continue;
????????}
????????}
????}
????switch (lpPerIoData->OperationType)
????{
????case
OP_ACCEPT://有連接進(jìn)來(lái)????????{
????????if (!bRet)
????????{
????????????closesocket(lpPerIoData->Socket);?? //關(guān)閉socket
????????????g_per_io_data.MemPoolFree(lpPerIoData);????//歸還結(jié)構(gòu)體到內(nèi)存池
????????????printf("關(guān)閉已經(jīng)投遞空閑連接\n");
????????????break;
????????}
.........................后邊代碼省略掉當(dāng)關(guān)閉監(jiān)聽(tīng)socket的時(shí)候GetQueuedCompletionStatus會(huì)返回false并且BytesTransferred等于零,
并且lpPerIoData,不為空l(shuí)pPerIoData->OperationType操作符號(hào)就是我們AcceptEx傳入的自定義操作類(lèi)型accept,這樣就釋放掉了與連接相關(guān)的
資源.所以我們?cè)陉P(guān)掉監(jiān)聽(tīng)socket之后要sleep一下讓資源能充分釋放.
第二類(lèi)就是客戶(hù)socket.
分兩種情況考慮.(1)服務(wù)器主動(dòng)關(guān)閉,這又可以分為兩種情況,第一,當(dāng)你在工作線程里關(guān)掉socket的時(shí)候GetQueuedCompletionStatus是收不到
任何消息的,所以直接釋放資源就是了.這樣產(chǎn)生的一個(gè)問(wèn)題就是如果你把拆包邏輯放進(jìn)完成端口工作線程的時(shí)候,當(dāng)數(shù)據(jù)包發(fā)生錯(cuò)誤的時(shí)候,你
想關(guān)閉socket,一定要記住連釋放掉與這個(gè)socket相關(guān)的資源.
if(WSARecv(lpPerIoData->Socket,
????????????&(lpPerIoData->DataBuf),
????????????1,
????????????&lpPerIoData->RecvBytes,
????????????&lpPerIoData->Flags,
????????????&(lpPerIoData->Overlapped),
????????????NULL)== SOCKET_ERROR)
????????????{
????????????if(WSAGetLastError() != ERROR_IO_PENDING)//讀操作失敗
????????????{
????????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????????
????????????}
????????????}
第二,就是當(dāng)在其他線程里關(guān)閉的時(shí)候,GetQueuedCompletionStatus就會(huì)收到消息.由于我們每次收到自定義recv事件的時(shí)候,接著就投遞了一個(gè)
recv事件.所以這個(gè)事件失敗消息肯定會(huì)從完成端口返回.處理如下
bRet = GetQueuedCompletionStatus(CompletionPort,
????????&BytesTransferred,
????????(PULONG_PTR)
????????&lpCompletionKey,
????????(LPOVERLAPPED*)
????????&lpPerIoData,
????????INFINITE);
????if(!bRet)//沒(méi)有取到任何東西
????{
????????if(!lpPerIoData)
????????continue;
????}
????//收到線程結(jié)束信號(hào)
????if(!lpCompletionKey)
????????return 0;
????LPPER_HANDLE_DATA lpSession = (LPPER_HANDLE_DATA)lpCompletionKey;
????if(!bRet)
????{
????????if(BytesTransferred == 0 )//斷開(kāi)連接
????????{
????????if (lpPerIoData->OperationType == OP_READ)
????????{
????????????printf("client socket %d disconnect\n",lpPerIoData->Socket);
????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????continue;
????????}
????????}
????}
返回代碼為false并且BytesTransferred為零且lpPerIoData->OperationType == OP_READ.
(2)用戶(hù)斷開(kāi)的處理邏輯與服務(wù)器斷開(kāi)第二種情況是一樣的.有些資料講服務(wù)器主動(dòng)斷開(kāi)的時(shí)候GetQueuedCompletionStatus會(huì)返回true是錯(cuò)誤的
.
6.ERROR_IO_PENDING消息處理這個(gè)消息說(shuō)明現(xiàn)在這個(gè)重疊操作還沒(méi)有完成,我們必須進(jìn)行等待,所以必須在錯(cuò)誤判斷的時(shí)候忽略掉這個(gè)消息.
????????????if(WSAGetLastError() != ERROR_IO_PENDING)//讀操作失敗
????????????{
????????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????????
????????????}
7.GetQueuedCompletionStatus返回之后的邏輯判斷按下邊的代碼進(jìn)行,我們必須知道為什么這樣做,理由就是上邊六條再加上msdn里
GetQueuedCompletionStatus這個(gè)函數(shù)的說(shuō)明.
DWORD WINAPI ServerWorkerThread(LPVOID lpParam)
{
????DWORD BytesTransferred;
????LPPER_IO_DATA lpPerIoData = NULL;
????LPVOID lpCompletionKey = NULL;
????BOOL bRet = FALSE;
????while (1)
????{
????bRet = GetQueuedCompletionStatus(CompletionPort,
????????&BytesTransferred,
????????(PULONG_PTR)
????????&lpCompletionKey,
????????(LPOVERLAPPED*)
????????&lpPerIoData,
????????INFINITE);
????if(!bRet)//沒(méi)有取到任何東西
????{
????????if(!lpPerIoData)
????????continue;
????}
????//收到線程結(jié)束信號(hào)
????if(!lpCompletionKey)
????????return 0;
????LPPER_HANDLE_DATA lpSession = (LPPER_HANDLE_DATA)lpCompletionKey;
????if(!bRet)
????{
????????if(BytesTransferred == 0 )//斷開(kāi)連接
????????{
????????if (lpPerIoData->OperationType == OP_READ)
????????{
????????????printf("client socket %d disconnect\n",lpPerIoData->Socket);
????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????continue;
????????}
????????}
????}
????switch (lpPerIoData->OperationType)
????{
????case
OP_ACCEPT://有連接進(jìn)來(lái)????????{
????????if (!bRet)
????????{
????????????closesocket(lpPerIoData->Socket);?? //關(guān)閉socket
????????????g_per_io_data.MemPoolFree(lpPerIoData);????//歸還結(jié)構(gòu)體到內(nèi)存池
????????????printf("關(guān)閉已經(jīng)投遞空閑連接\n");
????????????break;
????????}
????????LPPER_HANDLE_DATA lpCurSession;
????????lpCurSession=g_per_handle_data.MemPoolAlloc();
????????printf(" connect socket??= %d\n",lpPerIoData->Socket);
????????lpCurSession->Socket = lpPerIoData->Socket;//就是我們AcceptEx時(shí)傳入的socket
????????lpCurSession->node = g_session.Push_Back(lpCurSession);
????????lpPerIoData->lpSession = lpCurSession;
????????if(!CreateIoCompletionPort(
????????????(HANDLE)lpPerIoData->Socket,
????????????CompletionPort,
????????????(ULONG_PTR)lpCurSession,
????????????0))
????????{
????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????g_per_handle_data.MemPoolFree(lpSession);
????????}
????????else
????????{
????????????//g_per_io_data與每次讀寫(xiě)相關(guān)聯(lián)
????????????lpPerIoData->OperationType = OP_READ;
????????????ZeroMemory(&(lpPerIoData->Overlapped), sizeof(OVERLAPPED));
????????????lpPerIoData->Flags = 0;
????????????lpPerIoData->DataBuf.len = 1024;
????????????lpPerIoData->DataBuf.buf = lpPerIoData->buffer;
????????????lpPerIoData->RecvBytes =0;
????????????lpPerIoData->lpSession = lpSession;
????????????ZeroMemory(lpPerIoData->buffer,1024);
????????????if(WSARecv(lpPerIoData->Socket,
????????????&(lpPerIoData->DataBuf),
????????????1,
????????????&lpPerIoData->RecvBytes,
????????????&lpPerIoData->Flags,
????????????&(lpPerIoData->Overlapped),
????????????NULL)== SOCKET_ERROR)
????????????{
????????????if(WSAGetLastError() != ERROR_IO_PENDING)//讀操作失敗
????????????{
????????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????????
????????????}
????????????}
????????}
????????}
????????break;
????case OP_READ:
????????{
????????//cout << lpPerIoData->DataBuf.buf << endl;
????????send(lpPerIoData->Socket,"9876543210",lstrlen("9876543210")+1,0);
????????lpPerIoData->OperationType = OP_READ;
????????ZeroMemory(&(lpPerIoData->Overlapped), sizeof(OVERLAPPED));
????????lpPerIoData->Flags = 0;
????????lpPerIoData->DataBuf.len = 1024;
????????lpPerIoData->DataBuf.buf = lpPerIoData->buffer;
????????lpPerIoData->RecvBytes =0;
????????lpPerIoData->lpSession = lpSession;
????????ZeroMemory(lpPerIoData->buffer,1024);
????????if(WSARecv(lpPerIoData->Socket,
????????????&(lpPerIoData->DataBuf),
????????????1,
????????????&lpPerIoData->RecvBytes,
????????????&lpPerIoData->Flags,
????????????&(lpPerIoData->Overlapped),
????????????NULL)== SOCKET_ERROR)
????????{
????????????if(WSAGetLastError() != ERROR_IO_PENDING)
????????????{
????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????}
????????}
????????}
????????break;
????case OP_WRITE:
????????break;
????default:
????????break;
????}
????}
}
下邊就是整個(gè)控制臺(tái)代碼.
/********************************以下代碼要與前邊的容器頭文件一起進(jìn)行編譯***********************************************/
//iocpsever.cpp
#include <winsock2.h>
#include "memorypool.h"
#include <Mswsock.h>
#include <process.h>
using namespace std;
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")
using namespace std;
// 單句柄數(shù)據(jù),每個(gè)連接(客戶(hù)端)對(duì)應(yīng)一個(gè)這樣的結(jié)構(gòu)。
//只有連接斷開(kāi),或者服務(wù)器關(guān)閉的時(shí)候才釋放
struct tagPER_HANDLE_DATA;
typedef unsigned (WINAPI *PBEGINTHREADX_THREADFUN)(LPVOID lpThreadParameter); //線程函數(shù)原型
typedef struct tagPER_HANDLE_DATA *LPPER_HANDLE_DATA;
typedef CList<LPPER_HANDLE_DATA>::iterator LISTSESSIONNODE;
typedef struct tagPER_HANDLE_DATA
{
????SOCKET Socket;
????LISTSESSIONNODE node;
????// 將和這個(gè)句柄關(guān)聯(lián)的其他有用信息,盡管放在這里面吧
}PER_HANDLE_DATA;
// 單I/O操作數(shù)據(jù)。每次收發(fā)數(shù)據(jù)的時(shí)候
//收發(fā)數(shù)據(jù)操作完成數(shù)之后釋放。
//需要注意的是OVERLAPPED Overlapped一定要放在最前
typedef struct tagPER_IO_DATA
{
????OVERLAPPED Overlapped;
????WSABUF DataBuf;
????char buffer[1024];
????DWORD RecvBytes;
????DWORD Flags;
????int OperationType;
????SOCKET Socket;
????LPPER_HANDLE_DATA lpSession;
}PER_IO_DATA,*LPPER_IO_DATA;
//操作標(biāo)志
#define OP_READ 0
#define OP_WRITE 1
#define OP_ACCEPT 2
//連接數(shù)據(jù)內(nèi)存池
CMemoryPool<PER_IO_DATA,1024> g_per_io_data;
//數(shù)據(jù)收發(fā)內(nèi)存池
CMemoryPool<PER_HANDLE_DATA,1024> g_per_handle_data;
//完成線程
//保存當(dāng)前連接數(shù)據(jù)指針
CList<LPPER_HANDLE_DATA> g_session;
SOCKET Listen;
HANDLE CompletionPort;
HANDLE *m_arrayIOCPThreadHandle;
DWORD threadcnt;
DWORD WINAPI ServerWorkerThread(LPVOID lpParam);
BOOL b;
HANDLE g_hAcceptExOverEvent;
HANDLE g_hAcceptExThread;
DWORD WINAPI WinSockAcceptEXThread(LPVOID lpParam);
bool StartUpWinSockAcceptEXThread();
void CloseWinSockAcceptEXThread();
void InitCompletionPort();
void CloseCompletionPort();
int main(void)
{
????InitCompletionPort();
????StartUpWinSockAcceptEXThread();
????{
????HANDLE hWaitEvent;
????hWaitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
????WaitForSingleObject(hWaitEvent, 30000/*INFINITE*/);
????CloseHandle(hWaitEvent);
????}
????CloseCompletionPort();
????return 0;
}
DWORD WINAPI ServerWorkerThread(LPVOID lpParam)
{
????DWORD BytesTransferred;
????LPPER_IO_DATA lpPerIoData = NULL;
????LPVOID lpCompletionKey = NULL;
????BOOL bRet = FALSE;
????while (1)
????{
????bRet = GetQueuedCompletionStatus(CompletionPort,
????????&BytesTransferred,
????????(PULONG_PTR)
????????&lpCompletionKey,
????????(LPOVERLAPPED*)
????????&lpPerIoData,
????????INFINITE);
????if(!bRet)//沒(méi)有取到任何東西
????{
????????if(!lpPerIoData)
????????continue;
????}
????//收到線程結(jié)束信號(hào)
????if(!lpCompletionKey)
????????return 0;
????LPPER_HANDLE_DATA lpSession = (LPPER_HANDLE_DATA)lpCompletionKey;
????if(!bRet)
????{
????????if(BytesTransferred == 0 )//斷開(kāi)連接
????????{
????????if (lpPerIoData->OperationType == OP_READ)
????????{
????????????printf("client socket %d disconnect\n",lpPerIoData->Socket);
????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????continue;
????????}
????????}
????}
????switch (lpPerIoData->OperationType)
????{
????case
OP_ACCEPT://有連接進(jìn)來(lái)????????{
????????if (!bRet)
????????{
????????????closesocket(lpPerIoData->Socket);?? //關(guān)閉socket
????????????g_per_io_data.MemPoolFree(lpPerIoData);????//歸還結(jié)構(gòu)體到內(nèi)存池
????????????printf("關(guān)閉已經(jīng)投遞空閑連接\n");
????????????break;
????????}
????????LPPER_HANDLE_DATA lpCurSession;
????????lpCurSession=g_per_handle_data.MemPoolAlloc();
????????printf(" connect socket??= %d\n",lpPerIoData->Socket);
????????lpCurSession->Socket = lpPerIoData->Socket;//就是我們AcceptEx時(shí)傳入的socket
????????lpCurSession->node = g_session.Push_Back(lpCurSession);
????????lpPerIoData->lpSession = lpCurSession;
????????if(!CreateIoCompletionPort(
????????????(HANDLE)lpPerIoData->Socket,
????????????CompletionPort,
????????????(ULONG_PTR)lpCurSession,
????????????0))
????????{
????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????g_per_handle_data.MemPoolFree(lpSession);
????????}
????????else
????????{
????????????//g_per_io_data與每次讀寫(xiě)相關(guān)聯(lián)
????????????lpPerIoData->OperationType = OP_READ;
????????????ZeroMemory(&(lpPerIoData->Overlapped), sizeof(OVERLAPPED));
????????????lpPerIoData->Flags = 0;
????????????lpPerIoData->DataBuf.len = 1024;
????????????lpPerIoData->DataBuf.buf = lpPerIoData->buffer;
????????????lpPerIoData->RecvBytes =0;
????????????lpPerIoData->lpSession = lpSession;
????????????ZeroMemory(lpPerIoData->buffer,1024);
????????????if(WSARecv(lpPerIoData->Socket,
????????????&(lpPerIoData->DataBuf),
????????????1,
????????????&lpPerIoData->RecvBytes,
????????????&lpPerIoData->Flags,
????????????&(lpPerIoData->Overlapped),
????????????NULL)== SOCKET_ERROR)
????????????{
????????????if(WSAGetLastError() != ERROR_IO_PENDING)//讀操作失敗
????????????{
????????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????????
????????????}
????????????}
????????}
????????}
????????break;
????case OP_READ:
????????{
????????//cout << lpPerIoData->DataBuf.buf << endl;
????????send(lpPerIoData->Socket,"9876543210",lstrlen("9876543210")+1,0);
????????lpPerIoData->OperationType = OP_READ;
????????ZeroMemory(&(lpPerIoData->Overlapped), sizeof(OVERLAPPED));
????????lpPerIoData->Flags = 0;
????????lpPerIoData->DataBuf.len = 1024;
????????lpPerIoData->DataBuf.buf = lpPerIoData->buffer;
????????lpPerIoData->RecvBytes =0;
????????lpPerIoData->lpSession = lpSession;
????????ZeroMemory(lpPerIoData->buffer,1024);
????????if(WSARecv(lpPerIoData->Socket,
????????????&(lpPerIoData->DataBuf),
????????????1,
????????????&lpPerIoData->RecvBytes,
????????????&lpPerIoData->Flags,
????????????&(lpPerIoData->Overlapped),
????????????NULL)== SOCKET_ERROR)
????????{
????????????if(WSAGetLastError() != ERROR_IO_PENDING)
????????????{
????????????closesocket(lpPerIoData->Socket); //關(guān)閉socket
????????????g_session.Erase(lpSession->node); //從Session鏈表中刪除
????????????g_per_handle_data.MemPoolFree(lpSession); //將Session歸還到池
????????????g_per_io_data.MemPoolFree(lpPerIoData);
????????????}
????????}
????????}
????????break;
????case OP_WRITE:
????????break;
????default:
????????break;
????}
????}
}
DWORD WINAPI WinSockAcceptEXThread(LPVOID lpParam)
{
????LINGER lingerStruct = { 0x01, 0x00 };
????BOOL bNodelay = TRUE;
????//創(chuàng)建事件
????while (b)
????{
????//每次投遞10次,進(jìn)入等待狀態(tài),當(dāng)AcceptEx全部完成之后,繼續(xù)投遞
????if(WaitForSingleObject(g_hAcceptExOverEvent, INFINITE) == WAIT_FAILED)
????????continue;
????for(int i =0;i<10 && b;i++)
????{
????????int zero =0;
????????PER_IO_DATA * pper_io_data = NULL;
????????DWORD dwAddrLen = sizeof(sockaddr_in)+16;
????????pper_io_data = g_per_io_data.MemPoolAlloc();
????????pper_io_data ->Socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
????????if(pper_io_data->Socket == INVALID_SOCKET)
????????{
????????g_per_io_data.MemPoolFree(pper_io_data);
????????continue;
????????}
????????pper_io_data->OperationType = OP_ACCEPT;
????????pper_io_data->lpSession=NULL;
????????ZeroMemory(&pper_io_data->Overlapped,sizeof(OVERLAPPED));//一定要注意清零
????????pper_io_data->RecvBytes =0;
????????pper_io_data->Flags =0;
????????ZeroMemory(pper_io_data->buffer,1024);
????????setsockopt(pper_io_data->Socket, IPPROTO_TCP, TCP_NODELAY, (char*)&bNodelay, sizeof(BOOL));
????????setsockopt(pper_io_data->Socket, SOL_SOCKET, SO_LINGER, (char*)&lingerStruct, sizeof(LINGER));
????????if(!AcceptEx(Listen,pper_io_data ->Socket,pper_io_data ->buffer,0,dwAddrLen,dwAddrLen,&pper_io_data -
>RecvBytes,&pper_io_data->Overlapped))
????????{
????????if(WSAGetLastError() != ERROR_IO_PENDING)//對(duì)于AcceptEx,WSARecv,WSASend一定要有這樣的判斷,因?yàn)槭钱惒降乃?br />
不會(huì)立即完成
????????{
????????????closesocket(pper_io_data->Socket);
????????????g_per_io_data.MemPoolFree(pper_io_data); //歸還結(jié)構(gòu)體到內(nèi)存池
????????????continue;
????????}
????????}
????}
????}
????return 0;
}
bool StartUpWinSockAcceptEXThread()
{
????g_hAcceptExOverEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
????if(!g_hAcceptExOverEvent)
????{
????return false;
????}
????//幫定事件,由于我們用AcceptEx是一個(gè)異步的投遞,這樣幫定之后,如果投遞的AcceptEx事件全部完成
????//則g_hAcceptExOverEvent事件得到通知,進(jìn)而同步AcceptEx調(diào)用
????if(WSAEventSelect(Listen, g_hAcceptExOverEvent, FD_ACCEPT) == SOCKET_ERROR)
????{
????return false;
????}
????//由于開(kāi)始是復(fù)位,變成置位
????SetEvent(g_hAcceptExOverEvent);
????b = TRUE;
????g_hAcceptExThread = (HANDLE)_beginthreadex(NULL, 0, (PBEGINTHREADX_THREADFUN)WinSockAcceptEXThread, NULL, 0, NULL);
????if(!g_hAcceptExThread)
????return false;
????return true;
}
void CloseWinSockAcceptEXThread()
{
????b=false;
????SetEvent(g_hAcceptExOverEvent);
????if(WaitForSingleObject(g_hAcceptExThread,10000)!= WAIT_OBJECT_0)
????TerminateThread(g_hAcceptExThread, 0);
????CloseHandle(g_hAcceptExThread);
????CloseHandle(g_hAcceptExOverEvent);
};
void CloseCompletionPort()
{
????
????closesocket(Listen);
????g_session._guard.Lock();
????for(CList<LPPER_HANDLE_DATA>::iterator lpNode = g_session.begin(); lpNode != g_session.end();lpNode++)
????{
????closesocket((*lpNode)->Socket);
????printf(" close socket??= %d\n",(*lpNode)->Socket);
????}
????g_session._guard.Unlock();
????Sleep(1000);
????//向IOCP發(fā)送結(jié)束線程信號(hào)
????for(DWORD i = 0; i < threadcnt; i++)
????PostQueuedCompletionStatus(CompletionPort, 0, NULL, NULL);
????//等待工作線程結(jié)束,等待時(shí)間10秒
????if(WaitForMultipleObjects(threadcnt, m_arrayIOCPThreadHandle, TRUE, 10000) != WAIT_OBJECT_0)
????{
????//如果10秒內(nèi)沒(méi)有結(jié)束所有線程,就強(qiáng)制結(jié)束
????for(DWORD i = 0; i < threadcnt; i++)
????????TerminateThread(m_arrayIOCPThreadHandle[i], 0);
????}
????//關(guān)閉所有工作線程句柄
????for(DWORD i = 0; i < threadcnt; i++)
????CloseHandle(m_arrayIOCPThreadHandle[i]);
????//釋放線程句柄數(shù)組
????delete [] m_arrayIOCPThreadHandle;
????m_arrayIOCPThreadHandle = NULL;
????CloseHandle(CompletionPort);
????g_per_io_data.DestroyMemPool();
????g_per_handle_data.DestroyMemPool();
????g_session.Clear();
}
void InitCompletionPort()
{
????WSADATA wsd;
????SYSTEM_INFO SystemInfo;
????SOCKADDR_IN InternetAddr;
????WSAStartup(MAKEWORD(2, 2), &wsd);
????//創(chuàng)建完成端口
????CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
????NULL,
????0,
????threadcnt);
????//得到處理器數(shù)量
????GetSystemInfo(&SystemInfo);
????//經(jīng)驗(yàn)公式:一般按公式創(chuàng)建工作線程
????threadcnt = 2*SystemInfo.dwNumberOfProcessors+2;
????m_arrayIOCPThreadHandle = new HANDLE[threadcnt];
????for(DWORD i = 0; i < threadcnt; i++)
????{
????m_arrayIOCPThreadHandle[i] = (HANDLE)_beginthreadex(NULL, 0, (PBEGINTHREADX_THREADFUN)ServerWorkerThread, NULL, 0,
NULL);
????if(!m_arrayIOCPThreadHandle[i])
????????return ;
????}
????//創(chuàng)建監(jiān)聽(tīng)socket
????Listen = WSASocket(AF_INET,
????SOCK_STREAM,
????0,
????NULL,
????0,
????WSA_FLAG_OVERLAPPED);
????InternetAddr.sin_family = PF_INET;
????InternetAddr.sin_port = htons(20000);
????InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
????//幫定到指定端口
????bind(Listen, (SOCKADDR*)&InternetAddr, sizeof(InternetAddr));
????//開(kāi)始監(jiān)聽(tīng)
????listen(Listen, SOMAXCONN);
????//完成端口幫定到監(jiān)聽(tīng)socket
????printf(" listen socket??= %d\n",Listen);
????if (CreateIoCompletionPort((HANDLE) Listen, CompletionPort, (ULONG_PTR)&Listen, threadcnt) == NULL)
????{
????printf("CreateIoCompletionPort failed with error %d\n", GetLastError());
????return ;
????}
}
FROM:http://www.vchelp.net/cndevforum/subject_view.asp?subject_id=176818&forum_id=55
作者:
lustskyboy溝通無(wú)限
posted on 2007-01-31 14:10
我風(fēng) 閱讀(1125)
評(píng)論(2) 編輯 收藏 引用