本文簡(jiǎn)單介紹了當(dāng)前Windows支持的各種Socket I/O模型,如果你發(fā)現(xiàn)其中存在什么錯(cuò)誤請(qǐng)務(wù)必賜教。
一:select模型
二:WSAAsyncSelect模型
三:WSAEventSelect模型
四:Overlapped I/O 事件通知模型
五:Overlapped I/O 完成例程模型
六:IOCP模型
老陳有一個(gè)在外地工作的女兒,不能經(jīng)常回來(lái),老陳和她通過(guò)信件聯(lián)系。他們的信會(huì)被郵遞員投遞到他們的信箱里。
這和Socket模型非常類(lèi)似。下面我就以老陳接收信件為例講解Socket I/O模型~~~
一:Select模型
老陳非常想看到女兒的信。以至于他每隔10分鐘就下樓檢查信箱,看是否有女兒的信~~~~~
在這種情況下,“下樓檢查信箱”然后回到樓上耽誤了老陳太多的時(shí)間,以至于老陳無(wú)法做其他工作。
select模型和老陳的這種情況非常相似:周而復(fù)始地去檢查......如果有數(shù)據(jù)......接收/發(fā)送.......
使用線(xiàn)程來(lái)select應(yīng)該是通用的做法:
procedure TListenThread.Execute;
var
addr : TSockAddrIn;
fd_read : TFDSet;
timeout : TTimeVal;
ASock,
MainSock : TSocket;
len, i : Integer;
begin
MainSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
addr.sin_family := AF_INET;
addr.sin_port := htons(5678);
addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( MainSock, @addr, sizeof(addr) );
listen( MainSock, 5 );
while (not Terminated) do
begin
FD_ZERO( fd_read );
FD_SET( MainSock, fd_read );
timeout.tv_sec := 0;
timeout.tv_usec := 500;
if select( 0, @fd_read, nil, nil, @timeout ) > 0 then //至少有1個(gè)等待Accept的connection
begin
if FD_ISSET( MainSock, fd_read ) then
begin
for i:=0 to fd_read.fd_count-1 do //注意,fd_count <= 64,也就是說(shuō)select只能同時(shí)管理最多64個(gè)連接
begin
len := sizeof(addr);
ASock := accept( MainSock, addr, len );
if ASock <> INVALID_SOCKET then
....//為ASock創(chuàng)建一個(gè)新的線(xiàn)程,在新的線(xiàn)程中再不停地select
end;
end;
end;
end; //while (not self.Terminated)
shutdown( MainSock, SD_BOTH );
closesocket( MainSock );
end;
select模型
select已經(jīng)是老掉牙的東西了,windows下很少用了,不過(guò)既然叫“全接觸”,還是寫(xiě)出來(lái)吧!!!
首先創(chuàng)建一個(gè)listen線(xiàn)程(thrListen)負(fù)責(zé)監(jiān)聽(tīng)遠(yuǎn)程機(jī)器的連接請(qǐng)求, 和遠(yuǎn)程機(jī)器建立連接后,為此連接專(zhuān)門(mén)創(chuàng)建一個(gè)線(xiàn)程(thrReadWrite)進(jìn)行read/write。 注意,要使用“臨界區(qū)”保證線(xiàn)程對(duì)共享數(shù)據(jù)的安全訪(fǎng)問(wèn)。
代碼很簡(jiǎn)單,不多說(shuō)了~~~~~~~~~~~~~~~~~~~~~~~~
unit thrListen;
interface
uses Windows, Classes, SysUtils, Winsock2, thrReadWrite;
type YConnection = record thrRW : TRWThread; hsock : TSocket; dwIP : DWORD; dwPort : DWORD; end; PConnection = ^YConnection;
type TListenThread = class(TThread) private { Private declarations } FSock : TSocket; //主socket FList : TList; //客戶(hù)連接線(xiàn)程列表 protected procedure Execute; override; end;
implementation
uses frmMain;
{ TListenThread }
procedure TListenThread.Execute; var addr : TSockAddrIn; fd_read : TFDSet; timeout : TTimeVal; AConnect : PConnection; len, i : Integer; begin FList:= TList.Create;
FSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( FSock, @addr, sizeof(SOCKADDR) ); listen( FSock, 5 );//正在等待連接的最大隊(duì)列長(zhǎng)度5
while (not Terminated) do begin FD_ZERO( fd_read ); FD_SET( FSock, fd_read );
timeout.tv_sec := 0; timeout.tv_usec := 500;
if select( 0, @fd_read, nil, nil, @timeout ) > 0 then //至少有1個(gè)等待Accept的connection begin if FD_ISSET( FSock, fd_read ) then begin for i:=0 to fd_read.fd_count-1 do //注意,fd_count <= FD_SETSIZE(64) begin New( AConnect ); len := sizeof(addr); AConnect^.hsock := accept( FSock, addr, len ); if AConnect^.hsock <> INVALID_SOCKET then begin AConnect^.dwIP := ntohl( addr.sin_addr.S_addr ); AConnect^.dwPort := ntohs( addr.sin_port ); AConnect^.thrRW := TRWThread.Create( True ); with AConnect^.thrRW do begin m_sock := AConnect^.hsock; m_ip := AConnect^.dwIP; m_port := AConnect^.dwPort; m_itemid := AConnect; FreeOnTerminate := True; Resume; end;
//修改客戶(hù)連接列表 FList.Add( AConnect ); len := FList.Count; end else begin len := WSAGetLastError(); MessageBox( 0, PChar(IntToStr(len)), 'accept error', MB_ICONERROR ); Dispose( AConnect ); end; end; //for i:=0 to fd_read.fd_count-1 end; //if FD_ISSET( m_sock, fd_read ) end; //if ret > 0
end; //while (not self.Terminated)
shutdown( FSock, SD_BOTH ); closesocket( FSock );
//結(jié)束所有維護(hù)客戶(hù)端連接的線(xiàn)程 if FList.Count > 0 then begin for i:=0 to FList.Count-1 do begin PConnection(FList.Items[i])^.thrRW.Terminate; shutdown( PConnection(FList.Items[i])^.hsock, SD_BOTH ); closesocket( PConnection(FList.Items[i])^.hsock ); Dispose(FList.Items[i]); end; end;
FList.Free; end;
end.
unit thrReadWrite;
interface
uses Windows, Classes, SysUtils, Winsock2;
const PACK_SIZE_RECEIVE = 4096;
type TRWThread = class(TThread) public m_sock : THandle; m_ip : DWORD; m_port : DWORD; m_itemid : Pointer; private FRecvBuf : Array [0..PACK_SIZE_RECEIVE-1] of Char; protected procedure Execute; override; end;
implementation
uses frmMain;
{ TRWThread }
procedure TRWThread.Execute; var sTitle : String; fd_read : TFDSet; timeout : TTimeVal; ret : Integer; begin sTitle := inet_ntoa( TInAddr(htonl(m_ip)) ); sTitle := 'IP: ' + sTitle + ' Port: ' + IntToStr(m_port) + ' Msg: ';
while (not self.Terminated) do begin FD_ZERO( fd_read ); FD_SET( m_sock, fd_read ); timeout.tv_sec := 0; timeout.tv_usec := 500;
ret := select( 0, @fd_Read, nil, nil, @timeout ); if ret = SOCKET_ERROR then begin MessageBox( 0, 'Call select() failed.', 'Error', MB_ICONERROR ); Exit; end;
if ret > 0 then begin if FD_ISSET( m_sock, fd_read ) then begin FillChar( FRecvBuf[0], PACK_SIZE_RECEIVE, 0 ); ret := recv( m_sock, FRecvBuf[0], PACK_SIZE_RECEIVE, 0 );
if (ret=0) or (ret=SOCKET_ERROR) then begin closesocket( m_sock ); Exit; end;
EnterCriticalSection( gCSListBox ); fmMain.ListBox1.Items.Add( sTitle + FRecvBuf ); LeaveCriticalSection( gCSListBox ); end; end; //if ret > 0
end; //while (not self.Terminated)
closesocket( m_sock ); end;
end. |
二:WSAAsyncSelect模型
后來(lái),老陳使用了微軟公司的新式信箱。這種信箱非常先進(jìn),一旦信箱里有新的信件,蓋茨就會(huì)給老陳打電話(huà):喂,大爺,你有新的信件了!從此,老陳再也不必頻繁上下樓檢查信箱了,牙也不疼了,你瞅準(zhǔn)了,藍(lán)天......不是,微軟~~~~~~~~
微軟提供的WSAAsyncSelect模型就是這個(gè)意思。
WSAAsyncSelect模型是Windows下最簡(jiǎn)單易用的一種Socket I/O模型。使用這種模型時(shí),Windows會(huì)把網(wǎng)絡(luò)事件以消息的形勢(shì)通知應(yīng)用程序。
首先定義一個(gè)消息標(biāo)示常量:
const WM_SOCKET = WM_USER + 55;
再在主Form的private域添加一個(gè)處理此消息的函數(shù)聲明:
private
procedure WMSocket(var Msg: TMessage); message WM_SOCKET;
然后就可以使用WSAAsyncSelect了:
var
addr : TSockAddr;
sock : TSocket;
sock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
addr.sin_family := AF_INET;
addr.sin_port := htons(5678);
addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( m_sock, @addr, sizeof(SOCKADDR) );
WSAAsyncSelect( m_sock, Handle, WM_SOCKET, FD_ACCEPT or FD_CLOSE );
listen( m_sock, 5 );
....
應(yīng)用程序可以對(duì)收到WM_SOCKET消息進(jìn)行分析,判斷是哪一個(gè)socket產(chǎn)生了網(wǎng)絡(luò)事件以及事件類(lèi)型:
procedure TfmMain.WMSocket(var Msg: TMessage);
var
sock : TSocket;
addr : TSockAddrIn;
addrlen : Integer;
buf : Array [0..4095] of Char;
begin
//Msg的WParam是產(chǎn)生了網(wǎng)絡(luò)事件的socket句柄,LParam則包含了事件類(lèi)型
case WSAGetSelectEvent( Msg.LParam ) of
FD_ACCEPT :
begin
addrlen := sizeof(addr);
sock := accept( Msg.WParam, addr, addrlen );
if sock <> INVALID_SOCKET then
WSAAsyncSelect( sock, Handle, WM_SOCKET, FD_READ or FD_WRITE or FD_CLOSE );
end;
FD_CLOSE : closesocket( Msg.WParam );
FD_READ : recv( Msg.WParam, buf[0], 4096, 0 );
FD_WRITE : ;
end;
end;
1。WSAAsyncSelect模型
這個(gè)很簡(jiǎn)單,貼個(gè)源碼了事。。。。。。。。。。。。
unit frmMain;
interface
uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, Winsock2, StdCtrls, ComCtrls;
const LISTEN_PORT = 5005; WM_SOCKET = WM_USER + 55;
type TfmMain = class(TForm) btnStart: TButton; btnStop: TButton; ListBox1: TListBox; StatusBar1: TStatusBar;
procedure FormCreate(Sender: TObject); procedure FormClose(Sender: TObject; var Action: TCloseAction); procedure btnStartClick(Sender: TObject); procedure btnStopClick(Sender: TObject);
private { Private declarations } procedure WMSocket(var Msg: TMessage); message WM_SOCKET; procedure SendBuf( hsock: TSocket ); procedure RecvBuf( hsock: TSocket ); public { Public declarations } m_sock : TSocket; //主socket m_connect_list : TList; //客戶(hù)連接列表 end;
var fmMain : TfmMain;
implementation
{$R *.dfm}
procedure TfmMain.WMSocket(var Msg: TMessage); var s : TSocket; addr : TSockAddrIn; addrlen : Integer; begin case WSAGetSelectEvent( Msg.LParam ) of FD_ACCEPT : begin addrlen := sizeof(addr); s := accept( m_sock, addr, addrlen ); if s <> INVALID_SOCKET then begin WSAAsyncSelect( s, Handle, WM_SOCKET, FD_READ or FD_WRITE or FD_CLOSE ); m_connect_list.Add( Pointer(s) ); StatusBar1.Panels[0].Text := 'Connection count: ' + IntToStr(m_connect_list.Count); end; end;
FD_CLOSE : begin if m_connect_list.IndexOf( Pointer(Msg.WParam) ) > -1 then begin m_connect_list.Remove( Pointer(Msg.WParam) ); StatusBar1.Panels[0].Text := 'Connection count: ' + IntToStr(m_connect_list.Count); end; closesocket( Msg.WParam ); end;
FD_READ : RecvBuf( Msg.WParam ); FD_WRITE : SendBuf( Msg.WParam ); end; //case... end;
procedure TfmMain.SendBuf( hsock: TSocket ); begin {/* 只有在三種條件下,才會(huì)發(fā)出FD_WRITE通知: ■使用connect或WSAConnect ,一個(gè)套接字首次建立了連接。 ■使用accept或WSAAccept,套接字被接受以后。 ■若send、WSASend、sendto或WSASendTo操作失敗,返回了WSAEWOULDBLOCK錯(cuò)誤, 而且緩沖區(qū)的空間變得可用 因此,作為一個(gè)應(yīng)用程序,自收到首條FD_WRITE消息開(kāi)始,便應(yīng)認(rèn)為自己必然能在一 個(gè)套接字上發(fā)出數(shù)據(jù),直至一個(gè)send、WSASend、sendto或WSASendTo返回套接字錯(cuò)誤 WSAEWOULDBLOCK。經(jīng)過(guò)了這樣的失敗以后,要再用另一條FD_WRITE通知應(yīng)用程序再次 送數(shù)據(jù)。 也就是說(shuō),不要關(guān)心FD_WRITE消息,盡管send,直到出現(xiàn)WSAEWOULDBLOCK錯(cuò)誤! */} end;
procedure TfmMain.RecvBuf( hsock: TSocket ); var buf : Array [0..4095] of Char; adr : TSockAddrIn; len : Integer; s : String; begin FillChar( buf[0], 4096, 0 ); recv( hsock, buf[0], 4096, 0 );
len := sizeof(adr); getpeername( hsock, adr, len ); s := inet_ntoa( adr.sin_addr ); s := 'IP: ' + s + ' Port: ' + IntToStr(ntohs(adr.sin_port)) + ' Msg: '; ListBox1.Items.Add( s + buf ); end;
procedure TfmMain.FormCreate(Sender: TObject); var wsa : TWSAData; begin if WSAStartup( $0202, wsa ) <> 0 then //WSAStartup returns zero if successful. begin MessageBox( 0, 'WSAStartup failed', 'Error', MB_ICONERROR ); btnStart.Enabled := False; btnStop.Enabled := False; end;
btnStart.Enabled := True; btnStop.Enabled := False;
m_connect_list := TList.Create; end;
procedure TfmMain.FormClose(Sender: TObject; var Action: TCloseAction); var i : Integer; begin shutdown( m_sock, SD_BOTH ); closesocket( m_sock );
//結(jié)束所有維護(hù)客戶(hù)端連接的線(xiàn)程 if m_connect_list.Count > 0 then for i:=0 to m_connect_list.Count-1 do begin shutdown( TSocket(m_connect_list.Items[i]), SD_BOTH ); closesocket( TSocket(m_connect_list.Items[i]) ); end;
m_connect_list.Free;
WSACleanup(); end;
procedure TfmMain.btnStartClick(Sender: TObject); var addr : TSockAddr; ret : Integer; begin m_sock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if m_sock = INVALID_SOCKET then begin MessageBox( 0, 'Call socket() failed.', 'Error', MB_ICONERROR ); Exit; end;
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind( m_sock, @addr, sizeof(SOCKADDR) ) = SOCKET_ERROR then begin MessageBox( 0, 'Call bind failed.', 'Error', MB_ICONERROR ); Exit; end;
ret := WSAAsyncSelect( m_sock, Handle, WM_SOCKET, FD_ACCEPT or FD_CLOSE ); if ret = SOCKET_ERROR then begin MessageBox( 0, 'Call WSAAsyncSelect failed.', 'Error', MB_ICONERROR ); Exit; end;
if listen( m_sock, 5 ) = SOCKET_ERROR then begin MessageBox( 0, 'Call listen failed.', 'Error', MB_ICONERROR ); Exit; end;
btnStart.Enabled := False; btnStop.Enabled := True; end;
procedure TfmMain.btnStopClick(Sender: TObject); var i : Integer; begin shutdown( m_sock, SD_BOTH ); closesocket( m_sock );
//結(jié)束所有維護(hù)客戶(hù)端連接的線(xiàn)程 if m_connect_list.Count > 0 then for i:=0 to m_connect_list.Count-1 do begin shutdown( TSocket(m_connect_list.Items[i]), SD_BOTH ); closesocket( TSocket(m_connect_list.Items[i]) ); end;
m_connect_list.Clear;
btnStart.Enabled := True; btnStop.Enabled := False; end;
end.
|
三:WSAEventSelect模型
后來(lái),微軟的信箱非常暢銷(xiāo),購(gòu)買(mǎi)微軟信箱的人以百萬(wàn)計(jì)數(shù)......以至于蓋茨每天24小時(shí)給客戶(hù)打電話(huà),累得腰酸背痛,喝蟻力神都不好使~~~~~~
微軟改進(jìn)了他們的信箱:在客戶(hù)的家中添加一個(gè)附加裝置,這個(gè)裝置會(huì)監(jiān)視客戶(hù)的信箱,每當(dāng)新的信件來(lái)臨,此裝置會(huì)發(fā)出“新信件到達(dá)”聲,提醒老陳去收信。蓋茨終于可以睡覺(jué)了。
同樣要使用線(xiàn)程:
procedure TListenThread.Execute;
var
hEvent : WSAEvent;
ret : Integer;
ne : TWSANetworkEvents;
sock : TSocket;
adr : TSockAddrIn;
sMsg : String;
Index,
EventTotal : DWORD;
EventArray : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of WSAEVENT;
begin
...socket...bind...
hEvent := WSACreateEvent();
WSAEventSelect( ListenSock, hEvent, FD_ACCEPT or FD_CLOSE );
...listen...
while ( not Terminated ) do
begin
Index := WSAWaitForMultipleEvents( EventTotal, @EventArray[0], FALSE, WSA_INFINITE, FALSE );
FillChar( ne, sizeof(ne), 0 );
WSAEnumNetworkEvents( SockArray[Index-WSA_WAIT_EVENT_0], EventArray[Index-WSA_WAIT_EVENT_0], @ne );
if ( ne.lNetworkEvents and FD_ACCEPT ) > 0 then
begin
if ne.iErrorCode[FD_ACCEPT_BIT] <> 0 then
continue;
ret := sizeof(adr);
sock := accept( SockArray[Index-WSA_WAIT_EVENT_0], adr, ret );
if EventTotal > WSA_MAXIMUM_WAIT_EVENTS-1 then//這里WSA_MAXIMUM_WAIT_EVENTS同樣是64
begin
closesocket( sock );
continue;
end;
hEvent := WSACreateEvent();
WSAEventSelect( sock, hEvent, FD_READ or FD_WRITE or FD_CLOSE );
SockArray[EventTotal] := sock;
EventArray[EventTotal] := hEvent;
Inc( EventTotal );
end;
if ( ne.lNetworkEvents and FD_READ ) > 0 then
begin
if ne.iErrorCode[FD_READ_BIT] <> 0 then
continue;
FillChar( RecvBuf[0], PACK_SIZE_RECEIVE, 0 );
ret := recv( SockArray[Index-WSA_WAIT_EVENT_0], RecvBuf[0], PACK_SIZE_RECEIVE, 0 );
......
end;
end;
end;
WSAEventSelect模型
看來(lái)大家不感興趣啊呵呵沒(méi)有信心了把代碼貼完拉倒。。。。。。
unit frmMain;
interface
uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, Winsock2, StdCtrls, ComCtrls, thrEvent;
const LISTEN_PORT = 5005;
type TfmMain = class(TForm) btnStart: TButton; btnStop: TButton; ListBox1: TListBox; StatusBar1: TStatusBar;
procedure FormCreate(Sender: TObject); procedure FormClose(Sender: TObject; var Action: TCloseAction); procedure btnStartClick(Sender: TObject); procedure btnStopClick(Sender: TObject); private { Private declarations } public { Public declarations } EventThread : TEventThread; end;
var fmMain : TfmMain;
implementation
{$R *.dfm}
procedure TfmMain.FormCreate(Sender: TObject); var wsa : TWSAData; begin if WSAStartup( $0202, wsa ) <> 0 then //WSAStartup returns zero if successful. begin MessageBox( 0, 'WSAStartup failed', 'Error', MB_ICONERROR ); btnStart.Enabled := False; btnStop.Enabled := False; end;
btnStart.Enabled := True; btnStop.Enabled := False; end;
procedure TfmMain.FormClose(Sender: TObject; var Action: TCloseAction); begin WSACleanup(); end;
procedure TfmMain.btnStartClick(Sender: TObject); begin EventThread := TEventThread.Create( True ); EventThread.FreeOnTerminate := True; EventThread.OnTerminate := EventThread.WhileTerminate; EventThread.Resume;
btnStart.Enabled := False; btnStop.Enabled := True; end;
procedure TfmMain.btnStopClick(Sender: TObject); begin EventThread.Terminate; btnStart.Enabled := True; btnStop.Enabled := False; end;
end. //--------------------------------------------------------------------------------------
unit thrEvent;
interface
uses Windows, SysUtils, Classes, Winsock2;
const PACK_SIZE_RECEIVE = 4096;
type TEventThread = class(TThread) public procedure WhileTerminate(Sender: TObject); private ListenSock : TSocket; SockArray : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of TSocket; EventArray : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of WSAEVENT; EventTotal : DWORD; Index : DWORD; RecvBuf : Array [0..PACK_SIZE_RECEIVE-1] of Char;
procedure InitSock; procedure CompressArray(idx: DWORD); protected procedure Execute; override; end;
implementation
uses frmMain;
{ TEventThread }
procedure TEventThread.Execute; var hEvent : WSAEvent; ret : Integer; ne : TWSANetworkEvents; sock : TSocket; adr : TSockAddrIn; sMsg : String; begin InitSock(); if EventTotal = 0 then Exit;
while ( not Terminated ) do begin Index := WSAWaitForMultipleEvents( EventTotal, @EventArray[0], FALSE, WSA_INFINITE, FALSE ); if Index = WSA_WAIT_FAILED then begin MessageBox( 0,'Call WSAWaitForMultipleEvents failed.','Error',MB_ICONERROR ); Exit; end;
FillChar( ne, sizeof(ne), 0 ); WSAEnumNetworkEvents( SockArray[Index-WSA_WAIT_EVENT_0], EventArray[Index-WSA_WAIT_EVENT_0], @ne );
if ( ne.lNetworkEvents and FD_ACCEPT ) > 0 then begin if ne.iErrorCode[FD_ACCEPT_BIT] <> 0 then continue;
ret := sizeof(adr); sock := accept( SockArray[Index-WSA_WAIT_EVENT_0], adr, ret ); if EventTotal > WSA_MAXIMUM_WAIT_EVENTS-1 then begin closesocket( sock ); continue; end;
hEvent := WSACreateEvent(); WSAEventSelect( sock, hEvent, FD_READ or FD_WRITE or FD_CLOSE ); SockArray[EventTotal] := sock; EventArray[EventTotal] := hEvent; Inc( EventTotal );
fmMain.StatusBar1.Panels[0].Text := 'Connection: ' +IntToStr(EventTotal-1); end;
if ( ne.lNetworkEvents and FD_READ ) > 0 then begin if ne.iErrorCode[FD_READ_BIT] <> 0 then continue;
FillChar( RecvBuf[0], PACK_SIZE_RECEIVE, 0 ); ret := recv( SockArray[Index-WSA_WAIT_EVENT_0], RecvBuf[0], PACK_SIZE_RECEIVE, 0 ); if (ret=0) or (ret=SOCKET_ERROR) then continue;
ret := sizeof(adr); getpeername( SockArray[Index-WSA_WAIT_EVENT_0], adr, ret ); sMsg := inet_ntoa( adr.sin_addr ); sMsg := 'IP: ' +sMsg +' Port: ' +IntToStr(ntohs(adr.sin_port)) +' Msg: '; fmMain.ListBox1.Items.Add( sMsg + RecvBuf ); end; { if ( ne.lNetworkEvents and FD_WRITE ) > 0 then begin if ne.iErrorCode[FD_WRITE_BIT] <> 0 then continue;
... end; }
if ( ne.lNetworkEvents and FD_CLOSE ) > 0 then begin if ne.iErrorCode[FD_CLOSE_BIT] <> 0 then continue;
WSACloseEvent( EventArray[Index-WSA_WAIT_EVENT_0] ); closesocket( SockArray[Index-WSA_WAIT_EVENT_0] ); CompressArray( Index-WSA_WAIT_EVENT_0 );
fmMain.StatusBar1.Panels[0].Text := 'Connection: ' +IntToStr(EventTotal-1); end; end; end;
procedure TEventThread.InitSock; var addr : TSockAddr; hEvent : WSAEvent; begin EventTotal := 0; ListenSock := INVALID_SOCKET;
ListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if ListenSock = INVALID_SOCKET then begin MessageBox( 0, 'Call socket() failed.', 'Error', MB_ICONERROR ); Exit; end;
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind( ListenSock, @addr, sizeof(SOCKADDR) ) = SOCKET_ERROR then begin MessageBox( 0, 'Call bind failed.', 'Error', MB_ICONERROR ); Exit; end;
hEvent := WSACreateEvent(); if hEvent = WSA_INVALID_EVENT then begin MessageBox( 0, 'Call WSACreateEvent failed.', 'Error', MB_ICONERROR ); Exit; end;
if WSAEventSelect( ListenSock,hEvent,FD_ACCEPT or FD_CLOSE )=SOCKET_ERROR then begin MessageBox( 0, 'Call WSAEventSelect failed.', 'Error', MB_ICONERROR ); Exit; end;
if listen( ListenSock, 5 ) = SOCKET_ERROR then begin MessageBox( 0, 'Call listen failed.', 'Error', MB_ICONERROR ); Exit; end;
SockArray[EventTotal] := ListenSock; EventArray[EventTotal] := hEvent; Inc( EventTotal ); end;
procedure TEventThread.CompressArray(idx: DWORD); var i : Integer; begin if idx = EventTotal-1 then begin Dec( EventTotal ); Exit; end;
for i:=idx to EventTotal-2 do begin SockArray[i] := SockArray[i+1]; EventArray[i] := EventArray[i+1]; end; Dec( EventTotal ); end;
procedure TEventThread.WhileTerminate(Sender: TObject); var i : Integer; begin if EventTotal > 0 then begin for i:=0 to EventTotal-1 do begin WSACloseEvent( EventArray[i] ); shutdown( SockArray[i], SD_BOTH ); closesocket( SockArray[i] ); end; end; end;
end.
|
四:Overlapped I/O 事件通知模型
后來(lái),微軟通過(guò)調(diào)查發(fā)現(xiàn),老陳不喜歡上下樓收發(fā)信件,因?yàn)樯舷聵瞧鋵?shí)很浪費(fèi)時(shí)間。于是微軟再次改進(jìn)他們的信箱。新式的信箱采用了更為先進(jìn)的技術(shù),只要用戶(hù)告訴微軟自己的家在幾樓幾號(hào),新式信箱會(huì)把信件直接傳送到用戶(hù)的家中,然后告訴用戶(hù),你的信件已經(jīng)放到你的家中了!老陳很高興,因?yàn)樗槐卦儆H自收發(fā)信件了!
Overlapped I/O 事件通知模型和WSAEventSelect模型在實(shí)現(xiàn)上非常相似,主要區(qū)別在“Overlapped”,Overlapped模型是讓?xiě)?yīng)用程序使用重疊數(shù)據(jù)結(jié)構(gòu)(WSAOVERLAPPED),一次投遞一個(gè)或多個(gè)Winsock I/O請(qǐng)求。這些提交的請(qǐng)求完成后,應(yīng)用程序會(huì)收到通知。什么意思呢?就是說(shuō),如果你想從socket上接收數(shù)據(jù),只需要告訴系統(tǒng),由系統(tǒng)為你接收數(shù)據(jù),而你需要做的只是為系統(tǒng)提供一個(gè)緩沖區(qū)~~~~~
Listen線(xiàn)程和WSAEventSelect模型一模一樣,Recv/Send線(xiàn)程則完全不同:
procedure TOverlapThread.Execute;
var
dwTemp : DWORD;
ret : Integer;
Index : DWORD;
begin
......
while ( not Terminated ) do
begin
Index := WSAWaitForMultipleEvents( FLinks.Count, @FLinks.Events[0], FALSE, RECV_TIME_OUT, FALSE );
Dec( Index, WSA_WAIT_EVENT_0 );
if Index > WSA_MAXIMUM_WAIT_EVENTS-1 then //超時(shí)或者其他錯(cuò)誤
continue;
WSAResetEvent( FLinks.Events[Index] );
WSAGetOverlappedResult( FLinks.Sockets[Index], FLinks.pOverlaps[Index], @dwTemp, FALSE, FLinks.pdwFlags[Index]^ );
if dwTemp = 0 then //連接已經(jīng)關(guān)閉
begin
......
continue;
end else
begin
fmMain.ListBox1.Items.Add( FLinks.pBufs[Index]^.buf );
end;
//初始化緩沖區(qū)
FLinks.pdwFlags[Index]^ := 0;
FillChar( FLinks.pOverlaps[Index]^, sizeof(WSAOVERLAPPED), 0 );
FLinks.pOverlaps[Index]^.hEvent := FLinks.Events[Index];
FillChar( FLinks.pBufs[Index]^.buf^, BUFFER_SIZE, 0 );
//遞一個(gè)接收數(shù)據(jù)請(qǐng)求
WSARecv( FLinks.Sockets[Index], FLinks.pBufs[Index], 1, FLinks.pdwRecvd[Index]^, FLinks.pdwFlags[Index]^, FLinks.pOverlaps[Index], nil );
end;
end;
Overlapped I/O 事件通知
unit thrAccept;
interface
uses Windows, SysUtils, Classes, Winsock2, thrOverlap;
type TEventThread = class(TThread) private FListenSock : TSocket; FListenEvent : WSAEVENT; FRWThread : TOverlapThread; protected procedure Execute; override; function InitSock():BOOL; procedure FreeResource; end;
var gCS1 : TRTLCriticalSection; //臨界區(qū),保證線(xiàn)程安全 gSockTotal : DWORD; gSockArray : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of TSocket;
implementation
uses frmMain;
{ TEventThread }
procedure TEventThread.Execute; var ret : Integer; ne : TWSANetworkEvents; sock : TSocket; adr : TSockAddrIn; begin if not InitSock() then Exit;
InitializeCriticalSection( gCS1 ); gSockTotal := 0;
FRWThread := TOverlapThread.Create( True ); FRWThread.FreeOnTerminate := True; FRWThread.Resume;
while ( not Terminated ) do begin WSAWaitForMultipleEvents( 1, @FListenEvent, FALSE, ACCEPT_TIME_OUT, FALSE ); FillChar( ne, sizeof(ne), 0 ); WSAEnumNetworkEvents( FListenSock, FListenEvent, @ne );
if ( ne.lNetworkEvents and FD_ACCEPT ) > 0 then begin if ne.iErrorCode[FD_ACCEPT_BIT] <> 0 then continue; ret := sizeof(adr); sock := accept( FListenSock, adr, ret ); if sock = INVALID_SOCKET then continue; EnterCriticalSection( gCS1 ); ret := gSockTotal; LeaveCriticalSection( gCS1 );
if ret > WSA_MAXIMUM_WAIT_EVENTS-1 then begin closesocket( sock ); continue; end;
EnterCriticalSection( gCS1 ); gSockArray[gSockTotal] := sock; Inc( gSockTotal ); ret := gSockTotal; LeaveCriticalSection( gCS1 );
fmMain.StatusBar1.Panels[0].Text := 'Connection: ' + IntToStr(ret); end;
//不關(guān)心其他事件。雖然客戶(hù)端斷開(kāi)連接會(huì)ne.lNetworkEvents==0,但是鑒于本線(xiàn)程 //僅僅負(fù)責(zé)accept,所以不響應(yīng)其他事件。 end;
FreeResource; end;
function TEventThread.InitSock: BOOL; var addr : TSockAddr; begin result := False;
FListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( FListenSock, @addr, sizeof(SOCKADDR) ); FListenEvent := WSACreateEvent(); WSAEventSelect( FListenSock, FListenEvent, FD_ACCEPT ); listen( FListenSock, 5 );
result := True; end;
procedure TEventThread.FreeResource; begin FRWThread.Terminate;
DeleteCriticalSection( gCS1 );
closesocket( FListenSock ); WSACloseEvent( FListenEvent ); end;
end. --------------------------------------------------------------------------------- unit thrOverlap;
interface
uses Windows, SysUtils, Classes, Winsock2;
const BUFFER_SIZE = 4096; ACCEPT_TIME_OUT = 550; RECV_TIME_OUT = 550;
type YOverlappedSockets = record Count : DWORD; Sockets : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of TSocket; Events : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of WSAEVENT; pOverlaps : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of PWSAOVERLAPPED; pBufs : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of PWSABUF; pdwRecvd : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of PDWORD; pdwFlags : Array [0..WSA_MAXIMUM_WAIT_EVENTS-1] of PDWORD; end;
type TOverlapThread = class(TThread) private FLinks : YOverlappedSockets; protected procedure Execute; override; procedure CompressArray(idx: DWORD); procedure DoNewConnection(dwCount: DWORD); procedure FreeResource; end;
implementation
uses thrAccept, frmMain;
{ TOverlapThread }
procedure TOverlapThread.Execute; var dwTemp : DWORD; ret : Integer; Index : DWORD; begin for ret:=0 to WSA_MAXIMUM_WAIT_EVENTS-1 do begin New( FLinks.pdwRecvd[ret] ); FLinks.pdwRecvd[ret]^ := 0; New( FLinks.pdwFlags[ret] ); FLinks.pdwFlags[ret]^ := 0; New( FLinks.pOverlaps[ret] ); New( FLinks.pBufs[ret] );
FLinks.pBufs[ret]^.len := BUFFER_SIZE; FLinks.pBufs[ret]^.buf := AllocMem( BUFFER_SIZE ); end;
while ( not Terminated ) do begin EnterCriticalSection( gCS1 ); dwTemp := gSockTotal; //得到連接數(shù)量 LeaveCriticalSection( gCS1 );
if dwTemp = 0 then //沒(méi)有客戶(hù)連接 dwTemp==FLinks.Count說(shuō)明沒(méi)有新的連接 continue; //dwTemp < FLinks.Count --- 沒(méi)有這種可能性
if dwTemp > FLinks.Count then //Accept線(xiàn)程接受了新的連接 DoNewConnection( dwTemp );
Index := WSAWaitForMultipleEvents( FLinks.Count, @FLinks.Events[0], FALSE, RECV_TIME_OUT, FALSE ); Dec( Index, WSA_WAIT_EVENT_0 ); if Index > WSA_MAXIMUM_WAIT_EVENTS-1 then //超時(shí)或者其他錯(cuò)誤 continue;
WSAResetEvent( FLinks.Events[Index] ); WSAGetOverlappedResult( FLinks.Sockets[Index], FLinks.pOverlaps[Index], @dwTemp, FALSE, FLinks.pdwFlags[Index]^ );
if dwTemp = 0 then //連接已經(jīng)關(guān)閉 begin closesocket( FLinks.Sockets[Index] ); WSACloseEvent( FLinks.Events[Index] ); CompressArray( Index ); fmMain.StatusBar1.Panels[0].Text := 'Connection: '+IntToStr(FLinks.Count); continue; end else begin fmMain.ListBox1.Items.Add( FLinks.pBufs[Index]^.buf ); end;
FLinks.pdwFlags[Index]^ := 0; FillChar( FLinks.pOverlaps[Index]^, sizeof(WSAOVERLAPPED), 0 ); FLinks.pOverlaps[Index]^.hEvent := FLinks.Events[Index]; FillChar( FLinks.pBufs[Index]^.buf^, BUFFER_SIZE, 0 ); WSARecv( FLinks.Sockets[Index], FLinks.pBufs[Index], 1, FLinks.pdwRecvd[Index]^, FLinks.pdwFlags[Index]^, FLinks.pOverlaps[Index], nil ); end;
FreeResource; end;
procedure TOverlapThread.CompressArray(idx: DWORD); var i : Integer; p1,p2,p3,p4 : Pointer; begin EnterCriticalSection( gCS1 ); if idx = gSockTotal-1 then begin Dec( gSockTotal ); end else begin for i:=idx to gSockTotal-2 do gSockArray[i] := gSockArray[i+1]; Dec( gSockTotal ); end; LeaveCriticalSection( gCS1 );
if idx = FLinks.Count-1 then begin Dec( FLinks.Count ); Exit; end else begin p1 := FLinks.pOverlaps[idx]; p2 := FLinks.pBufs[idx]; p3 := FLinks.pdwRecvd[idx]; p4 := FLinks.pdwFlags[idx];
for i:=idx to FLinks.Count-2 do begin FLinks.Sockets[i] := FLinks.Sockets[i+1]; FLinks.Events[i] := FLinks.Events[i+1]; FLinks.pOverlaps[i] := FLinks.pOverlaps[i+1]; FLinks.pBufs[i] := FLinks.pBufs[i+1]; FLinks.pdwRecvd[i] := FLinks.pdwRecvd[i+1]; FLinks.pdwFlags[i] := FLinks.pdwFlags[i+1]; end;
FLinks.pOverlaps[FLinks.Count-1] := p1; FLinks.pBufs[FLinks.Count-1] := p2; FLinks.pdwRecvd[FLinks.Count-1] := p3; FLinks.pdwFlags[FLinks.Count-1] := p4; Dec( FLinks.Count ); end; end;
procedure TOverlapThread.DoNewConnection(dwCount: DWORD); var ret : Integer; begin EnterCriticalSection( gCS1 ); for ret:=dwCount-1 downto FLinks.Count do FLinks.Sockets[ret] := gSockArray[ret]; LeaveCriticalSection( gCS1 );
for ret:=dwCount-1 downto FLinks.Count do begin FLinks.Events[ret] := WSACreateEvent(); FillChar( FLinks.pOverlaps[ret]^, sizeof(WSAOVERLAPPED), 0 ); FLinks.pOverlaps[ret]^.hEvent := FLinks.Events[ret]; WSARecv( FLinks.Sockets[ret], FLinks.pBufs[ret], 1, FLinks.pdwRecvd[ret]^, FLinks.pdwFlags[ret]^, FLinks.pOverlaps[ret], nil ); end;
FLinks.Count := dwCount; end;
procedure TOverlapThread.FreeResource; var i : Integer; begin if FLinks.Count > 0 then begin for i:=0 to FLinks.Count-1 do begin closesocket( FLinks.Sockets[i] ); WSACloseEvent( FLinks.Events[i] ); end; end;
for i:=0 to WSA_MAXIMUM_WAIT_EVENTS-1 do begin FreeMem( FLinks.pBufs[i]^.buf ); Dispose( FLinks.pdwRecvd[i] ); Dispose( FLinks.pdwFlags[i] ); Dispose( FLinks.pOverlaps[i] ); Dispose( FLinks.pBufs[i] ); end; end;
end.
|
五:Overlapped I/O 完成例程模型
老陳接收到新的信件后,一般的程序是:打開(kāi)信封----掏出信紙----閱讀信件----回復(fù)信件......為了進(jìn)一步減輕用戶(hù)負(fù)擔(dān),微軟又開(kāi)發(fā)了一種新的技術(shù):用戶(hù)只要告訴微軟對(duì)信件的操作步驟,微軟信箱將按照這些步驟去處理信件,不再需要用戶(hù)親自拆信/閱讀/回復(fù)了!老陳終于過(guò)上了小資生活!
Overlapped I/O 完成例程要求用戶(hù)提供一個(gè)回調(diào)函數(shù),發(fā)生新的網(wǎng)絡(luò)事件的時(shí)候系統(tǒng)將執(zhí)行這個(gè)函數(shù):
procedure WorkerRoutine( const dwError, cbTransferred : DWORD; const
lpOverlapped : LPWSAOVERLAPPED; const dwFlags : DWORD ); stdcall;
然后告訴系統(tǒng)用WorkerRoutine函數(shù)處理接收到的數(shù)據(jù):
WSARecv( m_socket, @FBuf, 1, dwTemp, dwFlag, @m_overlap, WorkerRoutine );
然后......沒(méi)有什么然后了,系統(tǒng)什么都給你做了!微軟真實(shí)體貼!
while ( not Terminated ) do//這就是一個(gè)Recv/Send線(xiàn)程要做的事情......什么都不用做啊!!!
begin
if SleepEx( RECV_TIME_OUT, True ) = WAIT_IO_COMPLETION then //
begin
;
end else
begin
continue;
end;
end;
Overlapped I/O 完成例程
據(jù)說(shuō),“重疊I / O (Overlapped I/O )模型使應(yīng)用程序能達(dá)到更佳的系統(tǒng)性能。”,不過(guò)性能到底“更佳”了多少,沒(méi)有做過(guò)測(cè)試,不清楚。。。道理網(wǎng)上有很多,不講了,還是直接貼代碼。。。
unit thrAccept;
interface
uses Windows, SysUtils, Classes, Winsock2, thrOverlap;
type TEventThread = class(TThread) private FListenSock : TSocket; FListenEvent : WSAEVENT; FRWThread : TOverlapThread; protected procedure Execute; override; function InitSock: BOOL; procedure FreeResource; end;
implementation
uses frmMain;
{ TEventThread }
procedure TEventThread.Execute; var ret : Integer; ne : TWSANetworkEvents; sock : TSocket; adr : TSockAddrIn; begin if not InitSock() then Exit;
FRWThread := TOverlapThread.Create( True ); FRWThread.FreeOnTerminate := True; FRWThread.Resume;
while ( not Terminated ) do begin WSAWaitForMultipleEvents( 1, @FListenEvent, FALSE, ACCEPT_TIME_OUT, FALSE );
FillChar( ne, sizeof(ne), 0 ); WSAEnumNetworkEvents( FListenSock, FListenEvent, @ne ); //此函數(shù)使FListenEvent自動(dòng)成為“未傳信”狀態(tài). 不再需要使用WSAResetEvent
if ( ne.lNetworkEvents and FD_ACCEPT ) > 0 then begin if ne.iErrorCode[FD_ACCEPT_BIT] <> 0 then continue;
ret := sizeof(adr); sock := accept( FListenSock, adr, ret ); if sock = INVALID_SOCKET then continue;
//fmMain.StatusBar1.Panels[0].Text := 'Connection: ' + IntToStr(gSockTotal); end;
//不關(guān)心其他事件。雖然客戶(hù)端斷開(kāi)連接會(huì)ne.lNetworkEvents==0,但是鑒于本線(xiàn)程 //僅僅負(fù)責(zé)accept,所以不響應(yīng)其他事件。 end;
FreeResource; end;
function TEventThread.InitSock: BOOL; var addr : TSockAddr; begin result := False;
FListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
bind( FListenSock, @addr, sizeof(SOCKADDR) ); FListenEvent := WSACreateEvent(); WSAEventSelect( FListenSock, FListenEvent, FD_ACCEPT ); listen( FListenSock, 5 ); result := True; end;
procedure TEventThread.FreeResource; begin closesocket( FListenSock ); WSACloseEvent( FListenEvent ); end;
end.
unit thrOverlap;
interface
uses Windows, SysUtils, Classes, Winsock2;
const BUFFER_SIZE = 4096; ACCEPT_TIME_OUT = 550; RECV_TIME_OUT = 550;
type TOverlapThread = class(TThread) private FBuf : WSABUF; public m_socket : TSocket; m_overlap : WSAOVERLAPPED; protected procedure Execute; override; end;
procedure WorkerRoutine( const dwError, cbTransferred : DWORD; const lpOverlapped : LPWSAOVERLAPPED; const dwFlags : DWORD ); stdcall;
implementation
uses frmMain;
{ TOverlapThread }
procedure TOverlapThread.Execute; var dwTemp, dwFlag : DWORD; begin FBuf.len := BUFFER_SIZE; FBuf.buf := AllocMem( BUFFER_SIZE );
dwFlag := 0; FillChar( m_overlap, sizeof(WSAOVERLAPPED), 0 ); m_overlap.hEvent := DWORD(self);{If lpCompletionRoutine is not NULL, the hEvent field is ignored and can be used by the application to pass context information to the completion routine.} WSARecv( m_socket, @FBuf, 1, dwTemp, dwFlag, @m_overlap, WorkerRoutine );
while ( not Terminated ) do begin if SleepEx( RECV_TIME_OUT, True ) = WAIT_IO_COMPLETION then // begin ; end else begin continue; end; end; end;
procedure WorkerRoutine( const dwError, cbTransferred : DWORD; const lpOverlapped : LPWSAOVERLAPPED; const dwFlags : DWORD ); var dwTemp, Flags : DWORD; begin if ( dwError <> 0 ) or ( cbTransferred = 0 ) then begin closesocket( TOverlapThread(lpOverlapped^.hEvent).m_socket ); Exit; end;
fmMain.ListBox1.Items.Add( TOverlapThread(lpOverlapped^.hEvent).FBuf.buf ); FillChar( TOverlapThread(lpOverlapped^.hEvent).FBuf.buf^, BUFFER_SIZE, 0 );
Flags := 0; FillChar( lpOverlapped^, sizeof(WSAOVERLAPPED), 0 );
if WSARecv( TOverlapThread(lpOverlapped^.hEvent).m_socket, @(TOverlapThread(lpOverlapped^.hEvent)).FBuf, 1, dwTemp, Flags, @(TOverlapThread(lpOverlapped^.hEvent)).m_overlap, WorkerRoutine ) = SOCKET_ERROR then begin ; end; end;
end. |
六:IOCP模型
微軟信箱似乎很完美,老陳也很滿(mǎn)意。但是在一些大公司情況卻完全不同!這些大公司有數(shù)以萬(wàn)計(jì)的信箱,每秒鐘都有數(shù)以百計(jì)的信件需要處理,以至于微軟信箱經(jīng)常因超負(fù)荷運(yùn)轉(zhuǎn)而崩潰!需要重新啟動(dòng)!微軟不得不使出殺手锏......
微軟給每個(gè)大公司派了一名名叫“Completion Port”的超級(jí)機(jī)器人,讓這個(gè)機(jī)器人去處理那些信件!
“Windows NT小組注意到這些應(yīng)用程序的性能沒(méi)有預(yù)料的那么高。特別的,處理很多同時(shí)的客戶(hù)請(qǐng)求意味著很多線(xiàn)程并發(fā)地運(yùn)行在系統(tǒng)中。因?yàn)樗羞@些線(xiàn)程都是可運(yùn)行的[沒(méi)有被掛起和等待發(fā)生什么事],Microsoft意識(shí)到NT內(nèi)核花費(fèi)了太多的時(shí)間來(lái)轉(zhuǎn)換運(yùn)行線(xiàn)程的上下文[Context],線(xiàn)程就沒(méi)有得到很多CPU時(shí)間來(lái)做它們的工作。大家可能也都感覺(jué)到并行模型的瓶頸在于它為每一個(gè)客戶(hù)請(qǐng)求都創(chuàng)建了一個(gè)新線(xiàn)程。創(chuàng)建線(xiàn)程比起創(chuàng)建進(jìn)程開(kāi)銷(xiāo)要小,但也遠(yuǎn)不是沒(méi)有開(kāi)銷(xiāo)的。我們不妨設(shè)想一下:如果事先開(kāi)好N個(gè)線(xiàn)程,讓它們?cè)谀莌old[堵塞],然后可以將所有用戶(hù)的請(qǐng)求都投遞到一個(gè)消息隊(duì)列中去。然后那N個(gè)線(xiàn)程逐一從消息隊(duì)列中去取出消息并加以處理。就可以避免針對(duì)每一個(gè)用戶(hù)請(qǐng)求都開(kāi)線(xiàn)程。不僅減少了線(xiàn)程的資源,也提高了線(xiàn)程的利用率。理論上很不錯(cuò),你想我等泛泛之輩都能想出來(lái)的問(wèn)題,Microsoft又怎會(huì)沒(méi)有考慮到呢?”-----摘自nonocast的《理解I/O Completion Port》
先看一下IOCP模型的實(shí)現(xiàn):
//創(chuàng)建一個(gè)完成端口
FCompletPort := CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0,0,0 );
//接受遠(yuǎn)程連接,并把這個(gè)連接的socket句柄綁定到剛才創(chuàng)建的IOCP上
AConnect := accept( FListenSock, addr, len);
CreateIoCompletionPort( AConnect, FCompletPort, nil, 0 );
//創(chuàng)建CPU數(shù)*2 + 2個(gè)線(xiàn)程
for i:=1 to si.dwNumberOfProcessors*2+2 do
begin
AThread := TRecvSendThread.Create( false );
AThread.CompletPort := FCompletPort;//告訴這個(gè)線(xiàn)程,你要去這個(gè)IOCP去訪(fǎng)問(wèn)數(shù)據(jù)
end;
OK,就這么簡(jiǎn)單,我們要做的就是建立一個(gè)IOCP,把遠(yuǎn)程連接的socket句柄綁定到剛才創(chuàng)建的IOCP上,最后創(chuàng)建n個(gè)線(xiàn)程,并告訴這n個(gè)線(xiàn)程到這個(gè)IOCP上去訪(fǎng)問(wèn)數(shù)據(jù)就可以了。
再看一下TRecvSendThread線(xiàn)程都干些什么:
procedure TRecvSendThread.Execute;
var
......
begin
while (not self.Terminated) do
begin
//查詢(xún)IOCP狀態(tài)(數(shù)據(jù)讀寫(xiě)操作是否完成)
GetQueuedCompletionStatus( CompletPort, BytesTransd, CompletKey, POVERLAPPED(pPerIoDat), TIME_OUT );
if BytesTransd <> 0 then
....;//數(shù)據(jù)讀寫(xiě)操作完成
//再投遞一個(gè)讀數(shù)據(jù)請(qǐng)求
WSARecv( CompletKey, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil );
end;
end;
讀寫(xiě)線(xiàn)程只是簡(jiǎn)單地檢查IOCP是否完成了我們投遞的讀寫(xiě)操作,如果完成了則再投遞一個(gè)新的讀寫(xiě)請(qǐng)求。
應(yīng)該注意到,我們創(chuàng)建的所有TRecvSendThread都在訪(fǎng)問(wèn)同一個(gè)IOCP(因?yàn)槲覀冎粍?chuàng)建了一個(gè)IOCP),并且我們沒(méi)有使用臨界區(qū)!難道不會(huì)產(chǎn)生沖突嗎?不用考慮同步問(wèn)題嗎?
呵呵,這正是IOCP的奧妙所在。IOCP不是一個(gè)普通的對(duì)象,不需要考慮線(xiàn)程安全問(wèn)題。它會(huì)自動(dòng)調(diào)配訪(fǎng)問(wèn)它的線(xiàn)程:如果某個(gè)socket上有一個(gè)線(xiàn)程A正在訪(fǎng)問(wèn),那么線(xiàn)程B的訪(fǎng)問(wèn)請(qǐng)求會(huì)被分配到另外一個(gè)socket。這一切都是由系統(tǒng)自動(dòng)調(diào)配的,我們無(wú)需過(guò)問(wèn)。
完成端口
unit frmMain;
interface
uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, Winsock2, StdCtrls, thrListen;
type TfmMain = class(TForm) btnStart: TButton; ListBox1: TListBox; btnStop: TButton; procedure btnStartClick(Sender: TObject); procedure FormCreate(Sender: TObject); procedure FormClose(Sender: TObject; var Action: TCloseAction); procedure btnStopClick(Sender: TObject); private { Private declarations } FListenThread : TListenThread; public { Public declarations } end;
const LISTEN_PORT = 5005;
var fmMain: TfmMain;
implementation
{$R *.dfm}
procedure TfmMain.btnStartClick(Sender: TObject); begin FListenThread := TListenThread.Create( true ); FListenThread.FreeOnTerminate := true; FListenThread.Resume;
btnStop.Enabled := true; btnStart.Enabled := false; end;
procedure TfmMain.btnStopClick(Sender: TObject); begin FListenThread.terminate; btnStop.Enabled := false; btnStart.Enabled := true; end;
procedure TfmMain.FormCreate(Sender: TObject); var wsa : TWSAData; begin if WSAStartup( $0202, wsa ) <> 0 then //WSAStartup returns zero if successful. begin MessageBox( 0, 'WSAStartup failed', 'Error', MB_ICONERROR ); btnStart.Enabled := False; btnStop.Enabled := False; end;
btnStart.Enabled := true; btnStop.Enabled := false; end;
procedure TfmMain.FormClose(Sender: TObject; var Action: TCloseAction); begin WSACleanup(); end;
end. //---------------------------------------------------------------------
unit thrListen;
interface
uses Windows, Classes, Winsock2;
const RECV_POSTED = 0; SEND_POSTED = 1; TIME_OUT = 110; BUFFER_SIZE = 4096;
type YPER_OPERATION_DATA = record Overlap : OVERLAPPED; BufData : WSABUF; Buf : Array [0..BUFFER_SIZE-1] of Char; OprtType : Integer; end; PPER_OPERATION_DATA = ^YPER_OPERATION_DATA;
YPER_HANDLE_DATA = record Sock : TSocket; Ip : Array [0..15] of Char; Port : DWORD; OprtType : Integer; end; PPER_HANDLE_DATA = ^YPER_HANDLE_DATA;
type TListenThread = class(TThread) private { Private declarations } FCompletPort : THandle; FListenSock : TSocket; function InitSocket: BOOL; protected procedure Execute; override; end;
function WorkerThread( CompletPortID: Pointer ): DWORD; stdcall;
implementation
uses frmMain;
{ TListenThread }
procedure TListenThread.Execute; var si : SYSTEM_INFO; i : Integer; hThread : THandle; ThreadID : DWORD; AConnect : TSocket; addr : TSockAddrIn; len : Integer; BytesRecv, Flags : DWORD; pPerIoDat : PPER_OPERATION_DATA; begin FCompletPort := CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0,0,0 ); if FCompletPort = 0 then begin MessageBox( 0, 'CreateIoCompletionPort failed.', 'Error', MB_OK ); Exit; end;
GetSystemInfo( si ); for i:=0 to si.dwNumberOfProcessors-1 do begin hThread := CreateThread( nil,0,@WorkerThread,Pointer(FCompletPort),0,ThreadID ); CloseHandle( hThread ); end;
if not InitSocket() then Exit;
while (not self.Terminated) do begin len := sizeof(addr); AConnect := accept( FListenSock, addr, len); if AConnect = INVALID_SOCKET then begin sleepex( 110, false ); continue; end;
CreateIoCompletionPort( AConnect, FCompletPort, AConnect, 0 );
New( pPerIoDat );
FillChar( pPerIoDat^.Overlap, sizeof(OVERLAPPED), 0 ); FillChar( pPerIoDat^.Buf[0], BUFFER_SIZE, 0 ); pPerIoDat^.BufData.len := BUFFER_SIZE; pPerIoDat^.BufData.buf := pPerIoDat^.Buf; pPerIoDat.OprtType := RECV_POSTED;
Flags := 0; WSARecv( AConnect, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil ); end;
PostQueuedCompletionStatus( FCompletPort, 0,0,nil ); CloseHandle( FCompletPort ); end;
function TListenThread.InitSocket: BOOL; var addr : TSockAddr; begin result := False;
FListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if FListenSock = INVALID_SOCKET then begin MessageBox( 0, 'Call socket() failed.', 'Error', MB_ICONERROR ); Exit; end;
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind( FListenSock, @addr, sizeof(SOCKADDR) ) = SOCKET_ERROR then begin MessageBox( 0, 'Call bind failed.', 'Error', MB_ICONERROR ); Exit; end;
if listen( FListenSock, 5 ) = SOCKET_ERROR then begin MessageBox( 0, 'Call listen failed.', 'Error', MB_ICONERROR ); Exit; end;
result := True; end;
function WorkerThread( CompletPortID: Pointer ): DWORD; var CompletPort : THandle; CompletKey, BytesTransd, BytesSend, BytesRecv, Flags : DWORD; pPerIoDat : PPER_OPERATION_DATA; begin CompletPort := DWORD(CompletPortID);
while True do begin BytesTransd:=0;CompletKey:=0; GetQueuedCompletionStatus( CompletPort, BytesTransd, CompletKey, POVERLAPPED(pPerIoDat), 550 );
if ( BytesTransd = 0 ) and ( (pPerIoDat=nil )or(pPerIoDat^.OprtType = RECV_POSTED)or (pPerIoDat^.OprtType = SEND_POSTED) ) then begin closesocket( CompletKey ); Dispose( pPerIoDat ); continue; end;
if pPerIoDat^.OprtType = RECV_POSTED then begin fmmain.ListBox1.Items.Add( pPerIoDat^.BufData.buf ); end;
Flags := 0; FillChar( pPerIoDat^.Overlap, sizeof(OVERLAPPED), 0 ); FillChar( pPerIoDat^.Buf[0], 4096, 0 ); pPerIoDat^.BufData.len := 4096; pPerIoDat^.BufData.buf := pPerIoDat^.Buf; pPerIoDat.OprtType := RECV_POSTED;
WSARecv( CompletKey, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil ); end;
//closesocket( CompletKey ); //Dispose( pPerIoDat ); end;
end. ---------------------------------------------------------------------------------- 上面的完成端口的例子可能太C++了,換一個(gè)更Delphi的:
新建一個(gè)線(xiàn)程類(lèi)TRecvSendThread ------------ thrRecvSend.pas
unit thrRecvSend;
interface
uses Windows, Classes, Winsock2;
type TRecvSendThread = class(TThread) public CompletPort : THandle; protected procedure Execute; override; end;
implementation
uses thrListen, frmMain;
{ TRecvSendThread }
procedure TRecvSendThread.Execute; var CompletKey, BytesTransd, BytesRecv, Flags : DWORD; pPerIoDat : PPER_OPERATION_DATA; begin while (not self.Terminated) do begin BytesTransd := 0; CompletKey := 0; GetQueuedCompletionStatus( CompletPort, BytesTransd, CompletKey, POVERLAPPED(pPerIoDat), TIME_OUT );
if ( BytesTransd = 0 ) and ( (pPerIoDat = nil) or (pPerIoDat^.OprtType = RECV_POSTED) or (pPerIoDat^.OprtType = SEND_POSTED) ) then begin closesocket( CompletKey ); Dispose( pPerIoDat ); continue; end;
if pPerIoDat^.OprtType = RECV_POSTED then begin fmmain.ListBox1.Items.Add( pPerIoDat^.BufData.buf ); end;
Flags := 0; FillChar( pPerIoDat^.Overlap, sizeof(OVERLAPPED), 0 ); FillChar( pPerIoDat^.Buf[0], 4096, 0 ); pPerIoDat^.BufData.len := 4096; pPerIoDat^.BufData.buf := pPerIoDat^.Buf; pPerIoDat.OprtType := RECV_POSTED;
WSARecv( CompletKey, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil ); end;
end;
end.
//------------------------------------------------------------- 原來(lái)的TListenThread也稍微修改一下。。。。。。。。。 unit thrListen;
interface
uses Windows, Classes, Winsock2, thrRecvSend;
const RECV_POSTED = 0; SEND_POSTED = 1; TIME_OUT = 110; BUFFER_SIZE = 4096;
type YPER_OPERATION_DATA = record Overlap : OVERLAPPED; BufData : WSABUF; Buf : Array [0..BUFFER_SIZE-1] of Char; OprtType : Integer; end; PPER_OPERATION_DATA = ^YPER_OPERATION_DATA;
YPER_HANDLE_DATA = record Sock : TSocket; Ip : Array [0..15] of Char; Port : DWORD; OprtType : Integer; end; PPER_HANDLE_DATA = ^YPER_HANDLE_DATA;
type TListenThread = class(TThread) private { Private declarations } FCompletPort : THandle; FListenSock : TSocket; function InitSocket: BOOL; protected procedure Execute; override; end;
implementation
uses frmMain;
{ TListenThread }
procedure TListenThread.Execute; var si : SYSTEM_INFO; i, len : Integer; AThread : TRecvSendThread; AConnect : TSocket; addr : TSockAddrIn; BytesRecv, Flags : DWORD; pPerIoDat : PPER_OPERATION_DATA; begin if not InitSocket() then Exit;
FCompletPort := CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0,0,0 ); if FCompletPort = 0 then begin MessageBox( 0, 'CreateIoCompletionPort failed.', 'Error', MB_OK ); Exit; end;
GetSystemInfo( si ); for i:=0 to si.dwNumberOfProcessors-1 do begin AThread := TRecvSendThread.Create( True ); AThread.CompletPort := FCompletPort; AThread.FreeOnTerminate := True; AThread.Resume; end;
while (not self.Terminated) do begin len := sizeof(addr); AConnect := accept( FListenSock, addr, len); if AConnect = INVALID_SOCKET then begin sleepex( TIME_OUT, false ); continue; end;
CreateIoCompletionPort( AConnect, FCompletPort, AConnect, 0 );
New( pPerIoDat );
FillChar( pPerIoDat^.Overlap, sizeof(OVERLAPPED), 0 ); FillChar( pPerIoDat^.Buf[0], BUFFER_SIZE, 0 ); pPerIoDat^.BufData.len := BUFFER_SIZE; pPerIoDat^.BufData.buf := pPerIoDat^.Buf; pPerIoDat.OprtType := RECV_POSTED;
Flags := 0; WSARecv( AConnect, @(pPerIoDat^.BufData), 1, BytesRecv, Flags, @(pPerIoDat^.Overlap), nil ); end;
PostQueuedCompletionStatus( FCompletPort, 0,0,nil ); CloseHandle( FCompletPort ); end;
function TListenThread.InitSocket: BOOL; var addr : TSockAddr; begin result := False;
FListenSock := socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if FListenSock = INVALID_SOCKET then begin MessageBox( 0, 'Call socket() failed.', 'Error', MB_ICONERROR ); Exit; end;
addr.sin_family := AF_INET; addr.sin_port := htons(LISTEN_PORT); addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind( FListenSock, @addr, sizeof(SOCKADDR) ) = SOCKET_ERROR then begin MessageBox( 0, 'Call bind failed.', 'Error', MB_ICONERROR ); Exit; end;
if listen( FListenSock, 5 ) = SOCKET_ERROR then begin MessageBox( 0, 'Call listen failed.', 'Error', MB_ICONERROR ); Exit; end;
result := True; end;
end.
呵呵,感覺(jué)明了多了~~~~~~~~~~```` |
呵呵,終于寫(xiě)完了,好累......以上所有的源代碼可以從這里看到:
http://community.csdn.net/Expert/topic/3844/3844679.xml?temp=5.836123E-02
posted on 2007-08-17 12:10
聶文龍 閱讀(1269)
評(píng)論(0) 編輯 收藏 引用 所屬分類(lèi):
net work