最近簡(jiǎn)單讀了下muduo的源碼,本文對(duì)其主要實(shí)現(xiàn)/結(jié)構(gòu)簡(jiǎn)單總結(jié)下。
muduo的主要源碼位于net文件夾下,base文件夾是一些基礎(chǔ)代碼,不影響理解網(wǎng)絡(luò)部分的實(shí)現(xiàn)。muduo主要類包括:
- EventLoop
- Channel
- Poller
- TcpConnection
- TcpClient
- TcpServer
- Connector
- Acceptor
- EventLoopThread
- EventLoopThreadPool
其中,Poller(及其實(shí)現(xiàn)類)包裝了Poll/EPoll,封裝了OS針對(duì)設(shè)備(fd)的操作;Channel是設(shè)備fd的包裝,在muduo中主要包裝socket;TcpConnection抽象一個(gè)TCP連接,無論是客戶端還是服務(wù)器只要建立了網(wǎng)絡(luò)連接就會(huì)使用TcpConnection;TcpClient/TcpServer分別抽象TCP客戶端和服務(wù)器;Connector/Acceptor分別包裝TCP客戶端和服務(wù)器的建立連接/接受連接;EventLoop是一個(gè)主控類,是一個(gè)事件發(fā)生器,它驅(qū)動(dòng)Poller產(chǎn)生/發(fā)現(xiàn)事件,然后將事件派發(fā)到Channel處理;EventLoopThread是一個(gè)帶有EventLoop的線程;EventLoopThreadPool自然是一個(gè)EventLoopThread的資源池,維護(hù)一堆EventLoopThread。
閱讀庫(kù)源碼時(shí)可以從庫(kù)的接口層著手,看看關(guān)鍵功能是如何實(shí)現(xiàn)的。對(duì)于muduo而言,可以從TcpServer/TcpClient/EventLoop/TcpConnection這幾個(gè)類著手。接下來看看主要功能的實(shí)現(xiàn):
建立連接
TcpClient::connect
-> Connector::start
-> EventLoop::runInLoop(Connector::startInLoop...
-> Connector::connect
EventLoop::runInLoop接口用于在this所在的線程運(yùn)行某個(gè)函數(shù),這個(gè)后面看下EventLoop的實(shí)現(xiàn)就可以了解。 網(wǎng)絡(luò)連接的最終建立是在Connector::connect中實(shí)現(xiàn),建立連接之后會(huì)創(chuàng)建一個(gè)Channel來代表這個(gè)socket,并且綁定事件監(jiān)聽接口。最后最重要的是,調(diào)用Channel::enableWriting
。Channel
有一系列的enableXX接口,這些接口用于標(biāo)識(shí)自己關(guān)心某IO事件。后面會(huì)看到他們的實(shí)現(xiàn)。
Connector監(jiān)聽的主要事件無非就是連接已建立,用它監(jiān)聽讀數(shù)據(jù)/寫數(shù)據(jù)事件也不符合設(shè)計(jì)。TcpConnection才是做這種事的。
客戶端收發(fā)數(shù)據(jù)
當(dāng)Connector發(fā)現(xiàn)連接真正建立好后,會(huì)回調(diào)到TcpClient::newConnection
,在TcpClient構(gòu)造函數(shù)中:
connector_->setNewConnectionCallback(
boost::bind(&TcpClient::newConnection, this, _1));
TcpClient::newConnection
中創(chuàng)建一個(gè)TcpConnection來代表這個(gè)連接:
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
...
conn->connectEstablished();
并同時(shí)設(shè)置事件回調(diào),以上設(shè)置的回調(diào)都是應(yīng)用層(即庫(kù)的使用者)的接口。每一個(gè)TcpConnection都有一個(gè)Channel,畢竟每一個(gè)網(wǎng)絡(luò)連接都對(duì)應(yīng)了一個(gè)socket fd。在TcpConnection構(gòu)造函數(shù)中創(chuàng)建了一個(gè)Channel,并設(shè)置事件回調(diào)函數(shù)。
TcpConnection::connectEstablished
函數(shù)最主要的是通知Channel自己開始關(guān)心IO讀取事件:
void TcpConnection::connectEstablished()
{
...
channel_->enableReading();
這是自此我們看到的第二個(gè)Channel::enableXXX
接口,這些接口是如何實(shí)現(xiàn)關(guān)心IO事件的呢?這個(gè)后面講到。
muduo的數(shù)據(jù)發(fā)送都是通過TcpConnection::send
完成,這個(gè)就是一般網(wǎng)絡(luò)庫(kù)中在不使用OS的異步IO情況下的實(shí)現(xiàn):緩存應(yīng)用層傳遞過來的數(shù)據(jù),在IO設(shè)備可寫的情況下盡量寫入數(shù)據(jù)。這個(gè)主要實(shí)現(xiàn)在TcpConnection::sendInLoop
中。
TcpConnection::sendInLoop(....) {
...
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) // 設(shè)備可寫且沒有緩存時(shí)立即寫入
{
nwrote = sockets::write(channel_->fd(), data, len);
}
...
// 否則加入數(shù)據(jù)到緩存,等待IO可寫時(shí)再寫
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
// 注冊(cè)關(guān)心IO寫事件,Poller就會(huì)對(duì)寫做檢測(cè)
channel_->enableWriting();
}
...
}
當(dāng)IO可寫時(shí),Channel就會(huì)回調(diào)TcpConnection::handleWrite
(構(gòu)造函數(shù)中注冊(cè))
void TcpConnection::handleWrite()
{
...
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
服務(wù)器端的數(shù)據(jù)收發(fā)同客戶端機(jī)制一致,不同的是連接(TcpConnection)的建立方式不同。
服務(wù)器接收連接
服務(wù)器接收連接的實(shí)現(xiàn)在一個(gè)網(wǎng)絡(luò)庫(kù)中比較重要。muduo中通過Acceptor類來接收連接。在TcpClient中,其Connector通過一個(gè)關(guān)心Channel可寫的事件來通過連接已建立;在Acceptor中則是通過一個(gè)Channel可讀的事件來表示有新的連接到來:
Acceptor::Acceptor(....) {
...
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead, this));
...
}
void Acceptor::handleRead()
{
...
int connfd = acceptSocket_.accept(&peerAddr); // 接收連接獲得一個(gè)新的socket
if (connfd >= 0)
{
...
newConnectionCallback_(connfd, peerAddr); // 回調(diào)到TcpServer::newConnection
TcpServer::newConnection
中建立一個(gè)TcpConnection,并將其附加到一個(gè)EventLoopThread中,簡(jiǎn)單來說就是給其配置一個(gè)線程:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
IO的驅(qū)動(dòng)
之前提到,一旦要關(guān)心某IO事件了,就調(diào)用Channel::enableXXX
,這個(gè)如何實(shí)現(xiàn)的呢?
class Channel {
...
void enableReading() { events_ |= kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void Channel::update()
{
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
...
poller_->updateChannel(channel);
}
最終調(diào)用到Poller::upateChannel
。muduo中有兩個(gè)Poller的實(shí)現(xiàn),分別是Poll和EPoll,可以選擇簡(jiǎn)單的Poll來看:
void PollPoller::updateChannel(Channel* channel)
{
...
if (channel->index() < 0)
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events()); // 也就是Channel::enableXXX操作的那個(gè)events_
pfd.revents = 0;
pollfds_.push_back(pfd); // 加入一個(gè)新的pollfd
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
可見Poller就是把Channel關(guān)心的IO事件轉(zhuǎn)換為OS提供的IO模型數(shù)據(jù)結(jié)構(gòu)上。通過查看關(guān)鍵的pollfds_
的使用,可以發(fā)現(xiàn)其主要是在Poller::poll接口里。這個(gè)接口會(huì)在EventLoop的主循環(huán)中不斷調(diào)用:
void EventLoop::loop()
{
...
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
...
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_); // 獲得IO事件,通知各注冊(cè)回調(diào)
}
整個(gè)流程可總結(jié)為:各Channel內(nèi)部會(huì)把自己關(guān)心的事件告訴給Poller,Poller由EventLoop驅(qū)動(dòng)檢測(cè)IO,然后返回哪些Channel發(fā)生了事件,EventLoop再驅(qū)動(dòng)這些Channel調(diào)用各注冊(cè)回調(diào)。
從這個(gè)過程中可以看出,EventLoop就是一個(gè)事件產(chǎn)生器。
線程模型
在muduo的服務(wù)器中,muduo的線程模型是怎樣的呢?它如何通過線程來支撐高并發(fā)呢?其實(shí)很簡(jiǎn)單,它為每一個(gè)線程配置了一個(gè)EventLoop,這個(gè)線程同時(shí)被附加了若干個(gè)網(wǎng)絡(luò)連接,這個(gè)EventLoop服務(wù)于這些網(wǎng)絡(luò)連接,為這些連接收集并派發(fā)IO事件。
回到TcpServer::newConnection
中:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
...
TcpConnectionPtr conn(new TcpConnection(ioLoop, // 使用這個(gè)選擇到的線程中的EventLoop
connName,
sockfd,
localAddr,
peerAddr));
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
注意TcpConnection::connectEstablished
是如何通過Channel注冊(cè)關(guān)心的IO事件到ioLoop
的。
極端來說,muduo的每一個(gè)連接線程可以只為一個(gè)網(wǎng)絡(luò)連接服務(wù),這就有點(diǎn)類似于thread per connection模型了。
網(wǎng)絡(luò)模型
傳說中的Reactor模式,以及one loop per thread,基于EventLoop的作用,以及線程池與TcpConnection的關(guān)系,可以醍醐灌頂般理解以下這張muduo的網(wǎng)絡(luò)模型圖了:

總結(jié)
本文主要對(duì)muduo的主要結(jié)構(gòu)及主要機(jī)制的實(shí)現(xiàn)做了描述,其他如Buffer的實(shí)現(xiàn)、定時(shí)器的實(shí)現(xiàn)大家都可以自行研究。muduo的源碼很清晰,通過源碼及配合陳碩博客上的內(nèi)容可以學(xué)到一些網(wǎng)絡(luò)編程方面的經(jīng)驗(yàn)。