簡述: Overlapped I/O也稱
Asynchronous
I/O,異步I/O模型。異步I/O和同步I/O不同,同步I/O時,程序被掛起,一直到I/O處理完,程序才能獲得控制。異步I/O,調用一個函數告訴
OS,進行I/O操作,不等I/O結束就立即返回,繼續程序執行,操作系統完成I/O之后,通知消息給你。Overlapped
I/O只是一種模型,它可以由內核對象(hand),事件內核對象(hEvent), 異步過程調用(apcs) 和完成端口(I/O
completion)實現。
Overlapped I/O的設計的目的:
取代多線程功能,(多線程存在同步機制,錯誤處理,在成千上萬個線程
I/O中,線程上下文切換是十分消耗CPU資源的)。
Overlapped I/O模型是
OS為你傳遞數據,完成上下文切換,在處理完之后通知你。由程序中的處理,變為OS的處理。內部也是用線程處理的。
Overlapped數據結構:
typedef struct _OVERLAPPED { // o
DWORD Internal; //通常被保留,當GetOverlappedResult()傳回False并且GatLastError()并非傳回ERROR_IO_PENDINO時,該狀態置為系統定的狀態。
DWORD InternalHigh; //通常被保留,當GetOverlappedResult()傳回False時,為被傳輸數據的長度。
DWORD Offset; //指定文件的位置,從該位置傳送數據,文件位置是相對文件開始處的字節偏移量。調用 ReadFile或WriteFile函數之前調用進程設置這個成員,讀寫命名管道及通信設備時調用進程忽略這個成員;
DWORD OffsetHigh; //指定開始傳送數據的字節偏移量的高位字,讀寫命名管道及通信設備時調用進程忽略這個成員;
HANDLE hEvent; //標識事件,數據傳送完成時把它設為信號狀態,調用ReadFil,eWriteFile,ConnectNamedPipe TransactNamedPipe函數前,調用進程設置這個成員. 相關函數CreateEvent ResetEvent GetOverlappedResult WaitForSingleObject CWinThread GetLastError
} OVERLAPPED, *LPOVERLAPPED;
二個重要功能:1.
標識每個正在
overlapped 的操作。
2. 程序和系統之間提供了共享區域。參數可以在區域內雙向傳遞。
OVERLAPPED和數據緩沖區釋放問題:
在請求時,不能釋放,只有在
I/O請求完成之后,才可以釋放。如果發出多個overlapped請求,每個overlapped讀寫操作,都必須包含文件位置(socket),另外,如果有多個磁盤,I/O執行次序無法保證。(每個overlapped都是獨立的請求操作)。
內核對象(hand)實現:
例子:用
overlapped模型讀一個磁盤文件內容。
1.把設備句柄看作同步對象,
ReadFile將設備句柄設為無信號。ReadFile 異步I/O字節位置必須在OVERLAPPED結構中指定。
2.完成
I/O,設置信息狀態。為有信號。
3.WaitForSingleObject或WaitForMultipleObject判斷或者異步設備調用
GetOverLappedResult函數。
int main()
{
BOOL rc;
HANDLE hFile;
DWORD numread;
OVERLAPPED overlap;
char buf[READ_SIZE];
char szPath[MAX_PATH];
CheckOsVersion();
GetWindowsDirectory(szPath, sizeof(szPath));
strcat(szPath, "\\WINHLP32.EXE");
hFile = CreateFile( szPath,
GENERIC_READ,
FILE_SHARE_READ|FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL
);
if (hFile == INVALID_HANDLE_VALUE)
{
printf("Could not open %s\n", szPath);
return -1;
}
memset(&overlap, 0, sizeof(overlap));
overlap.Offset = 1500;
rc = ReadFile(
hFile,
buf,
READ_SIZE,
&numread,
&overlap
);
printf("Issued read request\n");
if (rc)
{
printf("Request was returned immediately\n");
}
else
{
if (GetLastError() == ERROR_IO_PENDING)
{
printf("Request queued, waiting
\n");
WaitForSingleObject(hFile, INFINITE);
printf("Request completed.\n");
rc = GetOverlappedResult(
hFile,
&overlap,
&numread,
FALSE
);
printf("Result was %d\n", rc);
}
else
{
printf("Error reading file\n");
}
}
CloseHandle(hFile);
return EXIT_SUCCESS;
}
事件內核對象(hEvent):
內核對象(hand)實現的問題:
不能區分那一個
overlapped操作,對同一個文件handle,系統有多個異步操作時(一邊讀文件頭,一邊寫文件尾, 有一個完成,就會有信號,不能區分是那種操作。),為每個進行中的overlapped調用GetOverlappedResult是不好的作法。
事件內核對象(hEvent)實現方案: Overlapped成員
hEven標識事件內核對象。CreateEvent,為每個請求創建一個事件,初始化每個請求的hEvent成員(對同一文件多個讀寫請求,每個操作綁定一個event對象)。調用WaitForMultipleObject來等等其中一個(或全部)完成。
另外
Event對象必須是手動重置。使用自動重置(在等待event之前設置,WaitForSingleObject()和 WaitForMultipleObjects()函數永不返回)。
自動重置事件
WaitForSingleObject()和 WaitForMultipleObjects()會等待事件到信號狀態,隨后又自動將其重置為非信號狀態,這樣保證了等待此事件的線程中只有一個會被喚醒。
手動重置事件
需要用戶調用ResetEvent()才會重置事件。可能有若干個線程在等待同一事件,
這樣當事件變為信號狀態時,所有等待線程都可以運行了。
SetEvent()函數用來把事件對象設置成信號狀態,ResetEvent()把事件對象重置成非信號狀態,兩者均需事件對象句柄作參數。
相關例子如下:
int main()
{
BOOL rc;
HANDLE hFile;
DWORD numread;
OVERLAPPED overlap;
char buf[READ_SIZE];
char szPath[MAX_PATH];
CheckOsVersion();
GetWindowsDirectory(szPath, sizeof(szPath));
strcat(szPath, "\\WINHLP32.EXE");
hFile = CreateFile( szPath,
GENERIC_READ,
FILE_SHARE_READ|FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL
);
if (hFile == INVALID_HANDLE_VALUE)
{
printf("Could not open %s\n", szPath);
return -1;
}
memset(&overlap, 0, sizeof(overlap));
overlap.Offset = 1500;
rc = ReadFile(
hFile,
buf,
READ_SIZE,
&numread,
&overlap
);
printf("Issued read request\n");
if (rc)
{
printf("Request was returned immediately\n");
}
else
{
if (GetLastError() == ERROR_IO_PENDING)
{
printf("Request queued, waiting
\n");
WaitForSingleObject(hFile, INFINITE);
printf("Request completed.\n");
rc = GetOverlappedResult(
hFile,
&overlap,
&numread,
FALSE
);
printf("Result was %d\n", rc);
}
else
{
printf("Error reading file\n");
}
}
CloseHandle(hFile);
return EXIT_SUCCESS;
}
異步過程調用(apcs):
事件內核對象(hEvent)的問題:
事件內核對象在使用
WaitForMultipleObjects時,只能等待64個對象。需要另建兩個數據組,并gOverlapped[nIndex].hEvent = ghEvents[nIndex]綁定起來。
異步過程調用(apcs)實現方案:
異步過程調用,
callback回調函數,在一個Overlapped I/O完成之后,系統調用該回調函數。OS在有信號狀態下(設備句柄),才會調用回調函數(可能有很多APCS等待處理了),傳給它完成I/O請求的錯誤碼,傳輸字節數和Overlapped結構的地址。
五個函數可以設置信號狀態:
1.SleepEx
2.
WaitForSingleObjectEx
3.
WaitForMultipleObjectEx
4.SingalObjectAndWait
5.
MsgWaitForMultipleObjectsEx
Main函數調用
WaitForSingleObjectEx, APCS被處理,調用回調函數FileIOCompletionRoutine
VOID WINAPI FileIOCompletionRoutine(
DWORD dwErrorCode, // completion code
DWORD dwNumberOfBytesTransfered, // number of bytes transferred
LPOVERLAPPED lpOverlapped // pointer to structure with I/O information
)
{
int nIndex = (int)(lpOverlapped->hEvent);
printf("Read #%d returned %d. %d bytes were read.\n",
nIndex,
dwErrorCode,
dwNumberOfBytesTransfered);
if (++nCompletionCount == MAX_REQUESTS)
SetEvent(ghEvent); // Cause the wait to terminate
}
int main()
{
int i;
char szPath[MAX_PATH];
CheckOsVersion();
MTVERIFY(
ghEvent = CreateEvent(
NULL, // No security
TRUE, // Manual reset - extremely important!
FALSE, // Initially set Event to non-signaled state
NULL // No name
)
);
GetWindowsDirectory(szPath, sizeof(szPath));
strcat(szPath, "\\WINHLP32.EXE");
ghFile = CreateFile( szPath,
GENERIC_READ,
FILE_SHARE_READ|FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL
);
if (ghFile == INVALID_HANDLE_VALUE)
{
printf("Could not open %s\n", szPath);
return -1;
}
for (i=0; i<MAX_REQUESTS; i++)
{
QueueRequest(i, i*16384, READ_SIZE);
}
printf("QUEUED!!\n");
for (;;)
{
DWORD rc;
rc = WaitForSingleObjectEx(ghEvent, INFINITE, TRUE );
if (rc == WAIT_OBJECT_0)
break;
MTVERIFY(rc == WAIT_IO_COMPLETION);
}
CloseHandle(ghFile);
return EXIT_SUCCESS;
}
int QueueRequest(int nIndex, DWORD dwLocation, DWORD dwAmount)
{
int i;
BOOL rc;
DWORD err;
gOverlapped[nIndex].hEvent = (HANDLE)nIndex;
gOverlapped[nIndex].Offset = dwLocation;
for (i=0; i<MAX_TRY_COUNT; i++)
{
rc = ReadFileEx(
ghFile,
gBuffers[nIndex],
dwAmount,
&gOverlapped[nIndex],
FileIOCompletionRoutine
);
if (rc)
{
printf("Read #%d queued for overlapped I/O.\n", nIndex);
return TRUE;
}
err = GetLastError();
if ( err == ERROR_INVALID_USER_BUFFER ||
err == ERROR_NOT_ENOUGH_QUOTA ||
err == ERROR_NOT_ENOUGH_MEMORY )
{
Sleep(50); // Wait around and try later
continue;
}
break;
}
printf("ReadFileEx failed.\n");
return -1;
}
完成端口(I/O completion):
異步過程調用
(apcs)問題:
只有發
overlapped請求的線程才可以提供callback函數(需要一個特定的線程為一個特定的I/O請求服務)。
完成端口
(I/O completion)的優點:
不會限制
handle個數,可處理成千上萬個連接。I/O completion port允許一個線程將一個請求暫時保存下來,由另一個線程為它做實際服務。
并發模型與線程池:
在典型的并發模型中,服務器為每一個客戶端創建一個線程,如果很多客戶同時請求,則這些線程都是運行的,那么
CPU就要一個個切換,CPU花費了更多的時間在線程切換,線程確沒得到很多CPU時間。到底應該創建多少個線程比較合適呢,微軟件幫助文檔上講應該是2*CPU個。但理想條件下最好線程不要切換,而又能象線程池一樣,重復利用。I/O完成端口就是使用了線程池。
理解與使用:
第一步:
在我們使用完成端口之前,要調用
CreateIoCompletionPort函數先創建完成端口對象。定義如下:
HANDLE CreateIoCompletionPort(
HANDLE FileHandle,
HANDLE ExistingCompletionPort,
DWORD CompletionKey,
DWORD NumberOfConcurrentThreads
);
FileHandle:
文件或設備的
handle, 如果值為INVALID_HANDLE_VALUE則產生一個沒有和任何文件handle有關系的port.( 可以用來和完成端口聯系的各種句柄,文件,套接字
)
ExistingCompletionPort:
NULL時生成一個新
port, 否則handle會加到此port上。
CompletionKey:
用戶自定義數值,被交給服務的線程。GetQueuedCompletionStatus函數時我們可以完全得到我們在此聯系函數中的完成鍵(申請的內存塊)。在
GetQueuedCompletionStatus中可以完封不動的得到這個內存塊,并且使用它。
NumberOfConcurrentThreads:
參數
NumberOfConcurrentThreads用來指定在一個完成端口上可以并發的線程數量。理想的情況是,一個處理器上只運行一
個線程,這樣可以避免線程上下文切換的開銷。如果這個參數的值為0,那就是告訴系統線程數與處理器數相同。我們可以用下面的代碼來創建I/O完成端口。
隱藏在之創建完成端口的秘密:
1.創建一個完成端口
CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, dwNumberOfConcurrentThreads);
2.設備列表,完成端口把它同一個或多個設備相關聯。
CreateIoCompletionPort(hDevice, hCompPort, dwCompKey, 0) ;
第二步:
根據處理器個數,創建
cpu*2個工作線程:CreateThread(NULL, 0, ServerWorkerThread, CompletionPort,0, &ThreadID))與此同時,服務器調用
WSASocket,bind, listen, WSAAccept,之后,調用CreateIoCompletionPort((HANDLE) Accept, CompletionPort... )把一個套接字句柄和一個完成端口綁定到一起。完成端口又同一個或多個設備相關聯著,所以以套接字為基礎,投遞發送和請求,對
I/O處理。接著,可以依賴完成端口,接收有關I/O操作完成情況的通知。再看程序里:WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,&(PerIoData->Overlapped), NULL)開始調用,這里象前面講過的一樣,既然是異步
I/O,所以WSASend和WSARecv的調用會立即返回。
系統處理:
當一個設備的異步
I/O請求完成之后,系統會檢查該設備是否關聯了一個完成端口,如果是,系統就向該完成端口的I/O完成隊列中加入完成的I/O請求列。
然后我們需要從這個完成隊列中,取出調用后的結果
(需要通過一個Overlapped結構來接收調用的結果)。怎么知道這個隊列中已經有處理后的結果呢,調用GetQueuedCompletionStatus函數。
工作線程與完成端口: 和異步過程調用不同
(在一個
Overlapped I/O完成之后,系統調用該回調函數。OS在有信號狀態下(設備句柄),才會調用回調函數(可能有很多APCS等待處理了))
GetQueuedCompletionStatus在工作線程內調用
GetQueuedCompletionStatus函數。
GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytesTransferred,
LPDWORD lpCompletionKey,
LPOVERLAPPED *lpOverlapped,
DWORD dwMilliseconds
);
CompletionPort:指出了線程要監視哪一個完成端口。很多服務應用程序只是使用一個I/O完成端口,所有的I/O請求完成以后的通知都將發給該端口。
lpNumberOfBytesTransferred:傳輸的數據字節數
lpCompletionKey:
完成端口的單句柄數據指針,這個指針將可以得到我們在CreateIoCompletionPort中申請那片內存。
lpOverlapped:
重疊I/O請求結構,這個結構同樣是指向我們在重疊請求時所申請的內存塊,同時和lpCompletionKey,一樣我們也可以利用這個內存塊來存儲我們要保存的任意數據。
dwMilliseconds:
等待的最長時間(毫秒),如果超時,lpOverlapped被設為NULL,函數返回False.
GetQueuedCompletionStatus功能及隱藏的秘密:
GetQueuedCompletionStatus使調用線程掛起,直到指定的端口的I/O完成隊列中出現了一項或直到超時。(I/0完成隊列中出現了記錄)調用GetQueuedCompletionStatus時,調用線程的ID(cpu*2個線程,每個ServerWorkerThread的線程ID)就被放入該等待線程隊列中。
等待線程隊列很簡單,只是保存了這些線程的ID。完成端口會按照后進先出的原則將一個線程隊列的ID放入到釋放線程列表中。
這樣,I/O完成端口內核對象就知道哪些線程正在等待處理完成的I/O請求。當端口的I/O完成隊列出現一項時,完成端口就喚醒(睡眠狀態中變為可調度狀態)等待線程隊列中的一個線程。線程將得到完成I/O項中的信息:傳輸的字節數,完成鍵(單句柄數據結構)和Overlapped結構地址,線程是通過GetQueuedCompletionStatus返回這些信息,等待CPU的調度。
GetQueuedCompletionStatus返回可能有多種原因,如果傳遞無效完成端口句柄,函數返回False,GetLastError返回一個錯誤(ERROR_INVALID_HANDLE),如果超時,返回False, GetLastError返回WAIT_TIMEOUT, i/o完成隊列刪除一項,該表項是一個成功完成的I/O請求,則返回True。
調用GetQueuedCompletionStatus的線程是后進先出的方式喚醒的,比如有4個線程等待,如果有一個I/O,最后一個調用GetQueuedCompletionStatus的線程被喚醒來處理。處理完之后,再調用 GetQueuedCompletionStatus進入等待線程隊列中。
深入分析完成端口線程池調度原理:
假設我們運行在2CPU的機器上。創建完成端口時指定2個并發,創建了4個工作線程加入線程池中等待完成I/O請求,且完成端口隊列(先入先出)中有3個完成I/O的請求的情況:
工作線程運行, 創建了4個工作線程,調用GetQueuedCompletionStatus時,該調用線程就進入了睡眠狀態,假設這個時候,I/O完成隊列出現了三項,調用線程的ID就被放入該等待線程隊列中,
(如圖):
等待線程隊列(先進后出)
進隊列
出隊列
線程A
線程B
線程C
線程D I/O完成端口內核對象(第3個參數等級線程隊列),因此知道哪些線程正在等待處理完成的I/O請求。當端口的I/O完成隊列出現一項時,完成端口就喚醒(睡眠狀態中變為可調度狀態)等待線程隊列中的一個線程(前面講過等待線程隊列是后進先出)。所以線程D將得到完成I/O項中的信息:傳輸的字節數,完成鍵(單句柄數據結構)和Overlapped結構地址,線程是通過GetQueuedCompletionStatus返回這些信息。
在前面我們指定了并發線程的數目是2,所以I/O完成端口喚醒2個線程,線程D和線程C,另兩個繼續休眠(線程B,線程A),直到線程D處理完了,發現表項里還有要處理的,就喚醒同一線程繼續處理。
等待線程隊列(后進先出
)
進隊列
出隊列
線程A
線程B
釋放線程隊列
線程C
線程D
線程并發量:
并發量限制了與該完成端口相關聯的可運行線程的數目, 它類似閥門的作用。 當與該完成端口相關聯的可運行線程的總數目達到了該并發量,系統就會阻塞任何與該完成端口相關聯的后續線程的執行, 直到與該完成端口相關聯的可運行線程數目下降到小于該并發量為止。所以解釋了線程池中的運行線程可能會比設置的并發線程多的原因。
它的作用:
最有效的假想是發生在有完成包在隊列中等待,而沒有等待被滿足,因為此時完成端口達到了其并發量的極限。此時,一個正在運行中的線程調用 GetQueuedCompletionStatus時,它就會立刻從隊列中取走該完成包。這樣就不存在著環境的切換,因為該處于運行中的線程就會連續不斷地從隊列中取走完成包,而其他的線程就不能運行了。
注意:如果池中的所有線程都在忙,客戶請求就可能拒絕,所以要適當調整這個參數,獲得最佳性能。
線程并發:D線程掛起,加入暫停線程,醒來后又加入釋放線程隊列。
線程C
線程B
線程A
出隊列
進隊列
等待的線程隊列(后進先出)
釋放線程隊列
暫停線程
線程D
線程的安全退出:
PostQueudCompletionStatus函數,我們可以用它發送一個自定義的包含了OVERLAPPED成員變量的結構地址,里面包含一個狀態變量,當狀態變量為退出標志時,線程就執行清除動作然后退出。
完成端口使用需要注意的地方:
1.在執行wsasend和wsarecv操作前,請先將overlapped結構體使用memset進行清零。