• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            Fork me on GitHub
            隨筆 - 215  文章 - 13  trackbacks - 0
            <2017年6月>
            28293031123
            45678910
            11121314151617
            18192021222324
            2526272829301
            2345678


            專注即時(shí)通訊及網(wǎng)游服務(wù)端編程
            ------------------------------------
            Openresty 官方模塊
            Openresty 標(biāo)準(zhǔn)模塊(Opm)
            Openresty 三方模塊
            ------------------------------------
            本博收藏大部分文章為轉(zhuǎn)載,并在文章開(kāi)頭給出了原文出處,如有再轉(zhuǎn),敬請(qǐng)保留相關(guān)信息,這是大家對(duì)原創(chuàng)作者勞動(dòng)成果的自覺(jué)尊重!!如為您帶來(lái)不便,請(qǐng)于本博下留言,謝謝配合。

            常用鏈接

            留言簿(1)

            隨筆分類

            隨筆檔案

            相冊(cè)

            Awesome

            Blog

            Book

            GitHub

            Link

            搜索

            •  

            積分與排名

            • 積分 - 219199
            • 排名 - 117

            最新評(píng)論

            閱讀排行榜

            KCP-GO源碼解析

            概念

            ARQ:自動(dòng)重傳請(qǐng)求(Automatic Repeat-reQuest,ARQ)是OSI模型中數(shù)據(jù)鏈路層的錯(cuò)誤糾正協(xié)議之一.
            RTO:Retransmission TimeOut
            FEC:Forward Error Correction

            kcp簡(jiǎn)介

            kcp是一個(gè)基于udp實(shí)現(xiàn)快速、可靠、向前糾錯(cuò)的的協(xié)議,能以比TCP浪費(fèi)10%-20%的帶寬的代價(jià),換取平均延遲降低30%-40%,且最大延遲降低三倍的傳輸效果。純算法實(shí)現(xiàn),并不負(fù)責(zé)底層協(xié)議(如UDP)的收發(fā)。查看官方文檔kcp

            kcp-go是用go實(shí)現(xiàn)了kcp協(xié)議的一個(gè)庫(kù),其實(shí)kcp類似tcp,協(xié)議的實(shí)現(xiàn)也很多參考tcp協(xié)議的實(shí)現(xiàn),滑動(dòng)窗口,快速重傳,選擇性重傳,慢啟動(dòng)等。
            kcp和tcp一樣,也分客戶端和監(jiān)聽(tīng)端。

                +-+-+-+-+-+            +-+-+-+-+-+
                
            |  Client |            |  Server |
                
            +-+-+-+-+-+            +-+-+-+-+-+
                    
            |------ kcp data ------>|     
                    
            |<----- kcp data -------|    

            kcp協(xié)議
            layer model
            +----------------------+
            |      Session         |
            +----------------------+
            |      KCP(ARQ)        |
            +----------------------+
            |      FEC(OPTIONAL)   |
            +----------------------+
            |      CRYPTO(OPTIONAL)|
            +----------------------+
            |      UDP(Packet)     |
            +----------------------+

            KCP header

            KCP Header Format

                  4           1   1     2 (Byte)
            +---+---+---+---+---+---+---+---+
            |     conv      |cmd|frg|  wnd  |
            +---+---+---+---+---+---+---+---+
            |     ts        |     sn        |
            +---+---+---+---+---+---+---+---+
            |     una       |     len       |
            +---+---+---+---+---+---+---+---+
            |                               |
            +             DATA              +
            |                               |
            +---+---+---+---+---+---+---+---+

            代碼結(jié)構(gòu)
            src/vendor/github.com/xtaci/kcp-go/
            ├── LICENSE
            ├── README.md
            ├── crypt.go    加解密實(shí)現(xiàn)
            ├── crypt_test.go
            ├── donate.png
            ├── fec.go      向前糾錯(cuò)實(shí)現(xiàn)
            ├── frame.png
            ├── kcp
            -go.png
            ├── kcp.go      kcp協(xié)議實(shí)現(xiàn)
            ├── kcp_test.go
            ├── sess.go     會(huì)話管理實(shí)現(xiàn)
            ├── sess_test.go
            ├── snmp.go     數(shù)據(jù)統(tǒng)計(jì)實(shí)現(xiàn)
            ├── updater.go  任務(wù)調(diào)度實(shí)現(xiàn)
            ├── xor.go      xor封裝
            └── xor_test.go

            著重研究?jī)蓚€(gè)文件kcp.gosess.go

            kcp淺析

            kcp是基于udp實(shí)現(xiàn)的,所有udp的實(shí)現(xiàn)這里不做介紹,kcp做的事情就是怎么封裝udp的數(shù)據(jù)和怎么解析udp的數(shù)據(jù),再加各種處理機(jī)制,為了重傳,擁塞控制,糾錯(cuò)等。下面介紹kcp客戶端和服務(wù)端整體實(shí)現(xiàn)的流程,只是大概介紹一下函數(shù)流,不做詳細(xì)解析,詳細(xì)解析看后面數(shù)據(jù)流的解析。

            kcp client整體函數(shù)流

            和tcp一樣,kcp要連接服務(wù)端需要先撥號(hào),但是和tcp有個(gè)很大的不同是,即使服務(wù)端沒(méi)有啟動(dòng),客戶端一樣可以撥號(hào)成功,因?yàn)閷?shí)際上這里的撥號(hào)沒(méi)有發(fā)送任何信息,而tcp在這里需要三次握手。

            DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
                V
            net.DialUDP(
            "udp", nil, udpaddr)
                V
            NewConn()
                V
            newUDPSession() {初始化UDPSession}
                V
            NewKCP() {初始化kcp}
                V
            updater.addSession(sess) {管理session會(huì)話,任務(wù)管理,根據(jù)用戶設(shè)置的internal參數(shù)間隔來(lái)輪流喚醒任務(wù)}
                V
            go sess.readLoop()
                V
            go s.receiver(chPacket)
                V
            s.kcpInput(data)
                V
            s.fecDecoder.decodeBytes(data)
                V
            s.kcp.Input(data, 
            true, s.ackNoDelay)
                V
            kcp.parse_data(seg) {將分段好的數(shù)據(jù)插入kcp.rcv_buf緩沖}
                V
            notifyReadEvent()

            客戶端大體的流程如上面所示,先Dial,建立udp連接,將這個(gè)連接封裝成一個(gè)會(huì)話,然后啟動(dòng)一個(gè)go程,接收udp的消息。

            kcp server整體函數(shù)流

            ListenWithOptions() 

                V
            net.ListenUDP()
                V
            ServerConn()
                V
            newFECDecoder()
                V
            go l.monitor() {從chPacket接收udp數(shù)據(jù),寫(xiě)入kcp}
                V
            go l.receiver(chPacket) {從upd接收數(shù)據(jù),并入隊(duì)列}
                V
            newUDPSession()
                V
            updater.addSession(sess) {管理session會(huì)話,任務(wù)管理,根據(jù)用戶設(shè)置的internal參數(shù)間隔來(lái)輪流喚醒任務(wù)}
                V
            s.kcpInput(data)`
                V
            s.fecDecoder.decodeBytes(data)
                V
            s.kcp.Input(data, 
            true, s.ackNoDelay)
                V
            kcp.parse_data(seg) {將分段好的數(shù)據(jù)插入kcp.rcv_buf緩沖}
                V
            notifyReadEvent()


            服務(wù)端的大體流程如上圖所示,先Listen,啟動(dòng)udp監(jiān)聽(tīng),接著用一個(gè)go程監(jiān)控udp的數(shù)據(jù)包,負(fù)責(zé)將不同session的數(shù)據(jù)寫(xiě)入不同的udp連接,然后解析封裝將數(shù)據(jù)交給上層。

            kcp 數(shù)據(jù)流詳細(xì)解析

            不管是kcp的客戶端還是服務(wù)端,他們都有io行為,就是讀與寫(xiě),我們只分析一個(gè)就好了,因?yàn)樗鼈冏x寫(xiě)的實(shí)現(xiàn)是一樣的,這里分析客戶端的讀與寫(xiě)。
            kcp client 發(fā)送消息

            s.Write(b []byte
                V
            s.kcp.WaitSnd() {}
                V
            s.kcp.Send(b) {將數(shù)據(jù)根據(jù)mss分段,并存在kcp.snd_queue}
                 V
            s.kcp.flush(
            false) [flush data to output] {
                
            if writeDelay==true {
                    flush
                }
            else{
                    每隔`interval`時(shí)間flush一次
                }
            }
                 V
            kcp.output(buffer, size) 
                 V
            s.output(buf)
                 V
            s.conn.WriteTo(ext, s.remote)
                 V
            s.conn..Conn.WriteTo(buf)

            讀寫(xiě)都是在sess.go文件中實(shí)現(xiàn)的,Write方法:

            // Write implements net.Conn
            func (s *UDPSession) Write(b []byte) (n int, err error) {
                
            for {
                    

                    
            // api flow control
                    if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
                        n 
            = len(b)
                        
            for {
                            
            if len(b) <= int(s.kcp.mss) {
                                s.kcp.Send(b)
                                
            break
                            } 
            else {
                                s.kcp.Send(b[:s.kcp.mss])
                                b 
            = b[s.kcp.mss:]
                            }
                        }

                        
            if !s.writeDelay {
                            s.kcp.flush(
            false)
                        }
                        s.mu.Unlock()
                        atomic.AddUint64(
            &DefaultSnmp.BytesSent, uint64(n))
                        
            return n, nil
                    }

                    
                    
            // wait for write event or timeout
                    select {
                    
            case <-s.chWriteEvent:
                    
            case <-c:
                    
            case <-s.die:
                    }

                    
            if timeout != nil {
                        timeout.Stop()
                    }
                }
            }

            假設(shè)發(fā)送一個(gè)hello消息,Write方法會(huì)先判斷發(fā)送窗口是否已滿,滿的話該函數(shù)阻塞,不滿則kcp.Send(“hello”),而Send函數(shù)實(shí)現(xiàn)根據(jù)mss的值對(duì)數(shù)據(jù)分段,當(dāng)然這里的發(fā)送的hello,長(zhǎng)度太短,只分了一個(gè)段,并把它們插入發(fā)送的隊(duì)列里。

            func (kcp *KCP) Send(buffer []byteint {
                
                
            for i := 0; i < count; i++ {
                    var size 
            int
                    
            if len(buffer) > int(kcp.mss) {
                        size 
            = int(kcp.mss)
                    } 
            else {
                        size 
            = len(buffer)
                    }
                    seg :
            = kcp.newSegment(size)
                    copy(seg.data, buffer[:size])
                    
            if kcp.stream == 0 { // message mode
                        seg.frg = uint8(count - i - 1)
                    } 
            else { // stream mode
                        seg.frg = 0
                    }
                    kcp.snd_queue 
            = append(kcp.snd_queue, seg)
                    buffer 
            = buffer[size:]
                }
                
            return 0
            }

            接著判斷參數(shù)writeDelay,如果參數(shù)設(shè)置為false,則立馬發(fā)送消息,否則需要任務(wù)調(diào)度后才會(huì)觸發(fā)發(fā)送,發(fā)送消息是由flush函數(shù)實(shí)現(xiàn)的。

            // flush pending data
            func (kcp *KCP) flush(ackOnly bool) {
                var seg Segment
                seg.conv 
            = kcp.conv
                seg.cmd 
            = IKCP_CMD_ACK
                seg.wnd 
            = kcp.wnd_unused()
                seg.una 
            = kcp.rcv_nxt

                buffer :
            = kcp.buffer
                
            // flush acknowledges
                ptr := buffer
                
            for i, ack := range kcp.acklist {
                    size :
            = len(buffer) - len(ptr)
                    
            if size+IKCP_OVERHEAD > int(kcp.mtu) {
                        kcp.output(buffer, size)
                        ptr 
            = buffer
                    }
                    
            // filter jitters caused by bufferbloat
                    if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
                        seg.sn, seg.ts 
            = ack.sn, ack.ts
                        ptr 
            = seg.encode(ptr)

                    }
                }
                kcp.acklist 
            = kcp.acklist[0:0]

                
            if ackOnly { // flash remain ack segments
                    size := len(buffer) - len(ptr)
                    
            if size > 0 {
                        kcp.output(buffer, size)
                    }
                    
            return
                }

                
            // probe window size (if remote window size equals zero)
                if kcp.rmt_wnd == 0 {
                    current :
            = currentMs()
                    
            if kcp.probe_wait == 0 {
                        kcp.probe_wait 
            = IKCP_PROBE_INIT
                        kcp.ts_probe 
            = current + kcp.probe_wait
                    } 
            else {
                        
            if _itimediff(current, kcp.ts_probe) >= 0 {
                            
            if kcp.probe_wait < IKCP_PROBE_INIT {
                                kcp.probe_wait 
            = IKCP_PROBE_INIT
                            }
                            kcp.probe_wait 
            += kcp.probe_wait / 2
                            
            if kcp.probe_wait > IKCP_PROBE_LIMIT {
                                kcp.probe_wait 
            = IKCP_PROBE_LIMIT
                            }
                            kcp.ts_probe 
            = current + kcp.probe_wait
                            kcp.probe 
            |= IKCP_ASK_SEND
                        }
                    }
                } 
            else {
                    kcp.ts_probe 
            = 0
                    kcp.probe_wait 
            = 0
                }

                
            // flush window probing commands
                if (kcp.probe & IKCP_ASK_SEND) != 0 {
                    seg.cmd 
            = IKCP_CMD_WASK
                    size :
            = len(buffer) - len(ptr)
                    
            if size+IKCP_OVERHEAD > int(kcp.mtu) {
                        kcp.output(buffer, size)
                        ptr 
            = buffer
                    }
                    ptr 
            = seg.encode(ptr)
                }

                
            // flush window probing commands
                if (kcp.probe & IKCP_ASK_TELL) != 0 {
                    seg.cmd 
            = IKCP_CMD_WINS
                    size :
            = len(buffer) - len(ptr)
                    
            if size+IKCP_OVERHEAD > int(kcp.mtu) {
                        kcp.output(buffer, size)
                        ptr 
            = buffer
                    }
                    ptr 
            = seg.encode(ptr)
                }

                kcp.probe 
            = 0

                
            // calculate window size
                cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
                
            if kcp.nocwnd == 0 {
                    cwnd 
            = _imin_(kcp.cwnd, cwnd)
                }

                
            // sliding window, controlled by snd_nxt && sna_una+cwnd
                newSegsCount := 0
                
            for k := range kcp.snd_queue {
                    
            if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
                        
            break
                    }
                    newseg :
            = kcp.snd_queue[k]
                    newseg.conv 
            = kcp.conv
                    newseg.cmd 
            = IKCP_CMD_PUSH
                    newseg.sn 
            = kcp.snd_nxt
                    kcp.snd_buf 
            = append(kcp.snd_buf, newseg)
                    kcp.snd_nxt
            ++
                    newSegsCount
            ++
                    kcp.snd_queue[k].data 
            = nil
                }
                
            if newSegsCount > 0 {
                    kcp.snd_queue 
            = kcp.remove_front(kcp.snd_queue, newSegsCount)
                }

                
            // calculate resent
                resent := uint32(kcp.fastresend)
                
            if kcp.fastresend <= 0 {
                    resent 
            = 0xffffffff
                }

                
            // check for retransmissions
                current := currentMs()
                var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
                
            for k := range kcp.snd_buf {
                    segment :
            = &kcp.snd_buf[k]
                    needsend :
            = false
                    
            if segment.xmit == 0 { // initial transmit
                        needsend = true
                        segment.rto 
            = kcp.rx_rto
                        segment.resendts 
            = current + segment.rto
                    } 
            else if _itimediff(current, segment.resendts) >= 0 { // RTO
                        needsend = true
                        
            if kcp.nodelay == 0 {
                            segment.rto 
            += kcp.rx_rto
                        } 
            else {
                            segment.rto 
            += kcp.rx_rto / 2
                        }
                        segment.resendts 
            = current + segment.rto
                        lost
            ++
                        lostSegs
            ++
                    } 
            else if segment.fastack >= resent { // fast retransmit
                        needsend = true
                        segment.fastack 
            = 0
                        segment.rto 
            = kcp.rx_rto
                        segment.resendts 
            = current + segment.rto
                        change
            ++
                        fastRetransSegs
            ++
                    } 
            else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
                        needsend = true
                        segment.fastack 
            = 0
                        segment.rto 
            = kcp.rx_rto
                        segment.resendts 
            = current + segment.rto
                        change
            ++
                        earlyRetransSegs
            ++
                    }

                    
            if needsend {
                        segment.xmit
            ++
                        segment.ts 
            = current
                        segment.wnd 
            = seg.wnd
                        segment.una 
            = seg.una

                        size :
            = len(buffer) - len(ptr)
                        need :
            = IKCP_OVERHEAD + len(segment.data)

                        
            if size+need > int(kcp.mtu) {
                            kcp.output(buffer, size)
                            current 
            = currentMs() // time update for a blocking call
                            ptr = buffer
                        }

                        ptr 
            = segment.encode(ptr)
                        copy(ptr, segment.data)
                        ptr 
            = ptr[len(segment.data):]

                        
            if segment.xmit >= kcp.dead_link {
                            kcp.state 
            = 0xFFFFFFFF
                        }
                    }
                }

                
            // flash remain segments
                size := len(buffer) - len(ptr)
                
            if size > 0 {
                    kcp.output(buffer, size)
                }

                
            // counter updates
                sum := lostSegs
                
            if lostSegs > 0 {
                    atomic.AddUint64(
            &DefaultSnmp.LostSegs, lostSegs)
                }
                
            if fastRetransSegs > 0 {
                    atomic.AddUint64(
            &DefaultSnmp.FastRetransSegs, fastRetransSegs)
                    sum 
            += fastRetransSegs
                }
                
            if earlyRetransSegs > 0 {
                    atomic.AddUint64(
            &DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
                    sum 
            += earlyRetransSegs
                }
                
            if sum > 0 {
                    atomic.AddUint64(
            &DefaultSnmp.RetransSegs, sum)
                }

                
            // update ssthresh
                
            // rate halving, https://tools.ietf.org/html/rfc6937
                if change > 0 {
                    inflight :
            = kcp.snd_nxt - kcp.snd_una
                    kcp.ssthresh 
            = inflight / 2
                    
            if kcp.ssthresh < IKCP_THRESH_MIN {
                        kcp.ssthresh 
            = IKCP_THRESH_MIN
                    }
                    kcp.cwnd 
            = kcp.ssthresh + resent
                    kcp.incr 
            = kcp.cwnd * kcp.mss
                }

                
            // congestion control, https://tools.ietf.org/html/rfc5681
                if lost > 0 {
                    kcp.ssthresh 
            = cwnd / 2
                    
            if kcp.ssthresh < IKCP_THRESH_MIN {
                        kcp.ssthresh 
            = IKCP_THRESH_MIN
                    }
                    kcp.cwnd 
            = 1
                    kcp.incr 
            = kcp.mss
                }

                
            if kcp.cwnd < 1 {
                    kcp.cwnd 
            = 1
                    kcp.incr 
            = kcp.mss
                }
            }

            flush函數(shù)非常的重要,kcp的重要參數(shù)都是在調(diào)節(jié)這個(gè)函數(shù)的行為,這個(gè)函數(shù)只有一個(gè)參數(shù)ackOnly,意思就是只發(fā)送ack,如果ackOnly為true的話,該函數(shù)只遍歷ack列表,然后發(fā)送,就完事了。 如果不是,也會(huì)發(fā)送真實(shí)數(shù)據(jù)。 在發(fā)送數(shù)據(jù)前先進(jìn)行windSize探測(cè),如果開(kāi)啟了擁塞控制nc=0,則每次發(fā)送前檢測(cè)服務(wù)端的winsize,如果服務(wù)端的winsize變小了,自身的winsize也要更著變小,來(lái)避免擁塞。如果沒(méi)有開(kāi)啟擁塞控制,就按設(shè)置的winsize進(jìn)行數(shù)據(jù)發(fā)送。
            接著循環(huán)每個(gè)段數(shù)據(jù),并判斷每個(gè)段數(shù)據(jù)的是否該重發(fā),還有什么時(shí)候重發(fā):
            1. 如果這個(gè)段數(shù)據(jù)首次發(fā)送,則直接發(fā)送數(shù)據(jù)。 2. 如果這個(gè)段數(shù)據(jù)的當(dāng)前時(shí)間大于它自身重發(fā)的時(shí)間,也就是RTO,則重傳消息。 3. 如果這個(gè)段數(shù)據(jù)的ack丟失累計(jì)超過(guò)resent次數(shù),則重傳,也就是快速重傳機(jī)制。這個(gè)resent參數(shù)由resend參數(shù)決定。 4. 如果這個(gè)段數(shù)據(jù)的ack有丟失且沒(méi)有新的數(shù)據(jù)段,則觸發(fā)ER,ER相關(guān)信息ER

            最后通過(guò)kcp.output發(fā)送消息hello,output是個(gè)回調(diào)函數(shù),函數(shù)的實(shí)體是sess.go的:

            func (s *UDPSession) output(buf []byte) {
                var ecc [][]
            byte

                
            // extend buf's header space
                ext := buf
                
            if s.headerSize > 0 {
                    ext 
            = s.ext[:s.headerSize+len(buf)]
                    copy(ext[s.headerSize:], buf)
                }

                
            // FEC stage
                if s.fecEncoder != nil {
                    ecc 
            = s.fecEncoder.Encode(ext)
                }

                
            // encryption stage
                if s.block != nil {
                    io.ReadFull(rand.Reader, ext[:nonceSize])
                    checksum :
            = crc32.ChecksumIEEE(ext[cryptHeaderSize:])
                    binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
                    s.block.Encrypt(ext, ext)

                    
            if ecc != nil {
                        
            for k := range ecc {
                            io.ReadFull(rand.Reader, ecc[k][:nonceSize])
                            checksum :
            = crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
                            binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
                            s.block.Encrypt(ecc[k], ecc[k])
                        }
                    }
                }

                
            // WriteTo kernel
                nbytes := 0
                npkts :
            = 0
                
            // if mrand.Intn(100) < 50 {
                for i := 0; i < s.dup+1; i++ {
                    
            if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
                        nbytes 
            += n
                        npkts
            ++
                    }
                }
                
            // }

                
            if ecc != nil {
                    
            for k := range ecc {
                        
            if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
                            nbytes 
            += n
                            npkts
            ++
                        }
                    }
                }
                atomic.AddUint64(
            &DefaultSnmp.OutPkts, uint64(npkts))
                atomic.AddUint64(
            &DefaultSnmp.OutBytes, uint64(nbytes))
            }

            output函數(shù)才是真正的將數(shù)據(jù)寫(xiě)入內(nèi)核中,在寫(xiě)入之前先進(jìn)行了fec編碼,fec編碼器的實(shí)現(xiàn)是用了一個(gè)開(kāi)源庫(kù)github.com/klauspost/reedsolomon,編碼以后的hello就不是和原來(lái)的hello一樣了,至少多了幾個(gè)字節(jié)。 fec編碼器有兩個(gè)重要的參數(shù)reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,這兩個(gè)參數(shù)決定了fec的冗余度,冗余度越大抗丟包性就越強(qiáng)。

            kcp的任務(wù)調(diào)度器

            其實(shí)這里任務(wù)調(diào)度器是一個(gè)很簡(jiǎn)單的實(shí)現(xiàn),用一個(gè)全局變量updater來(lái)管理session,代碼文件為updater.go。其中最主要的函數(shù)

            func (h *updateHeap) updateTask() {
                var timer 
            <-chan time.Time
                
            for {
                    select {
                    
            case <-timer:
                    
            case <-h.chWakeUp:
                    }

                    h.mu.Lock()
                    hlen :
            = h.Len()
                    now :
            = time.Now()
                    
            if hlen > 0 && now.After(h.entries[0].ts) {
                        
            for i := 0; i < hlen; i++ {
                            entry :
            = heap.Pop(h).(entry)
                            
            if now.After(entry.ts) {
                                entry.ts 
            = now.Add(entry.s.update())
                                heap.Push(h, entry)
                            } 
            else {
                                heap.Push(h, entry)
                                
            break
                            }
                        }
                    }
                    
            if hlen > 0 {
                        timer 
            = time.After(h.entries[0].ts.Sub(now))
                    }
                    h.mu.Unlock()
                }
            }

            任務(wù)調(diào)度器實(shí)現(xiàn)了一個(gè)堆結(jié)構(gòu),每當(dāng)有新的連接,session都會(huì)插入到這個(gè)堆里,接著for循環(huán)每隔interval時(shí)間,遍歷這個(gè)堆,得到entry然后執(zhí)行entry.s.update()。而entry.s.update()會(huì)執(zhí)行s.kcp.flush(false)來(lái)發(fā)送數(shù)據(jù)。

            總結(jié)

            這里簡(jiǎn)單介紹了kcp的整體流程,詳細(xì)介紹了發(fā)送數(shù)據(jù)的流程,但未介紹kcp接收數(shù)據(jù)的流程,其實(shí)在客戶端發(fā)送數(shù)據(jù)后,服務(wù)端是需要返回ack的,而客戶端也需要根據(jù)返回的ack來(lái)判斷數(shù)據(jù)段是否需要重傳還是在隊(duì)列里清除該數(shù)據(jù)段。處理返回來(lái)的ack是在函數(shù)kcp.Input()函數(shù)實(shí)現(xiàn)的。具體詳細(xì)流程下次再介紹。

            posted on 2017-12-09 15:20 思月行云 閱讀(1176) 評(píng)論(0)  編輯 收藏 引用 所屬分類: Golang
            久久国产精品偷99| 99久久这里只精品国产免费| 99精品国产在热久久无毒不卡| 狠色狠色狠狠色综合久久| 久久久久国色AV免费观看| yy6080久久| 99久久精品免费看国产| 少妇熟女久久综合网色欲| 国产AV影片久久久久久| 久久精品国产99国产精品亚洲 | 中文字幕亚洲综合久久2| 亚洲国产香蕉人人爽成AV片久久 | 亚洲成色999久久网站| 少妇人妻综合久久中文字幕 | 99久久香蕉国产线看观香| 久久亚洲精品中文字幕三区| 久久精品卫校国产小美女| 国产成人精品久久一区二区三区av | 久久精品国产亚洲一区二区三区| 国产精品久久新婚兰兰| 88久久精品无码一区二区毛片 | 久久不见久久见免费影院www日本| 日韩精品无码久久久久久| 久久青青草视频| 久久人人爽人人爽AV片| 久久久久黑人强伦姧人妻| 久久er热视频在这里精品| 日韩人妻无码精品久久免费一 | 精品久久久久久无码国产| 东京热TOKYO综合久久精品| 久久精品中文字幕无码绿巨人 | 久久天天躁狠狠躁夜夜avapp| 色婷婷狠狠久久综合五月| 久久播电影网| 三级三级久久三级久久| 99蜜桃臀久久久欧美精品网站| 性做久久久久久久久老女人| 久久精品中文字幕第23页| 青青草国产97免久久费观看| 日本精品久久久久久久久免费| 亚洲精品美女久久久久99小说 |