最近簡單讀了下muduo的源碼,本文對其主要實現/結構簡單總結下。
muduo的主要源碼位于net文件夾下,base文件夾是一些基礎代碼,不影響理解網絡部分的實現。muduo主要類包括:
- EventLoop
- Channel
- Poller
- TcpConnection
- TcpClient
- TcpServer
- Connector
- Acceptor
- EventLoopThread
- EventLoopThreadPool
其中,Poller(及其實現類)包裝了Poll/EPoll,封裝了OS針對設備(fd)的操作;Channel是設備fd的包裝,在muduo中主要包裝socket;TcpConnection抽象一個TCP連接,無論是客戶端還是服務器只要建立了網絡連接就會使用TcpConnection;TcpClient/TcpServer分別抽象TCP客戶端和服務器;Connector/Acceptor分別包裝TCP客戶端和服務器的建立連接/接受連接;EventLoop是一個主控類,是一個事件發生器,它驅動Poller產生/發現事件,然后將事件派發到Channel處理;EventLoopThread是一個帶有EventLoop的線程;EventLoopThreadPool自然是一個EventLoopThread的資源池,維護一堆EventLoopThread。
閱讀庫源碼時可以從庫的接口層著手,看看關鍵功能是如何實現的。對于muduo而言,可以從TcpServer/TcpClient/EventLoop/TcpConnection這幾個類著手。接下來看看主要功能的實現:
建立連接
TcpClient::connect
-> Connector::start
-> EventLoop::runInLoop(Connector::startInLoop...
-> Connector::connect
EventLoop::runInLoop接口用于在this所在的線程運行某個函數,這個后面看下EventLoop的實現就可以了解。 網絡連接的最終建立是在Connector::connect中實現,建立連接之后會創建一個Channel來代表這個socket,并且綁定事件監聽接口。最后最重要的是,調用Channel::enableWriting
。Channel
有一系列的enableXX接口,這些接口用于標識自己關心某IO事件。后面會看到他們的實現。
Connector監聽的主要事件無非就是連接已建立,用它監聽讀數據/寫數據事件也不符合設計。TcpConnection才是做這種事的。
客戶端收發數據
當Connector發現連接真正建立好后,會回調到TcpClient::newConnection
,在TcpClient構造函數中:
connector_->setNewConnectionCallback(
boost::bind(&TcpClient::newConnection, this, _1));
TcpClient::newConnection
中創建一個TcpConnection來代表這個連接:
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
...
conn->connectEstablished();
并同時設置事件回調,以上設置的回調都是應用層(即庫的使用者)的接口。每一個TcpConnection都有一個Channel,畢竟每一個網絡連接都對應了一個socket fd。在TcpConnection構造函數中創建了一個Channel,并設置事件回調函數。
TcpConnection::connectEstablished
函數最主要的是通知Channel自己開始關心IO讀取事件:
void TcpConnection::connectEstablished()
{
...
channel_->enableReading();
這是自此我們看到的第二個Channel::enableXXX
接口,這些接口是如何實現關心IO事件的呢?這個后面講到。
muduo的數據發送都是通過TcpConnection::send
完成,這個就是一般網絡庫中在不使用OS的異步IO情況下的實現:緩存應用層傳遞過來的數據,在IO設備可寫的情況下盡量寫入數據。這個主要實現在TcpConnection::sendInLoop
中。
TcpConnection::sendInLoop(....) {
...
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) // 設備可寫且沒有緩存時立即寫入
{
nwrote = sockets::write(channel_->fd(), data, len);
}
...
// 否則加入數據到緩存,等待IO可寫時再寫
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
// 注冊關心IO寫事件,Poller就會對寫做檢測
channel_->enableWriting();
}
...
}
當IO可寫時,Channel就會回調TcpConnection::handleWrite
(構造函數中注冊)
void TcpConnection::handleWrite()
{
...
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
服務器端的數據收發同客戶端機制一致,不同的是連接(TcpConnection)的建立方式不同。
服務器接收連接
服務器接收連接的實現在一個網絡庫中比較重要。muduo中通過Acceptor類來接收連接。在TcpClient中,其Connector通過一個關心Channel可寫的事件來通過連接已建立;在Acceptor中則是通過一個Channel可讀的事件來表示有新的連接到來:
Acceptor::Acceptor(....) {
...
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead, this));
...
}
void Acceptor::handleRead()
{
...
int connfd = acceptSocket_.accept(&peerAddr); // 接收連接獲得一個新的socket
if (connfd >= 0)
{
...
newConnectionCallback_(connfd, peerAddr); // 回調到TcpServer::newConnection
TcpServer::newConnection
中建立一個TcpConnection,并將其附加到一個EventLoopThread中,簡單來說就是給其配置一個線程:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
IO的驅動
之前提到,一旦要關心某IO事件了,就調用Channel::enableXXX
,這個如何實現的呢?
class Channel {
...
void enableReading() { events_ |= kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void Channel::update()
{
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
...
poller_->updateChannel(channel);
}
最終調用到Poller::upateChannel
。muduo中有兩個Poller的實現,分別是Poll和EPoll,可以選擇簡單的Poll來看:
void PollPoller::updateChannel(Channel* channel)
{
...
if (channel->index() < 0)
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events()); // 也就是Channel::enableXXX操作的那個events_
pfd.revents = 0;
pollfds_.push_back(pfd); // 加入一個新的pollfd
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
可見Poller就是把Channel關心的IO事件轉換為OS提供的IO模型數據結構上。通過查看關鍵的pollfds_
的使用,可以發現其主要是在Poller::poll接口里。這個接口會在EventLoop的主循環中不斷調用:
void EventLoop::loop()
{
...
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
...
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_); // 獲得IO事件,通知各注冊回調
}
整個流程可總結為:各Channel內部會把自己關心的事件告訴給Poller,Poller由EventLoop驅動檢測IO,然后返回哪些Channel發生了事件,EventLoop再驅動這些Channel調用各注冊回調。
從這個過程中可以看出,EventLoop就是一個事件產生器。
線程模型
在muduo的服務器中,muduo的線程模型是怎樣的呢?它如何通過線程來支撐高并發呢?其實很簡單,它為每一個線程配置了一個EventLoop,這個線程同時被附加了若干個網絡連接,這個EventLoop服務于這些網絡連接,為這些連接收集并派發IO事件。
回到TcpServer::newConnection
中:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
...
TcpConnectionPtr conn(new TcpConnection(ioLoop, // 使用這個選擇到的線程中的EventLoop
connName,
sockfd,
localAddr,
peerAddr));
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
注意TcpConnection::connectEstablished
是如何通過Channel注冊關心的IO事件到ioLoop
的。
極端來說,muduo的每一個連接線程可以只為一個網絡連接服務,這就有點類似于thread per connection模型了。
網絡模型
傳說中的Reactor模式,以及one loop per thread,基于EventLoop的作用,以及線程池與TcpConnection的關系,可以醍醐灌頂般理解以下這張muduo的網絡模型圖了:

總結
本文主要對muduo的主要結構及主要機制的實現做了描述,其他如Buffer的實現、定時器的實現大家都可以自行研究。muduo的源碼很清晰,通過源碼及配合陳碩博客上的內容可以學到一些網絡編程方面的經驗。