• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            loop_in_codes

            低調(diào)做技術(shù)__歡迎移步我的獨(dú)立博客 codemaro.com 微博 kevinlynx

            Muduo源碼閱讀

            最近簡(jiǎn)單讀了下muduo的源碼,本文對(duì)其主要實(shí)現(xiàn)/結(jié)構(gòu)簡(jiǎn)單總結(jié)下。

            muduo的主要源碼位于net文件夾下,base文件夾是一些基礎(chǔ)代碼,不影響理解網(wǎng)絡(luò)部分的實(shí)現(xiàn)。muduo主要類包括:

            • EventLoop
            • Channel
            • Poller
            • TcpConnection
            • TcpClient
            • TcpServer
            • Connector
            • Acceptor
            • EventLoopThread
            • EventLoopThreadPool

            其中,Poller(及其實(shí)現(xiàn)類)包裝了Poll/EPoll,封裝了OS針對(duì)設(shè)備(fd)的操作;Channel是設(shè)備fd的包裝,在muduo中主要包裝socket;TcpConnection抽象一個(gè)TCP連接,無論是客戶端還是服務(wù)器只要建立了網(wǎng)絡(luò)連接就會(huì)使用TcpConnection;TcpClient/TcpServer分別抽象TCP客戶端和服務(wù)器;Connector/Acceptor分別包裝TCP客戶端和服務(wù)器的建立連接/接受連接;EventLoop是一個(gè)主控類,是一個(gè)事件發(fā)生器,它驅(qū)動(dòng)Poller產(chǎn)生/發(fā)現(xiàn)事件,然后將事件派發(fā)到Channel處理;EventLoopThread是一個(gè)帶有EventLoop的線程;EventLoopThreadPool自然是一個(gè)EventLoopThread的資源池,維護(hù)一堆EventLoopThread。

            閱讀庫(kù)源碼時(shí)可以從庫(kù)的接口層著手,看看關(guān)鍵功能是如何實(shí)現(xiàn)的。對(duì)于muduo而言,可以從TcpServer/TcpClient/EventLoop/TcpConnection這幾個(gè)類著手。接下來看看主要功能的實(shí)現(xiàn):

            建立連接

                TcpClient::connect 
                    -> Connector::start 
                        -> EventLoop::runInLoop(Connector::startInLoop...
                        -> Connector::connect             
            

            EventLoop::runInLoop接口用于在this所在的線程運(yùn)行某個(gè)函數(shù),這個(gè)后面看下EventLoop的實(shí)現(xiàn)就可以了解。 網(wǎng)絡(luò)連接的最終建立是在Connector::connect中實(shí)現(xiàn),建立連接之后會(huì)創(chuàng)建一個(gè)Channel來代表這個(gè)socket,并且綁定事件監(jiān)聽接口。最后最重要的是,調(diào)用Channel::enableWritingChannel有一系列的enableXX接口,這些接口用于標(biāo)識(shí)自己關(guān)心某IO事件。后面會(huì)看到他們的實(shí)現(xiàn)。

            Connector監(jiān)聽的主要事件無非就是連接已建立,用它監(jiān)聽讀數(shù)據(jù)/寫數(shù)據(jù)事件也不符合設(shè)計(jì)。TcpConnection才是做這種事的。

            客戶端收發(fā)數(shù)據(jù)

            當(dāng)Connector發(fā)現(xiàn)連接真正建立好后,會(huì)回調(diào)到TcpClient::newConnection,在TcpClient構(gòu)造函數(shù)中:

                connector_->setNewConnectionCallback(
                  boost::bind(&TcpClient::newConnection, this, _1));
            

            TcpClient::newConnection中創(chuàng)建一個(gè)TcpConnection來代表這個(gè)連接:

                TcpConnectionPtr conn(new TcpConnection(loop_,
                                                        connName,
                                                        sockfd,
                                                        localAddr,
                                                        peerAddr));
            
                conn->setConnectionCallback(connectionCallback_);
                conn->setMessageCallback(messageCallback_);
                conn->setWriteCompleteCallback(writeCompleteCallback_);
                ...
                conn->connectEstablished();
            

            并同時(shí)設(shè)置事件回調(diào),以上設(shè)置的回調(diào)都是應(yīng)用層(即庫(kù)的使用者)的接口。每一個(gè)TcpConnection都有一個(gè)Channel,畢竟每一個(gè)網(wǎng)絡(luò)連接都對(duì)應(yīng)了一個(gè)socket fd。在TcpConnection構(gòu)造函數(shù)中創(chuàng)建了一個(gè)Channel,并設(shè)置事件回調(diào)函數(shù)。

            TcpConnection::connectEstablished函數(shù)最主要的是通知Channel自己開始關(guān)心IO讀取事件:

                void TcpConnection::connectEstablished()
                {
                    ...
                    channel_->enableReading();
            

            這是自此我們看到的第二個(gè)Channel::enableXXX接口,這些接口是如何實(shí)現(xiàn)關(guān)心IO事件的呢?這個(gè)后面講到。

            muduo的數(shù)據(jù)發(fā)送都是通過TcpConnection::send完成,這個(gè)就是一般網(wǎng)絡(luò)庫(kù)中在不使用OS的異步IO情況下的實(shí)現(xiàn):緩存應(yīng)用層傳遞過來的數(shù)據(jù),在IO設(shè)備可寫的情況下盡量寫入數(shù)據(jù)。這個(gè)主要實(shí)現(xiàn)在TcpConnection::sendInLoop中。

                TcpConnection::sendInLoop(....) {
                    ...
                    // if no thing in output queue, try writing directly
                    if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)  // 設(shè)備可寫且沒有緩存時(shí)立即寫入
                    { 
                        nwrote = sockets::write(channel_->fd(), data, len);
                    }
                    ...
                    // 否則加入數(shù)據(jù)到緩存,等待IO可寫時(shí)再寫
                    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
                    if (!channel_->isWriting())
                    {
                        // 注冊(cè)關(guān)心IO寫事件,Poller就會(huì)對(duì)寫做檢測(cè)
                        channel_->enableWriting();
                    }
                    ...     
                }
            

            當(dāng)IO可寫時(shí),Channel就會(huì)回調(diào)TcpConnection::handleWrite(構(gòu)造函數(shù)中注冊(cè))

                void TcpConnection::handleWrite()
                {
                    ...
                    if (channel_->isWriting())
                    {
                        ssize_t n = sockets::write(channel_->fd(),
                                           outputBuffer_.peek(),
                                           outputBuffer_.readableBytes());
            

            服務(wù)器端的數(shù)據(jù)收發(fā)同客戶端機(jī)制一致,不同的是連接(TcpConnection)的建立方式不同。

            服務(wù)器接收連接

            服務(wù)器接收連接的實(shí)現(xiàn)在一個(gè)網(wǎng)絡(luò)庫(kù)中比較重要。muduo中通過Acceptor類來接收連接。在TcpClient中,其Connector通過一個(gè)關(guān)心Channel可寫的事件來通過連接已建立;在Acceptor中則是通過一個(gè)Channel可讀的事件來表示有新的連接到來:

                Acceptor::Acceptor(....) {
                    ...
                    acceptChannel_.setReadCallback(
                        boost::bind(&Acceptor::handleRead, this));
                    ... 
                }
            
                void Acceptor::handleRead()
                {
                    ...
                    int connfd = acceptSocket_.accept(&peerAddr); // 接收連接獲得一個(gè)新的socket
                    if (connfd >= 0)
                    {
                        ...
                        newConnectionCallback_(connfd, peerAddr); // 回調(diào)到TcpServer::newConnection
            

            TcpServer::newConnection中建立一個(gè)TcpConnection,并將其附加到一個(gè)EventLoopThread中,簡(jiǎn)單來說就是給其配置一個(gè)線程:

                void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
                {
                    ...
                    EventLoop* ioLoop = threadPool_->getNextLoop();
                    TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                                            connName,
                                                            sockfd,
                                                            localAddr,
                                                            peerAddr));
                    connections_[connName] = conn;
                    ...
                    ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
            

            IO的驅(qū)動(dòng)

            之前提到,一旦要關(guān)心某IO事件了,就調(diào)用Channel::enableXXX,這個(gè)如何實(shí)現(xiàn)的呢?

                class Channel {
                    ...
                    void enableReading() { events_ |= kReadEvent; update(); }
                    void enableWriting() { events_ |= kWriteEvent; update(); }
                   
                void Channel::update()
                {
                    loop_->updateChannel(this);
                }
            
                void EventLoop::updateChannel(Channel* channel)
                {
                    ...
                    poller_->updateChannel(channel);
                }
            

            最終調(diào)用到Poller::upateChannel。muduo中有兩個(gè)Poller的實(shí)現(xiàn),分別是Poll和EPoll,可以選擇簡(jiǎn)單的Poll來看:

                void PollPoller::updateChannel(Channel* channel)
                {
                  ...
                  if (channel->index() < 0)
                  {
                    // a new one, add to pollfds_
                    assert(channels_.find(channel->fd()) == channels_.end());
                    struct pollfd pfd;
                    pfd.fd = channel->fd();
                    pfd.events = static_cast<short>(channel->events()); // 也就是Channel::enableXXX操作的那個(gè)events_
                    pfd.revents = 0;
                    pollfds_.push_back(pfd); // 加入一個(gè)新的pollfd
                    int idx = static_cast<int>(pollfds_.size())-1;
                    channel->set_index(idx);
                    channels_[pfd.fd] = channel;
            

            可見Poller就是把Channel關(guān)心的IO事件轉(zhuǎn)換為OS提供的IO模型數(shù)據(jù)結(jié)構(gòu)上。通過查看關(guān)鍵的pollfds_的使用,可以發(fā)現(xiàn)其主要是在Poller::poll接口里。這個(gè)接口會(huì)在EventLoop的主循環(huán)中不斷調(diào)用:

                void EventLoop::loop()
                {
                  ...
                  while (!quit_)
                  {
                    activeChannels_.clear();
                    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
                    ...
                    for (ChannelList::iterator it = activeChannels_.begin();
                        it != activeChannels_.end(); ++it)
                    {
                      currentActiveChannel_ = *it;
                      currentActiveChannel_->handleEvent(pollReturnTime_); // 獲得IO事件,通知各注冊(cè)回調(diào)
                    }
            

            整個(gè)流程可總結(jié)為:各Channel內(nèi)部會(huì)把自己關(guān)心的事件告訴給Poller,Poller由EventLoop驅(qū)動(dòng)檢測(cè)IO,然后返回哪些Channel發(fā)生了事件,EventLoop再驅(qū)動(dòng)這些Channel調(diào)用各注冊(cè)回調(diào)。

            從這個(gè)過程中可以看出,EventLoop就是一個(gè)事件產(chǎn)生器。

            線程模型

            在muduo的服務(wù)器中,muduo的線程模型是怎樣的呢?它如何通過線程來支撐高并發(fā)呢?其實(shí)很簡(jiǎn)單,它為每一個(gè)線程配置了一個(gè)EventLoop,這個(gè)線程同時(shí)被附加了若干個(gè)網(wǎng)絡(luò)連接,這個(gè)EventLoop服務(wù)于這些網(wǎng)絡(luò)連接,為這些連接收集并派發(fā)IO事件。

            回到TcpServer::newConnection中:

                void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
                {
                  ...
                  EventLoop* ioLoop = threadPool_->getNextLoop();
                  ...
                  TcpConnectionPtr conn(new TcpConnection(ioLoop, // 使用這個(gè)選擇到的線程中的EventLoop
                                                          connName,
                                                          sockfd,
                                                          localAddr,
                                                          peerAddr));
                  ...
                  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
            

            注意TcpConnection::connectEstablished是如何通過Channel注冊(cè)關(guān)心的IO事件到ioLoop的。

            極端來說,muduo的每一個(gè)連接線程可以只為一個(gè)網(wǎng)絡(luò)連接服務(wù),這就有點(diǎn)類似于thread per connection模型了。

            網(wǎng)絡(luò)模型

            傳說中的Reactor模式,以及one loop per thread,基于EventLoop的作用,以及線程池與TcpConnection的關(guān)系,可以醍醐灌頂般理解以下這張muduo的網(wǎng)絡(luò)模型圖了:


            總結(jié)

            本文主要對(duì)muduo的主要結(jié)構(gòu)及主要機(jī)制的實(shí)現(xiàn)做了描述,其他如Buffer的實(shí)現(xiàn)、定時(shí)器的實(shí)現(xiàn)大家都可以自行研究。muduo的源碼很清晰,通過源碼及配合陳碩博客上的內(nèi)容可以學(xué)到一些網(wǎng)絡(luò)編程方面的經(jīng)驗(yàn)。

            posted on 2014-05-04 18:22 Kevin Lynx 閱讀(14576) 評(píng)論(3)  編輯 收藏 引用 所屬分類: c/c++network

            評(píng)論

            # re: Muduo源碼閱讀[未登錄] 2014-05-04 21:43 春秋十二月

            這個(gè)庫(kù)的代碼比較簡(jiǎn)單  回復(fù)  更多評(píng)論   

            # re: Muduo源碼閱讀 2014-05-05 10:47 Enic

            樓主用的什么工具看的啊?  回復(fù)  更多評(píng)論   

            # re: Muduo源碼閱讀 2014-05-05 19:38 Kevin Lynx

            @Enic
            就vim,tag都沒用  回復(fù)  更多評(píng)論   

            久久丫精品国产亚洲av不卡| 国内精品久久久久久99| 精品国产乱码久久久久软件| 国产69精品久久久久9999APGF | 日韩人妻无码一区二区三区久久 | 久久天天躁狠狠躁夜夜av浪潮| 亚洲国产成人久久一区WWW| 久久国产精品77777| 久久精品无码专区免费| 久久精品国产亚洲AV蜜臀色欲| 久久精品视频免费| 精产国品久久一二三产区区别| 精品久久久久久| 久久强奷乱码老熟女网站| 久久99精品国产麻豆蜜芽| 人妻无码αv中文字幕久久| 日日狠狠久久偷偷色综合0 | 久久播电影网| 久久久久久免费一区二区三区 | 2021久久精品国产99国产精品| a级毛片无码兔费真人久久| 久久亚洲日韩精品一区二区三区| 久久伊人中文无码| 久久香蕉国产线看观看乱码| 久久亚洲精精品中文字幕| 日日狠狠久久偷偷色综合0 | 久久国产精品99精品国产| 2021国产精品午夜久久| 久久久精品人妻无码专区不卡| 久久99精品国产一区二区三区| 性色欲网站人妻丰满中文久久不卡| 日韩电影久久久被窝网| 国产一区二区精品久久凹凸| 成人午夜精品久久久久久久小说| 97久久香蕉国产线看观看| 少妇内射兰兰久久| 无码人妻久久一区二区三区 | 久久这里有精品视频| 久久久久亚洲AV综合波多野结衣| 成人a毛片久久免费播放| 国产成人精品久久综合|