gRPC異步處理應答
(金慶的專欄)
gRPC的示例 greeter_async_client.cc 不算是異步客戶端,
它使用了異步請求,但是阻塞式等待應答,結果成為一個同步調用。
std::string SayHello(const std::string& user) {
...
std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
stub_->AsyncSayHello(&context, request, &cq));
rpc->Finish(&reply, &status, (void*)1);
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
cq.Next(&got_tag, &ok);
...
return reply.message();
}
為了實現真正的異步RPC請求,發出請求后立即返回,然后在一個線程中處理所有應答。
以下代碼經測試表明可以使用。
// Grpc異步應答處理,線程中運行.
void HandleGrpcResponses()
{
...
grpc::CompletionQueue & rCq = rMgr.GetCq();
for (;;)
{
void * pTag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
rCq.Next(&pTag, &ok);
// Act upon the status of the actual RPC.
std::unique_ptr<IGrpcCb> pCb(static_cast<IGrpcCb*>(pTag));
const grpc::Status & rStatus = pCb->GetStatus();
if (rStatus.ok())
(*pCb)(); // run callback
}
}
IGrpcCb是回調類,定義如下:
class IGrpcCb
{
public:
explicit IGrpcCb(...) {};
virtual ~IGrpcCb(void) {};
grpc::ClientContext & GetContext() { return m_context; }
grpc::Status & GetStatus() { return m_status; }
public:
virtual void operator()() {};
protected:
grpc::ClientContext m_context;
grpc::Status m_status;
...
};
// R is response class like rpc::CreateRoomResponse.
template <class R>
class GrpcCb final : public IGrpcCb
{
public:
explicit GrpcCb(...)
: IGrpcCb(...)
{};
virtual ~GrpcCb(void) override {};
public:
typedef std::unique_ptr<grpc::ClientAsyncResponseReader<R> > RpcPtr;
public:
R & GetResp() { return m_resp; }
void SetRpcPtrAndFinish(RpcPtr pRpc)
{
m_pRpc.swap(pRpc);
m_pRpc->Finish(&m_resp, &m_status, (void*)this);
}
public:
virtual void operator()() override
{
// Deal m_resp...
}
private:
RpcPtr m_pRpc;
R m_resp;
};
異步請求代碼示例如下:
grpc::CompletionQueue & cq = GetCq();
rpc::CreateRoomRequest req;
// pGcb will be deleted in HandleGrpcResponses().
auto pGcb = new GrpcCb<rpc::CreateRoomResponse>(...);
pGcb->SetRpcPtrAndFinish(
m_pStub->AsyncCreateRoom(&pGcb->GetContext(), req, &cq));
(金慶的專欄)
gRPC的示例 greeter_async_client.cc 不算是異步客戶端,
它使用了異步請求,但是阻塞式等待應答,結果成為一個同步調用。
std::string SayHello(const std::string& user) {
...
std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
stub_->AsyncSayHello(&context, request, &cq));
rpc->Finish(&reply, &status, (void*)1);
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
cq.Next(&got_tag, &ok);
...
return reply.message();
}
為了實現真正的異步RPC請求,發出請求后立即返回,然后在一個線程中處理所有應答。
以下代碼經測試表明可以使用。
// Grpc異步應答處理,線程中運行.
void HandleGrpcResponses()
{
...
grpc::CompletionQueue & rCq = rMgr.GetCq();
for (;;)
{
void * pTag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
rCq.Next(&pTag, &ok);
// Act upon the status of the actual RPC.
std::unique_ptr<IGrpcCb> pCb(static_cast<IGrpcCb*>(pTag));
const grpc::Status & rStatus = pCb->GetStatus();
if (rStatus.ok())
(*pCb)(); // run callback
}
}
IGrpcCb是回調類,定義如下:
class IGrpcCb
{
public:
explicit IGrpcCb(...) {};
virtual ~IGrpcCb(void) {};
grpc::ClientContext & GetContext() { return m_context; }
grpc::Status & GetStatus() { return m_status; }
public:
virtual void operator()() {};
protected:
grpc::ClientContext m_context;
grpc::Status m_status;
...
};
// R is response class like rpc::CreateRoomResponse.
template <class R>
class GrpcCb final : public IGrpcCb
{
public:
explicit GrpcCb(...)
: IGrpcCb(...)
{};
virtual ~GrpcCb(void) override {};
public:
typedef std::unique_ptr<grpc::ClientAsyncResponseReader<R> > RpcPtr;
public:
R & GetResp() { return m_resp; }
void SetRpcPtrAndFinish(RpcPtr pRpc)
{
m_pRpc.swap(pRpc);
m_pRpc->Finish(&m_resp, &m_status, (void*)this);
}
public:
virtual void operator()() override
{
// Deal m_resp...
}
private:
RpcPtr m_pRpc;
R m_resp;
};
異步請求代碼示例如下:
grpc::CompletionQueue & cq = GetCq();
rpc::CreateRoomRequest req;
// pGcb will be deleted in HandleGrpcResponses().
auto pGcb = new GrpcCb<rpc::CreateRoomResponse>(...);
pGcb->SetRpcPtrAndFinish(
m_pStub->AsyncCreateRoom(&pGcb->GetContext(), req, &cq));