最近簡單讀了下muduo的源碼,本文對其主要實現(xiàn)/結構簡單總結下。
muduo的主要源碼位于net文件夾下,base文件夾是一些基礎代碼,不影響理解網(wǎng)絡部分的實現(xiàn)。muduo主要類包括:
- EventLoop
- Channel
- Poller
- TcpConnection
- TcpClient
- TcpServer
- Connector
- Acceptor
- EventLoopThread
- EventLoopThreadPool
其中,Poller(及其實現(xiàn)類)包裝了Poll/EPoll,封裝了OS針對設備(fd)的操作;Channel是設備fd的包裝,在muduo中主要包裝socket;TcpConnection抽象一個TCP連接,無論是客戶端還是服務器只要建立了網(wǎng)絡連接就會使用TcpConnection;TcpClient/TcpServer分別抽象TCP客戶端和服務器;Connector/Acceptor分別包裝TCP客戶端和服務器的建立連接/接受連接;EventLoop是一個主控類,是一個事件發(fā)生器,它驅動Poller產(chǎn)生/發(fā)現(xiàn)事件,然后將事件派發(fā)到Channel處理;EventLoopThread是一個帶有EventLoop的線程;EventLoopThreadPool自然是一個EventLoopThread的資源池,維護一堆EventLoopThread。
閱讀庫源碼時可以從庫的接口層著手,看看關鍵功能是如何實現(xiàn)的。對于muduo而言,可以從TcpServer/TcpClient/EventLoop/TcpConnection這幾個類著手。接下來看看主要功能的實現(xiàn):
建立連接
TcpClient::connect
-> Connector::start
-> EventLoop::runInLoop(Connector::startInLoop...
-> Connector::connect
EventLoop::runInLoop接口用于在this所在的線程運行某個函數(shù),這個后面看下EventLoop的實現(xiàn)就可以了解。 網(wǎng)絡連接的最終建立是在Connector::connect中實現(xiàn),建立連接之后會創(chuàng)建一個Channel來代表這個socket,并且綁定事件監(jiān)聽接口。最后最重要的是,調(diào)用Channel::enableWriting
。Channel
有一系列的enableXX接口,這些接口用于標識自己關心某IO事件。后面會看到他們的實現(xiàn)。
Connector監(jiān)聽的主要事件無非就是連接已建立,用它監(jiān)聽讀數(shù)據(jù)/寫數(shù)據(jù)事件也不符合設計。TcpConnection才是做這種事的。
客戶端收發(fā)數(shù)據(jù)
當Connector發(fā)現(xiàn)連接真正建立好后,會回調(diào)到TcpClient::newConnection
,在TcpClient構造函數(shù)中:
connector_->setNewConnectionCallback(
boost::bind(&TcpClient::newConnection, this, _1));
TcpClient::newConnection
中創(chuàng)建一個TcpConnection來代表這個連接:
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
...
conn->connectEstablished();
并同時設置事件回調(diào),以上設置的回調(diào)都是應用層(即庫的使用者)的接口。每一個TcpConnection都有一個Channel,畢竟每一個網(wǎng)絡連接都對應了一個socket fd。在TcpConnection構造函數(shù)中創(chuàng)建了一個Channel,并設置事件回調(diào)函數(shù)。
TcpConnection::connectEstablished
函數(shù)最主要的是通知Channel自己開始關心IO讀取事件:
void TcpConnection::connectEstablished()
{
...
channel_->enableReading();
這是自此我們看到的第二個Channel::enableXXX
接口,這些接口是如何實現(xiàn)關心IO事件的呢?這個后面講到。
muduo的數(shù)據(jù)發(fā)送都是通過TcpConnection::send
完成,這個就是一般網(wǎng)絡庫中在不使用OS的異步IO情況下的實現(xiàn):緩存應用層傳遞過來的數(shù)據(jù),在IO設備可寫的情況下盡量寫入數(shù)據(jù)。這個主要實現(xiàn)在TcpConnection::sendInLoop
中。
TcpConnection::sendInLoop(....) {
...
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) // 設備可寫且沒有緩存時立即寫入
{
nwrote = sockets::write(channel_->fd(), data, len);
}
...
// 否則加入數(shù)據(jù)到緩存,等待IO可寫時再寫
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
// 注冊關心IO寫事件,Poller就會對寫做檢測
channel_->enableWriting();
}
...
}
當IO可寫時,Channel就會回調(diào)TcpConnection::handleWrite
(構造函數(shù)中注冊)
void TcpConnection::handleWrite()
{
...
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
服務器端的數(shù)據(jù)收發(fā)同客戶端機制一致,不同的是連接(TcpConnection)的建立方式不同。
服務器接收連接
服務器接收連接的實現(xiàn)在一個網(wǎng)絡庫中比較重要。muduo中通過Acceptor類來接收連接。在TcpClient中,其Connector通過一個關心Channel可寫的事件來通過連接已建立;在Acceptor中則是通過一個Channel可讀的事件來表示有新的連接到來:
Acceptor::Acceptor(....) {
...
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead, this));
...
}
void Acceptor::handleRead()
{
...
int connfd = acceptSocket_.accept(&peerAddr); // 接收連接獲得一個新的socket
if (connfd >= 0)
{
...
newConnectionCallback_(connfd, peerAddr); // 回調(diào)到TcpServer::newConnection
TcpServer::newConnection
中建立一個TcpConnection,并將其附加到一個EventLoopThread中,簡單來說就是給其配置一個線程:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
IO的驅動
之前提到,一旦要關心某IO事件了,就調(diào)用Channel::enableXXX
,這個如何實現(xiàn)的呢?
class Channel {
...
void enableReading() { events_ |= kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void Channel::update()
{
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
...
poller_->updateChannel(channel);
}
最終調(diào)用到Poller::upateChannel
。muduo中有兩個Poller的實現(xiàn),分別是Poll和EPoll,可以選擇簡單的Poll來看:
void PollPoller::updateChannel(Channel* channel)
{
...
if (channel->index() < 0)
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events()); // 也就是Channel::enableXXX操作的那個events_
pfd.revents = 0;
pollfds_.push_back(pfd); // 加入一個新的pollfd
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
可見Poller就是把Channel關心的IO事件轉換為OS提供的IO模型數(shù)據(jù)結構上。通過查看關鍵的pollfds_
的使用,可以發(fā)現(xiàn)其主要是在Poller::poll接口里。這個接口會在EventLoop的主循環(huán)中不斷調(diào)用:
void EventLoop::loop()
{
...
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
...
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_); // 獲得IO事件,通知各注冊回調(diào)
}
整個流程可總結為:各Channel內(nèi)部會把自己關心的事件告訴給Poller,Poller由EventLoop驅動檢測IO,然后返回哪些Channel發(fā)生了事件,EventLoop再驅動這些Channel調(diào)用各注冊回調(diào)。
從這個過程中可以看出,EventLoop就是一個事件產(chǎn)生器。
線程模型
在muduo的服務器中,muduo的線程模型是怎樣的呢?它如何通過線程來支撐高并發(fā)呢?其實很簡單,它為每一個線程配置了一個EventLoop,這個線程同時被附加了若干個網(wǎng)絡連接,這個EventLoop服務于這些網(wǎng)絡連接,為這些連接收集并派發(fā)IO事件。
回到TcpServer::newConnection
中:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
...
TcpConnectionPtr conn(new TcpConnection(ioLoop, // 使用這個選擇到的線程中的EventLoop
connName,
sockfd,
localAddr,
peerAddr));
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
注意TcpConnection::connectEstablished
是如何通過Channel注冊關心的IO事件到ioLoop
的。
極端來說,muduo的每一個連接線程可以只為一個網(wǎng)絡連接服務,這就有點類似于thread per connection模型了。
網(wǎng)絡模型
傳說中的Reactor模式,以及one loop per thread,基于EventLoop的作用,以及線程池與TcpConnection的關系,可以醍醐灌頂般理解以下這張muduo的網(wǎng)絡模型圖了:

總結
本文主要對muduo的主要結構及主要機制的實現(xiàn)做了描述,其他如Buffer的實現(xiàn)、定時器的實現(xiàn)大家都可以自行研究。muduo的源碼很清晰,通過源碼及配合陳碩博客上的內(nèi)容可以學到一些網(wǎng)絡編程方面的經(jīng)驗。