陳碩 (giantchen_AT_gmail)
Blog.csdn.net/Solstice t.sina.com.cn/giantchen
Muduo 全系列文章列表: http://blog.csdn.net/Solstice/category/779646.aspx
本文是《一種自動(dòng)反射消息類型的 Google Protobuf 網(wǎng)絡(luò)傳輸方案》的延續(xù),介紹如何將前文介紹的打包方案與 muduo::net::Buffer 結(jié)合,實(shí)現(xiàn)了 protobuf codec 和 dispatcher。
Muduo 的下載地址: http://muduo.googlecode.com/files/muduo-0.1.9-alpha.tar.gz ,SHA1 dc0bb5f7becdfc0277fb35f6dfaafee8209213bc ,本文的完整代碼可在線閱讀 http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/ 。
考慮到不是每個(gè)人都安裝了 Google Protobuf,muduo 中的 protobuf 相關(guān)示例默認(rèn)是不 build 的,如果你的機(jī)器上安裝了 protobuf 2.3.0 或 2.4.0a,那么可以用 ./build.sh protobuf_all 來構(gòu)建 protobuf 相關(guān)的 examples。
在介紹 codec 和 dispatcher 之前,先講講前文的一個(gè)未決問題。
為什么 Protobuf 的默認(rèn)序列化格式?jīng)]有包含消息的長度與類型?
Protobuf 是經(jīng)過深思熟慮的消息打包方案,它的默認(rèn)序列化格式?jīng)]有包含消息的長度與類型,自然有其道理。哪些情況下不需要在 protobuf 序列化得到的字節(jié)流中包含消息的長度和(或)類型?我能想到的答案有:
- 如果把消息寫入文件,一個(gè)文件存一個(gè)消息,那么序列化結(jié)果中不需要包含長度和類型,因?yàn)閺奈募臀募L度中可以得知消息的類型與長度。
- 如果把消息寫入文件,一個(gè)文件存多個(gè)消息,那么序列化結(jié)果中不需要包含類型,因?yàn)槲募痛砹讼⒌念愋汀?
- 如果把消息存入數(shù)據(jù)庫(或者 NoSQL),以 VARBINARY 字段保存,那么序列化結(jié)果中不需要包含長度和類型,因?yàn)閺淖侄蚊妥侄伍L度中可以得知消息的類型與長度。
- 如果把消息以 UDP 方式發(fā)生給對(duì)方,而且對(duì)方一個(gè) UDP port 只接收一種消息類型,那么序列化結(jié)果中不需要包含長度和類型,因?yàn)閺?port 和 UDP packet 長度中可以得知消息的類型與長度。
- 如果把消息以 TCP 短連接方式發(fā)給對(duì)方,而且對(duì)方一個(gè) TCP port 只接收一種消息類型,那么序列化結(jié)果中不需要包含長度和類型,因?yàn)閺?port 和 TCP 字節(jié)流長度中可以得知消息的類型與長度。
- 如果把消息以 TCP 長連接方式發(fā)給對(duì)方,但是對(duì)方一個(gè) TCP port 只接收一種消息類型,那么序列化結(jié)果中不需要包含類型,因?yàn)?port 代表了消息的類型。
- 如果采用 RPC 方式通信,那么只需要告訴對(duì)方 method name,對(duì)方自然能推斷出 Request 和 Response 的消息類型,這些可以由 protoc 生成的 RPC stubs 自動(dòng)搞定。
對(duì)于最后一點(diǎn),比方說 sudoku.proto 定義為:
service SudokuService {
rpc Solve (SudokuRequest) returns (SudokuResponse);
}
那么 RPC method Sudoku.Solve 對(duì)應(yīng)的請(qǐng)求和響應(yīng)分別是 SudokuRequest 和 SudokuResponse。在發(fā)送 RPC 請(qǐng)求的時(shí)候,不需要包含 SudokuRequest 的類型,只需要發(fā)送 method name Sudoku.Solve,對(duì)方自知道應(yīng)該按照 SudokuRequest 來解析(parse)請(qǐng)求。這個(gè)例子來自我的半成品項(xiàng)目 evproto,見 http://blog.csdn.net/Solstice/archive/2010/04/17/5497699.aspx 。
對(duì)于上述這些情況,如果 protobuf 無條件地把長度和類型放到序列化的字節(jié)串中,只會(huì)浪費(fèi)網(wǎng)絡(luò)帶寬和存儲(chǔ)。可見 protobuf 默認(rèn)不發(fā)送長度和類型是正確的決定。Protobuf 為消息格式的設(shè)計(jì)樹立了典范,哪些該自己搞定,哪些留給外部系統(tǒng)去解決,這些都考慮得很清楚。
只有在使用 TCP 長連接,且在一個(gè)連接上傳遞不止一種消息的情況下(比方同時(shí)發(fā) Heartbeat 和 Request/Response),才需要我前文提到的那種打包方案。(為什么要在一個(gè)連接上同時(shí)發(fā) Heartbeat 和業(yè)務(wù)消息?請(qǐng)見陳碩《分布式系統(tǒng)的工程化開發(fā)方法》 p.51 心跳協(xié)議的設(shè)計(jì)。)這時(shí)候我們需要一個(gè)分發(fā)器 dispatcher,把不同類型的消息分給各個(gè)消息處理函數(shù),這正是本文的主題之一。
以下均只考慮 TCP 長連接這一應(yīng)用場(chǎng)景。
先談?wù)劸幗獯a器。
什么是編解碼器 codec?
Codec 是 encoder 和 decoder 的縮寫,這是一個(gè)到軟硬件都在使用的術(shù)語,這里我借指“把網(wǎng)絡(luò)數(shù)據(jù)和業(yè)務(wù)消息之間互相轉(zhuǎn)換”的代碼。
在最簡(jiǎn)單的網(wǎng)絡(luò)編程中,沒有消息 message 只有字節(jié)流數(shù)據(jù),這時(shí)候是用不到 codec 的。比如我們前面講過的 echo server,它只需要把收到的數(shù)據(jù)原封不動(dòng)地發(fā)送回去,它不必關(guān)心消息的邊界(也沒有“消息”的概念),收多少就發(fā)多少,這種情況下它干脆直接使用 muduo::net::Buffer,取到數(shù)據(jù)再交給 TcpConnection 發(fā)送回去,見下圖。

non-trivial 的網(wǎng)絡(luò)服務(wù)程序通常會(huì)以消息為單位來通信,每條消息有明確的長度與界限。程序每次收到一個(gè)完整的消息的時(shí)候才開始處理,發(fā)送的時(shí)候也是把一個(gè)完整的消息交給網(wǎng)絡(luò)庫。比如我們前面講過的 asio chat 服務(wù),它的一條聊天記錄就是一條消息,我們?cè)O(shè)計(jì)了一個(gè)簡(jiǎn)單的消息格式,即在聊天記錄前面加上 4 字節(jié)的 length header,LengthHeaderCodec 代碼及解說見《Muduo 網(wǎng)絡(luò)編程示例之二:Boost.Asio 的聊天服務(wù)器》一文。
codec 的基本功能之一是做 TCP 分包:確定每條消息的長度,為消息劃分界限。在 non-blocking 網(wǎng)絡(luò)編程中,codec 幾乎是必不可少的。如果只收到了半條消息,那么不會(huì)觸發(fā)消息回調(diào),數(shù)據(jù)會(huì)停留在 Buffer 里(數(shù)據(jù)已經(jīng)讀到 Buffer 中了),等待收到一個(gè)完整的消息再通知處理函數(shù)。既然這個(gè)任務(wù)太常見,我們干脆做一個(gè) utility class,避免服務(wù)端和客戶端程序都要自己處理分包,這就有了 LengthHeaderCodec。這個(gè) codec 的使用有點(diǎn)奇怪,不需要繼承,它也沒有基類,只要把它當(dāng)成普通 data member 來用,把 TcpConnection 的數(shù)據(jù)喂給它,然后向它注冊(cè) onXXXMessage() 回調(diào),代碼見 asio chat 示例。muduo 里的 codec 都是這樣的風(fēng)格,通過 boost::function 粘合到一起。
codec 是一層間接性,它位于 TcpConnection 和 ChatServer 之間,攔截處理收到的數(shù)據(jù),在收到完整的消息之后再調(diào)用 CharServer 對(duì)應(yīng)的處理函數(shù),注意 CharServer::onStringMessage() 的參數(shù)是 std::string,不再是 muduo::net::Buffer,也就是說 LengthHeaderCodec 把 Buffer 解碼成了 string。另外,在發(fā)送消息的時(shí)候,ChatServer 通過 LengthHeaderCodec::send() 來發(fā)送 string,LengthHeaderCodec 負(fù)責(zé)把它編碼成 Buffer。這正是“編解碼器”名字的由來。

Protobuf codec 與此非常類似,只不過消息類型從 std::string 變成了 protobuf::Message。對(duì)于只接收處理 Query 消息的 QueryServer 來說,用 ProtobufCodec 非常方便,收到 protobuf::Message 之后 down cast 成 Query 來用就行。如果要接收處理不止一種消息,ProtobufCodec 恐怕還不能單獨(dú)完成工作,請(qǐng)繼續(xù)閱讀下文。

實(shí)現(xiàn) ProtobufCodec
Protobuf 的打包方案我已經(jīng)在《一種自動(dòng)反射消息類型的 Google Protobuf 網(wǎng)絡(luò)傳輸方案》中講過,并以 string 為載體演示了 encode 和 decode 操作。在 muduo 里,我們有專門的 Buffer class,編碼更輕松。
編碼算法很直截了當(dāng),按照前文定義的消息格式一路打包下來,最后更新一下首部的長度即可。
解碼算法有幾個(gè)要點(diǎn):
- protobuf::Message 是 new 出來的對(duì)象,它的生命期如何管理?muduo 采用 shared_ptr<Message> 來自動(dòng)管理對(duì)象生命期,這與其他地方的做法是一致的。
- 出錯(cuò)如何處理?比方說長度超出范圍、check sum 不正確、message type name 不能識(shí)別、message parse 出錯(cuò)等等。ProtobufCodec 定義了 ErrorCallback,用戶代碼可以注冊(cè)這個(gè)回調(diào)。如果不注冊(cè),默認(rèn)的處理是斷開連接,讓客戶重連重試。codec 的單元測(cè)試里模擬了各種出錯(cuò)情況。
- 如何處理一次收到半條消息、一條消息、一條半消息、兩條消息等等情況?這是每個(gè) non-blocking 網(wǎng)絡(luò)程序中的 codec 都要面對(duì)的問題。
ProtobufCodec 在實(shí)際使用中有明顯的不足:它只負(fù)責(zé)把 muduo::net::Buffer 轉(zhuǎn)換為具體類型的 protobuf::Message,應(yīng)用程序拿到 Message 之后還有再根據(jù)其具體類型做一次分發(fā)。我們可以考慮做一個(gè)簡(jiǎn)單通用的分發(fā)器 dispatcher,以簡(jiǎn)化客戶代碼。
此外,目前 ProtobufCodec 的實(shí)現(xiàn)非常初級(jí),它沒有充分利用 ZeroCopyInputStream 和 ZeroCopyOutputStream,而是把收到的數(shù)據(jù)作為 byte array 交給 protobuf Message 去解析,這給性能優(yōu)化留下了空間。protobuf Message 不要求數(shù)據(jù)連續(xù)(像 vector 那樣),只要求數(shù)據(jù)分段連續(xù)(像 deque 那樣),這給 buffer 管理帶來性能上的好處(避免重新分配內(nèi)存,減少內(nèi)存碎片),當(dāng)然也使得代碼變復(fù)雜。muduo::net::Buffer 非常簡(jiǎn)單,它內(nèi)部是 vector<char>,我目前不想讓 protobuf 影響 muduo 本身的設(shè)計(jì),畢竟 muduo 是個(gè)通用的網(wǎng)絡(luò)庫,不是為實(shí)現(xiàn) protobuf RPC 而特制的。
消息分發(fā)器 dispatcher 有什么用?
前面提到,在使用 TCP 長連接,且在一個(gè)連接上傳遞不止一種 protobuf 消息的情況下,客戶代碼需要對(duì)收到的消息按類型做分發(fā)。比方說,收到 Logon 消息就交給 QueryServer::onLogon() 去處理,收到 Query 消息就交給 QueryServer::onQuery() 去處理。這個(gè)消息分派機(jī)制可以做得稍微有點(diǎn)通用性,讓所有 muduo+protobuf 程序收益,而且不增加復(fù)雜性。
換句話說,又是一層間接性,ProtobufCodec 攔截了 TcpConnection 的數(shù)據(jù),把它轉(zhuǎn)換為 Message,ProtobufDispatcher 攔截了 ProtobufCodec 的 callback,按消息具體類型把它分派給多個(gè) callbacks。

ProtobufCodec 與 ProtobufDispatcher 的綜合運(yùn)用
我寫了兩個(gè)示例代碼,client 和 server,把 ProtobufCodec 和 ProtobufDispatcher 串聯(lián)起來使用。server 響應(yīng) Query 消息,發(fā)生回 Answer 消息,如果收到未知消息類型,則斷開連接。client 可以選擇發(fā)送 Query 或 Empty 消息,由命令行控制。這樣可以測(cè)試 unknown message callback。
為節(jié)省篇幅,這里就不列出代碼了,請(qǐng)移步閱讀
http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/client.cc
http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/server.cc
在構(gòu)造函數(shù)中,通過注冊(cè)回調(diào)函數(shù)把四方 (TcpConnection、codec、dispatcher、QueryServer) 結(jié)合起來。
ProtobufDispatcher 的兩種實(shí)現(xiàn)
要完成消息分發(fā),那么就是對(duì)消息做 type-switch,這似乎是一個(gè) bad smell,但是 protobuf Message 的 Descriptor 沒有留下定制點(diǎn)(比如暴露一個(gè) boost::any 成員),我們只好硬來了。
先定義
typedef boost::function<void (Message*)> ProtobufMessageCallback;
注意,本節(jié)出現(xiàn)的不是 muduo dispatcher 真實(shí)的代碼,僅為示意,突出重點(diǎn),便于畫圖。
ProtobufDispatcherLite 的結(jié)構(gòu)非常簡(jiǎn)單,它有一個(gè) map<Descriptor*, ProtobufMessageCallback> 成員,客戶代碼可以以 Descriptor* 為 key 注冊(cè)回調(diào)(recall: 每個(gè)具體消息類型都有一個(gè)全局的 Descriptor 對(duì)象,其地址是不變的,可以用來當(dāng) key)。在收到 protobuf Message 之后,在 map 中找到對(duì)應(yīng)的 ProtobufMessageCallback,然后調(diào)用之。如果找不到,就調(diào)用 defaultCallback。

當(dāng)然,它的設(shè)計(jì)也有小小的缺陷,那就是 ProtobufMessageCallback 限制了客戶代碼只能接受基類 Message,客戶代碼需要自己做向下轉(zhuǎn)型,比如:

如果我希望 QueryServer 這么設(shè)計(jì):不想每個(gè)消息處理函數(shù)自己做 down casting,而是交給 dispatcher 去處理,客戶代碼拿到的就已經(jīng)是想要的具體類型。如下:

那么該該如何實(shí)現(xiàn) ProtobufDispatcher 呢?它如何與多個(gè)未知的消息類型合作?做 down cast 需要知道目標(biāo)類型,難道我們要用一長串模板類型參數(shù)嗎?
有一個(gè)辦法,把多態(tài)與模板結(jié)合,利用 templated derived class 來提供類型上的靈活性。設(shè)計(jì)如下。

ProtobufDispatcher 有一個(gè)模板成員函數(shù),可以接受注冊(cè)任意消息類型 T 的回調(diào),然后它創(chuàng)建一個(gè)模板化的派生類 CallbackT<T>,這樣消息的類新信息就保存在了 CallbackT<T> 中,做 down casting 就簡(jiǎn)單了。
比方說,我們有兩個(gè)具體消息類型 Query 和 Answer。

然后我們這樣注冊(cè)回調(diào):
dispatcher_.registerMessageCallback<muduo::Query>(
boost::bind(&QueryServer::onQuery, this, _1, _2, _3));
dispatcher_.registerMessageCallback<muduo::Answer>(
boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));
這樣會(huì)具現(xiàn)化 (instantiation) 出兩個(gè) CallbackT 實(shí)體,如下:

以上設(shè)計(jì)參考了 shared_ptr 的 deleter,Scott Meyers 也談到過。
ProtobufCodec 和 ProtobufDispatcher 有何意義?
ProtobufCodec 和 ProtobufDispatcher 把每個(gè)直接收發(fā) protobuf Message 的網(wǎng)絡(luò)程序都會(huì)用到的功能提煉出來做成了公用的 utility,這樣以后新寫 protobuf 網(wǎng)絡(luò)程序就不必為打包分包和消息分發(fā)勞神了。它倆以庫的形式存在,是兩個(gè)可以拿來就當(dāng) data member 用的 class,它們沒有基類,也沒有用到虛函數(shù)或者別的什么面向?qū)ο筇卣鳎磺秩?muduo::net 或者你的代碼。
下一篇文章講《分布式程序的自動(dòng)回歸測(cè)試》會(huì)介紹利用 protobuf 的跨語言特性,采用 Java 為 C++ 服務(wù)程序編寫 test harness。