早在兩年前我就已經能很熟練的運用完成端口這種技術了,只是一直沒有機會將它用在什么項目中,這段時間見到這種技術被過分炒作,過分的神秘化,就想寫一篇解釋它如何工作的文章.想告訴大家它沒有傳說中的那么高深難懂!有什么錯誤的地方還請高人指正.轉載請注明出處及作者,謝謝!

以一個文件傳輸服務端為例,在我的機器上它只起兩個線程就可以為很多個個客戶端同時提供文件下載服務,程序的性能會隨機器內CPU個數的增加而線性增長,我盡可能做到使它清晰易懂,雖然程序很小卻用到了NT 5的一些新特性,重疊IO,完成端口以及線程池,基于這種模型的服務端程序應該是NT系統上性能最好的了.

首先.做為完成端口的基礎,我們應該理解重疊IO,這需要你已經理解了內核對象及操作系統的一些概念概念,什么是信號/非信號態,什么是等待函數,什么是成功等待的副作用,什么是線程掛起等,如果這些概令還沒有理解,你應該先看一下Windows 核心編程中的相關內容.如果已經理解這些,那么重疊IO對你來說并不難.

你可以這樣認為重疊IO,現在你已經進入一個服務器/客戶機環境,請不要混淆概念,這里的服務器是指操作系統,而客戶機是指你的程序(它進行IO操作),是當你進行IO操作(send,recv,writefile,readfile....)時你發送一個IO請求給服務器(操作系統),由服務器來完成你需要的操作,然后你什么事都沒有了,當服務器完成IO請求時它會通知你,當然在這期間你可以做任何事,一個常用的技巧是在發送重疊IO請求后,程序在一個循環中一邊調用PeekMessage,TranslateMessage和DispatchMessage更新界面,同時調用GetOverlappedResult等待服務器完成IO操作,更高效一點的做法是使用IO完成例程來處理服務器(操作系統)返回的結果,但并不是每個支持重疊IO操作的函數都支持完成例程如TransmitFile函數.

例1.一次重疊寫操作過程(GetOverlappedResult方法):
1.填寫一個OVERLAPPED結構
2.進行一次寫操作,并指定重疊操作參數(上面的OVERLAPPED結構變量的指針)
3.做其它事(如更新界面)
4.GetOverlappedResult取操作結果
5.如果IO請求沒有完成,并且沒有出錯則回到期3
6.處理IO操作結果

例2.一次重疊寫操作過程(完成例程方法):
1.填寫一個OVERLAPPED結構
2.進行一次寫操作,并指定重疊操作參數(上面的OVERLAPPED結構變量的指針),并指定完成例程
3.做其它事(如更新界面)
4.當完成例程被調用說明IO操作已經完成或出錯,現在可以對操作結果進行處理了


如果你已經理解上面的概念,就已經很接近IO完成端口了,當然這只是很常規的重疊操作它已經非常高效,但如果再結合多線程對一個File或是Socket進行重疊IO操作就會非常復雜,通常程序員很難把握這種復雜度.完成端口可以說就是為了充分發揮多線程和重疊IO操作相結合的性能而設計的.很多人都說它復雜,其實如果你自己實現一個多線程的對一個File或是Socket進行重疊IO操作的程序(注意是多個線程對一個HANDLE或SOCKET進行重疊IO操作,而不是啟一個線程對一個HANDLE進行重疊IO操作)就會發現完成端口實際上簡化了多線程里使用重疊IO的復雜度,并且性能更高,性能高在哪?下面進行說明.

我們可能寫過這樣的服務端程序:

例3.主程序:
1.監聽一個端口
2.等待連接
3.當有連接來時
4.啟一個線程對這個客戶端進行處理
5.回到2

服務線程:
1.讀客戶端請求
2.如果客戶端不再有請求,執行6
3.處理請求
4.返回操作結果
5.回到1
6.退出線程

這是一種最簡單的網絡服務器模型,我們把它優化一下

例4.主程序:
1.開一個線程池,里面有機器能承受的最大線程數個線程,線程都處于掛起(suspend)狀態
1.監聽一個端口
2.等待連接
3.當有連接來時
4.從線程池里Resume一個線程對這個客戶端進行處理
5.回到2

服務線程與例3模型里的相同,只是當線程處理完客戶端所有請求后,不是退出而是回到線程池,再次掛起讓出CPU時間,并等待為下一個客戶機服務.當然在此期間線程會因為IO操作(服務線程的第1,5操作,也許還有其它阻塞操作)掛起自己,但不會回到線程池,也就是說它一次只能為一個客戶端服務.

這可能是你能想到的最高效的服務端模型了吧!它與第一個服務端模型相比少了很多個用戶態到內核態的CONTEXT Switch,反映也更加快速,也許你可能覺得這很微不足道,這說明你缺少對大規模高性能服務器程序(比如網游服務端)的認識,如果你的服務端程序要對幾千萬個客戶端進行服務呢?這也是微軟Windows NT開發組在NT 5以上的系統中添加線程池的原因.

思考一下什么樣的模型可以讓一個線程為多個客戶端服務呢!那就要跳出每來一個連接啟線程為其服務的固定思維模式,我們把線程服務的最小單元分割為單獨的讀或寫操作(注意是讀或寫不是讀和寫),而不是一個客戶端從連接到斷開期間的所有讀寫操作.每個線程都使用重疊IO進行讀寫操作,投遞了讀寫請求后線程回到線程池,等待為其它客戶機服務,當操作完成或出錯時再回來處理操作結果,然后再回到線程池.

看看這樣的服務器模型:
例5.主程序:
1.開一個線程池,里面有機器內CPU個數兩倍的線程,線程都處于掛起(suspend)狀態,它們在都等處理一次重疊IO操作的完成結果
1.監聽一個端口
2.等待連接
3.當有連接來時
4.投遞一個重疊讀操作讀取命令
5.回到2

服務線程:
1.如果讀完成,則處理讀取的內容(如HTTP GET命令),否則執行3
2.投遞一個重疊寫操作(如返回HTTP GET命令需要的網頁)
3.如果是一個寫操作完成,可以再投遞一個重疊讀操作,讀取客戶機的下一個請求,或者是關閉連接(如HTTP協議里每發完一個網頁就斷開)
4.取得下一個重疊IO操作結果,如果IO操作沒有完成或沒有IO操作則回到線程池

假設這是一個WEB服務器程序,可以看到工作者線程是以讀或寫為最小的工作單元運行的,在主程序里面進行了一次重疊讀操作

當讀操作完成時一個線程池中的一個工作者線程被激活取得了操作結果,處理GET或POST命令,然后發送一個網頁內容,發送也是一個重疊操作,然后處理對其它客戶機的IO操作結果,如果沒有其它的東西需要處理時回到線程池等待.可以看到使用這種模型發送和接收可以是也可以不是一個線程.

當發送操作完成時,線程池中的一個工作者線程池激活,它關閉連接(HTTP協議),然后處理其它的IO操作結果,如果沒有其它的東西需要處理時回到線程池等待.

看看在這樣的模型中一個線程怎么為多個客戶端服務,同樣是模擬一個WEB服務器例子:

假如現在系統中有兩個線程,ThreadA,ThreadB它們在都等處理一次重疊IO操作的完成結果

當一個客戶機ClientA連接來時主程序投遞一個重疊讀操作,然后等待下一個客戶機連接,當讀操作完成時ThreadA被激活,它收到一個HTTP GET命令,然后ThreadA使用重疊寫操作發送一個網頁給ClientA,然后立即回到線程池等待處理下一個IO操作結果,這時發送操作還沒有完成,又有一個客戶機ClientB連接來,主程序再投遞一個重疊讀操作,當讀操作完成時ThreadA(當然也可能是ThreadB)再次被激活,它重復同樣步驟,收到一個GET命令,使用重疊寫操作發送一個網頁給ClientB,這次它沒有來得及回到線程池時,又有一個連接ClientC連入,主程序再投遞一個重疊讀操作,讀操作完成時ThreadB被激活(因為ThreadA還沒有回到線程池)它收到一個HTTP GET命令,然后ThreadB使用重疊寫操作發送一個網頁給ClientC,然后ThreadB回到線程池,這時ThreadA也回到了線程池.

可以想象現在有三個掛起的發送操作分別是ThreadA發送給ClientA和ClientB的網頁,以及ThreadB發送給ClientC的網頁,它們由操作系統內核來處理.ThreadA和ThreadB現在已經回到線程池,可以繼續為其它任何客戶端服務.

當對ClientA的重疊寫操作已經完成,ThreadA(也可以是ThreadB)又被激活它關閉與ClientA連接,但還沒有回到線程池,與此同時發送給ClientB的重疊寫操作也完成,ThreadB被激活(因為ThreadA還沒有回到線程池)它關閉與ClientB的連接,然后回到線程池,這時ClientC的寫操作也完成,ThreadB再次被激活(因為ThreadA還是沒有回到線程池),它再關閉與ClientC的連接,這時ThreadA回到線程池,ThreadB也回到線程池.這時對三個客戶端的服務全部完成.可以看到在整個服務過程中,"建立連接","讀數據","寫數據"和"關閉連接"等操作是邏輯上連續而實際上分開的.

到現在為止兩個線程處理了三次讀操作和三次寫操作,在這些讀寫操作過程中所出現的狀態機(state machine)是比較復雜的,我們模擬的是經過我簡化過的,實際上的狀態要比這個還要復雜很多,然而這樣的服務端模型在客戶端請求越多時與前兩個模型相比的性能越高.而使用完成端口我們可以很容易實現這樣的服務器模型.

微軟的IIS WEB服務器就是使用這樣的服務端模型,很多人說什么阿帕奇服務器比IIS的性能好什么什么的我表示懷疑,除非阿帕奇服務器可以將線程分割成,為更小的單元服務,我覺得不太可能!這種完成端口模型已經將單個讀或寫操作作為最小的服務單元,我覺得在相同機器配置的情況下IIS的性能要遠遠高于其它WEB服務器,這也是從實現機理上來分析的,如果出現性能上的差別可能是在不同的操作系統上,也許Linux的內核比Windows的要好,有人真的研究過嗎?還是大家一起在炒作啊.

對于狀態機概念,在很多方面都用到,TCPIP中有,編譯原理中有,OpengGL中有等等,我的離散數學不好(我是會計專業不學這個),不過還是搞懂了些,我想如果你多花些時間看,還是可以搞懂的.最后是一個簡單的文件傳輸服務器程序代碼,只用了兩個線程(我的機器里只有一塊CPU)就可以服務多個客戶端.我調試時用它同時為6個nc客戶端提供文件下載服務都沒有問題,當然更多也不會有問題,只是略為使用了一下NT 5的線程池和完成端口技術就可以有這樣高的性能,更不用說IIS的性能咯!

希望大家不要陷在這個程序的框架中,Ctrl+C,Ctrl+V沒有什么意義,要理解它的實質.程序使用Visual C++ 6.0 SP5+2003 Platform SDK編譯通過,在Windows XP Professional下調試運行通過.程序運行的最低要求是Windows 2000操作系統.

/********************************************************************
  created:   2005/12/24
  created:   24:12:2005   20:25
  modified:   2005/12/24
  filename:   d:\vcwork\iocomp\iocomp.cpp
  file path:   d:\vcwork\iocomp
  file base:   iocomp
  file ext:   cpp
  author:     kruglinski(kruglinski_at_gmail_dot_com)
 
  purpose:   利用完成端口技術實現的高性能文件下載服務程序
*********************************************************************/

#define _WIN32_WINNT   0x0500

#include <cstdlib>
#include <clocale>
#include <ctime>
#include <iostream>//一使用輸入輸出流程序頓時增大70K
#include <vector>
#include <algorithm>
#include <winsock2.h>
#include <mswsock.h>

using namespace std;

#pragma comment(lib,"ws2_32.lib")
#pragma comment(lib,"mswsock.lib")

const int MAX_BUFFER_SIZE=1024;
const int PRE_SEND_SIZE=1024;
const int QUIT_TIME_OUT=3000;
const int PRE_DOT_TIMER=QUIT_TIME_OUT/80;

typedef enum{IoTransFile,IoSend,IoRecv,IoQuit} IO_TYPE;

typedef struct
{
  SOCKET hSocket;
  SOCKADDR_IN ClientAddr;
}PRE_SOCKET_DATA,*PPRE_SOCKET_DATA;

typedef struct
{
  OVERLAPPED   oa;
  WSABUF     DataBuf;
  char     Buffer[MAX_BUFFER_SIZE];
  IO_TYPE     IoType;
}PRE_IO_DATA,*PPRE_IO_DATA;

typedef vector<PPRE_SOCKET_DATA>   SocketDataVector;
typedef vector<PPRE_IO_DATA>     IoDataVector;

SocketDataVector   gSockDataVec;
IoDataVector     gIoDataVec;

CRITICAL_SECTION   csProtection;

char* TimeNow(void)
{
  time_t t=time(NULL);
  tm *localtm=localtime(&t);
  static char timemsg[512]={0};
 
  strftime(timemsg,512,"%Z: %B %d %X,%Y",localtm);
  return timemsg;
}

BOOL TransFile(PPRE_IO_DATA pIoData,PPRE_SOCKET_DATA pSocketData,DWORD dwNameLen)
{
  //這一句是為nc做的,你可以修改它
  pIoData->Buffer[dwNameLen-1]='\0';
 
  HANDLE hFile=CreateFile(pIoData->Buffer,GENERIC_READ,0,NULL,OPEN_EXISTING,0,NULL);
  BOOL bRet=FALSE;

  if(hFile!=INVALID_HANDLE_VALUE)
  {
    cout<<"Transmit File "<<pIoData->Buffer<<" to client"<<endl;
    pIoData->IoType=IoTransFile;
    memset(&pIoData->oa,0,sizeof(OVERLAPPED));
    *reinterpret_cast<HANDLE*>(pIoData->Buffer)=hFile;
    TransmitFile(pSocketData->hSocket,hFile,GetFileSize(hFile,NULL),PRE_SEND_SIZE,reinterpret_cast<LPOVERLAPPED>(pIoData),NULL,TF_USE_SYSTEM_THREAD);
    bRet=WSAGetLastError()==WSA_IO_PENDING;
  }
  else
    cout<<"Transmit File "<<"Error:"<<GetLastError()<<endl;

  return bRet;
}

DWORD WINAPI ThreadProc(LPVOID IocpHandle)
{
  DWORD dwRecv=0;
  DWORD dwFlags=0;
 
  HANDLE hIocp=reinterpret_cast<HANDLE>(IocpHandle);
  DWORD dwTransCount=0;
  PPRE_IO_DATA pPreIoData=NULL;
  PPRE_SOCKET_DATA pPreHandleData=NULL;

  while(TRUE)
  {
    if(GetQueuedCompletionStatus(hIocp,&dwTransCount,
        reinterpret_cast<LPDWORD>(&pPreHandleData),
        reinterpret_cast<LPOVERLAPPED*>(&pPreIoData),INFINITE))
    {
        if(0==dwTransCount&&IoQuit!=pPreIoData->IoType)
        {
          cout<<"Client:"
            <<inet_ntoa(pPreHandleData->ClientAddr.sin_addr)
            <<":"<<ntohs(pPreHandleData->ClientAddr.sin_port)
            <<" is closed"<<endl;

          closesocket(pPreHandleData->hSocket);

          EnterCriticalSection(&csProtection);
            IoDataVector::iterator itrIoDelete=find(gIoDataVec.begin(),gIoDataVec.end(),pPreIoData);
            gIoDataVec.erase(itrIoDelete);
            SocketDataVector::iterator itrSockDelete=find(gSockDataVec.begin(),gSockDataVec.end(),pPreHandleData);
            gSockDataVec.erase(itrSockDelete);
          LeaveCriticalSection(&csProtection);

          delete *itrIoDelete;
          delete *itrSockDelete;
         
          continue;
        }
       
        switch(pPreIoData->IoType){
        case IoTransFile:
          cout<<"Client:"
            <<inet_ntoa(pPreHandleData->ClientAddr.sin_addr)
            <<":"<<ntohs(pPreHandleData->ClientAddr.sin_port)
            <<" Transmit finished"<<endl;
          CloseHandle(*reinterpret_cast<HANDLE*>(pPreIoData->Buffer));
          goto LRERECV;
         
        case IoSend:
          cout<<"Client:"
            <<inet_ntoa(pPreHandleData->ClientAddr.sin_addr)
            <<":"<<ntohs(pPreHandleData->ClientAddr.sin_port)
            <<" Send finished"<<endl;

LRERECV:
          pPreIoData->IoType=IoRecv;
          pPreIoData->DataBuf.len=MAX_BUFFER_SIZE;
          memset(&pPreIoData->oa,0,sizeof(OVERLAPPED));

          WSARecv(pPreHandleData->hSocket,&pPreIoData->DataBuf,1,
            &dwRecv,&dwFlags,
            reinterpret_cast<LPWSAOVERLAPPED>(pPreIoData),NULL);

          break;

        case IoRecv:
          cout<<"Client:"
            <<inet_ntoa(pPreHandleData->ClientAddr.sin_addr)
            <<":"<<ntohs(pPreHandleData->ClientAddr.sin_port)
            <<" recv finished"<<endl;
          pPreIoData->IoType=IoSend;
         
          if(!TransFile(pPreIoData,pPreHandleData,dwTransCount))
          {
            memset(&pPreIoData->oa,0,sizeof(OVERLAPPED));
            strcpy(pPreIoData->DataBuf.buf,"File transmit error!\r\n");
            pPreIoData->DataBuf.len=strlen(pPreIoData->DataBuf.buf);
           
            WSASend(pPreHandleData->hSocket,&pPreIoData->DataBuf,1,
                &dwRecv,dwFlags,
                reinterpret_cast<LPWSAOVERLAPPED>(pPreIoData),NULL);
          }
          break;
         
        case IoQuit:
          goto LQUIT;
         
        default:
          ;
        }
    }  
  }
 
LQUIT:
  return 0;
}

HANDLE hIocp=NULL;
SOCKET hListen=NULL;

BOOL WINAPI ShutdownHandler(DWORD dwCtrlType)
{
  PRE_SOCKET_DATA PreSockData={0};
  PRE_IO_DATA PreIoData={0};

  PreIoData.IoType=IoQuit;

  if(hIocp)
  {
    PostQueuedCompletionStatus(hIocp,1,
        reinterpret_cast<ULONG_PTR>(&PreSockData),
        reinterpret_cast<LPOVERLAPPED>(&PreIoData));

    cout<<"Shutdown at "<<TimeNow()<<endl<<"wait for a moment please"<<endl;
   
    //讓出CPU時間,讓線程退出
    for(int t=0;t<80;t+=1)
    {
        Sleep(PRE_DOT_TIMER);
        cout<<".";
    }
   
    CloseHandle(hIocp);
  }
 
  int i=0;

  for(;i<gSockDataVec.size();i++)
  {
    PPRE_SOCKET_DATA pSockData=gSockDataVec[i];
    closesocket(pSockData->hSocket);
    delete pSockData;
  }

  for(i=0;i<gIoDataVec.size();i++)
  {
    PPRE_IO_DATA pIoData=gIoDataVec[i];
    delete pIoData;
  }

  DeleteCriticalSection(&csProtection);
  if(hListen)
    closesocket(hListen);

  WSACleanup();
  exit(0);
  return TRUE;
}

LONG WINAPI MyExceptionFilter(struct _EXCEPTION_POINTERS *ExceptionInfo)
{
  ShutdownHandler(0);
  return EXCEPTION_EXECUTE_HANDLER;
}

u_short DefPort=8182;

int main(int argc,char **argv)
{
  if(argc==2)
    DefPort=atoi(argv[1]);

  InitializeCriticalSection(&csProtection);
  SetUnhandledExceptionFilter(MyExceptionFilter);
  SetConsoleCtrlHandler(ShutdownHandler,TRUE);

  hIocp=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);

  WSADATA data={0};
  WSAStartup(0x0202,&data);

  hListen=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
  if(INVALID_SOCKET==hListen)
  {
    ShutdownHandler(0);
  }
 
  SOCKADDR_IN addr={0};
  addr.sin_family=AF_INET;
  addr.sin_port=htons(DefPort);
 
  if(bind(hListen,reinterpret_cast<PSOCKADDR>(&addr),
    sizeof(addr))==SOCKET_ERROR)
  {
    ShutdownHandler(0);
  }
 
  if(listen(hListen,256)==SOCKET_ERROR)
    ShutdownHandler(0);

  SYSTEM_INFO si={0};
  GetSystemInfo(&si);
  si.dwNumberOfProcessors<<=1;

  for(int i=0;i<si.dwNumberOfProcessors;i++)
  {
   
    QueueUserWorkItem(ThreadProc,hIocp,WT_EXECUTELONGFUNCTION);
  }
 
  cout<<"Startup at "<<TimeNow()<<endl
    <<"work on port "<<DefPort<<endl
    <<"press CTRL+C to shutdown"<<endl<<endl<<endl;

  while(TRUE)
  {
    int namelen=sizeof(addr);
    memset(&addr,0,sizeof(addr));
    SOCKET hAccept=accept(hListen,reinterpret_cast<PSOCKADDR>(&addr),&namelen);

    if(hAccept!=INVALID_SOCKET)
    {
        cout<<"accept a client:"<<inet_ntoa(addr.sin_addr)<<":"<<ntohs(addr.sin_port)<<endl;

        PPRE_SOCKET_DATA pPreHandleData=new PRE_SOCKET_DATA;
        pPreHandleData->hSocket=hAccept;
        memcpy(&pPreHandleData->ClientAddr,&addr,sizeof(addr));
       
        CreateIoCompletionPort(reinterpret_cast<HANDLE>(hAccept),
          hIocp,reinterpret_cast<DWORD>(pPreHandleData),0);
       
        PPRE_IO_DATA pPreIoData=new(nothrow) PRE_IO_DATA;

        if(pPreIoData)
        {
          EnterCriticalSection(&csProtection);
            gSockDataVec.push_back(pPreHandleData);
            gIoDataVec.push_back(pPreIoData);
          LeaveCriticalSection(&csProtection);

          memset(pPreIoData,0,sizeof(PRE_IO_DATA));
          pPreIoData->IoType=IoRecv;
          pPreIoData->DataBuf.len=MAX_BUFFER_SIZE;
          pPreIoData->DataBuf.buf=pPreIoData->Buffer;
          DWORD dwRecv=0;
          DWORD dwFlags=0;
          WSARecv(hAccept,&pPreIoData->DataBuf,1,
            &dwRecv,&dwFlags,
            reinterpret_cast<WSAOVERLAPPED*>(pPreIoData),NULL);
        }
        else
        {
          delete pPreHandleData;
          closesocket(hAccept);
        }
    }
  }
 
  return 0;
}

參考資料:
《MSDN 2001》
《Windows 網絡編程》
《Windows 核心編程》
《TCP/IP詳解》