概念
ARQ:自動重傳請求(Automatic Repeat-reQuest,ARQ)是OSI模型中數據鏈路層的錯誤糾正協議之一.
RTO:Retransmission TimeOut
FEC:Forward Error Correction
kcp簡介
kcp是一個基于udp實現快速、可靠、向前糾錯的的協議,能以比TCP浪費10%-20%的帶寬的代價,換取平均延遲降低30%-40%,且最大延遲降低三倍的傳輸效果。純算法實現,并不負責底層協議(如UDP)的收發。查看官方文檔kcp
kcp-go是用go實現了kcp協議的一個庫,其實kcp類似tcp,協議的實現也很多參考tcp協議的實現,滑動窗口,快速重傳,選擇性重傳,慢啟動等。
kcp和tcp一樣,也分客戶端和監聽端。
+-+-+-+-+-+ +-+-+-+-+-+
| Client | | Server |
+-+-+-+-+-+ +-+-+-+-+-+
|------ kcp data ------>|
|<----- kcp data -------|
kcp協議
layer model
+----------------------+
| Session |
+----------------------+
| KCP(ARQ) |
+----------------------+
| FEC(OPTIONAL) |
+----------------------+
| CRYPTO(OPTIONAL)|
+----------------------+
| UDP(Packet) |
+----------------------+
KCP header
KCP Header Format
4 1 1 2 (Byte)
+---+---+---+---+---+---+---+---+
| conv |cmd|frg| wnd |
+---+---+---+---+---+---+---+---+
| ts | sn |
+---+---+---+---+---+---+---+---+
| una | len |
+---+---+---+---+---+---+---+---+
| |
+ DATA +
| |
+---+---+---+---+---+---+---+---+
代碼結構
src/vendor/github.com/xtaci/kcp-go/
├── LICENSE
├── README.md
├── crypt.go 加解密實現
├── crypt_test.go
├── donate.png
├── fec.go 向前糾錯實現
├── frame.png
├── kcp-go.png
├── kcp.go kcp協議實現
├── kcp_test.go
├── sess.go 會話管理實現
├── sess_test.go
├── snmp.go 數據統計實現
├── updater.go 任務調度實現
├── xor.go xor封裝
└── xor_test.go
著重研究兩個文件kcp.go
和sess.go
kcp淺析
kcp是基于udp實現的,所有udp的實現這里不做介紹,kcp做的事情就是怎么封裝udp的數據和怎么解析udp的數據,再加各種處理機制,為了重傳,擁塞控制,糾錯等。下面介紹kcp客戶端和服務端整體實現的流程,只是大概介紹一下函數流,不做詳細解析,詳細解析看后面數據流的解析。
kcp client整體函數流
和tcp一樣,kcp要連接服務端需要先撥號,但是和tcp有個很大的不同是,即使服務端沒有啟動,客戶端一樣可以撥號成功,因為實際上這里的撥號沒有發送任何信息,而tcp在這里需要三次握手。
DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
V
net.DialUDP("udp", nil, udpaddr)
V
NewConn()
V
newUDPSession() {初始化UDPSession}
V
NewKCP() {初始化kcp}
V
updater.addSession(sess) {管理session會話,任務管理,根據用戶設置的internal參數間隔來輪流喚醒任務}
V
go sess.readLoop()
V
go s.receiver(chPacket)
V
s.kcpInput(data)
V
s.fecDecoder.decodeBytes(data)
V
s.kcp.Input(data, true, s.ackNoDelay)
V
kcp.parse_data(seg) {將分段好的數據插入kcp.rcv_buf緩沖}
V
notifyReadEvent()
客戶端大體的流程如上面所示,先Dial
,建立udp連接,將這個連接封裝成一個會話,然后啟動一個go程,接收udp的消息。
kcp server整體函數流
ListenWithOptions()
V
net.ListenUDP()
V
ServerConn()
V
newFECDecoder()
V
go l.monitor() {從chPacket接收udp數據,寫入kcp}
V
go l.receiver(chPacket) {從upd接收數據,并入隊列}
V
newUDPSession()
V
updater.addSession(sess) {管理session會話,任務管理,根據用戶設置的internal參數間隔來輪流喚醒任務}
V
s.kcpInput(data)`
V
s.fecDecoder.decodeBytes(data)
V
s.kcp.Input(data, true, s.ackNoDelay)
V
kcp.parse_data(seg) {將分段好的數據插入kcp.rcv_buf緩沖}
V
notifyReadEvent()
服務端的大體流程如上圖所示,先Listen
,啟動udp監聽,接著用一個go程監控udp的數據包,負責將不同session的數據寫入不同的udp連接,然后解析封裝將數據交給上層。
kcp 數據流詳細解析
不管是kcp的客戶端還是服務端,他們都有io行為,就是讀與寫,我們只分析一個就好了,因為它們讀寫的實現是一樣的,這里分析客戶端的讀與寫。
kcp client 發送消息
s.Write(b []byte)
V
s.kcp.WaitSnd() {}
V
s.kcp.Send(b) {將數據根據mss分段,并存在kcp.snd_queue}
V
s.kcp.flush(false) [flush data to output] {
if writeDelay==true {
flush
}else{
每隔`interval`時間flush一次
}
}
V
kcp.output(buffer, size)
V
s.output(buf)
V
s.conn.WriteTo(ext, s.remote)
V
s.conn..Conn.WriteTo(buf)
讀寫都是在sess.go
文件中實現的,Write方法:
// Write implements net.Conn
func (s *UDPSession) Write(b []byte) (n int, err error) {
for {

// api flow control
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
n = len(b)
for {
if len(b) <= int(s.kcp.mss) {
s.kcp.Send(b)
break
} else {
s.kcp.Send(b[:s.kcp.mss])
b = b[s.kcp.mss:]
}
}
if !s.writeDelay {
s.kcp.flush(false)
}
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
return n, nil
}

// wait for write event or timeout
select {
case <-s.chWriteEvent:
case <-c:
case <-s.die:
}
if timeout != nil {
timeout.Stop()
}
}
} 假設發送一個hello消息,Write方法會先判斷發送窗口是否已滿,滿的話該函數阻塞,不滿則kcp.Send(“hello”),而Send函數實現根據mss的值對數據分段,當然這里的發送的hello,長度太短,只分了一個段,并把它們插入發送的隊列里。
func (kcp *KCP) Send(buffer []byte) int {

for i := 0; i < count; i++ {
var size int
if len(buffer) > int(kcp.mss) {
size = int(kcp.mss)
} else {
size = len(buffer)
}
seg := kcp.newSegment(size)
copy(seg.data, buffer[:size])
if kcp.stream == 0 { // message mode
seg.frg = uint8(count - i - 1)
} else { // stream mode
seg.frg = 0
}
kcp.snd_queue = append(kcp.snd_queue, seg)
buffer = buffer[size:]
}
return 0
} 接著判斷參數writeDelay
,如果參數設置為false,則立馬發送消息,否則需要任務調度后才會觸發發送,發送消息是由flush函數實現的。
// flush pending data
func (kcp *KCP) flush(ackOnly bool) {
var seg Segment
seg.conv = kcp.conv
seg.cmd = IKCP_CMD_ACK
seg.wnd = kcp.wnd_unused()
seg.una = kcp.rcv_nxt
buffer := kcp.buffer
// flush acknowledges
ptr := buffer
for i, ack := range kcp.acklist {
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
// filter jitters caused by bufferbloat
if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
seg.sn, seg.ts = ack.sn, ack.ts
ptr = seg.encode(ptr)
}
}
kcp.acklist = kcp.acklist[0:0]
if ackOnly { // flash remain ack segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}
return
}
// probe window size (if remote window size equals zero)
if kcp.rmt_wnd == 0 {
current := currentMs()
if kcp.probe_wait == 0 {
kcp.probe_wait = IKCP_PROBE_INIT
kcp.ts_probe = current + kcp.probe_wait
} else {
if _itimediff(current, kcp.ts_probe) >= 0 {
if kcp.probe_wait < IKCP_PROBE_INIT {
kcp.probe_wait = IKCP_PROBE_INIT
}
kcp.probe_wait += kcp.probe_wait / 2
if kcp.probe_wait > IKCP_PROBE_LIMIT {
kcp.probe_wait = IKCP_PROBE_LIMIT
}
kcp.ts_probe = current + kcp.probe_wait
kcp.probe |= IKCP_ASK_SEND
}
}
} else {
kcp.ts_probe = 0
kcp.probe_wait = 0
}
// flush window probing commands
if (kcp.probe & IKCP_ASK_SEND) != 0 {
seg.cmd = IKCP_CMD_WASK
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = seg.encode(ptr)
}
// flush window probing commands
if (kcp.probe & IKCP_ASK_TELL) != 0 {
seg.cmd = IKCP_CMD_WINS
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = seg.encode(ptr)
}
kcp.probe = 0
// calculate window size
cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
if kcp.nocwnd == 0 {
cwnd = _imin_(kcp.cwnd, cwnd)
}
// sliding window, controlled by snd_nxt && sna_una+cwnd
newSegsCount := 0
for k := range kcp.snd_queue {
if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
break
}
newseg := kcp.snd_queue[k]
newseg.conv = kcp.conv
newseg.cmd = IKCP_CMD_PUSH
newseg.sn = kcp.snd_nxt
kcp.snd_buf = append(kcp.snd_buf, newseg)
kcp.snd_nxt++
newSegsCount++
kcp.snd_queue[k].data = nil
}
if newSegsCount > 0 {
kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
}
// calculate resent
resent := uint32(kcp.fastresend)
if kcp.fastresend <= 0 {
resent = 0xffffffff
}
// check for retransmissions
current := currentMs()
var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
for k := range kcp.snd_buf {
segment := &kcp.snd_buf[k]
needsend := false
if segment.xmit == 0 { // initial transmit
needsend = true
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
} else if _itimediff(current, segment.resendts) >= 0 { // RTO
needsend = true
if kcp.nodelay == 0 {
segment.rto += kcp.rx_rto
} else {
segment.rto += kcp.rx_rto / 2
}
segment.resendts = current + segment.rto
lost++
lostSegs++
} else if segment.fastack >= resent { // fast retransmit
needsend = true
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
change++
fastRetransSegs++
} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
needsend = true
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
change++
earlyRetransSegs++
}
if needsend {
segment.xmit++
segment.ts = current
segment.wnd = seg.wnd
segment.una = seg.una
size := len(buffer) - len(ptr)
need := IKCP_OVERHEAD + len(segment.data)
if size+need > int(kcp.mtu) {
kcp.output(buffer, size)
current = currentMs() // time update for a blocking call
ptr = buffer
}
ptr = segment.encode(ptr)
copy(ptr, segment.data)
ptr = ptr[len(segment.data):]
if segment.xmit >= kcp.dead_link {
kcp.state = 0xFFFFFFFF
}
}
}
// flash remain segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}
// counter updates
sum := lostSegs
if lostSegs > 0 {
atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
}
if fastRetransSegs > 0 {
atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
sum += fastRetransSegs
}
if earlyRetransSegs > 0 {
atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
sum += earlyRetransSegs
}
if sum > 0 {
atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
}
// update ssthresh
// rate halving, https://tools.ietf.org/html/rfc6937
if change > 0 {
inflight := kcp.snd_nxt - kcp.snd_una
kcp.ssthresh = inflight / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
}
kcp.cwnd = kcp.ssthresh + resent
kcp.incr = kcp.cwnd * kcp.mss
}
// congestion control, https://tools.ietf.org/html/rfc5681
if lost > 0 {
kcp.ssthresh = cwnd / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
}
kcp.cwnd = 1
kcp.incr = kcp.mss
}
if kcp.cwnd < 1 {
kcp.cwnd = 1
kcp.incr = kcp.mss
}
}
flush函數非常的重要,kcp的重要參數都是在調節這個函數的行為,這個函數只有一個參數ackOnly
,意思就是只發送ack,如果ackOnly
為true的話,該函數只遍歷ack列表,然后發送,就完事了。 如果不是,也會發送真實數據。 在發送數據前先進行windSize探測,如果開啟了擁塞控制nc=0
,則每次發送前檢測服務端的winsize,如果服務端的winsize變小了,自身的winsize也要更著變小,來避免擁塞。如果沒有開啟擁塞控制,就按設置的winsize進行數據發送。
接著循環每個段數據,并判斷每個段數據的是否該重發,還有什么時候重發:
1. 如果這個段數據首次發送,則直接發送數據。 2. 如果這個段數據的當前時間大于它自身重發的時間,也就是RTO,則重傳消息。 3. 如果這個段數據的ack丟失累計超過resent次數,則重傳,也就是快速重傳機制。這個resent參數由resend
參數決定。 4. 如果這個段數據的ack有丟失且沒有新的數據段,則觸發ER,ER相關信息ER
最后通過kcp.output發送消息hello,output是個回調函數,函數的實體是sess.go
的:
func (s *UDPSession) output(buf []byte) {
var ecc [][]byte
// extend buf's header space
ext := buf
if s.headerSize > 0 {
ext = s.ext[:s.headerSize+len(buf)]
copy(ext[s.headerSize:], buf)
}
// FEC stage
if s.fecEncoder != nil {
ecc = s.fecEncoder.Encode(ext)
}
// encryption stage
if s.block != nil {
io.ReadFull(rand.Reader, ext[:nonceSize])
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
s.block.Encrypt(ext, ext)
if ecc != nil {
for k := range ecc {
io.ReadFull(rand.Reader, ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}
}
// WriteTo kernel
nbytes := 0
npkts := 0
// if mrand.Intn(100) < 50 {
for i := 0; i < s.dup+1; i++ {
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
nbytes += n
npkts++
}
}
// }
if ecc != nil {
for k := range ecc {
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
nbytes += n
npkts++
}
}
}
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
}
output函數才是真正的將數據寫入內核中,在寫入之前先進行了fec編碼,fec編碼器的實現是用了一個開源庫github.com/klauspost/reedsolomon,編碼以后的hello就不是和原來的hello一樣了,至少多了幾個字節。 fec編碼器有兩個重要的參數reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShards
和parityShards
,這兩個參數決定了fec的冗余度,冗余度越大抗丟包性就越強。
kcp的任務調度器
其實這里任務調度器是一個很簡單的實現,用一個全局變量updater
來管理session,代碼文件為updater.go
。其中最主要的函數
func (h *updateHeap) updateTask() {
var timer <-chan time.Time
for {
select {
case <-timer:
case <-h.chWakeUp:
}
h.mu.Lock()
hlen := h.Len()
now := time.Now()
if hlen > 0 && now.After(h.entries[0].ts) {
for i := 0; i < hlen; i++ {
entry := heap.Pop(h).(entry)
if now.After(entry.ts) {
entry.ts = now.Add(entry.s.update())
heap.Push(h, entry)
} else {
heap.Push(h, entry)
break
}
}
}
if hlen > 0 {
timer = time.After(h.entries[0].ts.Sub(now))
}
h.mu.Unlock()
}
}
任務調度器實現了一個堆結構,每當有新的連接,session都會插入到這個堆里,接著for循環每隔interval時間,遍歷這個堆,得到entry
然后執行entry.s.update()
。而entry.s.update()
會執行s.kcp.flush(false)
來發送數據。
總結
這里簡單介紹了kcp的整體流程,詳細介紹了發送數據的流程,但未介紹kcp接收數據的流程,其實在客戶端發送數據后,服務端是需要返回ack的,而客戶端也需要根據返回的ack來判斷數據段是否需要重傳還是在隊列里清除該數據段。處理返回來的ack是在函數kcp.Input()函數實現的。具體詳細流程下次再介紹。