diff options
author | 2016-09-09 14:27:12 -0700 | |
---|---|---|
committer | 2016-12-15 15:54:39 -0800 | |
commit | f9329217b1ce334a19b8720f70a17ce8f5d5db23 (patch) | |
tree | 3d62b8efd0a785a9523123da73a36c0603b4346a /test/cpp/util/cli_call.cc | |
parent | 47f1f9e1ead610ccfc464bf3f6d2918867a0a212 (diff) |
Support client streaming
Diffstat (limited to 'test/cpp/util/cli_call.cc')
-rw-r--r-- | test/cpp/util/cli_call.cc | 85 |
1 files changed, 59 insertions, 26 deletions
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc index a02a8b2ee2..d9232ec4b6 100644 --- a/test/cpp/util/cli_call.cc +++ b/test/cpp/util/cli_call.cc @@ -37,8 +37,6 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/generic/generic_stub.h> #include <grpc++/support/byte_buffer.h> #include <grpc/grpc.h> #include <grpc/slice.h> @@ -50,49 +48,61 @@ namespace { void* tag(int i) { return (void*)(intptr_t)i; } } // namespace +enum CliCall::CallStatus : intptr_t { CREATE, PROCESS, FINISH }; + Status CliCall::Call(std::shared_ptr<grpc::Channel> channel, const grpc::string& method, const grpc::string& request, grpc::string* response, const OutgoingMetadataContainer& metadata, IncomingMetadataContainer* server_initial_metadata, IncomingMetadataContainer* server_trailing_metadata) { - std::unique_ptr<grpc::GenericStub> stub(new grpc::GenericStub(channel)); - grpc::ClientContext ctx; + CliCall call(channel, method, metadata); + call.Write(request); + call.WritesDone(); + call.Read(response, server_initial_metadata); + return call.Finish(server_trailing_metadata); +} + +CliCall::CliCall(std::shared_ptr<grpc::Channel> channel, + const grpc::string& method, + const OutgoingMetadataContainer& metadata) + : stub_(new grpc::GenericStub(channel)) { if (!metadata.empty()) { for (OutgoingMetadataContainer::const_iterator iter = metadata.begin(); iter != metadata.end(); ++iter) { - ctx.AddMetadata(iter->first, iter->second); + ctx_.AddMetadata(iter->first, iter->second); } } - grpc::CompletionQueue cq; - std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call( - stub->Call(&ctx, method, &cq, tag(1))); + call_ = stub_->Call(&ctx_, method, &cq_, tag(1)); void* got_tag; bool ok; - cq.Next(&got_tag, &ok); + cq_.Next(&got_tag, &ok); GPR_ASSERT(ok); +} + +void CliCall::Write(const grpc::string& request) { + void* got_tag; + bool ok; grpc_slice s = grpc_slice_from_copied_string(request.c_str()); grpc::Slice req_slice(s, grpc::Slice::STEAL_REF); grpc::ByteBuffer send_buffer(&req_slice, 1); - call->Write(send_buffer, tag(2)); - cq.Next(&got_tag, &ok); - GPR_ASSERT(ok); - call->WritesDone(tag(3)); - cq.Next(&got_tag, &ok); + call_->Write(send_buffer, tag(2)); + cq_.Next(&got_tag, &ok); GPR_ASSERT(ok); +} + +void CliCall::Read(grpc::string* response, + IncomingMetadataContainer* server_initial_metadata) { + void* got_tag; + bool ok; + grpc::ByteBuffer recv_buffer; - call->Read(&recv_buffer, tag(4)); - cq.Next(&got_tag, &ok); + call_->Read(&recv_buffer, tag(4)); + cq_.Next(&got_tag, &ok); if (!ok) { - std::cout << "Failed to read response." << std::endl; - } - grpc::Status status; - call->Finish(&status, tag(5)); - cq.Next(&got_tag, &ok); - GPR_ASSERT(ok); - - if (status.ok()) { + fprintf(stderr, "Failed to read response."); + } else { std::vector<grpc::Slice> slices; (void)recv_buffer.Dump(&slices); @@ -101,10 +111,33 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel, response->append(reinterpret_cast<const char*>(slices[i].begin()), slices[i].size()); } + if (server_initial_metadata) { + *server_initial_metadata = ctx_.GetServerInitialMetadata(); + } + } +} + +void CliCall::WritesDone() { + void* got_tag; + bool ok; + + call_->WritesDone(tag(3)); + cq_.Next(&got_tag, &ok); + GPR_ASSERT(ok); +} + +Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) { + void* got_tag; + bool ok; + grpc::Status status; + + call_->Finish(&status, tag(5)); + cq_.Next(&got_tag, &ok); + GPR_ASSERT(ok); + if (server_trailing_metadata) { + *server_trailing_metadata = ctx_.GetServerTrailingMetadata(); } - *server_initial_metadata = ctx.GetServerInitialMetadata(); - *server_trailing_metadata = ctx.GetServerTrailingMetadata(); return status; } |