• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            Fork me on GitHub
            隨筆 - 215  文章 - 13  trackbacks - 0
            <2017年12月>
            262728293012
            3456789
            10111213141516
            17181920212223
            24252627282930
            31123456


            專注即時通訊及網(wǎng)游服務(wù)端編程
            ------------------------------------
            Openresty 官方模塊
            Openresty 標(biāo)準(zhǔn)模塊(Opm)
            Openresty 三方模塊
            ------------------------------------
            本博收藏大部分文章為轉(zhuǎn)載,并在文章開頭給出了原文出處,如有再轉(zhuǎn),敬請保留相關(guān)信息,這是大家對原創(chuàng)作者勞動成果的自覺尊重!!如為您帶來不便,請于本博下留言,謝謝配合。

            常用鏈接

            留言簿(1)

            隨筆分類

            隨筆檔案

            相冊

            Awesome

            Blog

            Book

            GitHub

            Link

            搜索

            •  

            積分與排名

            • 積分 - 219202
            • 排名 - 117

            最新評論

            閱讀排行榜

            http://skoo.me/go/2014/04/21/go-net-core

            Go語言的出現(xiàn),讓我見到了一門語言把網(wǎng)絡(luò)編程這件事情給做“正確”了,當(dāng)然,除了Go語言以外,還有很多語言也把這件事情做”正確”了。我一直堅持著這樣的理念——要做"正確"的事情,而不是"高性能"的事情;很多時候,我們在做系統(tǒng)設(shè)計、技術(shù)選型的時候,都被“高性能”這三個字給綁架了,當(dāng)然不是說性能不重要,你懂的。


            目前很多高性能的基礎(chǔ)網(wǎng)絡(luò)服務(wù)器都是采用的C語言開發(fā)的,比如:Nginx、Redis、memcached等,它們都是基于”事件驅(qū)動 + 事件回掉函數(shù)”的方式實現(xiàn),也就是采用epoll等作為網(wǎng)絡(luò)收發(fā)數(shù)據(jù)包的核心驅(qū)動。不少人(包括我自己)都認為“事件驅(qū)動 + 事件回掉函數(shù)”的編程方法是“反人類”的;因為大多數(shù)人都更習(xí)慣線性的處理一件事情,做完第一件事情再做第二件事情,并不習(xí)慣在N件事情之間頻繁的切換干活。為了解決程序員在開發(fā)服務(wù)器時需要自己的大腦不斷的“上下文切換”的問題,Go語言引入了一種用戶態(tài)線程goroutine來取代編寫異步的事件回掉函數(shù),從而重新回歸到多線程并發(fā)模型的線性、同步的編程方式上。
            用Go語言寫一個最簡單的echo服務(wù)器:
            package main
            import (
            "log"
            "net"
            )
            func main() {
            ln, err := net.Listen("tcp", ":8080")
            if err != nil {
                    log.Println(err)
                    return
            }
            for {
                    conn, err := ln.Accept()
                    if err != nil {
                        log.Println(err)
                        continue
                    }
                    go echoFunc(conn)
            }
            }
            func echoFunc(c net.Conn) {
            buf := make([]byte, 1024)
            for {
                    n, err := c.Read(buf)
                    if err != nil {
                        log.Println(err)
                        return
                    }
                    c.Write(buf[:n])
            }
            }
            main函數(shù)的過程就是首先創(chuàng)建一個監(jiān)聽套接字,然后用一個for循環(huán)不斷的從監(jiān)聽套接字上Accept新的連接,最后調(diào)用echoFunc函數(shù)在建立的連接上干活。關(guān)鍵代碼是:
            go echoFunc(conn)
            每收到一個新的連接,就創(chuàng)建一個“線程”去服務(wù)這個連接,因此所有的業(yè)務(wù)邏輯都可以同步、順序的編寫到echoFunc函數(shù)中,再也不用去關(guān)心網(wǎng)絡(luò)IO是否會阻塞的問題。不管業(yè)務(wù)多復(fù)雜,Go語言的并發(fā)服務(wù)器的編程模型都是長這個樣子。可以肯定的是,在linux上Go語言寫的網(wǎng)絡(luò)服務(wù)器也是采用的epoll作為最底層的數(shù)據(jù)收發(fā)驅(qū)動,Go語言網(wǎng)絡(luò)的底層實現(xiàn)中同樣存在“上下文切換”的工作,只是這個切換工作由runtime的調(diào)度器來做了,減少了程序員的負擔(dān)。
            弄明白網(wǎng)絡(luò)庫的底層實現(xiàn),貌似只要弄清楚echo服務(wù)器中的Listen、Accept、Read、Write四個函數(shù)的底層實現(xiàn)關(guān)系就可以了。本文將采用自底向上的方式來介紹,也就是從最底層到上層的方式,這也是我閱讀源碼的方式。底層實現(xiàn)涉及到的核心源碼文件主要有:
            net/fd_unix.go 
            net/fd_poll_runtime.go
            runtime/netpoll.goc 
            runtime/netpoll_epoll.c 
            runtime/proc.c (調(diào)度器)
            netpoll_epoll.c文件是Linux平臺使用epoll作為網(wǎng)絡(luò)IO多路復(fù)用的實現(xiàn)代碼,這份代碼可以了解到epoll相關(guān)的操作(比如:添加fd到epoll、從epoll刪除fd等),只有4個函數(shù),分別是runtime·netpollinit、runtime·netpollopen、runtime·netpollclose和runtime·netpoll。init函數(shù)就是創(chuàng)建epoll對象,open函數(shù)就是添加一個fd到epoll中,close函數(shù)就是從epoll刪除一個fd,netpoll函數(shù)就是從epoll wait得到所有發(fā)生事件的fd,并將每個fd對應(yīng)的goroutine(用戶態(tài)線程)通過鏈表返回。用epoll寫過程序的人應(yīng)該都能理解這份代碼,沒什么特別之處。
            void
            runtime·netpollinit(void)
            {
            epfd = runtime·epollcreate1(EPOLL_CLOEXEC);
            if(epfd >= 0)
            return;
            epfd = runtime·epollcreate(1024);
            if(epfd >= 0) {
            runtime·closeonexec(epfd);
            return;
            }
            runtime·printf("netpollinit: failed to create descriptor (%d)\n", -epfd);
            runtime·throw("netpollinit: failed to create descriptor");
            }
            runtime·netpollinit函數(shù)首先使用runtime·epollcreate1創(chuàng)建epoll實例,如果沒有創(chuàng)建成功,就換用runtime·epollcreate再創(chuàng)建一次。這兩個create函數(shù)分別等價于glibc的epoll_create1和epoll_create函數(shù)。只是因為Go語言并沒有直接使用glibc,而是自己封裝的系統(tǒng)調(diào)用,但功能是等價于glibc的。可以通過man手冊查看這兩個create的詳細信息。
            int32
            runtime·netpollopen(uintptr fd, PollDesc *pd)
            {
            EpollEvent ev;
            int32 res;
            ev.events = EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET;
            ev.data = (uint64)pd;
            res = runtime·epollctl(epfd, EPOLL_CTL_ADD, (int32)fd, &ev);
            return -res;
            }
            添加fd到epoll中的runtime·netpollopen函數(shù)可以看到每個fd一開始都關(guān)注了讀寫事件,并且采用的是邊緣觸發(fā),除此之外還關(guān)注了一個不常見的新事件EPOLLRDHUP,這個事件是在較新的內(nèi)核版本添加的,目的是解決對端socket關(guān)閉,epoll本身并不能直接感知到這個關(guān)閉動作的問題。注意任何一個fd在添加到epoll中的時候就關(guān)注了EPOLLOUT事件的話,就立馬產(chǎn)生一次寫事件,這次事件可能是多余浪費的。
            epoll操作的相關(guān)函數(shù)都會在事件驅(qū)動的抽象層中去調(diào)用,為什么需要這個抽象層呢?原因很簡單,因為Go語言需要跑在不同的平臺上,有Linux、Unix、Mac OS X和Windows等,所以需要靠事件驅(qū)動的抽象層來為網(wǎng)絡(luò)庫提供一致的接口,從而屏蔽事件驅(qū)動的具體平臺依賴實現(xiàn)。runtime/netpoll.goc源文件就是整個事件驅(qū)動抽象層的實現(xiàn),抽象層的核心數(shù)據(jù)結(jié)構(gòu)是:
            struct PollDesc
            {
            PollDesc* link; // in pollcache, protected by pollcache.Lock
            Lock; // protectes the following fields
            uintptr fd;
            bool closing;
            uintptr seq; // protects from stale timers and ready notifications
            G* rg; // G waiting for read or READY (binary semaphore)
            Timer rt; // read deadline timer (set if rt.fv != nil)
            int64 rd; // read deadline
            G* wg; // the same for writes
            Timer wt;
            int64 wd;
            };
            每個添加到epoll中的fd都對應(yīng)了一個PollDesc結(jié)構(gòu)實例,PollDesc維護了讀寫此fd的goroutine這一非常重要的信息。可以大膽的推測一下,網(wǎng)絡(luò)IO讀寫操作的實現(xiàn)應(yīng)該是:當(dāng)在一個fd上讀寫遇到EAGAIN錯誤的時候,就將當(dāng)前goroutine存儲到這個fd對應(yīng)的PollDesc中,同時將goroutine給park住,直到這個fd上再此發(fā)生了讀寫事件后,再將此goroutine給ready激活重新運行。事實上的實現(xiàn)大概也是這個樣子的。
            事件驅(qū)動抽象層主要干的事情就是將具體的事件驅(qū)動實現(xiàn)(比如: epoll)通過統(tǒng)一的接口封裝成Go接口供net庫使用,主要的接口也是:創(chuàng)建事件驅(qū)動實例、添加fd、刪除fd、等待事件以及設(shè)置DeadLine。runtime_pollServerInit負責(zé)創(chuàng)建事件驅(qū)動實例,runtime_pollOpen將分配一個PollDesc實例和fd綁定起來,然后將fd添加到epoll中,runtime_pollClose就是將fd從epoll中刪除,同時將刪除的fd綁定的PollDesc實例刪除,runtime_pollWait接口是至關(guān)重要的,這個接口一般是在非阻塞讀寫發(fā)生EAGAIN錯誤的時候調(diào)用,作用就是park當(dāng)前讀寫的goroutine。
            runtime中的epoll事件驅(qū)動抽象層其實在進入net庫后,又被封裝了一次,這一次封裝從代碼上看主要是為了方便在純Go語言環(huán)境進行操作,net庫中的這次封裝實現(xiàn)在net/fd_poll_runtime.go文件中,主要是通過pollDesc對象來實現(xiàn)的:
            type pollDesc struct {
            runtimeCtx uintptr
            }
            注意:此處的pollDesc對象不是上文提到的runtime中的PollDesc,相反此處pollDesc對象的runtimeCtx成員才是指向的runtime的PollDesc實例。pollDesc對象主要就是將runtime的事件驅(qū)動抽象層給再封裝了一次,供網(wǎng)絡(luò)fd對象使用。
            var serverInit sync.Once
            func (pd *pollDesc) Init(fd *netFD) error {
            serverInit.Do(runtime_pollServerInit)
            ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
            if errno != 0 {
            return syscall.Errno(errno)
            }
            pd.runtimeCtx = ctx
            return nil
            }
            pollDesc對象最需要關(guān)注的就是其Init方法,這個方法通過一個sync.Once變量來調(diào)用了runtime_pollServerInit函數(shù),也就是創(chuàng)建epoll實例的函數(shù)。意思就是runtime_pollServerInit函數(shù)在整個進程生命周期內(nèi)只會被調(diào)用一次,也就是只會創(chuàng)建一次epoll實例。epoll實例被創(chuàng)建后,會調(diào)用runtime_pollOpen函數(shù)將fd添加到epoll中。
            網(wǎng)絡(luò)編程中的所有socket fd都是通過netFD對象實現(xiàn)的,netFD是對網(wǎng)絡(luò)IO操作的抽象,linux的實現(xiàn)在文件net/fd_unix.go中。netFD對象實現(xiàn)有自己的init方法,還有完成基本IO操作的Read和Write方法,當(dāng)然除了這三個方法以外,還有很多非常有用的方法供用戶使用。
            // Network file descriptor.
            type netFD struct {
            // locking/lifetime of sysfd + serialize access to Read and Write methods
            fdmu fdMutex
            // immutable until Close
            sysfd       int
            family      int
            sotype      int
            isConnected bool
            net         string
            laddr       Addr
            raddr       Addr
            // wait server
            pd pollDesc
            }
            通過netFD對象的定義可以看到每個fd都關(guān)聯(lián)了一個pollDesc實例,通過上文我們知道pollDesc對象最終是對epoll的封裝。
            func (fd *netFD) init() error {
            if err := fd.pd.Init(fd); err != nil {
            return err
            }
            return nil
            }
            netFD對象的init函數(shù)僅僅是調(diào)用了pollDesc實例的Init函數(shù),作用就是將fd添加到epoll中,如果這個fd是第一個網(wǎng)絡(luò)socket fd的話,這一次init還會擔(dān)任創(chuàng)建epoll實例的任務(wù)。要知道在Go進程里,只會有一個epoll實例來管理所有的網(wǎng)絡(luò)socket fd,這個epoll實例也就是在第一個網(wǎng)絡(luò)socket fd被創(chuàng)建的時候所創(chuàng)建。
            for {
            n, err = syscall.Read(int(fd.sysfd), p)
            if err != nil {
            n = 0
            if err == syscall.EAGAIN {
            if err = fd.pd.WaitRead(); err == nil {
            continue
            }
            }
            }
            err = chkReadErr(n, err, fd)
            break
            }
            上面代碼段是從netFD的Read方法中摘取,重點關(guān)注這個for循環(huán)中的syscall.Read調(diào)用的錯誤處理。當(dāng)有錯誤發(fā)生的時候,會檢查這個錯誤是否是syscall.EAGAIN,如果是,則調(diào)用WaitRead將當(dāng)前讀這個fd的goroutine給park住,直到這個fd上的讀事件再次發(fā)生為止。當(dāng)這個socket上有新數(shù)據(jù)到來的時候,WaitRead調(diào)用返回,繼續(xù)for循環(huán)的執(zhí)行。這樣的實現(xiàn),就讓調(diào)用netFD的Read的地方變成了同步“阻塞”方式編程,不再是異步非阻塞的編程方式了。netFD的Write方法和Read的實現(xiàn)原理是一樣的,都是在碰到EAGAIN錯誤的時候?qū)?dāng)前goroutine給park住直到socket再次可寫為止。
            本文只是將網(wǎng)絡(luò)庫的底層實現(xiàn)給大體上引導(dǎo)了一遍,知道底層代碼大概實現(xiàn)在什么地方,方便結(jié)合源碼深入理解。Go語言中的高并發(fā)、同步阻塞方式編程的關(guān)鍵其實是”goroutine和調(diào)度器”,針對網(wǎng)絡(luò)IO的時候,我們需要知道EAGAIN這個非常關(guān)鍵的調(diào)度點,掌握了這個調(diào)度點,即使沒有調(diào)度器,自己也可以在epoll的基礎(chǔ)上配合協(xié)程等用戶態(tài)線程實現(xiàn)網(wǎng)絡(luò)IO操作的調(diào)度,達到同步阻塞編程的目的。
            最后,為什么需要同步阻塞的方式編程?只有看多、寫多了異步非阻塞代碼的時候才能夠深切體會到這個問題。真正的高大上絕對不是——“別人不會,我會;別人寫不出來,我寫得出來。”

            http://ju.outofmemory.cn/entry/168649
            本文分析了Golang的socket文件描述符和goroutine阻塞調(diào)度的原理。代碼中大部分是Go代碼,小部分是匯編代碼。完整理解本文需要Go語言知識,并且用Golang寫過網(wǎng)絡(luò)程序。更重要的是,需要提前理解goroutine的調(diào)度原理。
            1. TCP的連接對象:
            連接對象:
            在net.go中有一個名為Conn的接口,提供了對于連接的讀寫和其他操作:
            type Conn interface {
                Read(b []byte) (n int, err error)
                Write(b []byte) (n int, err error)
                Close() error
                LocalAddr() Addr
                RemoteAddr() Addr
                SetReadDeadline(t time.Time) error
                SetWriteDeadline(t time.Time) error
            }
            這個接口就是對下面的結(jié)構(gòu)體conn的抽象。conn結(jié)構(gòu)體包含了對連接的讀寫和其他操作:
            type conn struct {
                fd *netFD
            }
            從連接讀取數(shù)據(jù):
            // Read implements the Conn Read method.
            func (c *conn) Read(b []byte) (int, error) {
                if !c.ok() {
                    return 0, syscall.EINVAL
                }
                return c.fd.Read(b)
            }
            向連接寫入數(shù)據(jù):
            // Write implements the Conn Write method.
            func (c *conn) Write(b []byte) (int, error) {
                if !c.ok() {
                    return 0, syscall.EINVAL
                }
                return c.fd.Write(b)
            }
            關(guān)閉連接:
            // Close closes the connection.
            func (c *conn) Close() error {
                if !c.ok() {
                    return syscall.EINVAL
                }
                return c.fd.Close()
            }
            設(shè)置讀寫超時:
            // SetDeadline implements the Conn SetDeadline method.
            func (c *conn) SetDeadline(t time.Time) error {
                if !c.ok() {
                    return syscall.EINVAL
                }
                return c.fd.setDeadline(t)
            }
            // SetReadDeadline implements the Conn SetReadDeadline method.
            func (c *conn) SetReadDeadline(t time.Time) error {
                if !c.ok() {
                    return syscall.EINVAL
                }
                return c.fd.setReadDeadline(t)
            }
            // SetWriteDeadline implements the Conn SetWriteDeadline method.
            func (c *conn) SetWriteDeadline(t time.Time) error {
                if !c.ok() {
                    return syscall.EINVAL
                }
                return c.fd.setWriteDeadline(t)
            }
            可以看到,對連接的所有操作,都體現(xiàn)在對*netFD的操作上。我們繼續(xù)跟蹤c.fd.Read()函數(shù).
            2.文件描述符
            net/fd_unix.go:
            網(wǎng)絡(luò)連接的文件描述符:
            // Network file descriptor.
            type netFD struct {
                // locking/lifetime of sysfd + serialize access to Read and Write methods
                fdmu fdMutex
                // immutable until Close
                sysfd       int
                family      int
                sotype      int
                isConnected bool
                net         string
                laddr       Addr
                raddr       Addr
                // wait server
                pd pollDesc
            }
            文件描述符讀取數(shù)據(jù):
            func (fd *netFD) Read(p []byte) (n int, err error) {
                if err := fd.readLock(); err != nil {
                    return 0, err
                }
                defer fd.readUnlock()
                if err := fd.pd.PrepareRead(); err != nil {
                    return 0, &OpError{"read", fd.net, fd.raddr, err}
                }
                // 調(diào)用system call,循環(huán)從fd.sysfd讀取數(shù)據(jù)
                for {
                    // 系統(tǒng)調(diào)用Read讀取數(shù)據(jù)
                    n, err = syscall.Read(int(fd.sysfd), p)
                    // 如果發(fā)生錯誤,則需要處理
                    // 并且只處理EAGAIN類型的錯誤,其他錯誤一律返回給調(diào)用者
                    if err != nil {
                        n = 0
                        // 對于非阻塞的網(wǎng)絡(luò)連接的文件描述符,如果錯誤是EAGAIN
                        // 說明Socket的緩沖區(qū)為空,未讀取到任何數(shù)據(jù)
                        // 則調(diào)用fd.pd.WaitRead,
                        if err == syscall.EAGAIN {
                            if err = fd.pd.WaitRead(); err == nil {
                                continue
                            }
                        }
                    }
                    err = chkReadErr(n, err, fd)
                    break
                }
                if err != nil && err != io.EOF {
                    err = &OpError{"read", fd.net, fd.raddr, err}
                }
                return
            }
            網(wǎng)絡(luò)輪詢器
            網(wǎng)絡(luò)輪詢器是Golang中針對每個socket文件描述符建立的輪詢機制。 此處的輪詢并不是一般意義上的輪詢,而是Golang的runtime在調(diào)度goroutine或者GC完成之后或者指定時間之內(nèi),調(diào)用epoll_wait獲取所有產(chǎn)生IO事件的socket文件描述符。當(dāng)然在runtime輪詢之前,需要將socket文件描述符和當(dāng)前goroutine的相關(guān)信息加入epoll維護的數(shù)據(jù)結(jié)構(gòu)中,并掛起當(dāng)前goroutine,當(dāng)IO就緒后,通過epoll返回的文件描述符和其中附帶的goroutine的信息,重新恢復(fù)當(dāng)前goroutine的執(zhí)行。
            // Integrated network poller (platform-independent part).
            // 網(wǎng)絡(luò)輪詢器(平臺獨立部分)
            // A particular implementation (epoll/kqueue) must define the following functions:
            // 實際的實現(xiàn)(epoll/kqueue)必須定義以下函數(shù):
            // func netpollinit()           // to initialize the poller,初始化輪詢器
            // func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications, 為fd和pd啟動邊緣觸發(fā)通知
            // and associate fd with pd.
            // 一個實現(xiàn)必須調(diào)用下面的函數(shù),用來指示pd已經(jīng)準(zhǔn)備好
            // An implementation must call the following function to denote that the pd is ready.
            // func netpollready(gpp **g, pd *pollDesc, mode int32)
            // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
            // goroutines respectively. The semaphore can be in the following states:
            // pollDesc包含了2個二進制的信號,分別負責(zé)讀寫goroutine的暫停.
            // 信號可能處于下面的狀態(tài):
            // pdReady - IO就緒通知被掛起;
            //           一個goroutine將次狀態(tài)置為nil來消費一個通知。
            // pdReady - io readiness notification is pending;
            //           a goroutine consumes the notification by changing the state to nil.
            // pdWait - 一個goroutine準(zhǔn)備暫停在信號上,但是還沒有完成暫停。
            // 這個goroutine通過把這個狀態(tài)改變?yōu)镚指針去提交這個暫停動作。
            // 或者,替代性的,并行的其他通知將狀態(tài)改變?yōu)镽EADY.
            // 或者,替代性的,并行的超時/關(guān)閉會將次狀態(tài)變?yōu)閚il
            // pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
            //          the goroutine commits to park by changing the state to G pointer,
            //          or, alternatively, concurrent io notification changes the state to READY,
            //          or, alternatively, concurrent timeout/close changes the state to nil.
            // G指針 - 阻塞在信號上的goroutine
            // IO通知或者超時/關(guān)閉會分別將此狀態(tài)置為READY或者nil.
            // G pointer - the goroutine is blocked on the semaphore;
            //             io notification or timeout/close changes the state to READY or nil respectively
            //             and unparks the goroutine.
            // nil - nothing of the above.
            const (
                pdReady uintptr = 1
                pdWait  uintptr = 2
            )
            網(wǎng)絡(luò)輪詢器的數(shù)據(jù)結(jié)構(gòu)如下:
            // Network poller descriptor.
            // 網(wǎng)絡(luò)輪詢器描述符
            type pollDesc struct {
                link *pollDesc // in pollcache, protected by pollcache.lock
                // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
                // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
                // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
                // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
                // in a lock-free way by all operations.
                // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
                // that will blow up when GC starts moving objects.
                //
                // lock鎖對象保護了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操作。
                // 而這些操作又完全包含了對seq, rt, tw變量。
                // fd在PollDesc整個生命過程中都是一個常量。
                // 處理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就緒通知)不需要用到鎖。
                // 所以closing, rg, rd, wg和wd的所有操作都是一個無鎖的操作。
                lock    mutex // protectes the following fields
                fd      uintptr
                closing bool
                seq     uintptr        // protects from stale timers and ready notifications
                rg      uintptr        // pdReady, pdWait, G waiting for read or nil
                rt      timer          // read deadline timer (set if rt.f != nil)
                rd      int64          // read deadline
                wg      uintptr        // pdReady, pdWait, G waiting for write or nil
                wt      timer          // write deadline timer
                wd      int64          // write deadline
                user    unsafe.Pointer // user settable cookie
            }
            將當(dāng)前goroutine設(shè)置為阻塞在fd上:
            pd.WaitRead():
            func (pd *pollDesc) WaitRead() error {
                return pd.Wait('r')
            }
            func (pd *pollDesc) Wait(mode int) error {
                res := runtime_pollWait(pd.runtimeCtx, mode)
                return convertErr(res)
            }
            res是runtime_pollWait函數(shù)返回的結(jié)果,由conevertErr函數(shù)包裝后返回:
            func convertErr(res int) error {
                switch res {
                case 0:
                    return nil
                case 1:
                    return errClosing
                case 2:
                    return errTimeout
                }
                println("unreachable: ", res)
                panic("unreachable")
            }
            函數(shù)返回0,表示IO已經(jīng)準(zhǔn)備好,返回nil。
            返回1,說明連接已關(guān)閉,應(yīng)該放回errClosing。
            返回2,說明對IO進行的操作發(fā)生超時,應(yīng)該返回errTimeout。
            runtime_pollWait會調(diào)用runtime/thunk.s中的函數(shù):
            TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0
                JMP runtime·netpollWait(SB)
            這是一個包裝函數(shù),沒有參數(shù),直接跳轉(zhuǎn)到runtime/netpoll.go中的函數(shù)netpollWait:
            func netpollWait(pd *pollDesc, mode int) int {
                // 檢查pd的狀態(tài)是否異常
                err := netpollcheckerr(pd, int32(mode))
                if err != 0 {
                    return err
                }
                // As for now only Solaris uses level-triggered IO.
                if GOOS == "solaris" {
                    onM(func() {
                        netpollarm(pd, mode)
                    })
                }
                // 循環(huán)中檢查pd的狀態(tài)是不是已經(jīng)被設(shè)置為pdReady
                // 即檢查IO是不是已經(jīng)就緒
                for !netpollblock(pd, int32(mode), false) {
                    err = netpollcheckerr(pd, int32(mode))
                    if err != 0 {
                        return err
                    }
                    // Can happen if timeout has fired and unblocked us,
                    // but before we had a chance to run, timeout has been reset.
                    // Pretend it has not happened and retry.
                }
                return 0
            }
            netpollcheckerr函數(shù)檢查pd是否出現(xiàn)異常:
            // 檢查pd的異常
            func netpollcheckerr(pd *pollDesc, mode int32) int {
                // 是否已經(jīng)關(guān)閉
                if pd.closing {
                    return 1 // errClosing
                }
                // 當(dāng)讀寫狀態(tài)下,deadline小于0,表示pd已經(jīng)過了超時時間
                if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
                    return 2 // errTimeout
                }
                // 正常情況返回0
                return 0
            }
            netpollblock():
            // returns true if IO is ready, or false if timedout or closed
            // waitio - wait only for completed IO, ignore errors
            // 這個函數(shù)被netpollWait循環(huán)調(diào)用
            // 返回true說明IO已經(jīng)準(zhǔn)備好,返回false說明IO操作已經(jīng)超時或者已經(jīng)關(guān)閉
            func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
                // 獲取pd的rg
                gpp := &pd.rg
                // 如果模式是w,則獲取pd的wg
                if mode == 'w' {
                    gpp = &pd.wg
                }
                // set the gpp semaphore to WAIT
                // 在循環(huán)中設(shè)置pd的gpp為pdWait
                // 因為casuintptr是自旋鎖,所以需要在循環(huán)中調(diào)用
                for {
                    // 如果在循環(huán)中發(fā)現(xiàn)IO已經(jīng)準(zhǔn)備好(pg的rg或者wg為pdReady狀態(tài))
                    // 則設(shè)置rg/wg為0,返回true
                    old := *gpp
                    if old == pdReady {
                        *gpp = 0
                        return true
                    }
                    // 每次netpollblock執(zhí)行完畢之后,gpp重置為0
                    // 非0表示重復(fù)wait
                    if old != 0 {
                        gothrow("netpollblock: double wait")
                    }
                    // CAS操作改變gpp為pdWait
                    if casuintptr(gpp, 0, pdWait) {
                        break
                    }
                }
                // need to recheck error states after setting gpp to WAIT
                // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
                // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
                //
                // 當(dāng)設(shè)置gpp為pdWait狀態(tài)后,重新檢查gpp的狀態(tài)
                // 這是必要的,因為runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl會做相反的操作
                // 如果狀態(tài)正常則掛起當(dāng)前的goroutine
                //
                // 當(dāng)netpollcheckerr檢查io出現(xiàn)超時或者錯誤,waitio為true可用于等待ioReady
                // 否則當(dāng)waitio為false, 且io不出現(xiàn)錯誤或者超時才會掛起當(dāng)前goroutine
                if waitio || netpollcheckerr(pd, mode) == 0 {
                    // 解鎖函數(shù),設(shè)置gpp為pdWait,如果設(shè)置不成功
                    // 說明已經(jīng)是發(fā)生其他事件,可以讓g繼續(xù)運行,而不是掛起當(dāng)前g
                    f := netpollblockcommit
                    // 嘗試掛起當(dāng)前g
                    gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
                }
                // be careful to not lose concurrent READY notification
                old := xchguintptr(gpp, 0)
                if old > pdWait {
                    gothrow("netpollblock: corrupted state")
                }
                return old == pdReady
            }
            runtime/proc.go: gopark():
            // Puts the current goroutine into a waiting state and calls unlockf.
            // If unlockf returns false, the goroutine is resumed.
            // 將當(dāng)前goroutine置為waiting狀態(tài),然后調(diào)用unlockf
            func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) {
                // 獲取當(dāng)前M
                mp := acquirem()
                // 獲取當(dāng)前G
                gp := mp.curg
                // 獲取G的狀態(tài)
                status := readgstatus(gp)
                // 如果不是_Grunning或者_Gscanrunning,則報錯
                if status != _Grunning && status != _Gscanrunning {
                    gothrow("gopark: bad g status")
                }
                // 設(shè)置lock和unlockf
                mp.waitlock = lock
                mp.waitunlockf = unlockf
                gp.waitreason = reason
                releasem(mp)
                // can't do anything that might move the G between Ms here.
                // 在m->g0這個棧上調(diào)用park_m,而不是當(dāng)前g的棧
                mcall(park_m)
            }
            mcall函數(shù)是一段匯編,在m->g0的棧上調(diào)用park_m,而不是在當(dāng)前goroutine的棧上。mcall的功能分兩部分,第一部分保存當(dāng)前G的PC/SP到G的gobuf的pc/sp字段,第二部分調(diào)用park_m函數(shù):
            // func mcall(fn func(*g))
            // Switch to m->g0's stack, call fn(g).
            // Fn must never return.  It should gogo(&g->sched)
            // to keep running g.
            TEXT runtime·mcall(SB), NOSPLIT, $0-8
                // 將需要執(zhí)行的函數(shù)保存在DI
                MOVQ    fn+0(FP), DI
                // 將M的TLS存放在CX
                get_tls(CX)
                // 將G對象存放在AX
                MOVQ    g(CX), AX   // save state in g->sched
                // 將調(diào)用者的PC存放在BX
                MOVQ    0(SP), BX   // caller's PC
                // 將調(diào)用者的PC保存到g->sched.pc
                MOVQ    BX, (g_sched+gobuf_pc)(AX)
                // 第一個參數(shù)的地址,即棧頂?shù)牡刂罚4娴紹X
                LEAQ    fn+0(FP), BX    // caller's SP
                // 保存SP的地址到g->sched.sp
                MOVQ    BX, (g_sched+gobuf_sp)(AX)
                // 將g對象保存到g->sched->g
                MOVQ    AX, (g_sched+gobuf_g)(AX)
                // switch to m->g0 & its stack, call fn
                // 將g對象指針保存到BX
                MOVQ    g(CX), BX
                // 將g->m保存到BX
                MOVQ    g_m(BX), BX
                // 將m->g0保存到SI
                MOVQ    m_g0(BX), SI
                CMPQ    SI, AX  // if g == m->g0 call badmcall
                JNE 3(PC)
                MOVQ    $runtime·badmcall(SB), AX
                JMP AX
                // 將m->g0保存到g
                MOVQ    SI, g(CX)   // g = m->g0
                // 將g->sched.sp恢復(fù)到SP寄存器
                // 即使用g0的棧
                MOVQ    (g_sched+gobuf_sp)(SI), SP  // sp = m->g0->sched.sp
                // AX進棧
                PUSHQ   AX
                MOVQ    DI, DX
                // 將fn的地址復(fù)制到DI
                MOVQ    0(DI), DI
                // 調(diào)用函數(shù)
                CALL    DI
                // AX出棧
                POPQ    AX
                MOVQ    $runtime·badmcall2(SB), AX
                JMP AX
                RET
            park_m函數(shù)的功能分為三部分,第一部分讓當(dāng)前G和當(dāng)前M脫離關(guān)系,第二部分是調(diào)用解鎖函數(shù),這里是調(diào)用netpoll.go源文件中的netpollblockcommit函數(shù):
            // runtime·park continuation on g0.
            void
            runtime·park_m(G *gp)
            {
                bool ok;
                // 設(shè)置當(dāng)前g為Gwaiting狀態(tài)
                runtime·casgstatus(gp, Grunning, Gwaiting);
                // 讓當(dāng)前g和m脫離關(guān)系
                dropg();
                if(g->m->waitunlockf) {
                    ok = g->m->waitunlockf(gp, g->m->waitlock);
                    g->m->waitunlockf = nil;
                    g->m->waitlock = nil;
                    // 返回0為false,非0為true
                    // 0說明g->m->waitlock發(fā)生了變化,即不是在gopark是設(shè)置的(pdWait)
                    // 說明了脫離了WAIT狀態(tài),應(yīng)該設(shè)置為Grunnable,并執(zhí)行g(shù)
                    if(!ok) {
                        runtime·casgstatus(gp, Gwaiting, Grunnable);
                        execute(gp);  // Schedule it back, never returns.
                    }
                }
                // 這里是調(diào)度當(dāng)前m繼續(xù)執(zhí)行其他g
                // 而不是上面執(zhí)行execute
                schedule();
            }
            netpollblockcommit函數(shù),設(shè)置gpp為pdWait,設(shè)置成功返回1,否則返回0。1為true,0為false:
            func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
                return casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
            }
            到這里當(dāng)前goroutine對socket文件描述符的等待IO繼續(xù)的行為已經(jīng)完成。過程中首先盡早嘗試判斷IO是否已經(jīng)就緒,如果未就緒則掛起當(dāng)前goroutine,掛起之后再次判斷IO是否就緒,如果還未就緒則調(diào)度當(dāng)前M運行其他G。如果是在調(diào)度goroutine之前IO已經(jīng)就緒,則不會使當(dāng)前goroutine進入調(diào)度隊列,會直接運行剛才掛起的G。否則當(dāng)前goroutine會進入調(diào)度隊列。
            接下來是等待runtime將其喚醒。runtime在執(zhí)行findrunnablequeue、starttheworld,sysmon函數(shù)時,都會調(diào)用netpoll_epoll.go中的netpoll函數(shù),尋找到IO就緒的socket文件描述符,并找到這些socket文件描述符對應(yīng)的輪詢器中附帶的信息,根據(jù)這些信息將之前等待這些socket文件描述符就緒的goroutine狀態(tài)修改為Grunnable。在以上函數(shù)中,執(zhí)行完netpoll之后,會找到一個就緒的goroutine列表,接下來將就緒的goroutine加入到調(diào)度隊列中,等待調(diào)度運行。
            在netpoll_epoll.go中的netpoll函數(shù)中,epoll_wait函數(shù)返回N個發(fā)生事件的文件描述符對應(yīng)的epollevent,接著對于每個event使用其data屬性,將event.data轉(zhuǎn)換為*pollDesc類型,再調(diào)用netpoll.go中的netpollready函數(shù),將*pollDesc類型中的G數(shù)據(jù)類型去除,并附加到netpoll函數(shù)的調(diào)用者傳遞的G鏈表中:
            // 將ev.data轉(zhuǎn)換為*pollDesc類型
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            // 調(diào)用netpollready將取出pd中保存的G,并添加到鏈表中
            netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
            所以runtime在執(zhí)行findrunnablequeue、starttheworld,sysmon函數(shù)中會執(zhí)行netpoll函數(shù),并返回N個goroutine。這些goroutine期待的網(wǎng)絡(luò)事件已經(jīng)發(fā)生,runtime會將這些goroutine放入到當(dāng)前P的可運行隊列中,接下來調(diào)度它們并運行。
            posted on 2017-06-02 11:12 思月行云 閱讀(1773) 評論(0)  編輯 收藏 引用 所屬分類: Golang
            国产激情久久久久影院| 伊人久久综合成人网| 亚洲伊人久久大香线蕉苏妲己| 精品多毛少妇人妻AV免费久久 | 狠狠色丁香久久综合五月| 国产精品无码久久综合| 久久久久四虎国产精品| 欧美亚洲国产精品久久| 久久99亚洲综合精品首页| 国产∨亚洲V天堂无码久久久| 久久久久一本毛久久久| 久久精品国产91久久麻豆自制| 久久午夜电影网| 国产高清国内精品福利99久久| 成人久久精品一区二区三区| 久久婷婷五月综合国产尤物app| 国产一区二区精品久久凹凸| 精品九九久久国内精品| 99精品国产在热久久无毒不卡 | 久久久久久久免费视频| 国产精品乱码久久久久久软件| 日批日出水久久亚洲精品tv| 2021国产精品久久精品| 欧美熟妇另类久久久久久不卡 | 97超级碰碰碰碰久久久久| 少妇被又大又粗又爽毛片久久黑人| 亚洲国产精品无码久久青草 | 精品熟女少妇av免费久久| 久久综合九色综合网站| 热re99久久精品国产99热| 亚洲国产日韩欧美久久| 久久久久人妻精品一区| 久久久久国产视频电影| 国产精品国色综合久久| 久久精品青青草原伊人| 色诱久久av| 久久夜色精品国产噜噜亚洲a| 精品永久久福利一区二区| 精品久久久无码人妻中文字幕| 久久久午夜精品| 亚洲国产成人精品女人久久久|