aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/util/cli_call.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/util/cli_call.cc')
-rw-r--r--test/cpp/util/cli_call.cc175
1 files changed, 145 insertions, 30 deletions
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc
index a02a8b2ee2..041cc0e4c3 100644
--- a/test/cpp/util/cli_call.cc
+++ b/test/cpp/util/cli_call.cc
@@ -37,8 +37,6 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/generic/generic_stub.h>
#include <grpc++/support/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
@@ -56,55 +54,172 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
const OutgoingMetadataContainer& metadata,
IncomingMetadataContainer* server_initial_metadata,
IncomingMetadataContainer* server_trailing_metadata) {
- std::unique_ptr<grpc::GenericStub> stub(new grpc::GenericStub(channel));
- grpc::ClientContext ctx;
+ CliCall call(channel, method, metadata);
+ call.Write(request);
+ call.WritesDone();
+ if (!call.Read(response, server_initial_metadata)) {
+ fprintf(stderr, "Failed to read response.\n");
+ }
+ return call.Finish(server_trailing_metadata);
+}
+
+CliCall::CliCall(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method,
+ const OutgoingMetadataContainer& metadata)
+ : stub_(new grpc::GenericStub(channel)) {
+ gpr_mu_init(&write_mu_);
+ gpr_cv_init(&write_cv_);
if (!metadata.empty()) {
for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
iter != metadata.end(); ++iter) {
- ctx.AddMetadata(iter->first, iter->second);
+ ctx_.AddMetadata(iter->first, iter->second);
}
}
- grpc::CompletionQueue cq;
- std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call(
- stub->Call(&ctx, method, &cq, tag(1)));
+ call_ = stub_->Call(&ctx_, method, &cq_, tag(1));
void* got_tag;
bool ok;
- cq.Next(&got_tag, &ok);
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+CliCall::~CliCall() {
+ gpr_cv_destroy(&write_cv_);
+ gpr_mu_destroy(&write_mu_);
+}
+
+void CliCall::Write(const grpc::string& request) {
+ void* got_tag;
+ bool ok;
grpc_slice s = grpc_slice_from_copied_string(request.c_str());
grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
grpc::ByteBuffer send_buffer(&req_slice, 1);
- call->Write(send_buffer, tag(2));
- cq.Next(&got_tag, &ok);
- GPR_ASSERT(ok);
- call->WritesDone(tag(3));
- cq.Next(&got_tag, &ok);
+ call_->Write(send_buffer, tag(2));
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+bool CliCall::Read(grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata) {
+ void* got_tag;
+ bool ok;
+
grpc::ByteBuffer recv_buffer;
- call->Read(&recv_buffer, tag(4));
- cq.Next(&got_tag, &ok);
- if (!ok) {
- std::cout << "Failed to read response." << std::endl;
+ call_->Read(&recv_buffer, tag(3));
+
+ if (!cq_.Next(&got_tag, &ok) || !ok) {
+ return false;
}
- grpc::Status status;
- call->Finish(&status, tag(5));
- cq.Next(&got_tag, &ok);
+ std::vector<grpc::Slice> slices;
+ GPR_ASSERT(recv_buffer.Dump(&slices).ok());
+
+ response->clear();
+ for (size_t i = 0; i < slices.size(); i++) {
+ response->append(reinterpret_cast<const char*>(slices[i].begin()),
+ slices[i].size());
+ }
+ if (server_initial_metadata) {
+ *server_initial_metadata = ctx_.GetServerInitialMetadata();
+ }
+ return true;
+}
+
+void CliCall::WritesDone() {
+ void* got_tag;
+ bool ok;
+
+ call_->WritesDone(tag(4));
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
- if (status.ok()) {
- std::vector<grpc::Slice> slices;
- (void)recv_buffer.Dump(&slices);
+void CliCall::WriteAndWait(const grpc::string& request) {
+ grpc_slice s = grpc_slice_from_copied_string(request.c_str());
+ grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
+ grpc::ByteBuffer send_buffer(&req_slice, 1);
+
+ gpr_mu_lock(&write_mu_);
+ call_->Write(send_buffer, tag(2));
+ write_done_ = false;
+ while (!write_done_) {
+ gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&write_mu_);
+}
+
+void CliCall::WritesDoneAndWait() {
+ gpr_mu_lock(&write_mu_);
+ call_->WritesDone(tag(4));
+ write_done_ = false;
+ while (!write_done_) {
+ gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&write_mu_);
+}
- response->clear();
- for (size_t i = 0; i < slices.size(); i++) {
- response->append(reinterpret_cast<const char*>(slices[i].begin()),
- slices[i].size());
+bool CliCall::ReadAndMaybeNotifyWrite(
+ grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata) {
+ void* got_tag;
+ bool ok;
+ grpc::ByteBuffer recv_buffer;
+
+ call_->Read(&recv_buffer, tag(3));
+ bool cq_result = cq_.Next(&got_tag, &ok);
+
+ while (got_tag != tag(3)) {
+ gpr_mu_lock(&write_mu_);
+ write_done_ = true;
+ gpr_cv_signal(&write_cv_);
+ gpr_mu_unlock(&write_mu_);
+
+ cq_result = cq_.Next(&got_tag, &ok);
+ if (got_tag == tag(2)) {
+ GPR_ASSERT(ok);
}
}
- *server_initial_metadata = ctx.GetServerInitialMetadata();
- *server_trailing_metadata = ctx.GetServerTrailingMetadata();
+ if (!cq_result || !ok) {
+ // If the RPC is ended on the server side, we should still wait for the
+ // pending write on the client side to be done.
+ if (!ok) {
+ gpr_mu_lock(&write_mu_);
+ if (!write_done_) {
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(got_tag != tag(2));
+ write_done_ = true;
+ gpr_cv_signal(&write_cv_);
+ }
+ gpr_mu_unlock(&write_mu_);
+ }
+ return false;
+ }
+
+ std::vector<grpc::Slice> slices;
+ GPR_ASSERT(recv_buffer.Dump(&slices).ok());
+ response->clear();
+ for (size_t i = 0; i < slices.size(); i++) {
+ response->append(reinterpret_cast<const char*>(slices[i].begin()),
+ slices[i].size());
+ }
+ if (server_initial_metadata) {
+ *server_initial_metadata = ctx_.GetServerInitialMetadata();
+ }
+ return true;
+}
+
+Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
+ void* got_tag;
+ bool ok;
+ grpc::Status status;
+
+ call_->Finish(&status, tag(5));
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(ok);
+ if (server_trailing_metadata) {
+ *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
+ }
+
return status;
}