http://skoo.me/go/2014/04/21/go-net-core
Go語言的出現(xiàn),讓我見到了一門語言把網(wǎng)絡(luò)編程這件事情給做“正確”了,當(dāng)然,除了Go語言以外,還有很多語言也把這件事情做”正確”了。我一直堅持著這樣的理念——要做"正確"的事情,而不是"高性能"的事情;很多時候,我們在做系統(tǒng)設(shè)計、技術(shù)選型的時候,都被“高性能”這三個字給綁架了,當(dāng)然不是說性能不重要,你懂的。
目前很多高性能的基礎(chǔ)網(wǎng)絡(luò)服務(wù)器都是采用的C語言開發(fā)的,比如:Nginx、Redis、memcached等,它們都是基于”事件驅(qū)動 + 事件回掉函數(shù)”的方式實現(xiàn),也就是采用epoll等作為網(wǎng)絡(luò)收發(fā)數(shù)據(jù)包的核心驅(qū)動。不少人(包括我自己)都認為“事件驅(qū)動 + 事件回掉函數(shù)”的編程方法是“反人類”的;因為大多數(shù)人都更習(xí)慣線性的處理一件事情,做完第一件事情再做第二件事情,并不習(xí)慣在N件事情之間頻繁的切換干活。為了解決程序員在開發(fā)服務(wù)器時需要自己的大腦不斷的“上下文切換”的問題,Go語言引入了一種用戶態(tài)線程goroutine來取代編寫異步的事件回掉函數(shù),從而重新回歸到多線程并發(fā)模型的線性、同步的編程方式上。
用Go語言寫一個最簡單的echo服務(wù)器:
package main
import (
"log"
"net"
)
func main() {
ln, err := net.Listen("tcp", ":8080")
if err != nil {
log.Println(err)
return
}
for {
conn, err := ln.Accept()
if err != nil {
log.Println(err)
continue
}
go echoFunc(conn)
}
}
func echoFunc(c net.Conn) {
buf := make([]byte, 1024)
for {
n, err := c.Read(buf)
if err != nil {
log.Println(err)
return
}
c.Write(buf[:n])
}
}
main函數(shù)的過程就是首先創(chuàng)建一個監(jiān)聽套接字,然后用一個for循環(huán)不斷的從監(jiān)聽套接字上Accept新的連接,最后調(diào)用echoFunc函數(shù)在建立的連接上干活。關(guān)鍵代碼是:
go echoFunc(conn)
每收到一個新的連接,就創(chuàng)建一個“線程”去服務(wù)這個連接,因此所有的業(yè)務(wù)邏輯都可以同步、順序的編寫到echoFunc函數(shù)中,再也不用去關(guān)心網(wǎng)絡(luò)IO是否會阻塞的問題。不管業(yè)務(wù)多復(fù)雜,Go語言的并發(fā)服務(wù)器的編程模型都是長這個樣子。可以肯定的是,在linux上Go語言寫的網(wǎng)絡(luò)服務(wù)器也是采用的epoll作為最底層的數(shù)據(jù)收發(fā)驅(qū)動,Go語言網(wǎng)絡(luò)的底層實現(xiàn)中同樣存在“上下文切換”的工作,只是這個切換工作由runtime的調(diào)度器來做了,減少了程序員的負擔(dān)。
弄明白網(wǎng)絡(luò)庫的底層實現(xiàn),貌似只要弄清楚echo服務(wù)器中的Listen、Accept、Read、Write四個函數(shù)的底層實現(xiàn)關(guān)系就可以了。本文將采用自底向上的方式來介紹,也就是從最底層到上層的方式,這也是我閱讀源碼的方式。底層實現(xiàn)涉及到的核心源碼文件主要有:
net/fd_unix.go
net/fd_poll_runtime.go
runtime/netpoll.goc
runtime/netpoll_epoll.c
runtime/proc.c (調(diào)度器)
netpoll_epoll.c文件是Linux平臺使用epoll作為網(wǎng)絡(luò)IO多路復(fù)用的實現(xiàn)代碼,這份代碼可以了解到epoll相關(guān)的操作(比如:添加fd到epoll、從epoll刪除fd等),只有4個函數(shù),分別是runtime·netpollinit、runtime·netpollopen、runtime·netpollclose和runtime·netpoll。init函數(shù)就是創(chuàng)建epoll對象,open函數(shù)就是添加一個fd到epoll中,close函數(shù)就是從epoll刪除一個fd,netpoll函數(shù)就是從epoll wait得到所有發(fā)生事件的fd,并將每個fd對應(yīng)的goroutine(用戶態(tài)線程)通過鏈表返回。用epoll寫過程序的人應(yīng)該都能理解這份代碼,沒什么特別之處。
void
runtime·netpollinit(void)
{
epfd = runtime·epollcreate1(EPOLL_CLOEXEC);
if(epfd >= 0)
return;
epfd = runtime·epollcreate(1024);
if(epfd >= 0) {
runtime·closeonexec(epfd);
return;
}
runtime·printf("netpollinit: failed to create descriptor (%d)\n", -epfd);
runtime·throw("netpollinit: failed to create descriptor");
}
runtime·netpollinit函數(shù)首先使用runtime·epollcreate1創(chuàng)建epoll實例,如果沒有創(chuàng)建成功,就換用runtime·epollcreate再創(chuàng)建一次。這兩個create函數(shù)分別等價于glibc的epoll_create1和epoll_create函數(shù)。只是因為Go語言并沒有直接使用glibc,而是自己封裝的系統(tǒng)調(diào)用,但功能是等價于glibc的。可以通過man手冊查看這兩個create的詳細信息。
int32
runtime·netpollopen(uintptr fd, PollDesc *pd)
{
EpollEvent ev;
int32 res;
ev.events = EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET;
ev.data = (uint64)pd;
res = runtime·epollctl(epfd, EPOLL_CTL_ADD, (int32)fd, &ev);
return -res;
}
添加fd到epoll中的runtime·netpollopen函數(shù)可以看到每個fd一開始都關(guān)注了讀寫事件,并且采用的是邊緣觸發(fā),除此之外還關(guān)注了一個不常見的新事件EPOLLRDHUP,這個事件是在較新的內(nèi)核版本添加的,目的是解決對端socket關(guān)閉,epoll本身并不能直接感知到這個關(guān)閉動作的問題。注意任何一個fd在添加到epoll中的時候就關(guān)注了EPOLLOUT事件的話,就立馬產(chǎn)生一次寫事件,這次事件可能是多余浪費的。
epoll操作的相關(guān)函數(shù)都會在事件驅(qū)動的抽象層中去調(diào)用,為什么需要這個抽象層呢?原因很簡單,因為Go語言需要跑在不同的平臺上,有Linux、Unix、Mac OS X和Windows等,所以需要靠事件驅(qū)動的抽象層來為網(wǎng)絡(luò)庫提供一致的接口,從而屏蔽事件驅(qū)動的具體平臺依賴實現(xiàn)。runtime/netpoll.goc源文件就是整個事件驅(qū)動抽象層的實現(xiàn),抽象層的核心數(shù)據(jù)結(jié)構(gòu)是:
struct PollDesc
{
PollDesc* link; // in pollcache, protected by pollcache.Lock
Lock; // protectes the following fields
uintptr fd;
bool closing;
uintptr seq; // protects from stale timers and ready notifications
G* rg; // G waiting for read or READY (binary semaphore)
Timer rt; // read deadline timer (set if rt.fv != nil)
int64 rd; // read deadline
G* wg; // the same for writes
Timer wt;
int64 wd;
};
每個添加到epoll中的fd都對應(yīng)了一個PollDesc結(jié)構(gòu)實例,PollDesc維護了讀寫此fd的goroutine這一非常重要的信息。可以大膽的推測一下,網(wǎng)絡(luò)IO讀寫操作的實現(xiàn)應(yīng)該是:當(dāng)在一個fd上讀寫遇到EAGAIN錯誤的時候,就將當(dāng)前goroutine存儲到這個fd對應(yīng)的PollDesc中,同時將goroutine給park住,直到這個fd上再此發(fā)生了讀寫事件后,再將此goroutine給ready激活重新運行。事實上的實現(xiàn)大概也是這個樣子的。
事件驅(qū)動抽象層主要干的事情就是將具體的事件驅(qū)動實現(xiàn)(比如: epoll)通過統(tǒng)一的接口封裝成Go接口供net庫使用,主要的接口也是:創(chuàng)建事件驅(qū)動實例、添加fd、刪除fd、等待事件以及設(shè)置DeadLine。runtime_pollServerInit負責(zé)創(chuàng)建事件驅(qū)動實例,runtime_pollOpen將分配一個PollDesc實例和fd綁定起來,然后將fd添加到epoll中,runtime_pollClose就是將fd從epoll中刪除,同時將刪除的fd綁定的PollDesc實例刪除,runtime_pollWait接口是至關(guān)重要的,這個接口一般是在非阻塞讀寫發(fā)生EAGAIN錯誤的時候調(diào)用,作用就是park當(dāng)前讀寫的goroutine。
runtime中的epoll事件驅(qū)動抽象層其實在進入net庫后,又被封裝了一次,這一次封裝從代碼上看主要是為了方便在純Go語言環(huán)境進行操作,net庫中的這次封裝實現(xiàn)在net/fd_poll_runtime.go文件中,主要是通過pollDesc對象來實現(xiàn)的:
type pollDesc struct {
runtimeCtx uintptr
}
注意:此處的pollDesc對象不是上文提到的runtime中的PollDesc,相反此處pollDesc對象的runtimeCtx成員才是指向的runtime的PollDesc實例。pollDesc對象主要就是將runtime的事件驅(qū)動抽象層給再封裝了一次,供網(wǎng)絡(luò)fd對象使用。
var serverInit sync.Once
func (pd *pollDesc) Init(fd *netFD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
if errno != 0 {
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
pollDesc對象最需要關(guān)注的就是其Init方法,這個方法通過一個sync.Once變量來調(diào)用了runtime_pollServerInit函數(shù),也就是創(chuàng)建epoll實例的函數(shù)。意思就是runtime_pollServerInit函數(shù)在整個進程生命周期內(nèi)只會被調(diào)用一次,也就是只會創(chuàng)建一次epoll實例。epoll實例被創(chuàng)建后,會調(diào)用runtime_pollOpen函數(shù)將fd添加到epoll中。
網(wǎng)絡(luò)編程中的所有socket fd都是通過netFD對象實現(xiàn)的,netFD是對網(wǎng)絡(luò)IO操作的抽象,linux的實現(xiàn)在文件net/fd_unix.go中。netFD對象實現(xiàn)有自己的init方法,還有完成基本IO操作的Read和Write方法,當(dāng)然除了這三個方法以外,還有很多非常有用的方法供用戶使用。
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd + serialize access to Read and Write methods
fdmu fdMutex
// immutable until Close
sysfd int
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
// wait server
pd pollDesc
}
通過netFD對象的定義可以看到每個fd都關(guān)聯(lián)了一個pollDesc實例,通過上文我們知道pollDesc對象最終是對epoll的封裝。
func (fd *netFD) init() error {
if err := fd.pd.Init(fd); err != nil {
return err
}
return nil
}
netFD對象的init函數(shù)僅僅是調(diào)用了pollDesc實例的Init函數(shù),作用就是將fd添加到epoll中,如果這個fd是第一個網(wǎng)絡(luò)socket fd的話,這一次init還會擔(dān)任創(chuàng)建epoll實例的任務(wù)。要知道在Go進程里,只會有一個epoll實例來管理所有的網(wǎng)絡(luò)socket fd,這個epoll實例也就是在第一個網(wǎng)絡(luò)socket fd被創(chuàng)建的時候所創(chuàng)建。
for {
n, err = syscall.Read(int(fd.sysfd), p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.WaitRead(); err == nil {
continue
}
}
}
err = chkReadErr(n, err, fd)
break
}
上面代碼段是從netFD的Read方法中摘取,重點關(guān)注這個for循環(huán)中的syscall.Read調(diào)用的錯誤處理。當(dāng)有錯誤發(fā)生的時候,會檢查這個錯誤是否是syscall.EAGAIN,如果是,則調(diào)用WaitRead將當(dāng)前讀這個fd的goroutine給park住,直到這個fd上的讀事件再次發(fā)生為止。當(dāng)這個socket上有新數(shù)據(jù)到來的時候,WaitRead調(diào)用返回,繼續(xù)for循環(huán)的執(zhí)行。這樣的實現(xiàn),就讓調(diào)用netFD的Read的地方變成了同步“阻塞”方式編程,不再是異步非阻塞的編程方式了。netFD的Write方法和Read的實現(xiàn)原理是一樣的,都是在碰到EAGAIN錯誤的時候?qū)?dāng)前goroutine給park住直到socket再次可寫為止。
本文只是將網(wǎng)絡(luò)庫的底層實現(xiàn)給大體上引導(dǎo)了一遍,知道底層代碼大概實現(xiàn)在什么地方,方便結(jié)合源碼深入理解。Go語言中的高并發(fā)、同步阻塞方式編程的關(guān)鍵其實是”goroutine和調(diào)度器”,針對網(wǎng)絡(luò)IO的時候,我們需要知道EAGAIN這個非常關(guān)鍵的調(diào)度點,掌握了這個調(diào)度點,即使沒有調(diào)度器,自己也可以在epoll的基礎(chǔ)上配合協(xié)程等用戶態(tài)線程實現(xiàn)網(wǎng)絡(luò)IO操作的調(diào)度,達到同步阻塞編程的目的。
最后,為什么需要同步阻塞的方式編程?只有看多、寫多了異步非阻塞代碼的時候才能夠深切體會到這個問題。真正的高大上絕對不是——“別人不會,我會;別人寫不出來,我寫得出來。”
http://ju.outofmemory.cn/entry/168649
本文分析了Golang的socket文件描述符和goroutine阻塞調(diào)度的原理。代碼中大部分是Go代碼,小部分是匯編代碼。完整理解本文需要Go語言知識,并且用Golang寫過網(wǎng)絡(luò)程序。更重要的是,需要提前理解goroutine的調(diào)度原理。
1. TCP的連接對象:
連接對象:
在net.go中有一個名為Conn的接口,提供了對于連接的讀寫和其他操作:
type Conn interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() Addr
RemoteAddr() Addr
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
這個接口就是對下面的結(jié)構(gòu)體conn的抽象。conn結(jié)構(gòu)體包含了對連接的讀寫和其他操作:
type conn struct {
fd *netFD
}
從連接讀取數(shù)據(jù):
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
return c.fd.Read(b)
}
向連接寫入數(shù)據(jù):
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
return c.fd.Write(b)
}
關(guān)閉連接:
// Close closes the connection.
func (c *conn) Close() error {
if !c.ok() {
return syscall.EINVAL
}
return c.fd.Close()
}
設(shè)置讀寫超時:
// SetDeadline implements the Conn SetDeadline method.
func (c *conn) SetDeadline(t time.Time) error {
if !c.ok() {
return syscall.EINVAL
}
return c.fd.setDeadline(t)
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func (c *conn) SetReadDeadline(t time.Time) error {
if !c.ok() {
return syscall.EINVAL
}
return c.fd.setReadDeadline(t)
}
// SetWriteDeadline implements the Conn SetWriteDeadline method.
func (c *conn) SetWriteDeadline(t time.Time) error {
if !c.ok() {
return syscall.EINVAL
}
return c.fd.setWriteDeadline(t)
}
可以看到,對連接的所有操作,都體現(xiàn)在對*netFD的操作上。我們繼續(xù)跟蹤c.fd.Read()函數(shù).
2.文件描述符
net/fd_unix.go:
網(wǎng)絡(luò)連接的文件描述符:
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd + serialize access to Read and Write methods
fdmu fdMutex
// immutable until Close
sysfd int
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
// wait server
pd pollDesc
}
文件描述符讀取數(shù)據(jù):
func (fd *netFD) Read(p []byte) (n int, err error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if err := fd.pd.PrepareRead(); err != nil {
return 0, &OpError{"read", fd.net, fd.raddr, err}
}
// 調(diào)用system call,循環(huán)從fd.sysfd讀取數(shù)據(jù)
for {
// 系統(tǒng)調(diào)用Read讀取數(shù)據(jù)
n, err = syscall.Read(int(fd.sysfd), p)
// 如果發(fā)生錯誤,則需要處理
// 并且只處理EAGAIN類型的錯誤,其他錯誤一律返回給調(diào)用者
if err != nil {
n = 0
// 對于非阻塞的網(wǎng)絡(luò)連接的文件描述符,如果錯誤是EAGAIN
// 說明Socket的緩沖區(qū)為空,未讀取到任何數(shù)據(jù)
// 則調(diào)用fd.pd.WaitRead,
if err == syscall.EAGAIN {
if err = fd.pd.WaitRead(); err == nil {
continue
}
}
}
err = chkReadErr(n, err, fd)
break
}
if err != nil && err != io.EOF {
err = &OpError{"read", fd.net, fd.raddr, err}
}
return
}
網(wǎng)絡(luò)輪詢器
網(wǎng)絡(luò)輪詢器是Golang中針對每個socket文件描述符建立的輪詢機制。 此處的輪詢并不是一般意義上的輪詢,而是Golang的runtime在調(diào)度goroutine或者GC完成之后或者指定時間之內(nèi),調(diào)用epoll_wait獲取所有產(chǎn)生IO事件的socket文件描述符。當(dāng)然在runtime輪詢之前,需要將socket文件描述符和當(dāng)前goroutine的相關(guān)信息加入epoll維護的數(shù)據(jù)結(jié)構(gòu)中,并掛起當(dāng)前goroutine,當(dāng)IO就緒后,通過epoll返回的文件描述符和其中附帶的goroutine的信息,重新恢復(fù)當(dāng)前goroutine的執(zhí)行。
// Integrated network poller (platform-independent part).
// 網(wǎng)絡(luò)輪詢器(平臺獨立部分)
// A particular implementation (epoll/kqueue) must define the following functions:
// 實際的實現(xiàn)(epoll/kqueue)必須定義以下函數(shù):
// func netpollinit() // to initialize the poller,初始化輪詢器
// func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications, 為fd和pd啟動邊緣觸發(fā)通知
// and associate fd with pd.
// 一個實現(xiàn)必須調(diào)用下面的函數(shù),用來指示pd已經(jīng)準(zhǔn)備好
// An implementation must call the following function to denote that the pd is ready.
// func netpollready(gpp **g, pd *pollDesc, mode int32)
// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
// goroutines respectively. The semaphore can be in the following states:
// pollDesc包含了2個二進制的信號,分別負責(zé)讀寫goroutine的暫停.
// 信號可能處于下面的狀態(tài):
// pdReady - IO就緒通知被掛起;
// 一個goroutine將次狀態(tài)置為nil來消費一個通知。
// pdReady - io readiness notification is pending;
// a goroutine consumes the notification by changing the state to nil.
// pdWait - 一個goroutine準(zhǔn)備暫停在信號上,但是還沒有完成暫停。
// 這個goroutine通過把這個狀態(tài)改變?yōu)镚指針去提交這個暫停動作。
// 或者,替代性的,并行的其他通知將狀態(tài)改變?yōu)镽EADY.
// 或者,替代性的,并行的超時/關(guān)閉會將次狀態(tài)變?yōu)閚il
// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
// the goroutine commits to park by changing the state to G pointer,
// or, alternatively, concurrent io notification changes the state to READY,
// or, alternatively, concurrent timeout/close changes the state to nil.
// G指針 - 阻塞在信號上的goroutine
// IO通知或者超時/關(guān)閉會分別將此狀態(tài)置為READY或者nil.
// G pointer - the goroutine is blocked on the semaphore;
// io notification or timeout/close changes the state to READY or nil respectively
// and unparks the goroutine.
// nil - nothing of the above.
const (
pdReady uintptr = 1
pdWait uintptr = 2
)
網(wǎng)絡(luò)輪詢器的數(shù)據(jù)結(jié)構(gòu)如下:
// Network poller descriptor.
// 網(wǎng)絡(luò)輪詢器描述符
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
//
// lock鎖對象保護了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操作。
// 而這些操作又完全包含了對seq, rt, tw變量。
// fd在PollDesc整個生命過程中都是一個常量。
// 處理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就緒通知)不需要用到鎖。
// 所以closing, rg, rd, wg和wd的所有操作都是一個無鎖的操作。
lock mutex // protectes the following fields
fd uintptr
closing bool
seq uintptr // protects from stale timers and ready notifications
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
user unsafe.Pointer // user settable cookie
}
將當(dāng)前goroutine設(shè)置為阻塞在fd上:
pd.WaitRead():
func (pd *pollDesc) WaitRead() error {
return pd.Wait('r')
}
func (pd *pollDesc) Wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res)
}
res是runtime_pollWait函數(shù)返回的結(jié)果,由conevertErr函數(shù)包裝后返回:
func convertErr(res int) error {
switch res {
case 0:
return nil
case 1:
return errClosing
case 2:
return errTimeout
}
println("unreachable: ", res)
panic("unreachable")
}
函數(shù)返回0,表示IO已經(jīng)準(zhǔn)備好,返回nil。
返回1,說明連接已關(guān)閉,應(yīng)該放回errClosing。
返回2,說明對IO進行的操作發(fā)生超時,應(yīng)該返回errTimeout。
runtime_pollWait會調(diào)用runtime/thunk.s中的函數(shù):
TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0
JMP runtime·netpollWait(SB)
這是一個包裝函數(shù),沒有參數(shù),直接跳轉(zhuǎn)到runtime/netpoll.go中的函數(shù)netpollWait:
func netpollWait(pd *pollDesc, mode int) int {
// 檢查pd的狀態(tài)是否異常
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
onM(func() {
netpollarm(pd, mode)
})
}
// 循環(huán)中檢查pd的狀態(tài)是不是已經(jīng)被設(shè)置為pdReady
// 即檢查IO是不是已經(jīng)就緒
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}
netpollcheckerr函數(shù)檢查pd是否出現(xiàn)異常:
// 檢查pd的異常
func netpollcheckerr(pd *pollDesc, mode int32) int {
// 是否已經(jīng)關(guān)閉
if pd.closing {
return 1 // errClosing
}
// 當(dāng)讀寫狀態(tài)下,deadline小于0,表示pd已經(jīng)過了超時時間
if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
return 2 // errTimeout
}
// 正常情況返回0
return 0
}
netpollblock():
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// 這個函數(shù)被netpollWait循環(huán)調(diào)用
// 返回true說明IO已經(jīng)準(zhǔn)備好,返回false說明IO操作已經(jīng)超時或者已經(jīng)關(guān)閉
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// 獲取pd的rg
gpp := &pd.rg
// 如果模式是w,則獲取pd的wg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 在循環(huán)中設(shè)置pd的gpp為pdWait
// 因為casuintptr是自旋鎖,所以需要在循環(huán)中調(diào)用
for {
// 如果在循環(huán)中發(fā)現(xiàn)IO已經(jīng)準(zhǔn)備好(pg的rg或者wg為pdReady狀態(tài))
// 則設(shè)置rg/wg為0,返回true
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
// 每次netpollblock執(zhí)行完畢之后,gpp重置為0
// 非0表示重復(fù)wait
if old != 0 {
gothrow("netpollblock: double wait")
}
// CAS操作改變gpp為pdWait
if casuintptr(gpp, 0, pdWait) {
break
}
}
// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
//
// 當(dāng)設(shè)置gpp為pdWait狀態(tài)后,重新檢查gpp的狀態(tài)
// 這是必要的,因為runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl會做相反的操作
// 如果狀態(tài)正常則掛起當(dāng)前的goroutine
//
// 當(dāng)netpollcheckerr檢查io出現(xiàn)超時或者錯誤,waitio為true可用于等待ioReady
// 否則當(dāng)waitio為false, 且io不出現(xiàn)錯誤或者超時才會掛起當(dāng)前goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 解鎖函數(shù),設(shè)置gpp為pdWait,如果設(shè)置不成功
// 說明已經(jīng)是發(fā)生其他事件,可以讓g繼續(xù)運行,而不是掛起當(dāng)前g
f := netpollblockcommit
// 嘗試掛起當(dāng)前g
gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
}
// be careful to not lose concurrent READY notification
old := xchguintptr(gpp, 0)
if old > pdWait {
gothrow("netpollblock: corrupted state")
}
return old == pdReady
}
runtime/proc.go: gopark():
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// 將當(dāng)前goroutine置為waiting狀態(tài),然后調(diào)用unlockf
func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) {
// 獲取當(dāng)前M
mp := acquirem()
// 獲取當(dāng)前G
gp := mp.curg
// 獲取G的狀態(tài)
status := readgstatus(gp)
// 如果不是_Grunning或者_Gscanrunning,則報錯
if status != _Grunning && status != _Gscanrunning {
gothrow("gopark: bad g status")
}
// 設(shè)置lock和unlockf
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
releasem(mp)
// can't do anything that might move the G between Ms here.
// 在m->g0這個棧上調(diào)用park_m,而不是當(dāng)前g的棧
mcall(park_m)
}
mcall函數(shù)是一段匯編,在m->g0的棧上調(diào)用park_m,而不是在當(dāng)前goroutine的棧上。mcall的功能分兩部分,第一部分保存當(dāng)前G的PC/SP到G的gobuf的pc/sp字段,第二部分調(diào)用park_m函數(shù):
// func mcall(fn func(*g))
// Switch to m->g0's stack, call fn(g).
// Fn must never return. It should gogo(&g->sched)
// to keep running g.
TEXT runtime·mcall(SB), NOSPLIT, $0-8
// 將需要執(zhí)行的函數(shù)保存在DI
MOVQ fn+0(FP), DI
// 將M的TLS存放在CX
get_tls(CX)
// 將G對象存放在AX
MOVQ g(CX), AX // save state in g->sched
// 將調(diào)用者的PC存放在BX
MOVQ 0(SP), BX // caller's PC
// 將調(diào)用者的PC保存到g->sched.pc
MOVQ BX, (g_sched+gobuf_pc)(AX)
// 第一個參數(shù)的地址,即棧頂?shù)牡刂罚4娴紹X
LEAQ fn+0(FP), BX // caller's SP
// 保存SP的地址到g->sched.sp
MOVQ BX, (g_sched+gobuf_sp)(AX)
// 將g對象保存到g->sched->g
MOVQ AX, (g_sched+gobuf_g)(AX)
// switch to m->g0 & its stack, call fn
// 將g對象指針保存到BX
MOVQ g(CX), BX
// 將g->m保存到BX
MOVQ g_m(BX), BX
// 將m->g0保存到SI
MOVQ m_g0(BX), SI
CMPQ SI, AX // if g == m->g0 call badmcall
JNE 3(PC)
MOVQ $runtime·badmcall(SB), AX
JMP AX
// 將m->g0保存到g
MOVQ SI, g(CX) // g = m->g0
// 將g->sched.sp恢復(fù)到SP寄存器
// 即使用g0的棧
MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp
// AX進棧
PUSHQ AX
MOVQ DI, DX
// 將fn的地址復(fù)制到DI
MOVQ 0(DI), DI
// 調(diào)用函數(shù)
CALL DI
// AX出棧
POPQ AX
MOVQ $runtime·badmcall2(SB), AX
JMP AX
RET
park_m函數(shù)的功能分為三部分,第一部分讓當(dāng)前G和當(dāng)前M脫離關(guān)系,第二部分是調(diào)用解鎖函數(shù),這里是調(diào)用netpoll.go源文件中的netpollblockcommit函數(shù):
// runtime·park continuation on g0.
void
runtime·park_m(G *gp)
{
bool ok;
// 設(shè)置當(dāng)前g為Gwaiting狀態(tài)
runtime·casgstatus(gp, Grunning, Gwaiting);
// 讓當(dāng)前g和m脫離關(guān)系
dropg();
if(g->m->waitunlockf) {
ok = g->m->waitunlockf(gp, g->m->waitlock);
g->m->waitunlockf = nil;
g->m->waitlock = nil;
// 返回0為false,非0為true
// 0說明g->m->waitlock發(fā)生了變化,即不是在gopark是設(shè)置的(pdWait)
// 說明了脫離了WAIT狀態(tài),應(yīng)該設(shè)置為Grunnable,并執(zhí)行g(shù)
if(!ok) {
runtime·casgstatus(gp, Gwaiting, Grunnable);
execute(gp); // Schedule it back, never returns.
}
}
// 這里是調(diào)度當(dāng)前m繼續(xù)執(zhí)行其他g
// 而不是上面執(zhí)行execute
schedule();
}
netpollblockcommit函數(shù),設(shè)置gpp為pdWait,設(shè)置成功返回1,否則返回0。1為true,0為false:
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
return casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
}
到這里當(dāng)前goroutine對socket文件描述符的等待IO繼續(xù)的行為已經(jīng)完成。過程中首先盡早嘗試判斷IO是否已經(jīng)就緒,如果未就緒則掛起當(dāng)前goroutine,掛起之后再次判斷IO是否就緒,如果還未就緒則調(diào)度當(dāng)前M運行其他G。如果是在調(diào)度goroutine之前IO已經(jīng)就緒,則不會使當(dāng)前goroutine進入調(diào)度隊列,會直接運行剛才掛起的G。否則當(dāng)前goroutine會進入調(diào)度隊列。
接下來是等待runtime將其喚醒。runtime在執(zhí)行findrunnablequeue、starttheworld,sysmon函數(shù)時,都會調(diào)用netpoll_epoll.go中的netpoll函數(shù),尋找到IO就緒的socket文件描述符,并找到這些socket文件描述符對應(yīng)的輪詢器中附帶的信息,根據(jù)這些信息將之前等待這些socket文件描述符就緒的goroutine狀態(tài)修改為Grunnable。在以上函數(shù)中,執(zhí)行完netpoll之后,會找到一個就緒的goroutine列表,接下來將就緒的goroutine加入到調(diào)度隊列中,等待調(diào)度運行。
在netpoll_epoll.go中的netpoll函數(shù)中,epoll_wait函數(shù)返回N個發(fā)生事件的文件描述符對應(yīng)的epollevent,接著對于每個event使用其data屬性,將event.data轉(zhuǎn)換為*pollDesc類型,再調(diào)用netpoll.go中的netpollready函數(shù),將*pollDesc類型中的G數(shù)據(jù)類型去除,并附加到netpoll函數(shù)的調(diào)用者傳遞的G鏈表中:
// 將ev.data轉(zhuǎn)換為*pollDesc類型
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
// 調(diào)用netpollready將取出pd中保存的G,并添加到鏈表中
netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
所以runtime在執(zhí)行findrunnablequeue、starttheworld,sysmon函數(shù)中會執(zhí)行netpoll函數(shù),并返回N個goroutine。這些goroutine期待的網(wǎng)絡(luò)事件已經(jīng)發(fā)生,runtime會將這些goroutine放入到當(dāng)前P的可運行隊列中,接下來調(diào)度它們并運行。