• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            Fork me on GitHub
            隨筆 - 215  文章 - 13  trackbacks - 0
            <2017年6月>
            28293031123
            45678910
            11121314151617
            18192021222324
            2526272829301
            2345678


            專注即時通訊及網游服務端編程
            ------------------------------------
            Openresty 官方模塊
            Openresty 標準模塊(Opm)
            Openresty 三方模塊
            ------------------------------------
            本博收藏大部分文章為轉載,并在文章開頭給出了原文出處,如有再轉,敬請保留相關信息,這是大家對原創作者勞動成果的自覺尊重!!如為您帶來不便,請于本博下留言,謝謝配合。

            常用鏈接

            留言簿(1)

            隨筆分類

            隨筆檔案

            相冊

            Awesome

            Blog

            Book

            GitHub

            Link

            搜索

            •  

            積分與排名

            • 積分 - 215445
            • 排名 - 118

            最新評論

            閱讀排行榜

            KCP-GO源碼解析

            概念

            ARQ:自動重傳請求(Automatic Repeat-reQuest,ARQ)是OSI模型中數據鏈路層的錯誤糾正協議之一.
            RTO:Retransmission TimeOut
            FEC:Forward Error Correction

            kcp簡介

            kcp是一個基于udp實現快速、可靠、向前糾錯的的協議,能以比TCP浪費10%-20%的帶寬的代價,換取平均延遲降低30%-40%,且最大延遲降低三倍的傳輸效果。純算法實現,并不負責底層協議(如UDP)的收發。查看官方文檔kcp

            kcp-go是用go實現了kcp協議的一個庫,其實kcp類似tcp,協議的實現也很多參考tcp協議的實現,滑動窗口,快速重傳,選擇性重傳,慢啟動等。
            kcp和tcp一樣,也分客戶端和監聽端。

                +-+-+-+-+-+            +-+-+-+-+-+
                
            |  Client |            |  Server |
                
            +-+-+-+-+-+            +-+-+-+-+-+
                    
            |------ kcp data ------>|     
                    
            |<----- kcp data -------|    

            kcp協議
            layer model
            +----------------------+
            |      Session         |
            +----------------------+
            |      KCP(ARQ)        |
            +----------------------+
            |      FEC(OPTIONAL)   |
            +----------------------+
            |      CRYPTO(OPTIONAL)|
            +----------------------+
            |      UDP(Packet)     |
            +----------------------+

            KCP header

            KCP Header Format

                  4           1   1     2 (Byte)
            +---+---+---+---+---+---+---+---+
            |     conv      |cmd|frg|  wnd  |
            +---+---+---+---+---+---+---+---+
            |     ts        |     sn        |
            +---+---+---+---+---+---+---+---+
            |     una       |     len       |
            +---+---+---+---+---+---+---+---+
            |                               |
            +             DATA              +
            |                               |
            +---+---+---+---+---+---+---+---+

            代碼結構
            src/vendor/github.com/xtaci/kcp-go/
            ├── LICENSE
            ├── README.md
            ├── crypt.go    加解密實現
            ├── crypt_test.go
            ├── donate.png
            ├── fec.go      向前糾錯實現
            ├── frame.png
            ├── kcp
            -go.png
            ├── kcp.go      kcp協議實現
            ├── kcp_test.go
            ├── sess.go     會話管理實現
            ├── sess_test.go
            ├── snmp.go     數據統計實現
            ├── updater.go  任務調度實現
            ├── xor.go      xor封裝
            └── xor_test.go

            著重研究兩個文件kcp.gosess.go

            kcp淺析

            kcp是基于udp實現的,所有udp的實現這里不做介紹,kcp做的事情就是怎么封裝udp的數據和怎么解析udp的數據,再加各種處理機制,為了重傳,擁塞控制,糾錯等。下面介紹kcp客戶端和服務端整體實現的流程,只是大概介紹一下函數流,不做詳細解析,詳細解析看后面數據流的解析。

            kcp client整體函數流

            和tcp一樣,kcp要連接服務端需要先撥號,但是和tcp有個很大的不同是,即使服務端沒有啟動,客戶端一樣可以撥號成功,因為實際上這里的撥號沒有發送任何信息,而tcp在這里需要三次握手。

            DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
                V
            net.DialUDP(
            "udp", nil, udpaddr)
                V
            NewConn()
                V
            newUDPSession() {初始化UDPSession}
                V
            NewKCP() {初始化kcp}
                V
            updater.addSession(sess) {管理session會話,任務管理,根據用戶設置的internal參數間隔來輪流喚醒任務}
                V
            go sess.readLoop()
                V
            go s.receiver(chPacket)
                V
            s.kcpInput(data)
                V
            s.fecDecoder.decodeBytes(data)
                V
            s.kcp.Input(data, 
            true, s.ackNoDelay)
                V
            kcp.parse_data(seg) {將分段好的數據插入kcp.rcv_buf緩沖}
                V
            notifyReadEvent()

            客戶端大體的流程如上面所示,先Dial,建立udp連接,將這個連接封裝成一個會話,然后啟動一個go程,接收udp的消息。

            kcp server整體函數流

            ListenWithOptions() 

                V
            net.ListenUDP()
                V
            ServerConn()
                V
            newFECDecoder()
                V
            go l.monitor() {從chPacket接收udp數據,寫入kcp}
                V
            go l.receiver(chPacket) {從upd接收數據,并入隊列}
                V
            newUDPSession()
                V
            updater.addSession(sess) {管理session會話,任務管理,根據用戶設置的internal參數間隔來輪流喚醒任務}
                V
            s.kcpInput(data)`
                V
            s.fecDecoder.decodeBytes(data)
                V
            s.kcp.Input(data, 
            true, s.ackNoDelay)
                V
            kcp.parse_data(seg) {將分段好的數據插入kcp.rcv_buf緩沖}
                V
            notifyReadEvent()


            服務端的大體流程如上圖所示,先Listen,啟動udp監聽,接著用一個go程監控udp的數據包,負責將不同session的數據寫入不同的udp連接,然后解析封裝將數據交給上層。

            kcp 數據流詳細解析

            不管是kcp的客戶端還是服務端,他們都有io行為,就是讀與寫,我們只分析一個就好了,因為它們讀寫的實現是一樣的,這里分析客戶端的讀與寫。
            kcp client 發送消息

            s.Write(b []byte
                V
            s.kcp.WaitSnd() {}
                V
            s.kcp.Send(b) {將數據根據mss分段,并存在kcp.snd_queue}
                 V
            s.kcp.flush(
            false) [flush data to output] {
                
            if writeDelay==true {
                    flush
                }
            else{
                    每隔`interval`時間flush一次
                }
            }
                 V
            kcp.output(buffer, size) 
                 V
            s.output(buf)
                 V
            s.conn.WriteTo(ext, s.remote)
                 V
            s.conn..Conn.WriteTo(buf)

            讀寫都是在sess.go文件中實現的,Write方法:

            // Write implements net.Conn
            func (s *UDPSession) Write(b []byte) (n int, err error) {
                
            for {
                    

                    
            // api flow control
                    if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
                        n 
            = len(b)
                        
            for {
                            
            if len(b) <= int(s.kcp.mss) {
                                s.kcp.Send(b)
                                
            break
                            } 
            else {
                                s.kcp.Send(b[:s.kcp.mss])
                                b 
            = b[s.kcp.mss:]
                            }
                        }

                        
            if !s.writeDelay {
                            s.kcp.flush(
            false)
                        }
                        s.mu.Unlock()
                        atomic.AddUint64(
            &DefaultSnmp.BytesSent, uint64(n))
                        
            return n, nil
                    }

                    
                    
            // wait for write event or timeout
                    select {
                    
            case <-s.chWriteEvent:
                    
            case <-c:
                    
            case <-s.die:
                    }

                    
            if timeout != nil {
                        timeout.Stop()
                    }
                }
            }

            假設發送一個hello消息,Write方法會先判斷發送窗口是否已滿,滿的話該函數阻塞,不滿則kcp.Send(“hello”),而Send函數實現根據mss的值對數據分段,當然這里的發送的hello,長度太短,只分了一個段,并把它們插入發送的隊列里。

            func (kcp *KCP) Send(buffer []byteint {
                
                
            for i := 0; i < count; i++ {
                    var size 
            int
                    
            if len(buffer) > int(kcp.mss) {
                        size 
            = int(kcp.mss)
                    } 
            else {
                        size 
            = len(buffer)
                    }
                    seg :
            = kcp.newSegment(size)
                    copy(seg.data, buffer[:size])
                    
            if kcp.stream == 0 { // message mode
                        seg.frg = uint8(count - i - 1)
                    } 
            else { // stream mode
                        seg.frg = 0
                    }
                    kcp.snd_queue 
            = append(kcp.snd_queue, seg)
                    buffer 
            = buffer[size:]
                }
                
            return 0
            }

            接著判斷參數writeDelay,如果參數設置為false,則立馬發送消息,否則需要任務調度后才會觸發發送,發送消息是由flush函數實現的。

            // flush pending data
            func (kcp *KCP) flush(ackOnly bool) {
                var seg Segment
                seg.conv 
            = kcp.conv
                seg.cmd 
            = IKCP_CMD_ACK
                seg.wnd 
            = kcp.wnd_unused()
                seg.una 
            = kcp.rcv_nxt

                buffer :
            = kcp.buffer
                
            // flush acknowledges
                ptr := buffer
                
            for i, ack := range kcp.acklist {
                    size :
            = len(buffer) - len(ptr)
                    
            if size+IKCP_OVERHEAD > int(kcp.mtu) {
                        kcp.output(buffer, size)
                        ptr 
            = buffer
                    }
                    
            // filter jitters caused by bufferbloat
                    if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
                        seg.sn, seg.ts 
            = ack.sn, ack.ts
                        ptr 
            = seg.encode(ptr)

                    }
                }
                kcp.acklist 
            = kcp.acklist[0:0]

                
            if ackOnly { // flash remain ack segments
                    size := len(buffer) - len(ptr)
                    
            if size > 0 {
                        kcp.output(buffer, size)
                    }
                    
            return
                }

                
            // probe window size (if remote window size equals zero)
                if kcp.rmt_wnd == 0 {
                    current :
            = currentMs()
                    
            if kcp.probe_wait == 0 {
                        kcp.probe_wait 
            = IKCP_PROBE_INIT
                        kcp.ts_probe 
            = current + kcp.probe_wait
                    } 
            else {
                        
            if _itimediff(current, kcp.ts_probe) >= 0 {
                            
            if kcp.probe_wait < IKCP_PROBE_INIT {
                                kcp.probe_wait 
            = IKCP_PROBE_INIT
                            }
                            kcp.probe_wait 
            += kcp.probe_wait / 2
                            
            if kcp.probe_wait > IKCP_PROBE_LIMIT {
                                kcp.probe_wait 
            = IKCP_PROBE_LIMIT
                            }
                            kcp.ts_probe 
            = current + kcp.probe_wait
                            kcp.probe 
            |= IKCP_ASK_SEND
                        }
                    }
                } 
            else {
                    kcp.ts_probe 
            = 0
                    kcp.probe_wait 
            = 0
                }

                
            // flush window probing commands
                if (kcp.probe & IKCP_ASK_SEND) != 0 {
                    seg.cmd 
            = IKCP_CMD_WASK
                    size :
            = len(buffer) - len(ptr)
                    
            if size+IKCP_OVERHEAD > int(kcp.mtu) {
                        kcp.output(buffer, size)
                        ptr 
            = buffer
                    }
                    ptr 
            = seg.encode(ptr)
                }

                
            // flush window probing commands
                if (kcp.probe & IKCP_ASK_TELL) != 0 {
                    seg.cmd 
            = IKCP_CMD_WINS
                    size :
            = len(buffer) - len(ptr)
                    
            if size+IKCP_OVERHEAD > int(kcp.mtu) {
                        kcp.output(buffer, size)
                        ptr 
            = buffer
                    }
                    ptr 
            = seg.encode(ptr)
                }

                kcp.probe 
            = 0

                
            // calculate window size
                cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
                
            if kcp.nocwnd == 0 {
                    cwnd 
            = _imin_(kcp.cwnd, cwnd)
                }

                
            // sliding window, controlled by snd_nxt && sna_una+cwnd
                newSegsCount := 0
                
            for k := range kcp.snd_queue {
                    
            if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
                        
            break
                    }
                    newseg :
            = kcp.snd_queue[k]
                    newseg.conv 
            = kcp.conv
                    newseg.cmd 
            = IKCP_CMD_PUSH
                    newseg.sn 
            = kcp.snd_nxt
                    kcp.snd_buf 
            = append(kcp.snd_buf, newseg)
                    kcp.snd_nxt
            ++
                    newSegsCount
            ++
                    kcp.snd_queue[k].data 
            = nil
                }
                
            if newSegsCount > 0 {
                    kcp.snd_queue 
            = kcp.remove_front(kcp.snd_queue, newSegsCount)
                }

                
            // calculate resent
                resent := uint32(kcp.fastresend)
                
            if kcp.fastresend <= 0 {
                    resent 
            = 0xffffffff
                }

                
            // check for retransmissions
                current := currentMs()
                var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
                
            for k := range kcp.snd_buf {
                    segment :
            = &kcp.snd_buf[k]
                    needsend :
            = false
                    
            if segment.xmit == 0 { // initial transmit
                        needsend = true
                        segment.rto 
            = kcp.rx_rto
                        segment.resendts 
            = current + segment.rto
                    } 
            else if _itimediff(current, segment.resendts) >= 0 { // RTO
                        needsend = true
                        
            if kcp.nodelay == 0 {
                            segment.rto 
            += kcp.rx_rto
                        } 
            else {
                            segment.rto 
            += kcp.rx_rto / 2
                        }
                        segment.resendts 
            = current + segment.rto
                        lost
            ++
                        lostSegs
            ++
                    } 
            else if segment.fastack >= resent { // fast retransmit
                        needsend = true
                        segment.fastack 
            = 0
                        segment.rto 
            = kcp.rx_rto
                        segment.resendts 
            = current + segment.rto
                        change
            ++
                        fastRetransSegs
            ++
                    } 
            else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
                        needsend = true
                        segment.fastack 
            = 0
                        segment.rto 
            = kcp.rx_rto
                        segment.resendts 
            = current + segment.rto
                        change
            ++
                        earlyRetransSegs
            ++
                    }

                    
            if needsend {
                        segment.xmit
            ++
                        segment.ts 
            = current
                        segment.wnd 
            = seg.wnd
                        segment.una 
            = seg.una

                        size :
            = len(buffer) - len(ptr)
                        need :
            = IKCP_OVERHEAD + len(segment.data)

                        
            if size+need > int(kcp.mtu) {
                            kcp.output(buffer, size)
                            current 
            = currentMs() // time update for a blocking call
                            ptr = buffer
                        }

                        ptr 
            = segment.encode(ptr)
                        copy(ptr, segment.data)
                        ptr 
            = ptr[len(segment.data):]

                        
            if segment.xmit >= kcp.dead_link {
                            kcp.state 
            = 0xFFFFFFFF
                        }
                    }
                }

                
            // flash remain segments
                size := len(buffer) - len(ptr)
                
            if size > 0 {
                    kcp.output(buffer, size)
                }

                
            // counter updates
                sum := lostSegs
                
            if lostSegs > 0 {
                    atomic.AddUint64(
            &DefaultSnmp.LostSegs, lostSegs)
                }
                
            if fastRetransSegs > 0 {
                    atomic.AddUint64(
            &DefaultSnmp.FastRetransSegs, fastRetransSegs)
                    sum 
            += fastRetransSegs
                }
                
            if earlyRetransSegs > 0 {
                    atomic.AddUint64(
            &DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
                    sum 
            += earlyRetransSegs
                }
                
            if sum > 0 {
                    atomic.AddUint64(
            &DefaultSnmp.RetransSegs, sum)
                }

                
            // update ssthresh
                
            // rate halving, https://tools.ietf.org/html/rfc6937
                if change > 0 {
                    inflight :
            = kcp.snd_nxt - kcp.snd_una
                    kcp.ssthresh 
            = inflight / 2
                    
            if kcp.ssthresh < IKCP_THRESH_MIN {
                        kcp.ssthresh 
            = IKCP_THRESH_MIN
                    }
                    kcp.cwnd 
            = kcp.ssthresh + resent
                    kcp.incr 
            = kcp.cwnd * kcp.mss
                }

                
            // congestion control, https://tools.ietf.org/html/rfc5681
                if lost > 0 {
                    kcp.ssthresh 
            = cwnd / 2
                    
            if kcp.ssthresh < IKCP_THRESH_MIN {
                        kcp.ssthresh 
            = IKCP_THRESH_MIN
                    }
                    kcp.cwnd 
            = 1
                    kcp.incr 
            = kcp.mss
                }

                
            if kcp.cwnd < 1 {
                    kcp.cwnd 
            = 1
                    kcp.incr 
            = kcp.mss
                }
            }

            flush函數非常的重要,kcp的重要參數都是在調節這個函數的行為,這個函數只有一個參數ackOnly,意思就是只發送ack,如果ackOnly為true的話,該函數只遍歷ack列表,然后發送,就完事了。 如果不是,也會發送真實數據。 在發送數據前先進行windSize探測,如果開啟了擁塞控制nc=0,則每次發送前檢測服務端的winsize,如果服務端的winsize變小了,自身的winsize也要更著變小,來避免擁塞。如果沒有開啟擁塞控制,就按設置的winsize進行數據發送。
            接著循環每個段數據,并判斷每個段數據的是否該重發,還有什么時候重發:
            1. 如果這個段數據首次發送,則直接發送數據。 2. 如果這個段數據的當前時間大于它自身重發的時間,也就是RTO,則重傳消息。 3. 如果這個段數據的ack丟失累計超過resent次數,則重傳,也就是快速重傳機制。這個resent參數由resend參數決定。 4. 如果這個段數據的ack有丟失且沒有新的數據段,則觸發ER,ER相關信息ER

            最后通過kcp.output發送消息hello,output是個回調函數,函數的實體是sess.go的:

            func (s *UDPSession) output(buf []byte) {
                var ecc [][]
            byte

                
            // extend buf's header space
                ext := buf
                
            if s.headerSize > 0 {
                    ext 
            = s.ext[:s.headerSize+len(buf)]
                    copy(ext[s.headerSize:], buf)
                }

                
            // FEC stage
                if s.fecEncoder != nil {
                    ecc 
            = s.fecEncoder.Encode(ext)
                }

                
            // encryption stage
                if s.block != nil {
                    io.ReadFull(rand.Reader, ext[:nonceSize])
                    checksum :
            = crc32.ChecksumIEEE(ext[cryptHeaderSize:])
                    binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
                    s.block.Encrypt(ext, ext)

                    
            if ecc != nil {
                        
            for k := range ecc {
                            io.ReadFull(rand.Reader, ecc[k][:nonceSize])
                            checksum :
            = crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
                            binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
                            s.block.Encrypt(ecc[k], ecc[k])
                        }
                    }
                }

                
            // WriteTo kernel
                nbytes := 0
                npkts :
            = 0
                
            // if mrand.Intn(100) < 50 {
                for i := 0; i < s.dup+1; i++ {
                    
            if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
                        nbytes 
            += n
                        npkts
            ++
                    }
                }
                
            // }

                
            if ecc != nil {
                    
            for k := range ecc {
                        
            if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
                            nbytes 
            += n
                            npkts
            ++
                        }
                    }
                }
                atomic.AddUint64(
            &DefaultSnmp.OutPkts, uint64(npkts))
                atomic.AddUint64(
            &DefaultSnmp.OutBytes, uint64(nbytes))
            }

            output函數才是真正的將數據寫入內核中,在寫入之前先進行了fec編碼,fec編碼器的實現是用了一個開源庫github.com/klauspost/reedsolomon,編碼以后的hello就不是和原來的hello一樣了,至少多了幾個字節。 fec編碼器有兩個重要的參數reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,這兩個參數決定了fec的冗余度,冗余度越大抗丟包性就越強。

            kcp的任務調度器

            其實這里任務調度器是一個很簡單的實現,用一個全局變量updater來管理session,代碼文件為updater.go。其中最主要的函數

            func (h *updateHeap) updateTask() {
                var timer 
            <-chan time.Time
                
            for {
                    select {
                    
            case <-timer:
                    
            case <-h.chWakeUp:
                    }

                    h.mu.Lock()
                    hlen :
            = h.Len()
                    now :
            = time.Now()
                    
            if hlen > 0 && now.After(h.entries[0].ts) {
                        
            for i := 0; i < hlen; i++ {
                            entry :
            = heap.Pop(h).(entry)
                            
            if now.After(entry.ts) {
                                entry.ts 
            = now.Add(entry.s.update())
                                heap.Push(h, entry)
                            } 
            else {
                                heap.Push(h, entry)
                                
            break
                            }
                        }
                    }
                    
            if hlen > 0 {
                        timer 
            = time.After(h.entries[0].ts.Sub(now))
                    }
                    h.mu.Unlock()
                }
            }

            任務調度器實現了一個堆結構,每當有新的連接,session都會插入到這個堆里,接著for循環每隔interval時間,遍歷這個堆,得到entry然后執行entry.s.update()。而entry.s.update()會執行s.kcp.flush(false)來發送數據。

            總結

            這里簡單介紹了kcp的整體流程,詳細介紹了發送數據的流程,但未介紹kcp接收數據的流程,其實在客戶端發送數據后,服務端是需要返回ack的,而客戶端也需要根據返回的ack來判斷數據段是否需要重傳還是在隊列里清除該數據段。處理返回來的ack是在函數kcp.Input()函數實現的。具體詳細流程下次再介紹。

            posted on 2017-12-09 15:20 思月行云 閱讀(1154) 評論(0)  編輯 收藏 引用 所屬分類: Golang
            亚洲一级Av无码毛片久久精品| 亚洲AV日韩AV天堂久久| 人人狠狠综合88综合久久| 亚洲日韩中文无码久久| 久久国产精品一区二区| 久久只这里是精品66| 精品久久777| 亚洲精品无码久久一线| 久久久久久国产精品美女| 伊人久久精品无码av一区| 精品久久久久久国产| 99久久精品免费看国产一区二区三区| 欧美久久精品一级c片片| 无码国内精品久久人妻麻豆按摩| 久久狠狠高潮亚洲精品| 99久久这里只精品国产免费| 久久最近最新中文字幕大全| 人妻无码αv中文字幕久久琪琪布| 久久精品一区二区三区中文字幕| AV无码久久久久不卡蜜桃| 久久99精品久久久大学生| 久久伊人色| 久久精品人妻一区二区三区| 996久久国产精品线观看| 久久综合国产乱子伦精品免费| 久久综合九色综合网站| 性做久久久久久免费观看| 精品国产乱码久久久久久浪潮| 久久综合久久综合九色| 精品一区二区久久久久久久网站| 日韩人妻无码精品久久久不卡| 久久天天躁狠狠躁夜夜不卡| 亚洲国产精品成人久久蜜臀| 性做久久久久久久久久久| 模特私拍国产精品久久| 久久人做人爽一区二区三区| 久久只有这里有精品4| 久久久久高潮综合影院| 欧洲成人午夜精品无码区久久 | 久久er国产精品免费观看2| 久久久女人与动物群交毛片|