轉載自:http://blog.csdn.net/leehark/article/details/7671462
PseudoTcp對成塊數據流的處理
上一篇談論了TCP和PTCP對交互數據流的處理方法。這一篇談論另一個數據流--成塊數據流。成塊數據流主要采用滑動窗口協議和慢啟動算法來控制成塊數據的流量。
滑動窗口
滑動窗口允許發送方在停止并等待確認前可以連續發送多個分組。因此發送方不必每發一個就停下來等待,這樣可以加速數據的傳輸。這個Nagle算法沖突么?不會,因為成塊數據流的分組都是滿載傳輸的,根據Nagle算法,當等待發送數據的大小和窗口大小都大于MSS時,會立即發送。
如果發送方一直傳輸數據會出現經常丟包的現象,特別是快的發送方發給慢的接收方。當接收方還沒有處理數據,發送方就接連發來了數據會填滿接收方的緩沖區,從而后續的數據將被丟棄,為了減少網絡上丟包的次數,用一種機制來限制發送方傳輸數據。
因此出現了滑動窗口,如下圖:

滑動窗口分為4個部分:
上圖1~3為發送并確認的數據段
上圖4~6為已經發送,但是沒有被確認的數據段
上圖7~9為可用的窗口,即滑動窗口,發送方還可以發送的數據段空間
上圖10以上為不能夠發送。

當接收方確認數據后,滑動窗口兩邊不斷的向右移動。
窗口合攏:當發送方發送數據并等待確認時,滑動窗口的左邊向右移動。
窗口張開:當接收方收到數據并確認且釋放緩沖區數據時,右邊向右移動。
窗口收縮:當接收方的緩沖區大小變小時,右邊向左移動,但不建議使用這種方式。
滑動窗口時通過窗口大小來更新。當接收方收到數據后,重新計算接收緩沖區的大小,并通告發送方。如果通告窗口大小為0,則發送方不能再發送數據,等到窗口大小為非0,這樣可以有效的避免因接收方緩沖區滿導致的分組的丟失。
那么PTCP是怎么實現的呢?
PTCP通過m_rbuf_len來標示接收緩沖區大小。如果緩沖區大小小于65536時,m_rwnd_scale為0,m_rcv_wnd標示窗口大小,而大于65535時,通過如下算法來調整m_rbuf_len和m_rwnd_scale。調整后根據緩沖區中可用空間來更新窗口大小m_rcv_wnd 。為什么選擇65535為界限呢?因為在PTCP的頭部中window字段的長度為16個bit,只能支持窗口打小范圍0~65535(包含65535)。
- void
- PseudoTcp::resizeReceiveBuffer(uint32 new_size) {
- uint8 scale_factor = 0;
-
- while (new_size > 0xFFFF) {
- ++scale_factor;
- new_size >>= 1;
- }
- new_size <<= scale_factor;
- bool result = m_rbuf.SetCapacity(new_size);
- m_rbuf_len = new_size;
- m_rwnd_scale = scale_factor;
- m_ssthresh = new_size;
- size_t available_space = 0;
- m_rbuf.GetWriteRemaining(&available_space);
- m_rcv_wnd = available_space;
- }
當PTCP三次握手時,通過PTCP選項TCP_OPT_WND_SCALE來通告對方m_rwnd_scale的大小。
- void
- PseudoTcp::queueConnectMessage() {
- talk_base::ByteBuffer buf(talk_base::ByteBuffer::ORDER_NETWORK);
- buf.WriteUInt8(CTL_CONNECT);
- if (m_support_wnd_scale) {
- buf.WriteUInt8(TCP_OPT_WND_SCALE);
- buf.WriteUInt8(1);
- buf.WriteUInt8(m_rwnd_scale);
- }
- m_snd_wnd = buf.Length();
- queue(buf.Data(), buf.Length(), true);
- }
PTCP接收窗口擴大因子對應的控制包之后,通過parseOptions方法來解析此包如下:
- void
- PseudoTcp::parseOptions(const char* data, uint32 len) {
- std::set<uint8> options_specified;
- talk_base::ByteBuffer buf(data, len);
- while (buf.Length()) {
- uint8 kind = TCP_OPT_EOL;
- buf.ReadUInt8(&kind);
- if (kind == TCP_OPT_EOL) {
- break;
- } else if (kind == TCP_OPT_NOOP) {
- continue;
- }
- UNUSED(len);
- uint8 opt_len = 0;
- buf.ReadUInt8(&opt_len);
- if (opt_len <= buf.Length()) {
- applyOption(kind, buf.Data(), opt_len);
- buf.Consume(opt_len);
- } else {
- return;
- }
- options_specified.insert(kind);
- }
- if (options_specified.find(TCP_OPT_WND_SCALE) == options_specified.end()) {
- if (m_rwnd_scale > 0) {
- resizeReceiveBuffer(DEFAULT_RCV_BUF_SIZE);
- m_swnd_scale = 0;
- }
- }
- }
接收方調整窗口大小,如下:
窗口合攏:當接收方收到數據時,會從窗口大小里減去把接收緩沖區消耗的數據大小。
- bool PseudoTcp::process(Segment& seg) {
- ......
- uint32 nOffset = seg.seq - m_rcv_nxt;
- talk_base::StreamResult result = m_rbuf.WriteOffset(seg.data, seg.len,
- nOffset, NULL);
- ASSERT(result == talk_base::SR_SUCCESS);
- UNUSED(result);
- if (seg.seq == m_rcv_nxt) {
- m_rbuf.ConsumeWriteBuffer(seg.len);
- m_rcv_nxt += seg.len;
- m_rcv_wnd -= seg.len;
- bNewData = true;
- RList::iterator it = m_rlist.begin();
- while ((it != m_rlist.end()) && (it->seq <= m_rcv_nxt)) {
- if (it->seq + it->len > m_rcv_nxt) {
- sflags = sfImmediateAck;
- uint32 nAdjust = (it->seq + it->len) - m_rcv_nxt;
- m_rbuf.ConsumeWriteBuffer(nAdjust);
- m_rcv_nxt += nAdjust;
- m_rcv_wnd -= nAdjust;
- }
- it = m_rlist.erase(it);
- }
- } else {
- RSegment rseg;
- rseg.seq = seg.seq;
- rseg.len = seg.len;
- RList::iterator it = m_rlist.begin();
- while ((it != m_rlist.end()) && (it->seq < rseg.seq)) {
- ++it;
- }
- m_rlist.insert(it, rseg);
- }
- ......
- }
窗口張開:當應用層調用Recv來獲取PTCP接收的數據時,PTCP會把此部分數據清除,騰空緩沖區并擴大窗口大小。
- int PseudoTcp::Recv(char* buffer, size_t len) {
- ......
- talk_base::StreamResult result = m_rbuf.Read(buffer, len, &read, NULL);
- ......
- size_t available_space = 0;
- m_rbuf.GetWriteRemaining(&available_space);
- if (uint32(available_space) - m_rcv_wnd >=
- talk_base::_min<uint32>(m_rbuf_len / 2, m_mss)) {
- bool bWasClosed = (m_rcv_wnd == 0);
- m_rcv_wnd = available_space;
- if (bWasClosed) {
- attemptSend(sfImmediateAck);
- }
- }
- return read;
- }
通告窗口大小給對方:
- IPseudoTcpNotify::WriteResult PseudoTcp::packet(uint32 seq, uint8 flags,
- uint32 offset, uint32 len) {
- ASSERT(HEADER_SIZE + len <= MAX_PACKET);
- uint32 now = Now();
- uint8 buffer[MAX_PACKET];
- long_to_bytes(m_conv, buffer);
- long_to_bytes(seq, buffer + 4);
- long_to_bytes(m_rcv_nxt, buffer + 8);
- buffer[12] = 0;
- buffer[13] = flags;
- short_to_bytes(static_cast<uint16>(m_rcv_wnd >> m_rwnd_scale), buffer + 14);
- ......
- }
當發送方收到接收方發送的窗口大小后,可發送大小計算為窗口大小減去已經發送但未被確認的數據大小。
- void PseudoTcp::attemptSend(SendFlags sflags) {
- ......
- uint32 nWindow = talk_base::_min(m_snd_wnd, cwnd);
- uint32 nInFlight = m_snd_nxt - m_snd_una;
- uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
- ......
- }
慢啟動
當接收方和發送方之間存在多個路由器和速率較慢的鏈路時,一些中間的路由器必須緩存分組。一開始發送方向接收方發送多個分組,可能會把緩存填滿,這會嚴重降低TCP的吞吐量。
TCP通過慢啟動算法解決上述問題:首先設置擁塞窗口cwnd為1,當發送方每收到一個ACK擁塞窗口加1個報文段。發送方取擁塞窗口和通告窗口的最小值為發送上限。擁塞窗口是發送方使用的流量控制,而通告窗口時接收方使用的流量控制。
發送方首先發送一個報文段,當收到ACK時,cwnd變為2,可以發送2個報文段,當收到2個ACK時cwnd變為4,發送方可以發送4個報文段,依次類推,慢啟動算法是指數增長的。
PTCP實現慢啟動算法如下:
Cwnd初始值為2個MSS,當收到ACK時cwnd增加一個MSS。
- Bool PseudoTcp::process(Segment& seg) {
- ......
-
- if ((seg.ack > m_snd_una) && (seg.ack <= m_snd_nxt)) {
- if (m_dup_acks >= 3) {
- ......
- }else{
- m_dup_acks = 0;
-
- if (m_cwnd < m_ssthresh) {
- m_cwnd += m_mss;
- } else {
- m_cwnd += talk_base::_max<uint32>(1, m_mss * m_mss / m_cwnd);
- }
- }
- }
- ......
- }
當發送方發送數據時,取窗口大小為通告窗口(m_snd_wnd)和擁塞窗口(cwnd)的最小值,然后減去已經發送的未被確認的大小為當前可發送數據大小(nUseable )。
- void PseudoTcp::attemptSend(SendFlags sflags) {
- ......
- while (true) {
- uint32 cwnd = m_cwnd;
- if ((m_dup_acks == 1) || (m_dup_acks == 2)) {
- cwnd += m_dup_acks * m_mss;
- }
- uint32 nWindow = talk_base::_min(m_snd_wnd, cwnd);
- uint32 nInFlight = m_snd_nxt - m_snd_una;
- uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
- size_t snd_buffered = 0;
- m_sbuf.GetBuffered(&snd_buffered);
- uint32 nAvailable =
- talk_base::_min(static_cast<uint32>(snd_buffered) - nInFlight, m_mss);
- ......
- }