轉(zhuǎn)載自:http://blog.csdn.net/leehark/article/details/7671462
PseudoTcp對成塊數(shù)據(jù)流的處理
上一篇談?wù)摿薚CP和PTCP對交互數(shù)據(jù)流的處理方法。這一篇談?wù)摿硪粋€(gè)數(shù)據(jù)流--成塊數(shù)據(jù)流。成塊數(shù)據(jù)流主要采用滑動(dòng)窗口協(xié)議和慢啟動(dòng)算法來控制成塊數(shù)據(jù)的流量。
滑動(dòng)窗口
滑動(dòng)窗口允許發(fā)送方在停止并等待確認(rèn)前可以連續(xù)發(fā)送多個(gè)分組。因此發(fā)送方不必每發(fā)一個(gè)就停下來等待,這樣可以加速數(shù)據(jù)的傳輸。這個(gè)Nagle算法沖突么?不會(huì),因?yàn)槌蓧K數(shù)據(jù)流的分組都是滿載傳輸?shù)?,根?jù)Nagle算法,當(dāng)?shù)却l(fā)送數(shù)據(jù)的大小和窗口大小都大于MSS時(shí),會(huì)立即發(fā)送。
如果發(fā)送方一直傳輸數(shù)據(jù)會(huì)出現(xiàn)經(jīng)常丟包的現(xiàn)象,特別是快的發(fā)送方發(fā)給慢的接收方。當(dāng)接收方還沒有處理數(shù)據(jù),發(fā)送方就接連發(fā)來了數(shù)據(jù)會(huì)填滿接收方的緩沖區(qū),從而后續(xù)的數(shù)據(jù)將被丟棄,為了減少網(wǎng)絡(luò)上丟包的次數(shù),用一種機(jī)制來限制發(fā)送方傳輸數(shù)據(jù)。
因此出現(xiàn)了滑動(dòng)窗口,如下圖:

滑動(dòng)窗口分為4個(gè)部分:
上圖1~3為發(fā)送并確認(rèn)的數(shù)據(jù)段
上圖4~6為已經(jīng)發(fā)送,但是沒有被確認(rèn)的數(shù)據(jù)段
上圖7~9為可用的窗口,即滑動(dòng)窗口,發(fā)送方還可以發(fā)送的數(shù)據(jù)段空間
上圖10以上為不能夠發(fā)送。

當(dāng)接收方確認(rèn)數(shù)據(jù)后,滑動(dòng)窗口兩邊不斷的向右移動(dòng)。
窗口合攏:當(dāng)發(fā)送方發(fā)送數(shù)據(jù)并等待確認(rèn)時(shí),滑動(dòng)窗口的左邊向右移動(dòng)。
窗口張開:當(dāng)接收方收到數(shù)據(jù)并確認(rèn)且釋放緩沖區(qū)數(shù)據(jù)時(shí),右邊向右移動(dòng)。
窗口收縮:當(dāng)接收方的緩沖區(qū)大小變小時(shí),右邊向左移動(dòng),但不建議使用這種方式。
滑動(dòng)窗口時(shí)通過窗口大小來更新。當(dāng)接收方收到數(shù)據(jù)后,重新計(jì)算接收緩沖區(qū)的大小,并通告發(fā)送方。如果通告窗口大小為0,則發(fā)送方不能再發(fā)送數(shù)據(jù),等到窗口大小為非0,這樣可以有效的避免因接收方緩沖區(qū)滿導(dǎo)致的分組的丟失。
那么PTCP是怎么實(shí)現(xiàn)的呢?
PTCP通過m_rbuf_len來標(biāo)示接收緩沖區(qū)大小。如果緩沖區(qū)大小小于65536時(shí),m_rwnd_scale為0,m_rcv_wnd標(biāo)示窗口大小,而大于65535時(shí),通過如下算法來調(diào)整m_rbuf_len和m_rwnd_scale。調(diào)整后根據(jù)緩沖區(qū)中可用空間來更新窗口大小m_rcv_wnd 。為什么選擇65535為界限呢?因?yàn)樵?/span>PTCP的頭部中window字段的長度為16個(gè)bit,只能支持窗口打小范圍0~65535(包含65535)。
- void
- PseudoTcp::resizeReceiveBuffer(uint32 new_size) {
- uint8 scale_factor = 0;
-
- while (new_size > 0xFFFF) {
- ++scale_factor;
- new_size >>= 1;
- }
- new_size <<= scale_factor;
- bool result = m_rbuf.SetCapacity(new_size);
- m_rbuf_len = new_size;
- m_rwnd_scale = scale_factor;
- m_ssthresh = new_size;
- size_t available_space = 0;
- m_rbuf.GetWriteRemaining(&available_space);
- m_rcv_wnd = available_space;
- }
當(dāng)PTCP三次握手時(shí),通過PTCP選項(xiàng)TCP_OPT_WND_SCALE來通告對方m_rwnd_scale的大小。
- void
- PseudoTcp::queueConnectMessage() {
- talk_base::ByteBuffer buf(talk_base::ByteBuffer::ORDER_NETWORK);
- buf.WriteUInt8(CTL_CONNECT);
- if (m_support_wnd_scale) {
- buf.WriteUInt8(TCP_OPT_WND_SCALE);
- buf.WriteUInt8(1);
- buf.WriteUInt8(m_rwnd_scale);
- }
- m_snd_wnd = buf.Length();
- queue(buf.Data(), buf.Length(), true);
- }
PTCP接收窗口擴(kuò)大因子對應(yīng)的控制包之后,通過parseOptions方法來解析此包如下:
- void
- PseudoTcp::parseOptions(const char* data, uint32 len) {
- std::set<uint8> options_specified;
- talk_base::ByteBuffer buf(data, len);
- while (buf.Length()) {
- uint8 kind = TCP_OPT_EOL;
- buf.ReadUInt8(&kind);
- if (kind == TCP_OPT_EOL) {
- break;
- } else if (kind == TCP_OPT_NOOP) {
- continue;
- }
- UNUSED(len);
- uint8 opt_len = 0;
- buf.ReadUInt8(&opt_len);
- if (opt_len <= buf.Length()) {
- applyOption(kind, buf.Data(), opt_len);
- buf.Consume(opt_len);
- } else {
- return;
- }
- options_specified.insert(kind);
- }
- if (options_specified.find(TCP_OPT_WND_SCALE) == options_specified.end()) {
- if (m_rwnd_scale > 0) {
- resizeReceiveBuffer(DEFAULT_RCV_BUF_SIZE);
- m_swnd_scale = 0;
- }
- }
- }
接收方調(diào)整窗口大小,如下:
窗口合攏:當(dāng)接收方收到數(shù)據(jù)時(shí),會(huì)從窗口大小里減去把接收緩沖區(qū)消耗的數(shù)據(jù)大小。
- bool PseudoTcp::process(Segment& seg) {
- ......
- uint32 nOffset = seg.seq - m_rcv_nxt;
- talk_base::StreamResult result = m_rbuf.WriteOffset(seg.data, seg.len,
- nOffset, NULL);
- ASSERT(result == talk_base::SR_SUCCESS);
- UNUSED(result);
- if (seg.seq == m_rcv_nxt) {
- m_rbuf.ConsumeWriteBuffer(seg.len);
- m_rcv_nxt += seg.len;
- m_rcv_wnd -= seg.len;
- bNewData = true;
- RList::iterator it = m_rlist.begin();
- while ((it != m_rlist.end()) && (it->seq <= m_rcv_nxt)) {
- if (it->seq + it->len > m_rcv_nxt) {
- sflags = sfImmediateAck;
- uint32 nAdjust = (it->seq + it->len) - m_rcv_nxt;
- m_rbuf.ConsumeWriteBuffer(nAdjust);
- m_rcv_nxt += nAdjust;
- m_rcv_wnd -= nAdjust;
- }
- it = m_rlist.erase(it);
- }
- } else {
- RSegment rseg;
- rseg.seq = seg.seq;
- rseg.len = seg.len;
- RList::iterator it = m_rlist.begin();
- while ((it != m_rlist.end()) && (it->seq < rseg.seq)) {
- ++it;
- }
- m_rlist.insert(it, rseg);
- }
- ......
- }
窗口張開:當(dāng)應(yīng)用層調(diào)用Recv來獲取PTCP接收的數(shù)據(jù)時(shí),PTCP會(huì)把此部分?jǐn)?shù)據(jù)清除,騰空緩沖區(qū)并擴(kuò)大窗口大小。
- int PseudoTcp::Recv(char* buffer, size_t len) {
- ......
- talk_base::StreamResult result = m_rbuf.Read(buffer, len, &read, NULL);
- ......
- size_t available_space = 0;
- m_rbuf.GetWriteRemaining(&available_space);
- if (uint32(available_space) - m_rcv_wnd >=
- talk_base::_min<uint32>(m_rbuf_len / 2, m_mss)) {
- bool bWasClosed = (m_rcv_wnd == 0);
- m_rcv_wnd = available_space;
- if (bWasClosed) {
- attemptSend(sfImmediateAck);
- }
- }
- return read;
- }
通告窗口大小給對方:
- IPseudoTcpNotify::WriteResult PseudoTcp::packet(uint32 seq, uint8 flags,
- uint32 offset, uint32 len) {
- ASSERT(HEADER_SIZE + len <= MAX_PACKET);
- uint32 now = Now();
- uint8 buffer[MAX_PACKET];
- long_to_bytes(m_conv, buffer);
- long_to_bytes(seq, buffer + 4);
- long_to_bytes(m_rcv_nxt, buffer + 8);
- buffer[12] = 0;
- buffer[13] = flags;
- short_to_bytes(static_cast<uint16>(m_rcv_wnd >> m_rwnd_scale), buffer + 14);
- ......
- }
當(dāng)發(fā)送方收到接收方發(fā)送的窗口大小后,可發(fā)送大小計(jì)算為窗口大小減去已經(jīng)發(fā)送但未被確認(rèn)的數(shù)據(jù)大小。
- void PseudoTcp::attemptSend(SendFlags sflags) {
- ......
- uint32 nWindow = talk_base::_min(m_snd_wnd, cwnd);
- uint32 nInFlight = m_snd_nxt - m_snd_una;
- uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
- ......
- }
慢啟動(dòng)
當(dāng)接收方和發(fā)送方之間存在多個(gè)路由器和速率較慢的鏈路時(shí),一些中間的路由器必須緩存分組。一開始發(fā)送方向接收方發(fā)送多個(gè)分組,可能會(huì)把緩存填滿,這會(huì)嚴(yán)重降低TCP的吞吐量。
TCP通過慢啟動(dòng)算法解決上述問題:首先設(shè)置擁塞窗口cwnd為1,當(dāng)發(fā)送方每收到一個(gè)ACK擁塞窗口加1個(gè)報(bào)文段。發(fā)送方取擁塞窗口和通告窗口的最小值為發(fā)送上限。擁塞窗口是發(fā)送方使用的流量控制,而通告窗口時(shí)接收方使用的流量控制。
發(fā)送方首先發(fā)送一個(gè)報(bào)文段,當(dāng)收到ACK時(shí),cwnd變?yōu)?/span>2,可以發(fā)送2個(gè)報(bào)文段,當(dāng)收到2個(gè)ACK時(shí)cwnd變?yōu)?/span>4,發(fā)送方可以發(fā)送4個(gè)報(bào)文段,依次類推,慢啟動(dòng)算法是指數(shù)增長的。
PTCP實(shí)現(xiàn)慢啟動(dòng)算法如下:
Cwnd初始值為2個(gè)MSS,當(dāng)收到ACK時(shí)cwnd增加一個(gè)MSS。
- Bool PseudoTcp::process(Segment& seg) {
- ......
-
- if ((seg.ack > m_snd_una) && (seg.ack <= m_snd_nxt)) {
- if (m_dup_acks >= 3) {
- ......
- }else{
- m_dup_acks = 0;
-
- if (m_cwnd < m_ssthresh) {
- m_cwnd += m_mss;
- } else {
- m_cwnd += talk_base::_max<uint32>(1, m_mss * m_mss / m_cwnd);
- }
- }
- }
- ......
- }
當(dāng)發(fā)送方發(fā)送數(shù)據(jù)時(shí),取窗口大小為通告窗口(m_snd_wnd)和擁塞窗口(cwnd)的最小值,然后減去已經(jīng)發(fā)送的未被確認(rèn)的大小為當(dāng)前可發(fā)送數(shù)據(jù)大?。?/span>nUseable )。
- void PseudoTcp::attemptSend(SendFlags sflags) {
- ......
- while (true) {
- uint32 cwnd = m_cwnd;
- if ((m_dup_acks == 1) || (m_dup_acks == 2)) {
- cwnd += m_dup_acks * m_mss;
- }
- uint32 nWindow = talk_base::_min(m_snd_wnd, cwnd);
- uint32 nInFlight = m_snd_nxt - m_snd_una;
- uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
- size_t snd_buffered = 0;
- m_sbuf.GetBuffered(&snd_buffered);
- uint32 nAvailable =
- talk_base::_min(static_cast<uint32>(snd_buffered) - nInFlight, m_mss);
- ......
- }