Muduo 網(wǎng)絡(luò)編程示例之八:Timing wheel 踢掉空閑連接
陳碩 (giantchen_AT_gmail)
Blog.csdn.net/Solstice t.sina.com.cn/giantchen
這是《Muduo 網(wǎng)絡(luò)編程示例》系列的第八篇文章,原計(jì)劃講文件傳輸,這里插入一點(diǎn)計(jì)劃之外的內(nèi)容。
Muduo 全系列文章列表: http://blog.csdn.net/Solstice/category/779646.aspx
本文介紹如何使用 timing wheel 來(lái)踢掉空閑的連接,一個(gè)連接如果若干秒沒有收到數(shù)據(jù),就認(rèn)為是空閑連接。
本文的代碼見 http://code.google.com/p/muduo/source/browse/trunk/examples/idleconnection
在嚴(yán)肅的網(wǎng)絡(luò)程序中,應(yīng)用層的心跳協(xié)議是必不可少的。應(yīng)該用心跳消息來(lái)判斷對(duì)方進(jìn)程是否能正常工作,“踢掉空閑連接”只是一時(shí)權(quán)宜之計(jì)。我這里想順便講講 shared_ptr 和 weak_ptr 的用法。
如果一個(gè)連接連續(xù)幾秒鐘(后文以 8s 為例)內(nèi)沒有收到數(shù)據(jù),就把它斷開,為此有兩種簡(jiǎn)單粗暴的做法:
- 每個(gè)連接保存“最后收到數(shù)據(jù)的時(shí)間 lastReceiveTime”,然后用一個(gè)定時(shí)器,每秒鐘遍歷一遍所有連接,斷開那些 (now - connection.lastReceiveTime) > 8s 的 connection。這種做法全局只有一個(gè) repeated timer,不過(guò)每次 timeout 都要檢查全部連接,如果連接數(shù)目比較大(幾千上萬(wàn)),這一步可能會(huì)比較費(fèi)時(shí)。
- 每個(gè)連接設(shè)置一個(gè) one-shot timer,超時(shí)定為 8s,在超時(shí)的時(shí)候就斷開本連接。當(dāng)然,每次收到數(shù)據(jù)要去更新 timer。這種做法需要很多個(gè) one-shot timer,會(huì)頻繁地更新 timers。如果連接數(shù)目比較大,可能對(duì) reactor 的 timer queue 造成壓力。
使用 timing wheel 能避免上述兩種做法的缺點(diǎn)。timing wheel 可以翻譯為“時(shí)間輪盤”或“刻度盤”,本文保留英文。
連接超時(shí)不需要精確定時(shí),只要大致 8 秒鐘超時(shí)斷開就行,多一秒少一秒關(guān)系不大。處理連接超時(shí)可以用一個(gè)簡(jiǎn)單的數(shù)據(jù)結(jié)構(gòu):8 個(gè)桶組成的循環(huán)隊(duì)列。第一個(gè)桶放下一秒將要超時(shí)的連接,第二個(gè)放下 2 秒將要超時(shí)的連接。每個(gè)連接一收到數(shù)據(jù)就把自己放到第 8 個(gè)桶,然后在每秒鐘的 callback 里把第一個(gè)桶里的連接斷開,把這個(gè)空桶挪到隊(duì)尾。這樣大致可以做到 8 秒鐘沒有數(shù)據(jù)就超時(shí)斷開連接。更重要的是,每次不用檢查全部的 connection,只要檢查第一個(gè)桶里的 connections,相當(dāng)于把任務(wù)分散了。
Timing wheel 原理
《Hashed and hierarchical timing wheels: efficient data structures for implementing a timer facility》這篇論文詳細(xì)比較了實(shí)現(xiàn)定時(shí)器的各種數(shù)據(jù)結(jié)構(gòu),并提出了層次化的 timing wheel 與 hash timing wheel 等新結(jié)構(gòu)。針對(duì)本文要解決的問(wèn)題的特點(diǎn),我們不需要實(shí)現(xiàn)一個(gè)通用的定時(shí)器,只用實(shí)現(xiàn) simple timing wheel 即可。
Simple timing wheel 的基本結(jié)構(gòu)是一個(gè)循環(huán)隊(duì)列,還有一個(gè)指向隊(duì)尾的指針 (tail),這個(gè)指針每秒鐘移動(dòng)一格,就像鐘表上的時(shí)針,timing wheel 由此得名。
以下是某一時(shí)刻 timing wheel 的狀態(tài),格子里的數(shù)字是倒計(jì)時(shí)(與通常的 timing wheel 相反),表示這個(gè)格子(桶子)中的連接的剩余壽命。

一秒鐘以后,tail 指針移動(dòng)一格,原來(lái)四點(diǎn)鐘方向的格子被清空,其中的連接已被斷開。

連接超時(shí)被踢掉的過(guò)程
假設(shè)在某個(gè)時(shí)刻,conn 1 到達(dá),把它放到當(dāng)前格子中,它的剩余壽命是 7 秒。此后 conn 1 上沒有收到數(shù)據(jù)。

1 秒鐘之后,tail 指向下一個(gè)格子,conn 1 的剩余壽命是 6 秒。

又過(guò)了幾秒鐘,tail 指向 conn 1 之前的那個(gè)格子,conn 1 即將被斷開。

下一秒,tail 重新指向 conn 1 原來(lái)所在的格子,清空其中的數(shù)據(jù),斷開 conn 1 連接。

連接刷新
如果在斷開 conn 1 之前收到數(shù)據(jù),就把它移到當(dāng)前的格子里。

收到數(shù)據(jù),conn 1 的壽命延長(zhǎng)為 7 秒。

時(shí)間繼續(xù)前進(jìn),conn 1 壽命遞減,不過(guò)它已經(jīng)比第一種情況長(zhǎng)壽了。

多個(gè)連接
timing wheel 中的每個(gè)格子是個(gè) hash set,可以容納不止一個(gè)連接。
比如一開始,conn 1 到達(dá)。

隨后,conn 2 到達(dá),這時(shí)候 tail 還沒有移動(dòng),兩個(gè)連接位于同一個(gè)格子中,具有相同的剩余壽命。(下圖中畫成鏈表,代碼中是哈希表。)

幾秒鐘之后,conn 1 收到數(shù)據(jù),而 conn 2 一直沒有收到數(shù)據(jù),那么 conn 1 被移到當(dāng)前的格子中。這時(shí) conn 1 的壽命比 conn 2 長(zhǎng)。

代碼實(shí)現(xiàn)與改進(jìn)
我們用以前多次出現(xiàn)的 EchoServer 來(lái)說(shuō)明具體如何實(shí)現(xiàn) timing wheel。代碼見 http://code.google.com/p/muduo/source/browse/trunk/examples/idleconnection
在具體實(shí)現(xiàn)中,格子里放的不是連接,而是一個(gè)特制的 Entry struct,每個(gè) Entry 包含 TcpConnection 的 weak_ptr。Entry 的析構(gòu)函數(shù)會(huì)判斷連接是否還存在(用 weak_ptr),如果還存在則斷開連接。
數(shù)據(jù)結(jié)構(gòu):
typedef boost::weak_ptr<muduo::net::TcpConnection> WeakTcpConnectionPtr;
struct Entry : public muduo::copyable
{
Entry(const WeakTcpConnectionPtr& weakConn)
: weakConn_(weakConn)
{
}
~Entry()
{
muduo::net::TcpConnectionPtr conn = weakConn_.lock();
if (conn)
{
conn->shutdown();
}
}
WeakTcpConnectionPtr weakConn_;
};
typedef boost::shared_ptr<Entry> EntryPtr;
typedef boost::weak_ptr<Entry> WeakEntryPtr;
typedef boost::unordered_set<EntryPtr> Bucket;
typedef boost::circular_buffer<Bucket> WeakConnectionList;
在實(shí)現(xiàn)中,為了簡(jiǎn)單起見,我們不會(huì)真的把一個(gè)連接從一個(gè)格子移到另一個(gè)格子,而是采用引用計(jì)數(shù)的辦法,用 shared_ptr 來(lái)管理 Entry。如果從連接收到數(shù)據(jù),就把對(duì)應(yīng)的 EntryPtr 放到這個(gè)格子里,這樣它的引用計(jì)數(shù)就遞增了。當(dāng) Entry 的引用計(jì)數(shù)遞減到零,說(shuō)明它沒有在任何一個(gè)格子里出現(xiàn),那么連接超時(shí),Entry 的析構(gòu)函數(shù)會(huì)斷開連接。
Timing wheel 用 boost::circular_buffer 實(shí)現(xiàn),其中每個(gè) Bucket 元素是個(gè) hash set of EntryPtr。
在構(gòu)造函數(shù)中,注冊(cè)每秒鐘的回調(diào)(EventLoop::runEvery() 注冊(cè) EchoServer::onTimer() ),然后把 timing wheel 設(shè)為適當(dāng)?shù)拇笮 ?/p>
EchoServer::EchoServer(EventLoop* loop,
const InetAddress& listenAddr,
int idleSeconds)
: loop_(loop),
server_(loop, listenAddr, "EchoServer"),
connectionBuckets_(idleSeconds)
{
server_.setConnectionCallback(
boost::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&EchoServer::onMessage, this, _1, _2, _3));
loop->runEvery(1.0, boost::bind(&EchoServer::onTimer, this));
connectionBuckets_.resize(idleSeconds);
}
其中 EchoServer::onTimer() 的實(shí)現(xiàn)只有一行:往隊(duì)尾添加一個(gè)空的 Bucket,這樣 circular_buffer 會(huì)自動(dòng)彈出隊(duì)首的 Bucket,并析構(gòu)之。在析構(gòu) Bucket 的時(shí)候,會(huì)依次析構(gòu)其中的 EntryPtr 對(duì)象,這樣 Entry 的引用計(jì)數(shù)就不用我們?nèi)ゲ傩模珻++ 的值語(yǔ)意會(huì)幫我們搞定一切。
void EchoServer::onTimer()
{
connectionBuckets_.push_back(Bucket());
}
在連接建立時(shí),創(chuàng)建一個(gè) Entry 對(duì)象,把它放到 timing wheel 的隊(duì)尾。另外,我們還需要把 Entry 的弱引用保存到 TcpConnection 的 context 里,因?yàn)樵谑盏綌?shù)據(jù)的時(shí)候還要用到 Entry。(思考題:如果 TcpConnection::setContext 保存的是強(qiáng)引用 EntryPtr,會(huì)出現(xiàn)什么情況?)
void EchoServer::onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << "EchoServer - " << conn->peerAddress().toHostPort() << " -> "
<< conn->localAddress().toHostPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected())
{
EntryPtr entry(new Entry(conn));
connectionBuckets_.back().insert(entry);
WeakEntryPtr weakEntry(entry);
conn->setContext(weakEntry);
}
else
{
assert(!conn->getContext().empty());
WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));
LOG_DEBUG << "Entry use_count = " << weakEntry.use_count();
}
}
在收到消息時(shí),從 TcpConnection 的 context 中取出 Entry 的弱引用,把它提升為強(qiáng)引用 EntryPtr,然后放到當(dāng)前的 timing wheel 隊(duì)尾。(思考題,為什么要把 Entry 作為 TcpConnection 的 context 保存,如果這里再創(chuàng)建一個(gè)新的 Entry 會(huì)有什么后果?)
void EchoServer::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp time)
{
string msg(buf->retrieveAsString());
LOG_INFO << conn->name() << " echo " << msg.size() << " bytes at " << time.toString();
conn->send(msg);
assert(!conn->getContext().empty());
WeakEntryPtr weakEntry(boost::any_cast<WeakEntryPtr>(conn->getContext()));
EntryPtr entry(weakEntry.lock());
if (entry)
{
connectionBuckets_.back().insert(entry);
}
}
然后呢?沒有然后了,程序已經(jīng)完成了我們想要的功能。(完整的代碼會(huì)打印 circular_buffer 變化的情況,運(yùn)行一下即可理解。)
希望本文有助于您理解 shared_ptr 和 weak_ptr。
改進(jìn)
在現(xiàn)在的實(shí)現(xiàn)中,每次收到消息都會(huì)往隊(duì)尾添加 EntryPtr (當(dāng)然,hash set 會(huì)幫我們?nèi)ブ亍#┮粋€(gè)簡(jiǎn)單的改進(jìn)措施是,在 TcpConnection 里保存“最后一次往隊(duì)尾添加引用時(shí)的 tail 位置”,然后先檢查 tail 是否變化,若無(wú)變化則不重復(fù)添加 EntryPtr。這樣或許能提高效率。
以上改進(jìn)留作練習(xí)。