aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/util/cli_call.cc
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2016-09-09 14:27:12 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2016-12-15 15:54:39 -0800
commitf9329217b1ce334a19b8720f70a17ce8f5d5db23 (patch)
tree3d62b8efd0a785a9523123da73a36c0603b4346a /test/cpp/util/cli_call.cc
parent47f1f9e1ead610ccfc464bf3f6d2918867a0a212 (diff)
Support client streaming
Diffstat (limited to 'test/cpp/util/cli_call.cc')
-rw-r--r--test/cpp/util/cli_call.cc85
1 files changed, 59 insertions, 26 deletions
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc
index a02a8b2ee2..d9232ec4b6 100644
--- a/test/cpp/util/cli_call.cc
+++ b/test/cpp/util/cli_call.cc
@@ -37,8 +37,6 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/generic/generic_stub.h>
#include <grpc++/support/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
@@ -50,49 +48,61 @@ namespace {
void* tag(int i) { return (void*)(intptr_t)i; }
} // namespace
+enum CliCall::CallStatus : intptr_t { CREATE, PROCESS, FINISH };
+
Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
const grpc::string& method, const grpc::string& request,
grpc::string* response,
const OutgoingMetadataContainer& metadata,
IncomingMetadataContainer* server_initial_metadata,
IncomingMetadataContainer* server_trailing_metadata) {
- std::unique_ptr<grpc::GenericStub> stub(new grpc::GenericStub(channel));
- grpc::ClientContext ctx;
+ CliCall call(channel, method, metadata);
+ call.Write(request);
+ call.WritesDone();
+ call.Read(response, server_initial_metadata);
+ return call.Finish(server_trailing_metadata);
+}
+
+CliCall::CliCall(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method,
+ const OutgoingMetadataContainer& metadata)
+ : stub_(new grpc::GenericStub(channel)) {
if (!metadata.empty()) {
for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
iter != metadata.end(); ++iter) {
- ctx.AddMetadata(iter->first, iter->second);
+ ctx_.AddMetadata(iter->first, iter->second);
}
}
- grpc::CompletionQueue cq;
- std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call(
- stub->Call(&ctx, method, &cq, tag(1)));
+ call_ = stub_->Call(&ctx_, method, &cq_, tag(1));
void* got_tag;
bool ok;
- cq.Next(&got_tag, &ok);
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+void CliCall::Write(const grpc::string& request) {
+ void* got_tag;
+ bool ok;
grpc_slice s = grpc_slice_from_copied_string(request.c_str());
grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
grpc::ByteBuffer send_buffer(&req_slice, 1);
- call->Write(send_buffer, tag(2));
- cq.Next(&got_tag, &ok);
- GPR_ASSERT(ok);
- call->WritesDone(tag(3));
- cq.Next(&got_tag, &ok);
+ call_->Write(send_buffer, tag(2));
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+void CliCall::Read(grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata) {
+ void* got_tag;
+ bool ok;
+
grpc::ByteBuffer recv_buffer;
- call->Read(&recv_buffer, tag(4));
- cq.Next(&got_tag, &ok);
+ call_->Read(&recv_buffer, tag(4));
+ cq_.Next(&got_tag, &ok);
if (!ok) {
- std::cout << "Failed to read response." << std::endl;
- }
- grpc::Status status;
- call->Finish(&status, tag(5));
- cq.Next(&got_tag, &ok);
- GPR_ASSERT(ok);
-
- if (status.ok()) {
+ fprintf(stderr, "Failed to read response.");
+ } else {
std::vector<grpc::Slice> slices;
(void)recv_buffer.Dump(&slices);
@@ -101,10 +111,33 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
response->append(reinterpret_cast<const char*>(slices[i].begin()),
slices[i].size());
}
+ if (server_initial_metadata) {
+ *server_initial_metadata = ctx_.GetServerInitialMetadata();
+ }
+ }
+}
+
+void CliCall::WritesDone() {
+ void* got_tag;
+ bool ok;
+
+ call_->WritesDone(tag(3));
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(ok);
+}
+
+Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
+ void* got_tag;
+ bool ok;
+ grpc::Status status;
+
+ call_->Finish(&status, tag(5));
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(ok);
+ if (server_trailing_metadata) {
+ *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
}
- *server_initial_metadata = ctx.GetServerInitialMetadata();
- *server_trailing_metadata = ctx.GetServerTrailingMetadata();
return status;
}