?
?????????????????????????????????????
用UDP實(shí)現(xiàn)可靠文件傳輸
大家都清楚,如果用TCP傳輸文件的話,是很簡單的,根本都不用操心會(huì)丟包,除非是網(wǎng)絡(luò)壞了,就得重來。用UDP的話,因?yàn)閁DP是不可靠的,所以用它傳輸文件,要保證不丟包,就得我們自己寫額外的代碼來保障了。本文就說說如果保證可靠傳輸。
???要實(shí)現(xiàn)無差錯(cuò)的傳輸數(shù)據(jù),我們可以采用重發(fā)請求(ARQ)協(xié)議,它又可分為連續(xù)ARQ協(xié)議、選擇重發(fā)ARQ協(xié)議、滑動(dòng)窗口協(xié)議。本文重點(diǎn)介紹滑動(dòng)窗口協(xié)議,其它的兩種有興趣的可參考相關(guān)的網(wǎng)絡(luò)通信之類的書。
????采用滑動(dòng)窗口協(xié)議,限制已發(fā)送出去但未被確認(rèn)的數(shù)據(jù)幀的數(shù)目。循環(huán)重復(fù)使用已收到的那些數(shù)據(jù)幀的序號。具體實(shí)現(xiàn)是在發(fā)送端和接收端分別設(shè)定發(fā)送窗口和接收窗口。
?(1)發(fā)送窗口
??發(fā)送窗口用來對發(fā)送端進(jìn)行流量控制。發(fā)送窗口的大小Wt代表在還沒有收到對方確認(rèn)的條件下,發(fā)送端最多可以發(fā)送的數(shù)據(jù)幀的個(gè)數(shù)。具體意思請參考下圖:
?(2)接收窗口
??接收窗口用來控制接收數(shù)據(jù)幀。只有當(dāng)接收到的數(shù)據(jù)幀的發(fā)送序號落在接收窗口內(nèi),才允許將該數(shù)據(jù)幀收下,否則一律丟棄。接收窗口的大小用Wr來表示,在連續(xù)ARQ協(xié)議中,Wr?=?1。接收窗口的意義可參考下圖:
??在接收窗口和發(fā)送窗口間存在著這樣的關(guān)系:接收窗口發(fā)生旋轉(zhuǎn)后,發(fā)送窗口才可能向前旋轉(zhuǎn),接收窗口保持不動(dòng)時(shí),發(fā)送窗口是不會(huì)旋轉(zhuǎn)的。這種收發(fā)窗口按如此規(guī)律順時(shí)鐘方向不斷旋轉(zhuǎn)的協(xié)議就犯法為滑動(dòng)窗口協(xié)議。
???好了,在上面對滑動(dòng)窗口協(xié)議有大致了解后,我們還是進(jìn)入正題吧:)
???發(fā)送端的發(fā)送線程:
???int??ret;
???int??nPacketCount?=?0;
???DWORD?dwRet;
???SendBuf?sendbuf;
???DWORD?dwRead;
???DWORD?dwReadSize;
???SendBuf*?pushbuf;
???//計(jì)算一共要讀的文件次數(shù),若文件已讀完,但客戶端沒有接收完,
???//則要發(fā)送的內(nèi)容不再從文件里讀取,而從m_bufqueue里提取
???nPacketCount?=?m_dwFileSize?/?sizeof(sendbuf.buf);
??//若不能整除,則應(yīng)加1
??if(m_dwFileSize?%?sizeof(sendbuf.buf)?!=?0)
?????++nPacketCount;
?SetEvent(m_hEvent);
?
??CHtime?htime;
??//若已發(fā)送大小小于文件大小并且發(fā)送窗口前沿等于后沿,則繼續(xù)發(fā)送
??//否則退出循環(huán)
?if(m_dwSend?
<
?m_dwFileSize
)??//?文件沒有傳輸完時(shí)才繼續(xù)傳輸
?{
??while(1)
??{
???dwRet?
=?WaitForSingleObject(m_hEvent,?
1000);
???if(dwRet?
==?
WAIT_FAILED)
???{
????return?false;
???}
???else?if(dwRet?
==?
WAIT_TIMEOUT)
???{
????//重發(fā)
????::EnterCriticalSection(&m_csQueue);??//?進(jìn)入m_bufqueue的排斥區(qū)
????ret?
=?m_hsocket.hsendto((char*)m_bufqueue.front(),?
sizeof(sendbuf));
????::LeaveCriticalSection(&m_csQueue);??//?退出m_bufqueue的排斥區(qū)
????if(ret?
==?
SOCKET_ERROR)
????{
?????cout?<<?"重發(fā)失敗,繼續(xù)重發(fā)"?<<?endl;
?????continue;
????}
???ResetEvent(m_hEvent);
?
????continue;
???}
??//若發(fā)送窗口大小?<?預(yù)定大小?&&?已讀文件次數(shù)(nReadIndex)?<?需要讀文件的次數(shù)(nReadCount),則繼續(xù)讀取發(fā)送
?
???//否則,要發(fā)送的內(nèi)容從m_bufqueue里提取
???if(m_dwSend?<?m_dwFileSize)
???{
????dwReadSize?
=?m_dwFileSize?
-?m_dwSend;
????dwReadSize?
=?dwReadSize?
<?MAXBUF_SIZE?????dwReadSize?:MAXBUF_SIZE;
????memset(sendbuf.buf,?0,?sizeof(sendbuf.buf));
????if(!ReadFile(m_hFile,?sendbuf.buf,?dwReadSize,?&dwRead,?NULL))
????{
?????//AfxMessageBox("讀取文件失敗,請確認(rèn)文件存在或有讀取權(quán)限.");
?????cout?<<?"讀取文件失敗,請確認(rèn)文件存在或有讀取權(quán)限."?<<?endl;
?????return?false;
????}
????m_dwSend?+
=?dwRead;
????
sendbuf.index?
=?m_nSendIndexHead;
????
//?發(fā)送窗口前沿向前移一格?????m_nSendIndexHead?
=?(m_nSendIndexHead?
+?1)?%???Sliding_Window_Size;?
??sendbuf.dwLen?
=?dwRead;
?
??
//保存發(fā)送過的數(shù)據(jù),以便重發(fā)
?
???::EnterCriticalSection(&m_csQueue);???//?進(jìn)入m_bufqueue的排斥區(qū)
???pushbuf?
=?GetBufFromLookaside();
???
memcpy(pushbuf,?&sendbuf,?sizeof(sendbuf));
???m_bufqueue.push(pushbuf);
???//?文件已讀完,在隊(duì)列中加一File_End標(biāo)志,以便判斷是否需要繼續(xù)發(fā)送????if(m_dwSend?
>
=?m_dwFileSize)????
???{
?????pushbuf?=?GetBufFromLookaside();
????pushbuf->index?=?File_End;
?
?????pushbuf->dwLen?=?File_End;
?????memset(pushbuf->buf,?0,?sizeof(pushbuf->buf));
?????m_bufqueue.push(pushbuf);
????}
????::LeaveCriticalSection(
&m_csQueue
);???//?退出m_bufqueue的排斥區(qū)
???}
???::EnterCriticalSection(
&m_csQueue
);????//?進(jìn)入m_bufqueue的排斥區(qū)
?
???//?所有數(shù)據(jù)包已發(fā)送完畢,退出循環(huán)????if(m_bufqueue.front()->index?==?File_End)??
???{
?????::LeaveCriticalSection(
&m_csQueue
);???//?退出m_bufqueue的排斥區(qū)
?????break;
???}
???//?發(fā)送窗口小于指定值,繼續(xù)發(fā)送????else?if(m_bufqueue.size()?
<
=?Send_Window_Size
)?
???{
????ret?
=?m_hsocket.hsendto((char*)m_bufqueue.front(),?
sizeof(sendbuf));
????if(ret?
==?
SOCKET_ERROR)
????{
?????::LeaveCriticalSection(&m_csQueue);??//?退出m_bufqueue的排斥區(qū)
?????cout?<<?"發(fā)送失敗,重發(fā)"?<<?endl;
?????continue;
????}
????//延時(shí),防止丟包
????Sleep(50);??
???}
???else???????????//?發(fā)送窗口大于指定值,等持接收線程接收確認(rèn)消息
???{
????ResetEvent(m_hEvent);
???}
???::LeaveCriticalSection(&m_csQueue);????//?退出m_bufqueue的排斥區(qū)
??}
?}
??發(fā)送端的接收線程:
??int?ret;
??RecvBuf?recvbuf;
??while(m_hFile?!
=?NULL)
??
{
????ret?
=?m_hsocket.hrecvfrom((char*)&recvbuf,?
sizeof(recvbuf));??
????if(ret?
==?
SOCKET_ERROR)
????{
??????//AfxMessageBox("接收確認(rèn)消息出錯(cuò)");
??????::EnterCriticalSection(&m_csQueue);
??????if(m_bufqueue.front()-
>
index?==?File_End)?//?文件傳輸完畢
??????{
?????????::LeaveCriticalSection(
&m_csQueue
);
?????????break;
???????}
???????::LeaveCriticalSection(
&m_csQueue
);
???????cout?
<
<?"接收確認(rèn)消息出錯(cuò):?
"?<<?GetLastError()?<<?endl;
???????return?false;
?????}
??if(recvbuf.flag?
==?
Flag_Ack?&&?recvbuf.index?
==????
m_nSendIndexTail)
??
???{
??????m_nSendIndexTail?
=?(m_nSendIndexTail?
+?1)?%?Sliding_Window_Size;
??????//該結(jié)點(diǎn)已得到確認(rèn),將其加入旁視列表,以備再用
??????::EnterCriticalSection(&m_csQueue);
??????m_bufLookaside.push(m_bufqueue.front());
??????m_bufqueue.pop();
?????::LeaveCriticalSection(&m_csQueue);
?????SetEvent(m_hEvent);
???}
?}
接收端的接收線程:
?int??ret;
?DWORD?dwWritten;
?SendBuf?recvbuf;
?RecvBuf?sendbuf;
?int?nerror?
=?0;
?
//?設(shè)置文件指針位置,指向上次已發(fā)送的大小
?SetFilePointer(m_hFile,?0,?NULL,?FILE_END);
?//若已接收文件大小小于需要接收的大小,則繼續(xù)
?while(m_dwSend?<?m_dwFileSize)
?{
??//接收
??memset(&recvbuf,?0,?sizeof(recvbuf));
??ret?
=?m_hsocket.hrecvfrom((char*)&recvbuf,?
sizeof(recvbuf));
??if(ret?
==?
SOCKET_ERROR)
??{
???return?false;
??}
??//不是希望接收的,丟棄,繼續(xù)接收
??if(recvbuf.index?!
=?(m_nRecvIndex)?
%?Sliding_Window_Size)
??{
???nerror++;
???cout?<<?recvbuf.index?<<?"error?"?<<?m_nRecvIndex?<<?endl;
???continue;
??}
??if(!WriteFile(m_hFile,?recvbuf.buf,?recvbuf.dwLen,?&dwWritten,?NULL))
??{
???//AfxMessageBox("寫入文件失敗");
???cout?<<?"寫入文件失敗"?<<?endl;
???return?false;
??}
??//已接收文件大小
??m_dwSend?+
=?dwWritten;
??
//發(fā)送確認(rèn)消息
??sendbuf.flag?
=?Flag_Ack;
??
sendbuf.index?
=?m_nRecvIndex;
??
ret?
=?m_hsocket.hsendto((char*)&sendbuf,?
sizeof(sendbuf));
??if(ret?
==?
SOCKET_ERROR)
??{
???return?false;
??}
??//接收窗口前移一格
??m_nRecvIndex?
=?(m_nRecvIndex?
+?1)?%?Sliding_Window_Size;
?}
?
?