陳碩 (giantchen_AT_gmail)
Blog.csdn.net/Solstice
這是《Muduo 網(wǎng)絡(luò)編程示例》系列的第二篇文章。
Muduo 全系列文章列表: http://blog.csdn.net/Solstice/category/779646.aspx
本文講介紹一個(gè)與 Boost.Asio 的示例代碼中的聊天服務(wù)器功能類似的網(wǎng)絡(luò)服務(wù)程序,包括客戶端與服務(wù)端的 muduo 實(shí)現(xiàn)。這個(gè)例子的主要目的是介紹如何處理分包,并初步涉及 Muduo 的多線程功能。Muduo 的下載地址: http://muduo.googlecode.com/files/muduo-0.1.7-alpha.tar.gz ,SHA1 873567e43b3c2cae592101ea809b30ba730f2ee6,本文的完整代碼可在線閱讀
http://code.google.com/p/muduo/source/browse/trunk/examples/asio/chat/ 。
TCP 分包
前面一篇《五個(gè)簡(jiǎn)單 TCP 協(xié)議》中處理的協(xié)議沒(méi)有涉及分包,在 TCP 這種字節(jié)流協(xié)議上做應(yīng)用層分包是網(wǎng)絡(luò)編程的基本需求。分包指的是在發(fā)生一個(gè)消息(message)或一幀(frame)數(shù)據(jù)時(shí),通過(guò)一定的處理,讓接收方能從字節(jié)流中識(shí)別并截取(還原)出一個(gè)個(gè)消息。“粘包問(wèn)題”是個(gè)偽問(wèn)題。
對(duì)于短連接的 TCP 服務(wù),分包不是一個(gè)問(wèn)題,只要發(fā)送方主動(dòng)關(guān)閉連接,就表示一條消息發(fā)送完畢,接收方 read() 返回 0,從而知道消息的結(jié)尾。例如前一篇文章里的 daytime 和 time 協(xié)議。
對(duì)于長(zhǎng)連接的 TCP 服務(wù),分包有四種方法:
- 消息長(zhǎng)度固定,比如 muduo 的 roundtrip 示例就采用了固定的 16 字節(jié)消息;
- 使用特殊的字符或字符串作為消息的邊界,例如 HTTP 協(xié)議的 headers 以 "\r\n" 為字段的分隔符;
- 在每條消息的頭部加一個(gè)長(zhǎng)度字段,這恐怕是最常見(jiàn)的做法,本文的聊天協(xié)議也采用這一辦法;
- 利用消息本身的格式來(lái)分包,例如 XML 格式的消息中 <root>...</root> 的配對(duì),或者 JSON 格式中的 { ... } 的配對(duì)。解析這種消息格式通常會(huì)用到狀態(tài)機(jī)。
在后文的代碼講解中還會(huì)仔細(xì)討論用長(zhǎng)度字段分包的常見(jiàn)陷阱。
聊天服務(wù)
本文實(shí)現(xiàn)的聊天服務(wù)非常簡(jiǎn)單,由服務(wù)端程序和客戶端程序組成,協(xié)議如下:
- 服務(wù)端程序中某個(gè)端口偵聽(tīng) (listen) 新的連接;
- 客戶端向服務(wù)端發(fā)起連接;
- 連接建立之后,客戶端隨時(shí)準(zhǔn)備接收服務(wù)端的消息并在屏幕上顯示出來(lái);
- 客戶端接受鍵盤(pán)輸入,以回車(chē)為界,把消息發(fā)送給服務(wù)端;
- 服務(wù)端接收到消息之后,依次發(fā)送給每個(gè)連接到它的客戶端;原來(lái)發(fā)送消息的客戶端進(jìn)程也會(huì)收到這條消息;
- 一個(gè)服務(wù)端進(jìn)程可以同時(shí)服務(wù)多個(gè)客戶端進(jìn)程,當(dāng)有消息到達(dá)服務(wù)端后,每個(gè)客戶端進(jìn)程都會(huì)收到同一條消息,服務(wù)端廣播發(fā)送消息的順序是任意的,不一定哪個(gè)客戶端會(huì)先收到這條消息。
- (可選)如果消息 A 先于消息 B 到達(dá)服務(wù)端,那么每個(gè)客戶端都會(huì)先收到 A 再收到 B。
這實(shí)際上是一個(gè)簡(jiǎn)單的基于 TCP 的應(yīng)用層廣播協(xié)議,由服務(wù)端負(fù)責(zé)把消息發(fā)送給每個(gè)連接到它的客戶端。參與“聊天”的既可以是人,也可以是程序。在以后的文章中,我將介紹一個(gè)稍微復(fù)雜的一點(diǎn)的例子 hub,它有“聊天室”的功能,客戶端可以注冊(cè)特定的 topic(s),并往某個(gè) topic 發(fā)送消息,這樣代碼更有意思。
消息格式
本聊天服務(wù)的消息格式非常簡(jiǎn)單,“消息”本身是一個(gè)字符串,每條消息的有一個(gè) 4 字節(jié)的頭部,以網(wǎng)絡(luò)序存放字符串的長(zhǎng)度。消息之間沒(méi)有間隙,字符串也不一定以 '\0' 結(jié)尾。比方說(shuō)有兩條消息 "hello" 和 "chenshuo",那么打包后的字節(jié)流是:
0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'
共 21 字節(jié)。
打包的代碼
這段代碼把 const string& message 打包為 muduo::net::Buffer,并通過(guò) conn 發(fā)送。
1: void send(muduo::net::TcpConnection* conn, const string& message)
2: {
3: muduo::net::Buffer buf;
4: buf.append(message.data(), message.size());
5: int32_t len = muduo::net::sockets::hostToNetwork32(static_cast<int32_t>(message.size()));
6: buf.prepend(&len, sizeof len);
7: conn->send(&buf);
8: }
muduo::Buffer 有一個(gè)很好的功能,它在頭部預(yù)留了 8 個(gè)字節(jié)的空間,這樣第 6 行的 prepend() 操作就不需要移動(dòng)已有的數(shù)據(jù),效率較高。
分包的代碼
解析數(shù)據(jù)往往比生成數(shù)據(jù)復(fù)雜,分包打包也不例外。
1: void onMessage(const muduo::net::TcpConnectionPtr& conn,
2: muduo::net::Buffer* buf,
3: muduo::Timestamp receiveTime)
4: {
5: while (buf->readableBytes() >= kHeaderLen)
6: {
7: const void* data = buf->peek();
8: int32_t tmp = *static_cast<const int32_t*>(data);
9: int32_t len = muduo::net::sockets::networkToHost32(tmp);
10: if (len > 65536 || len < 0)
11: {
12: LOG_ERROR << "Invalid length " << len;
13: conn->shutdown();
14: }
15: else if (buf->readableBytes() >= len + kHeaderLen)
16: {
17: buf->retrieve(kHeaderLen);
18: muduo::string message(buf->peek(), len);
19: buf->retrieve(len);
20: messageCallback_(conn, message, receiveTime); // 收到完整的消息,通知用戶
21: }
22: else
23: {
24: break;
25: }
26: }
27: }
上面這段代碼第 7 行用了 while 循環(huán)來(lái)反復(fù)讀取數(shù)據(jù),直到 Buffer 中的數(shù)據(jù)不夠一條完整的消息。請(qǐng)讀者思考,如果換成 if (buf->readableBytes() >= kHeaderLen) 會(huì)有什么后果。
以前面提到的兩條消息的字節(jié)流為例:
0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'
假設(shè)數(shù)據(jù)最終都全部到達(dá),onMessage() 至少要能正確處理以下各種數(shù)據(jù)到達(dá)的次序,每種情況下 messageCallback_ 都應(yīng)該被調(diào)用兩次:
- 每次收到一個(gè)字節(jié)的數(shù)據(jù),onMessage() 被調(diào)用 21 次;
- 數(shù)據(jù)分兩次到達(dá),第一次收到 2 個(gè)字節(jié),不足消息的長(zhǎng)度字段;
- 數(shù)據(jù)分兩次到達(dá),第一次收到 4 個(gè)字節(jié),剛好夠長(zhǎng)度字段,但是沒(méi)有 body;
- 數(shù)據(jù)分兩次到達(dá),第一次收到 8 個(gè)字節(jié),長(zhǎng)度完整,但 body 不完整;
- 數(shù)據(jù)分兩次到達(dá),第一次收到 9 個(gè)字節(jié),長(zhǎng)度完整,body 也完整;
- 數(shù)據(jù)分兩次到達(dá),第一次收到 10 個(gè)字節(jié),第一條消息的長(zhǎng)度完整、body 也完整,第二條消息長(zhǎng)度不完整;
- 請(qǐng)自行移動(dòng)分割點(diǎn),驗(yàn)證各種情況;
- 數(shù)據(jù)一次就全部到達(dá),這時(shí)必須用 while 循環(huán)來(lái)讀出兩條消息,否則消息會(huì)堆積。
請(qǐng)讀者驗(yàn)證 onMessage() 是否做到了以上幾點(diǎn)。這個(gè)例子充分說(shuō)明了 non-blocking read 必須和 input buffer 一起使用。
編解碼器 LengthHeaderCodec
有人評(píng)論 Muduo 的接收緩沖區(qū)不能設(shè)置回調(diào)函數(shù)的觸發(fā)條件,確實(shí)如此。每當(dāng) socket 可讀,Muduo 的 TcpConnection 會(huì)讀取數(shù)據(jù)并存入 Input Buffer,然后回調(diào)用戶的函數(shù)。不過(guò),一個(gè)簡(jiǎn)單的間接層就能解決問(wèn)題,讓用戶代碼只關(guān)心“消息到達(dá)”而不是“數(shù)據(jù)到達(dá)”,如本例中的 LengthHeaderCodec 所展示的那一樣。
1: #ifndef MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
2: #define MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
3:
4: #include <muduo/base/Logging.h>
5: #include <muduo/net/Buffer.h>
6: #include <muduo/net/SocketsOps.h>
7: #include <muduo/net/TcpConnection.h>
8:
9: #include <boost/function.hpp>
10: #include <boost/noncopyable.hpp>
11:
12: using muduo::Logger;
13:
14: class LengthHeaderCodec : boost::noncopyable
15: {
16: public:
17: typedef boost::function<void (const muduo::net::TcpConnectionPtr&,
18: const muduo::string& message,
19: muduo::Timestamp)> StringMessageCallback;
20:
21: explicit LengthHeaderCodec(const StringMessageCallback& cb)
22: : messageCallback_(cb)
23: {
24: }
25:
26: void onMessage(const muduo::net::TcpConnectionPtr& conn,
27: muduo::net::Buffer* buf,
28: muduo::Timestamp receiveTime)
29: { 同上 }
30:
31: void send(muduo::net::TcpConnection* conn, const muduo::string& message)
32: { 同上 }
33:
34: private:
35: StringMessageCallback messageCallback_;
36: const static size_t kHeaderLen = sizeof(int32_t);
37: };
38:
39: #endif // MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
這段代碼把以 Buffer* 為參數(shù)的 MessageCallback 轉(zhuǎn)換成了以 const string& 為參數(shù)的 StringMessageCallback,讓用戶代碼不必關(guān)心分包操作。客戶端和服務(wù)端都能從中受益。
服務(wù)端的實(shí)現(xiàn)
聊天服務(wù)器的服務(wù)端代碼小于 100 行,不到 asio 的一半。
請(qǐng)先閱讀第 68 行起的數(shù)據(jù)成員的定義。除了經(jīng)常見(jiàn)到的 EventLoop 和 TcpServer,ChatServer 還定義了 codec_ 和 std::set<TcpConnectionPtr> connections_ 作為成員,connections_ 是目前已建立的客戶連接,在收到消息之后,服務(wù)器會(huì)遍歷整個(gè)容器,把消息廣播給其中每一個(gè) TCP 連接。
首先,在構(gòu)造函數(shù)里注冊(cè)回調(diào):
1: #include "codec.h"
2:
3: #include <muduo/base/Logging.h>
4: #include <muduo/base/Mutex.h>
5: #include <muduo/net/EventLoop.h>
6: #include <muduo/net/SocketsOps.h>
7: #include <muduo/net/TcpServer.h>
8:
9: #include <boost/bind.hpp>
10:
11: #include <set>
12: #include <stdio.h>
13:
14: using namespace muduo;
15: using namespace muduo::net;
16:
17: class ChatServer : boost::noncopyable
18: {
19: public:
20: ChatServer(EventLoop* loop,
21: const InetAddress& listenAddr)
22: : loop_(loop),
23: server_(loop, listenAddr, "ChatServer"),
24: codec_(boost::bind(&ChatServer::onStringMessage, this, _1, _2, _3))
25: {
26: server_.setConnectionCallback(
27: boost::bind(&ChatServer::onConnection, this, _1));
28: server_.setMessageCallback(
29: boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
30: }
31:
32: void start()
33: {
34: server_.start();
35: }
36:
這里有幾點(diǎn)值得注意,在以往的代碼里是直接把本 class 的 onMessage() 注冊(cè)給 server_;這里我們把 LengthHeaderCodec::onMessage() 注冊(cè)給 server_,然后向 codec_ 注冊(cè)了 ChatServer::onStringMessage(),等于說(shuō)讓 codec_ 負(fù)責(zé)解析消息,然后把完整的消息回調(diào)給 ChatServer。這正是我前面提到的“一個(gè)簡(jiǎn)單的間接層”,在不增加 Muduo 庫(kù)的復(fù)雜度的前提下,提供了足夠的靈活性讓我們?cè)谟脩舸a里完成需要的工作。
另外,server_.start() 絕對(duì)不能在構(gòu)造函數(shù)里調(diào)用,這么做將來(lái)會(huì)有線程安全的問(wèn)題,見(jiàn)我在《當(dāng)析構(gòu)函數(shù)遇到多線程 ── C++ 中線程安全的對(duì)象回調(diào)》一文中的論述。
以下是處理連接的建立和斷開(kāi)的代碼,注意它把新建的連接加入到 connections_ 容器中,把已斷開(kāi)的連接從容器中刪除。這么做是為了避免內(nèi)存和資源泄漏,TcpConnectionPtr 是 boost::shared_ptr<TcpConnection>,是 muduo 里唯一一個(gè)默認(rèn)采用 shared_ptr 來(lái)管理生命期的對(duì)象。以后我們會(huì)談到這么做的原因。
37: private:
38: void onConnection(const TcpConnectionPtr& conn)
39: {
40: LOG_INFO << conn->localAddress().toHostPort() << " -> "
41: << conn->peerAddress().toHostPort() << " is "
42: << (conn->connected() ? "UP" : "DOWN");
43:
44: MutexLockGuard lock(mutex_);
45: if (conn->connected())
46: {
47: connections_.insert(conn);
48: }
49: else
50: {
51: connections_.erase(conn);
52: }
53: }
54:
以下是服務(wù)端處理消息的代碼,它遍歷整個(gè) connections_ 容器,把消息打包發(fā)送給各個(gè)客戶連接。
55: void onStringMessage(const TcpConnectionPtr&,
56: const string& message,
57: Timestamp)
58: {
59: MutexLockGuard lock(mutex_);
60: for (ConnectionList::iterator it = connections_.begin();
61: it != connections_.end();
62: ++it)
63: {
64: codec_.send(get_pointer(*it), message);
65: }
66: }
67:
數(shù)據(jù)成員:
68: typedef std::set<TcpConnectionPtr> ConnectionList;
69: EventLoop* loop_;
70: TcpServer server_;
71: LengthHeaderCodec codec_;
72: MutexLock mutex_;
73: ConnectionList connections_;
74: };
75:
main() 函數(shù)里邊是例行公事的代碼:
76: int main(int argc, char* argv[])
77: {
78: LOG_INFO << "pid = " << getpid();
79: if (argc > 1)
80: {
81: EventLoop loop;
82: uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
83: InetAddress serverAddr(port);
84: ChatServer server(&loop, serverAddr);
85: server.start();
86: loop.loop();
87: }
88: else
89: {
90: printf("Usage: %s port\n", argv[0]);
91: }
92: }
如果你讀過(guò) asio 的對(duì)應(yīng)代碼,會(huì)不會(huì)覺(jué)得 Reactor 往往比 Proactor 容易使用?
客戶端的實(shí)現(xiàn)
我有時(shí)覺(jué)得服務(wù)端的程序常常比客戶端的更容易寫(xiě),聊天服務(wù)器再次驗(yàn)證了我的看法。客戶端的復(fù)雜性來(lái)自于它要讀取鍵盤(pán)輸入,而 EventLoop 是獨(dú)占線程的,所以我用了兩個(gè)線程,main() 函數(shù)所在的線程負(fù)責(zé)讀鍵盤(pán),另外用一個(gè) EventLoopThread 來(lái)處理網(wǎng)絡(luò) IO。我暫時(shí)沒(méi)有把標(biāo)準(zhǔn)輸入輸出融入 Reactor 的想法,因?yàn)榉?wù)器程序的 stdin 和 stdout 往往是重定向了的。
來(lái)看代碼,首先,在構(gòu)造函數(shù)里注冊(cè)回調(diào),并使用了跟前面一樣的 LengthHeaderCodec 作為中間層,負(fù)責(zé)打包分包。
1: #include "codec.h"
2:
3: #include <muduo/base/Logging.h>
4: #include <muduo/base/Mutex.h>
5: #include <muduo/net/EventLoopThread.h>
6: #include <muduo/net/TcpClient.h>
7:
8: #include <boost/bind.hpp>
9: #include <boost/noncopyable.hpp>
10:
11: #include <iostream>
12: #include <stdio.h>
13:
14: using namespace muduo;
15: using namespace muduo::net;
16:
17: class ChatClient : boost::noncopyable
18: {
19: public:
20: ChatClient(EventLoop* loop, const InetAddress& listenAddr)
21: : loop_(loop),
22: client_(loop, listenAddr, "ChatClient"),
23: codec_(boost::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
24: {
25: client_.setConnectionCallback(
26: boost::bind(&ChatClient::onConnection, this, _1));
27: client_.setMessageCallback(
28: boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
29: client_.enableRetry();
30: }
31:
32: void connect()
33: {
34: client_.connect();
35: }
36:
disconnect() 目前為空,客戶端的連接由操作系統(tǒng)在進(jìn)程終止時(shí)關(guān)閉。
37: void disconnect()
38: {
39: // client_.disconnect();
40: }
41:
write() 會(huì)由 main 線程調(diào)用,所以要加鎖,這個(gè)鎖不是為了保護(hù) TcpConnection,而是保護(hù) shared_ptr。
42: void write(const string& message)
43: {
44: MutexLockGuard lock(mutex_);
45: if (connection_)
46: {
47: codec_.send(get_pointer(connection_), message);
48: }
49: }
50:
onConnection() 會(huì)由 EventLoop 線程調(diào)用,所以要加鎖以保護(hù) shared_ptr。
51: private:
52: void onConnection(const TcpConnectionPtr& conn)
53: {
54: LOG_INFO << conn->localAddress().toHostPort() << " -> "
55: << conn->peerAddress().toHostPort() << " is "
56: << (conn->connected() ? "UP" : "DOWN");
57:
58: MutexLockGuard lock(mutex_);
59: if (conn->connected())
60: {
61: connection_ = conn;
62: }
63: else
64: {
65: connection_.reset();
66: }
67: }
68:
把收到的消息打印到屏幕,這個(gè)函數(shù)由 EventLoop 線程調(diào)用,但是不用加鎖,因?yàn)?printf() 是線程安全的。
注意這里不能用 cout,它不是線程安全的。
69: void onStringMessage(const TcpConnectionPtr&,
70: const string& message,
71: Timestamp)
72: {
73: printf("<<< %s\n", message.c_str());
74: }
75:
數(shù)據(jù)成員:
76: EventLoop* loop_;
77: TcpClient client_;
78: LengthHeaderCodec codec_;
79: MutexLock mutex_;
80: TcpConnectionPtr connection_;
81: };
82:
main() 函數(shù)里除了例行公事,還要啟動(dòng) EventLoop 線程和讀取鍵盤(pán)輸入。
83: int main(int argc, char* argv[])
84: {
85: LOG_INFO << "pid = " << getpid();
86: if (argc > 2)
87: {
88: EventLoopThread loopThread;
89: uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
90: InetAddress serverAddr(argv[1], port);
91:
92: ChatClient client(loopThread.startLoop(), serverAddr); // 注冊(cè)到 EventLoopThread 的 EventLoop 上。
93: client.connect();
94: std::string line;
95: while (std::getline(std::cin, line))
96: {
97: string message(line.c_str()); // 這里似乎多此一舉,可直接發(fā)送 line。這里是
98: client.write(message);
99: }
100: client.disconnect();
101: }
102: else
103: {
104: printf("Usage: %s host_ip port\n", argv[0]);
105: }
106: }
107:
簡(jiǎn)單測(cè)試
開(kāi)三個(gè)命令行窗口,在第一個(gè)運(yùn)行
$ ./asio_chat_server 3000
第二個(gè)運(yùn)行
$ ./asio_chat_client 127.0.0.1 3000
第三個(gè)運(yùn)行同樣的命令
$ ./asio_chat_client 127.0.0.1 3000
這樣就有兩個(gè)客戶端進(jìn)程參與聊天。在第二個(gè)窗口里輸入一些字符并回車(chē),字符會(huì)出現(xiàn)在本窗口和第三個(gè)窗口中。
下一篇文章我會(huì)介紹 Muduo 中的定時(shí)器,并實(shí)現(xiàn) Boost.Asio 教程中的 timer2~5 示例,以及帶流量統(tǒng)計(jì)功能的 discard 和 echo 服務(wù)器(來(lái)自 Java Netty)。流量等于單位時(shí)間內(nèi)發(fā)送或接受的字節(jié)數(shù),這要用到定時(shí)器功能。
(待續(xù))