陳碩 (giantchen_AT_gmail)
Blog.csdn.net/Solstice t.sina.com.cn/giantchen
Muduo 全系列文章列表: http://blog.csdn.net/Solstice/category/779646.aspx
本文是《一種自動反射消息類型的 Google Protobuf 網絡傳輸方案》的延續,介紹如何將前文介紹的打包方案與 muduo::net::Buffer 結合,實現了 protobuf codec 和 dispatcher。
Muduo 的下載地址: http://muduo.googlecode.com/files/muduo-0.1.9-alpha.tar.gz ,SHA1 dc0bb5f7becdfc0277fb35f6dfaafee8209213bc ,本文的完整代碼可在線閱讀 http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/ 。
考慮到不是每個人都安裝了 Google Protobuf,muduo 中的 protobuf 相關示例默認是不 build 的,如果你的機器上安裝了 protobuf 2.3.0 或 2.4.0a,那么可以用 ./build.sh protobuf_all 來構建 protobuf 相關的 examples。
在介紹 codec 和 dispatcher 之前,先講講前文的一個未決問題。
為什么 Protobuf 的默認序列化格式沒有包含消息的長度與類型?
Protobuf 是經過深思熟慮的消息打包方案,它的默認序列化格式沒有包含消息的長度與類型,自然有其道理。哪些情況下不需要在 protobuf 序列化得到的字節流中包含消息的長度和(或)類型?我能想到的答案有:
- 如果把消息寫入文件,一個文件存一個消息,那么序列化結果中不需要包含長度和類型,因為從文件名和文件長度中可以得知消息的類型與長度。
- 如果把消息寫入文件,一個文件存多個消息,那么序列化結果中不需要包含類型,因為文件名就代表了消息的類型。
- 如果把消息存入數據庫(或者 NoSQL),以 VARBINARY 字段保存,那么序列化結果中不需要包含長度和類型,因為從字段名和字段長度中可以得知消息的類型與長度。
- 如果把消息以 UDP 方式發生給對方,而且對方一個 UDP port 只接收一種消息類型,那么序列化結果中不需要包含長度和類型,因為從 port 和 UDP packet 長度中可以得知消息的類型與長度。
- 如果把消息以 TCP 短連接方式發給對方,而且對方一個 TCP port 只接收一種消息類型,那么序列化結果中不需要包含長度和類型,因為從 port 和 TCP 字節流長度中可以得知消息的類型與長度。
- 如果把消息以 TCP 長連接方式發給對方,但是對方一個 TCP port 只接收一種消息類型,那么序列化結果中不需要包含類型,因為 port 代表了消息的類型。
- 如果采用 RPC 方式通信,那么只需要告訴對方 method name,對方自然能推斷出 Request 和 Response 的消息類型,這些可以由 protoc 生成的 RPC stubs 自動搞定。
對于最后一點,比方說 sudoku.proto 定義為:
service SudokuService {
rpc Solve (SudokuRequest) returns (SudokuResponse);
}
那么 RPC method Sudoku.Solve 對應的請求和響應分別是 SudokuRequest 和 SudokuResponse。在發送 RPC 請求的時候,不需要包含 SudokuRequest 的類型,只需要發送 method name Sudoku.Solve,對方自知道應該按照 SudokuRequest 來解析(parse)請求。這個例子來自我的半成品項目 evproto,見 http://blog.csdn.net/Solstice/archive/2010/04/17/5497699.aspx 。
對于上述這些情況,如果 protobuf 無條件地把長度和類型放到序列化的字節串中,只會浪費網絡帶寬和存儲??梢?protobuf 默認不發送長度和類型是正確的決定。Protobuf 為消息格式的設計樹立了典范,哪些該自己搞定,哪些留給外部系統去解決,這些都考慮得很清楚。
只有在使用 TCP 長連接,且在一個連接上傳遞不止一種消息的情況下(比方同時發 Heartbeat 和 Request/Response),才需要我前文提到的那種打包方案。(為什么要在一個連接上同時發 Heartbeat 和業務消息?請見陳碩《分布式系統的工程化開發方法》 p.51 心跳協議的設計。)這時候我們需要一個分發器 dispatcher,把不同類型的消息分給各個消息處理函數,這正是本文的主題之一。
以下均只考慮 TCP 長連接這一應用場景。
先談談編解碼器。
什么是編解碼器 codec?
Codec 是 encoder 和 decoder 的縮寫,這是一個到軟硬件都在使用的術語,這里我借指“把網絡數據和業務消息之間互相轉換”的代碼。
在最簡單的網絡編程中,沒有消息 message 只有字節流數據,這時候是用不到 codec 的。比如我們前面講過的 echo server,它只需要把收到的數據原封不動地發送回去,它不必關心消息的邊界(也沒有“消息”的概念),收多少就發多少,這種情況下它干脆直接使用 muduo::net::Buffer,取到數據再交給 TcpConnection 發送回去,見下圖。

non-trivial 的網絡服務程序通常會以消息為單位來通信,每條消息有明確的長度與界限。程序每次收到一個完整的消息的時候才開始處理,發送的時候也是把一個完整的消息交給網絡庫。比如我們前面講過的 asio chat 服務,它的一條聊天記錄就是一條消息,我們設計了一個簡單的消息格式,即在聊天記錄前面加上 4 字節的 length header,LengthHeaderCodec 代碼及解說見《Muduo 網絡編程示例之二:Boost.Asio 的聊天服務器》一文。
codec 的基本功能之一是做 TCP 分包:確定每條消息的長度,為消息劃分界限。在 non-blocking 網絡編程中,codec 幾乎是必不可少的。如果只收到了半條消息,那么不會觸發消息回調,數據會停留在 Buffer 里(數據已經讀到 Buffer 中了),等待收到一個完整的消息再通知處理函數。既然這個任務太常見,我們干脆做一個 utility class,避免服務端和客戶端程序都要自己處理分包,這就有了 LengthHeaderCodec。這個 codec 的使用有點奇怪,不需要繼承,它也沒有基類,只要把它當成普通 data member 來用,把 TcpConnection 的數據喂給它,然后向它注冊 onXXXMessage() 回調,代碼見 asio chat 示例。muduo 里的 codec 都是這樣的風格,通過 boost::function 粘合到一起。
codec 是一層間接性,它位于 TcpConnection 和 ChatServer 之間,攔截處理收到的數據,在收到完整的消息之后再調用 CharServer 對應的處理函數,注意 CharServer::onStringMessage() 的參數是 std::string,不再是 muduo::net::Buffer,也就是說 LengthHeaderCodec 把 Buffer 解碼成了 string。另外,在發送消息的時候,ChatServer 通過 LengthHeaderCodec::send() 來發送 string,LengthHeaderCodec 負責把它編碼成 Buffer。這正是“編解碼器”名字的由來。

Protobuf codec 與此非常類似,只不過消息類型從 std::string 變成了 protobuf::Message。對于只接收處理 Query 消息的 QueryServer 來說,用 ProtobufCodec 非常方便,收到 protobuf::Message 之后 down cast 成 Query 來用就行。如果要接收處理不止一種消息,ProtobufCodec 恐怕還不能單獨完成工作,請繼續閱讀下文。

實現 ProtobufCodec
Protobuf 的打包方案我已經在《一種自動反射消息類型的 Google Protobuf 網絡傳輸方案》中講過,并以 string 為載體演示了 encode 和 decode 操作。在 muduo 里,我們有專門的 Buffer class,編碼更輕松。
編碼算法很直截了當,按照前文定義的消息格式一路打包下來,最后更新一下首部的長度即可。
解碼算法有幾個要點:
- protobuf::Message 是 new 出來的對象,它的生命期如何管理?muduo 采用 shared_ptr<Message> 來自動管理對象生命期,這與其他地方的做法是一致的。
- 出錯如何處理?比方說長度超出范圍、check sum 不正確、message type name 不能識別、message parse 出錯等等。ProtobufCodec 定義了 ErrorCallback,用戶代碼可以注冊這個回調。如果不注冊,默認的處理是斷開連接,讓客戶重連重試。codec 的單元測試里模擬了各種出錯情況。
- 如何處理一次收到半條消息、一條消息、一條半消息、兩條消息等等情況?這是每個 non-blocking 網絡程序中的 codec 都要面對的問題。
ProtobufCodec 在實際使用中有明顯的不足:它只負責把 muduo::net::Buffer 轉換為具體類型的 protobuf::Message,應用程序拿到 Message 之后還有再根據其具體類型做一次分發。我們可以考慮做一個簡單通用的分發器 dispatcher,以簡化客戶代碼。
此外,目前 ProtobufCodec 的實現非常初級,它沒有充分利用 ZeroCopyInputStream 和 ZeroCopyOutputStream,而是把收到的數據作為 byte array 交給 protobuf Message 去解析,這給性能優化留下了空間。protobuf Message 不要求數據連續(像 vector 那樣),只要求數據分段連續(像 deque 那樣),這給 buffer 管理帶來性能上的好處(避免重新分配內存,減少內存碎片),當然也使得代碼變復雜。muduo::net::Buffer 非常簡單,它內部是 vector<char>,我目前不想讓 protobuf 影響 muduo 本身的設計,畢竟 muduo 是個通用的網絡庫,不是為實現 protobuf RPC 而特制的。
消息分發器 dispatcher 有什么用?
前面提到,在使用 TCP 長連接,且在一個連接上傳遞不止一種 protobuf 消息的情況下,客戶代碼需要對收到的消息按類型做分發。比方說,收到 Logon 消息就交給 QueryServer::onLogon() 去處理,收到 Query 消息就交給 QueryServer::onQuery() 去處理。這個消息分派機制可以做得稍微有點通用性,讓所有 muduo+protobuf 程序收益,而且不增加復雜性。
換句話說,又是一層間接性,ProtobufCodec 攔截了 TcpConnection 的數據,把它轉換為 Message,ProtobufDispatcher 攔截了 ProtobufCodec 的 callback,按消息具體類型把它分派給多個 callbacks。

ProtobufCodec 與 ProtobufDispatcher 的綜合運用
我寫了兩個示例代碼,client 和 server,把 ProtobufCodec 和 ProtobufDispatcher 串聯起來使用。server 響應 Query 消息,發生回 Answer 消息,如果收到未知消息類型,則斷開連接。client 可以選擇發送 Query 或 Empty 消息,由命令行控制。這樣可以測試 unknown message callback。
為節省篇幅,這里就不列出代碼了,請移步閱讀
http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/client.cc
http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/server.cc
在構造函數中,通過注冊回調函數把四方 (TcpConnection、codec、dispatcher、QueryServer) 結合起來。
ProtobufDispatcher 的兩種實現
要完成消息分發,那么就是對消息做 type-switch,這似乎是一個 bad smell,但是 protobuf Message 的 Descriptor 沒有留下定制點(比如暴露一個 boost::any 成員),我們只好硬來了。
先定義
typedef boost::function<void (Message*)> ProtobufMessageCallback;
注意,本節出現的不是 muduo dispatcher 真實的代碼,僅為示意,突出重點,便于畫圖。
ProtobufDispatcherLite 的結構非常簡單,它有一個 map<Descriptor*, ProtobufMessageCallback> 成員,客戶代碼可以以 Descriptor* 為 key 注冊回調(recall: 每個具體消息類型都有一個全局的 Descriptor 對象,其地址是不變的,可以用來當 key)。在收到 protobuf Message 之后,在 map 中找到對應的 ProtobufMessageCallback,然后調用之。如果找不到,就調用 defaultCallback。

當然,它的設計也有小小的缺陷,那就是 ProtobufMessageCallback 限制了客戶代碼只能接受基類 Message,客戶代碼需要自己做向下轉型,比如:

如果我希望 QueryServer 這么設計:不想每個消息處理函數自己做 down casting,而是交給 dispatcher 去處理,客戶代碼拿到的就已經是想要的具體類型。如下:

那么該該如何實現 ProtobufDispatcher 呢?它如何與多個未知的消息類型合作?做 down cast 需要知道目標類型,難道我們要用一長串模板類型參數嗎?
有一個辦法,把多態與模板結合,利用 templated derived class 來提供類型上的靈活性。設計如下。

ProtobufDispatcher 有一個模板成員函數,可以接受注冊任意消息類型 T 的回調,然后它創建一個模板化的派生類 CallbackT<T>,這樣消息的類新信息就保存在了 CallbackT<T> 中,做 down casting 就簡單了。
比方說,我們有兩個具體消息類型 Query 和 Answer。

然后我們這樣注冊回調:
dispatcher_.registerMessageCallback<muduo::Query>(
boost::bind(&QueryServer::onQuery, this, _1, _2, _3));
dispatcher_.registerMessageCallback<muduo::Answer>(
boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));
這樣會具現化 (instantiation) 出兩個 CallbackT 實體,如下:

以上設計參考了 shared_ptr 的 deleter,Scott Meyers 也談到過。
ProtobufCodec 和 ProtobufDispatcher 有何意義?
ProtobufCodec 和 ProtobufDispatcher 把每個直接收發 protobuf Message 的網絡程序都會用到的功能提煉出來做成了公用的 utility,這樣以后新寫 protobuf 網絡程序就不必為打包分包和消息分發勞神了。它倆以庫的形式存在,是兩個可以拿來就當 data member 用的 class,它們沒有基類,也沒有用到虛函數或者別的什么面向對象特征,不侵入 muduo::net 或者你的代碼。
下一篇文章講《分布式程序的自動回歸測試》會介紹利用 protobuf 的跨語言特性,采用 Java 為 C++ 服務程序編寫 test harness。