可以對(duì)照使用google protobuf RPC實(shí)現(xiàn)echo service一文看,細(xì)節(jié)本文不再描述。
google protobuf只負(fù)責(zé)消息的打包和解包,并不包含RPC的實(shí)現(xiàn),但其包含了RPC的定義。假設(shè)有下面的RPC定義:
service MyService {
rpc Echo(EchoReqMsg) returns(EchoRespMsg)
}
那么要實(shí)現(xiàn)這個(gè)RPC需要最少做哪些事?總結(jié)起來(lái)需要完成以下幾步:
客戶(hù)端
RPC客戶(hù)端需要實(shí)現(xiàn)google::protobuf::RpcChannel
。主要實(shí)現(xiàn)RpcChannel::CallMethod
接口。客戶(hù)端調(diào)用任何一個(gè)RPC接口,最終都是調(diào)用到CallMethod
。這個(gè)函數(shù)的典型實(shí)現(xiàn)就是將RPC調(diào)用參數(shù)序列化,然后投遞給網(wǎng)絡(luò)模塊進(jìn)行發(fā)送。
void CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) {
...
DataBufferOutputStream outputStream(...) // 取決于你使用的網(wǎng)絡(luò)實(shí)現(xiàn)
request->SerializeToZeroCopyStream(&outputStream);
_connection->postData(outputStream.getData(), ...
...
}
服務(wù)端
服務(wù)端首先需要實(shí)現(xiàn)RPC接口,直接實(shí)現(xiàn)MyService
中定義的接口:
class MyServiceImpl : public MyService {
virtual void Echo(::google::protobuf::RpcController* controller,
const EchoReqMsg* request,
EchoRespMsg* response,
::google::protobuf::Closure* done) {
...
done->Run();
}
}
標(biāo)示service&method
基于以上,可以看出服務(wù)端根本不知道客戶(hù)端想要調(diào)用哪一個(gè)RPC接口。從服務(wù)器接收到網(wǎng)絡(luò)消息,到調(diào)用到MyServiceImpl::Echo
還有很大一段距離。
解決方法就是在網(wǎng)絡(luò)消息中帶上RPC接口標(biāo)識(shí)。這個(gè)標(biāo)識(shí)可以直接帶上service name和method name,但這種實(shí)現(xiàn)導(dǎo)致網(wǎng)絡(luò)消息太大。另一種實(shí)現(xiàn)是基于service name和method name生成一個(gè)哈希值,因?yàn)榻涌诓粫?huì)太多,所以較容易找到基本不沖突的字符串哈希算法。
無(wú)論哪種方法,服務(wù)器是肯定需要建立RPC接口標(biāo)識(shí)到protobuf service對(duì)象的映射的。
這里提供第三種方法:基于option的方法。
protobuf中option機(jī)制類(lèi)似于這樣一種機(jī)制:service&method被視為一個(gè)對(duì)象,其有很多屬性,屬性包含內(nèi)置的,以及用戶(hù)擴(kuò)展的。用戶(hù)擴(kuò)展的就是option。每一個(gè)屬性有一個(gè)值。protobuf提供訪問(wèn)service&method這些屬性的接口。
首先擴(kuò)展service&method的屬性,以下定義這些屬性的key:
extend google.protobuf.ServiceOptions {
required uint32 global_service_id = 1000;
}
extend google.protobuf.MethodOptions {
required uint32 local_method_id = 1000;
}
應(yīng)用層定義service&method時(shí)可以指定以上key的值:
service MyService
{
option (arpc.global_service_id) = 2302;
rpc Echo(EchoReqMsg) returns(EchoRespMsg)
{
option (arpc.local_method_id) = 1;
}
rpc Echo_2(EchoReqMsg) returns(EchoRespMsg)
{
option (arpc.local_method_id) = 2;
}
...
}
以上相當(dāng)于在整個(gè)應(yīng)用中,每個(gè)service都被賦予了唯一的id,單個(gè)service中的method也有唯一的id。
然后可以通過(guò)protobuf取出以上屬性值:
void CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) {
...
google::protobuf::ServiceDescriptor *service = method->service();
uint32_t serviceId = (uint32_t)(service->options().GetExtension(global_service_id));
uint32_t methodId = (uint32_t)(method->options().GetExtension(local_method_id));
...
}
考慮到serviceId
methodId
的范圍,可以直接打包到一個(gè)32位整數(shù)里:
uint32_t ret = (serviceId << 16) | methodId;
然后就可以把這個(gè)值作為網(wǎng)絡(luò)消息頭的一部分發(fā)送。
當(dāng)然服務(wù)器端是需要建立這個(gè)標(biāo)識(shí)值到service的映射的:
bool MyRPCServer::registerService(google::protobuf::Service *rpcService) {
const google::protobuf::ServiceDescriptor = rpcService->GetDescriptor();
int methodCnt = pSerDes->method_count();
for (int i = 0; i < methodCnt; i++) {
google::protobuf::MethodDescriptor *pMethodDes = pSerDes->method(i);
uint32_t rpcCode = PacketCodeBuilder()(pMethodDes); // 計(jì)算出映射值
_rpcCallMap[rpcCode] = make_pair(rpcService, pMethodDes); // 建立映射
}
return true;
}
服務(wù)端收到RPC調(diào)用后,取出這個(gè)標(biāo)識(shí)值,然后再?gòu)?code>_rpcCallMap中取出對(duì)應(yīng)的service和method,最后進(jìn)行調(diào)用:
google::protobuf::Message* response = _pService->GetResponsePrototype(_pMethodDes).New();
// 用于回應(yīng)的closure
RPCServerClosure *pClosure = new (nothrow) RPCServerClosure(
_channelId, _pConnection, _pReqMsg, pResMsg, _messageCodec, _version);
RPCController *pController = pClosure->GetRpcController();
...
// protobuf 生成的CallMethod,會(huì)自動(dòng)調(diào)用到Echo接口
_pService->CallMethod(_pMethodDes, pController, _pReqMsg, pResMsg, pClosure);
參考