aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <y-zeng@users.noreply.github.com>2017-01-31 10:34:04 -0800
committerGravatar GitHub <noreply@github.com>2017-01-31 10:34:04 -0800
commit08a215f72c23525cad6a096e65175d9e11e5e8e9 (patch)
tree7b0a98f65e1f7f30a8307b64d3c8d3c63180c51c
parentc88834ba996084db5f530706a09892bdd900e7dc (diff)
parent8d2d70ca99451623ec48175671a81f9927000983 (diff)
Merge pull request #8066 from y-zeng/cli_main_fork
gRPC CLI stream RPC support
-rw-r--r--test/cpp/util/cli_call.cc175
-rw-r--r--test/cpp/util/cli_call.h51
-rw-r--r--test/cpp/util/grpc_cli.cc6
-rw-r--r--test/cpp/util/grpc_tool.cc253
-rw-r--r--test/cpp/util/grpc_tool_test.cc193
-rw-r--r--test/cpp/util/proto_file_parser.cc32
-rw-r--r--test/cpp/util/proto_file_parser.h3
7 files changed, 619 insertions, 94 deletions
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc
index a02a8b2ee2..4d045da098 100644
--- a/test/cpp/util/cli_call.cc
+++ b/test/cpp/util/cli_call.cc
@@ -37,8 +37,6 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/generic/generic_stub.h>
#include <grpc++/support/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
@@ -56,55 +54,172 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
const OutgoingMetadataContainer& metadata,
IncomingMetadataContainer* server_initial_metadata,
IncomingMetadataContainer* server_trailing_metadata) {
- std::unique_ptr<grpc::GenericStub> stub(new grpc::GenericStub(channel));
- grpc::ClientContext ctx;
+ CliCall call(channel, method, metadata);
+ call.Write(request);
+ call.WritesDone();
+ if (!call.Read(response, server_initial_metadata)) {
+ fprintf(stderr, "Failed to read response.\n");
+ }
+ return call.Finish(server_trailing_metadata);
+}
+
+CliCall::CliCall(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method,
+ const OutgoingMetadataContainer& metadata)
+ : stub_(new grpc::GenericStub(channel)) {
+ gpr_mu_init(&write_mu_);
+ gpr_cv_init(&write_cv_);
if (!metadata.empty()) {
for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
iter != metadata.end(); ++iter) {
- ctx.AddMetadata(iter->first, iter->second);
+ ctx_.AddMetadata(iter->first, iter->second);
}
}
- grpc::CompletionQueue cq;
- std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call(
- stub->Call(&ctx, method, &cq, tag(1)));
+ call_ = stub_->Call(&ctx_, method, &cq_, tag(1));
void* got_tag;
bool ok;
- cq.Next(&got_tag, &ok);
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+CliCall::~CliCall() {
+ gpr_cv_destroy(&write_cv_);
+ gpr_mu_destroy(&write_mu_);
+}
+
+void CliCall::Write(const grpc::string& request) {
+ void* got_tag;
+ bool ok;
grpc_slice s = grpc_slice_from_copied_string(request.c_str());
grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
grpc::ByteBuffer send_buffer(&req_slice, 1);
- call->Write(send_buffer, tag(2));
- cq.Next(&got_tag, &ok);
- GPR_ASSERT(ok);
- call->WritesDone(tag(3));
- cq.Next(&got_tag, &ok);
+ call_->Write(send_buffer, tag(2));
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+bool CliCall::Read(grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata) {
+ void* got_tag;
+ bool ok;
+
grpc::ByteBuffer recv_buffer;
- call->Read(&recv_buffer, tag(4));
- cq.Next(&got_tag, &ok);
- if (!ok) {
- std::cout << "Failed to read response." << std::endl;
+ call_->Read(&recv_buffer, tag(3));
+
+ if (!cq_.Next(&got_tag, &ok) || !ok) {
+ return false;
}
- grpc::Status status;
- call->Finish(&status, tag(5));
- cq.Next(&got_tag, &ok);
+ std::vector<grpc::Slice> slices;
+ recv_buffer.Dump(&slices);
+
+ response->clear();
+ for (size_t i = 0; i < slices.size(); i++) {
+ response->append(reinterpret_cast<const char*>(slices[i].begin()),
+ slices[i].size());
+ }
+ if (server_initial_metadata) {
+ *server_initial_metadata = ctx_.GetServerInitialMetadata();
+ }
+ return true;
+}
+
+void CliCall::WritesDone() {
+ void* got_tag;
+ bool ok;
+
+ call_->WritesDone(tag(4));
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
- if (status.ok()) {
- std::vector<grpc::Slice> slices;
- (void)recv_buffer.Dump(&slices);
+void CliCall::WriteAndWait(const grpc::string& request) {
+ grpc_slice s = grpc_slice_from_copied_string(request.c_str());
+ grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
+ grpc::ByteBuffer send_buffer(&req_slice, 1);
+
+ gpr_mu_lock(&write_mu_);
+ call_->Write(send_buffer, tag(2));
+ write_done_ = false;
+ while (!write_done_) {
+ gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&write_mu_);
+}
+
+void CliCall::WritesDoneAndWait() {
+ gpr_mu_lock(&write_mu_);
+ call_->WritesDone(tag(4));
+ write_done_ = false;
+ while (!write_done_) {
+ gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&write_mu_);
+}
- response->clear();
- for (size_t i = 0; i < slices.size(); i++) {
- response->append(reinterpret_cast<const char*>(slices[i].begin()),
- slices[i].size());
+bool CliCall::ReadAndMaybeNotifyWrite(
+ grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata) {
+ void* got_tag;
+ bool ok;
+ grpc::ByteBuffer recv_buffer;
+
+ call_->Read(&recv_buffer, tag(3));
+ bool cq_result = cq_.Next(&got_tag, &ok);
+
+ while (got_tag != tag(3)) {
+ gpr_mu_lock(&write_mu_);
+ write_done_ = true;
+ gpr_cv_signal(&write_cv_);
+ gpr_mu_unlock(&write_mu_);
+
+ cq_result = cq_.Next(&got_tag, &ok);
+ if (got_tag == tag(2)) {
+ GPR_ASSERT(ok);
}
}
- *server_initial_metadata = ctx.GetServerInitialMetadata();
- *server_trailing_metadata = ctx.GetServerTrailingMetadata();
+ if (!cq_result || !ok) {
+ // If the RPC is ended on the server side, we should still wait for the
+ // pending write on the client side to be done.
+ if (!ok) {
+ gpr_mu_lock(&write_mu_);
+ if (!write_done_) {
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(got_tag != tag(2));
+ write_done_ = true;
+ gpr_cv_signal(&write_cv_);
+ }
+ gpr_mu_unlock(&write_mu_);
+ }
+ return false;
+ }
+
+ std::vector<grpc::Slice> slices;
+ recv_buffer.Dump(&slices);
+ response->clear();
+ for (size_t i = 0; i < slices.size(); i++) {
+ response->append(reinterpret_cast<const char*>(slices[i].begin()),
+ slices[i].size());
+ }
+ if (server_initial_metadata) {
+ *server_initial_metadata = ctx_.GetServerInitialMetadata();
+ }
+ return true;
+}
+
+Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
+ void* got_tag;
+ bool ok;
+ grpc::Status status;
+
+ call_->Finish(&status, tag(5));
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(ok);
+ if (server_trailing_metadata) {
+ *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
+ }
+
return status;
}
diff --git a/test/cpp/util/cli_call.h b/test/cpp/util/cli_call.h
index 65da86bd4e..91f0dbc9ed 100644
--- a/test/cpp/util/cli_call.h
+++ b/test/cpp/util/cli_call.h
@@ -37,23 +37,74 @@
#include <map>
#include <grpc++/channel.h>
+#include <grpc++/completion_queue.h>
+#include <grpc++/generic/generic_stub.h>
#include <grpc++/support/status.h>
#include <grpc++/support/string_ref.h>
namespace grpc {
+
+class ClientContext;
+
namespace testing {
+// CliCall handles the sending and receiving of generic messages given the name
+// of the remote method. This class is only used by GrpcTool. Its thread-safe
+// and thread-unsafe methods should not be used together.
class CliCall final {
public:
typedef std::multimap<grpc::string, grpc::string> OutgoingMetadataContainer;
typedef std::multimap<grpc::string_ref, grpc::string_ref>
IncomingMetadataContainer;
+
+ CliCall(std::shared_ptr<grpc::Channel> channel, const grpc::string& method,
+ const OutgoingMetadataContainer& metadata);
+ ~CliCall();
+
+ // Perform an unary generic RPC.
static Status Call(std::shared_ptr<grpc::Channel> channel,
const grpc::string& method, const grpc::string& request,
grpc::string* response,
const OutgoingMetadataContainer& metadata,
IncomingMetadataContainer* server_initial_metadata,
IncomingMetadataContainer* server_trailing_metadata);
+
+ // Send a generic request message in a synchronous manner. NOT thread-safe.
+ void Write(const grpc::string& request);
+
+ // Send a generic request message in a synchronous manner. NOT thread-safe.
+ void WritesDone();
+
+ // Receive a generic response message in a synchronous manner.NOT thread-safe.
+ bool Read(grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata);
+
+ // Thread-safe write. Must be used with ReadAndMaybeNotifyWrite. Send out a
+ // generic request message and wait for ReadAndMaybeNotifyWrite to finish it.
+ void WriteAndWait(const grpc::string& request);
+
+ // Thread-safe WritesDone. Must be used with ReadAndMaybeNotifyWrite. Send out
+ // WritesDone for gereneric request messages and wait for
+ // ReadAndMaybeNotifyWrite to finish it.
+ void WritesDoneAndWait();
+
+ // Thread-safe Read. Blockingly receive a generic response message. Notify
+ // writes if they are finished when this read is waiting for a resposne.
+ bool ReadAndMaybeNotifyWrite(
+ grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata);
+
+ // Finish the RPC.
+ Status Finish(IncomingMetadataContainer* server_trailing_metadata);
+
+ private:
+ std::unique_ptr<grpc::GenericStub> stub_;
+ grpc::ClientContext ctx_;
+ std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call_;
+ grpc::CompletionQueue cq_;
+ gpr_mu write_mu_;
+ gpr_cv write_cv_; // Protected by write_mu_;
+ bool write_done_; // Portected by write_mu_;
};
} // namespace testing
diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc
index fe248601ee..a78bed4b90 100644
--- a/test/cpp/util/grpc_cli.cc
+++ b/test/cpp/util/grpc_cli.cc
@@ -83,10 +83,10 @@ DEFINE_string(outfile, "", "Output file (default is stdout)");
static bool SimplePrint(const grpc::string& outfile,
const grpc::string& output) {
if (outfile.empty()) {
- std::cout << output;
+ std::cout << output << std::endl;
} else {
- std::ofstream output_file(outfile, std::ios::trunc | std::ios::binary);
- output_file << output;
+ std::ofstream output_file(outfile, std::ios::app | std::ios::binary);
+ output_file << output << std::endl;
output_file.close();
}
return true;
diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc
index b9900ca1b7..39acd8eb4b 100644
--- a/test/cpp/util/grpc_tool.cc
+++ b/test/cpp/util/grpc_tool.cc
@@ -39,6 +39,7 @@
#include <memory>
#include <sstream>
#include <string>
+#include <thread>
#include <gflags/gflags.h>
#include <grpc++/channel.h>
@@ -159,6 +160,36 @@ void PrintMetadata(const T& m, const grpc::string& message) {
}
}
+void ReadResponse(CliCall* call, const grpc::string& method_name,
+ GrpcToolOutputCallback callback, ProtoFileParser* parser,
+ gpr_mu* parser_mu, bool print_mode) {
+ grpc::string serialized_response_proto;
+ std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata;
+
+ for (bool receive_initial_metadata = true; call->ReadAndMaybeNotifyWrite(
+ &serialized_response_proto,
+ receive_initial_metadata ? &server_initial_metadata : nullptr);
+ receive_initial_metadata = false) {
+ fprintf(stderr, "got response.\n");
+ if (!FLAGS_binary_output) {
+ gpr_mu_lock(parser_mu);
+ serialized_response_proto = parser->GetTextFormatFromMethod(
+ method_name, serialized_response_proto, false /* is_request */);
+ if (parser->HasError() && print_mode) {
+ fprintf(stderr, "Failed to parse response.\n");
+ }
+ gpr_mu_unlock(parser_mu);
+ }
+ if (receive_initial_metadata) {
+ PrintMetadata(server_initial_metadata,
+ "Received initial metadata from server:");
+ }
+ if (!callback(serialized_response_proto) && print_mode) {
+ fprintf(stderr, "Failed to output response.\n");
+ }
+ }
+}
+
struct Command {
const char* command;
std::function<bool(GrpcTool*, int, const char**, const CliCredentials&,
@@ -416,85 +447,191 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
grpc::string server_address(argv[0]);
grpc::string method_name(argv[1]);
grpc::string formatted_method_name;
- std::unique_ptr<grpc::testing::ProtoFileParser> parser;
+ std::unique_ptr<ProtoFileParser> parser;
grpc::string serialized_request_proto;
+ bool print_mode = false;
- if (argc == 3) {
- request_text = argv[2];
- if (!FLAGS_infile.empty()) {
- fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
- }
+ std::shared_ptr<grpc::Channel> channel =
+ FLAGS_remotedb
+ ? grpc::CreateChannel(server_address, cred.GetCredentials())
+ : nullptr;
+
+ parser.reset(new grpc::testing::ProtoFileParser(channel, FLAGS_proto_path,
+ FLAGS_protofiles));
+
+ if (FLAGS_binary_input) {
+ formatted_method_name = method_name;
} else {
- std::stringstream input_stream;
+ formatted_method_name = parser->GetFormattedMethodName(method_name);
+ }
+
+ if (parser->HasError()) {
+ return false;
+ }
+
+ if (parser->IsStreaming(method_name, true /* is_request */)) {
+ std::istream* input_stream;
+ std::ifstream input_file;
+
+ if (argc == 3) {
+ request_text = argv[2];
+ }
+
+ std::multimap<grpc::string, grpc::string> client_metadata;
+ ParseMetadataFlag(&client_metadata);
+ PrintMetadata(client_metadata, "Sending client initial metadata:");
+
+ CliCall call(channel, formatted_method_name, client_metadata);
+
if (FLAGS_infile.empty()) {
if (isatty(STDIN_FILENO)) {
- fprintf(stderr, "reading request message from stdin...\n");
+ print_mode = true;
+ fprintf(stderr, "reading streaming request message from stdin...\n");
}
- input_stream << std::cin.rdbuf();
+ input_stream = &std::cin;
} else {
- std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary);
- input_stream << input_file.rdbuf();
+ input_file.open(FLAGS_infile, std::ios::in | std::ios::binary);
+ input_stream = &input_file;
+ }
+
+ gpr_mu parser_mu;
+ gpr_mu_init(&parser_mu);
+ std::thread read_thread(ReadResponse, &call, method_name, callback,
+ parser.get(), &parser_mu, print_mode);
+
+ std::stringstream request_ss;
+ grpc::string line;
+ while (!request_text.empty() ||
+ (!input_stream->eof() && getline(*input_stream, line))) {
+ if (!request_text.empty()) {
+ if (FLAGS_binary_input) {
+ serialized_request_proto = request_text;
+ request_text.clear();
+ } else {
+ gpr_mu_lock(&parser_mu);
+ serialized_request_proto = parser->GetSerializedProtoFromMethod(
+ method_name, request_text, true /* is_request */);
+ request_text.clear();
+ if (parser->HasError()) {
+ if (print_mode) {
+ fprintf(stderr, "Failed to parse request.\n");
+ }
+ gpr_mu_unlock(&parser_mu);
+ continue;
+ }
+ gpr_mu_unlock(&parser_mu);
+ }
+
+ call.WriteAndWait(serialized_request_proto);
+ if (print_mode) {
+ fprintf(stderr, "Request sent.\n");
+ }
+ } else {
+ if (line.length() == 0) {
+ request_text = request_ss.str();
+ request_ss.str(grpc::string());
+ request_ss.clear();
+ } else {
+ request_ss << line << ' ';
+ }
+ }
+ }
+ if (input_file.is_open()) {
input_file.close();
}
- request_text = input_stream.str();
- }
- std::shared_ptr<grpc::Channel> channel =
- grpc::CreateChannel(server_address, cred.GetCredentials());
- if (!FLAGS_binary_input || !FLAGS_binary_output) {
- parser.reset(
- new grpc::testing::ProtoFileParser(FLAGS_remotedb ? channel : nullptr,
- FLAGS_proto_path, FLAGS_protofiles));
- if (parser->HasError()) {
+ call.WritesDoneAndWait();
+ read_thread.join();
+
+ std::multimap<grpc::string_ref, grpc::string_ref> server_trailing_metadata;
+ Status status = call.Finish(&server_trailing_metadata);
+ PrintMetadata(server_trailing_metadata,
+ "Received trailing metadata from server:");
+
+ if (status.ok()) {
+ fprintf(stderr, "Stream RPC succeeded with OK status\n");
+ return true;
+ } else {
+ fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
+ status.error_code(), status.error_message().c_str());
return false;
}
- }
- if (FLAGS_binary_input) {
- serialized_request_proto = request_text;
- formatted_method_name = method_name;
- } else {
- formatted_method_name = parser->GetFormattedMethodName(method_name);
- serialized_request_proto = parser->GetSerializedProtoFromMethod(
- method_name, request_text, true /* is_request */);
- if (parser->HasError()) {
- return false;
+ } else { // parser->IsStreaming(method_name, true /* is_request */)
+ if (argc == 3) {
+ request_text = argv[2];
+ if (!FLAGS_infile.empty()) {
+ fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
+ }
+ } else {
+ std::stringstream input_stream;
+ if (FLAGS_infile.empty()) {
+ if (isatty(STDIN_FILENO)) {
+ fprintf(stderr, "reading request message from stdin...\n");
+ }
+ input_stream << std::cin.rdbuf();
+ } else {
+ std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary);
+ input_stream << input_file.rdbuf();
+ input_file.close();
+ }
+ request_text = input_stream.str();
}
- }
- fprintf(stderr, "connecting to %s\n", server_address.c_str());
- grpc::string serialized_response_proto;
- std::multimap<grpc::string, grpc::string> client_metadata;
- std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
- server_trailing_metadata;
- ParseMetadataFlag(&client_metadata);
- PrintMetadata(client_metadata, "Sending client initial metadata:");
- grpc::Status status = grpc::testing::CliCall::Call(
- channel, formatted_method_name, serialized_request_proto,
- &serialized_response_proto, client_metadata, &server_initial_metadata,
- &server_trailing_metadata);
- PrintMetadata(server_initial_metadata,
- "Received initial metadata from server:");
- PrintMetadata(server_trailing_metadata,
- "Received trailing metadata from server:");
- if (status.ok()) {
- fprintf(stderr, "Rpc succeeded with OK status\n");
- if (FLAGS_binary_output) {
- output_ss << serialized_response_proto;
+ if (FLAGS_binary_input) {
+ serialized_request_proto = request_text;
+ // formatted_method_name = method_name;
} else {
- grpc::string response_text = parser->GetTextFormatFromMethod(
- method_name, serialized_response_proto, false /* is_request */);
+ // formatted_method_name = parser->GetFormattedMethodName(method_name);
+ serialized_request_proto = parser->GetSerializedProtoFromMethod(
+ method_name, request_text, true /* is_request */);
if (parser->HasError()) {
return false;
}
- output_ss << "Response: \n " << response_text << std::endl;
}
- } else {
- fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
- status.error_code(), status.error_message().c_str());
+ fprintf(stderr, "connecting to %s\n", server_address.c_str());
+
+ grpc::string serialized_response_proto;
+ std::multimap<grpc::string, grpc::string> client_metadata;
+ std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
+ server_trailing_metadata;
+ ParseMetadataFlag(&client_metadata);
+ PrintMetadata(client_metadata, "Sending client initial metadata:");
+
+ CliCall call(channel, formatted_method_name, client_metadata);
+ call.Write(serialized_request_proto);
+ call.WritesDone();
+
+ for (bool receive_initial_metadata = true; call.Read(
+ &serialized_response_proto,
+ receive_initial_metadata ? &server_initial_metadata : nullptr);
+ receive_initial_metadata = false) {
+ if (!FLAGS_binary_output) {
+ serialized_response_proto = parser->GetTextFormatFromMethod(
+ method_name, serialized_response_proto, false /* is_request */);
+ if (parser->HasError()) {
+ return false;
+ }
+ }
+ if (receive_initial_metadata) {
+ PrintMetadata(server_initial_metadata,
+ "Received initial metadata from server:");
+ }
+ if (!callback(serialized_response_proto)) {
+ return false;
+ }
+ }
+ Status status = call.Finish(&server_trailing_metadata);
+ if (status.ok()) {
+ fprintf(stderr, "Rpc succeeded with OK status\n");
+ return true;
+ } else {
+ fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
+ status.error_code(), status.error_message().c_str());
+ return false;
+ }
}
-
- return callback(output_ss.str());
+ GPR_UNREACHABLE_CODE(return false);
}
bool GrpcTool::ParseMessage(int argc, const char** argv,
diff --git a/test/cpp/util/grpc_tool_test.cc b/test/cpp/util/grpc_tool_test.cc
index 33ce611a60..26e2b1f502 100644
--- a/test/cpp/util/grpc_tool_test.cc
+++ b/test/cpp/util/grpc_tool_test.cc
@@ -102,6 +102,8 @@ DECLARE_bool(l);
namespace {
+const int kNumResponseStreamsMsgs = 3;
+
class TestCliCredentials final : public grpc::testing::CliCredentials {
public:
std::shared_ptr<grpc::ChannelCredentials> GetCredentials() const override {
@@ -137,6 +139,71 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
response->set_message(request->message());
return Status::OK;
}
+
+ Status RequestStream(ServerContext* context,
+ ServerReader<EchoRequest>* reader,
+ EchoResponse* response) override {
+ EchoRequest request;
+ response->set_message("");
+ if (!context->client_metadata().empty()) {
+ for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+ iter = context->client_metadata().begin();
+ iter != context->client_metadata().end(); ++iter) {
+ context->AddInitialMetadata(ToString(iter->first),
+ ToString(iter->second));
+ }
+ }
+ context->AddTrailingMetadata("trailing_key", "trailing_value");
+ while (reader->Read(&request)) {
+ response->mutable_message()->append(request.message());
+ }
+
+ return Status::OK;
+ }
+
+ Status ResponseStream(ServerContext* context, const EchoRequest* request,
+ ServerWriter<EchoResponse>* writer) override {
+ if (!context->client_metadata().empty()) {
+ for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+ iter = context->client_metadata().begin();
+ iter != context->client_metadata().end(); ++iter) {
+ context->AddInitialMetadata(ToString(iter->first),
+ ToString(iter->second));
+ }
+ }
+ context->AddTrailingMetadata("trailing_key", "trailing_value");
+
+ EchoResponse response;
+ for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
+ response.set_message(request->message() + grpc::to_string(i));
+ writer->Write(response);
+ }
+
+ return Status::OK;
+ }
+
+ Status BidiStream(
+ ServerContext* context,
+ ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
+ EchoRequest request;
+ EchoResponse response;
+ if (!context->client_metadata().empty()) {
+ for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+ iter = context->client_metadata().begin();
+ iter != context->client_metadata().end(); ++iter) {
+ context->AddInitialMetadata(ToString(iter->first),
+ ToString(iter->second));
+ }
+ }
+ context->AddTrailingMetadata("trailing_key", "trailing_value");
+
+ while (stream->Read(&request)) {
+ response.set_message(request.message());
+ stream->Write(response);
+ }
+
+ return Status::OK;
+ }
};
} // namespace
@@ -347,6 +414,132 @@ TEST_F(GrpcToolTest, CallCommand) {
ShutdownServer();
}
+TEST_F(GrpcToolTest, CallCommandRequestStream) {
+ // Test input: grpc_cli call localhost:<port> RequestStream "message:
+ // 'Hello0'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "RequestStream", "message: 'Hello0'"};
+
+ // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+ std::streambuf* orig = std::cin.rdbuf();
+ std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n");
+ std::cin.rdbuf(ss.rdbuf());
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello0Hello1Hello2\""
+ EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+ "message: \"Hello0Hello1Hello2\""));
+ std::cin.rdbuf(orig);
+ ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandRequestStreamWithBadRequest) {
+ // Test input: grpc_cli call localhost:<port> RequestStream "message:
+ // 'Hello0'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "RequestStream", "message: 'Hello0'"};
+
+ // Mock std::cin input "bad_field: 'Hello1'\n\n message: 'Hello2'\n\n"
+ std::streambuf* orig = std::cin.rdbuf();
+ std::istringstream ss("bad_field: 'Hello1'\n\n message: 'Hello2'\n\n");
+ std::cin.rdbuf(ss.rdbuf());
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello0Hello2\""
+ EXPECT_TRUE(NULL !=
+ strstr(output_stream.str().c_str(), "message: \"Hello0Hello2\""));
+ std::cin.rdbuf(orig);
+ ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandResponseStream) {
+ // Test input: grpc_cli call localhost:<port> ResponseStream "message:
+ // 'Hello'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "ResponseStream", "message: 'Hello'"};
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello{n}\""
+ for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
+ grpc::string expected_response_text =
+ "message: \"Hello" + grpc::to_string(i) + "\"\n";
+ EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+ expected_response_text.c_str()));
+ }
+
+ ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandBidiStream) {
+ // Test input: grpc_cli call localhost:<port> BidiStream "message: 'Hello0'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "BidiStream", "message: 'Hello0'"};
+
+ // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+ std::streambuf* orig = std::cin.rdbuf();
+ std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n");
+ std::cin.rdbuf(ss.rdbuf());
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello0\"\nmessage: \"Hello1\"\nmessage:
+ // \"Hello2\"\n\n"
+ EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+ "message: \"Hello0\"\nmessage: "
+ "\"Hello1\"\nmessage: \"Hello2\"\n"));
+ std::cin.rdbuf(orig);
+ ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandBidiStreamWithBadRequest) {
+ // Test input: grpc_cli call localhost:<port> BidiStream "message: 'Hello0'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "BidiStream", "message: 'Hello0'"};
+
+ // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+ std::streambuf* orig = std::cin.rdbuf();
+ std::istringstream ss("message: 1.0\n\n message: 'Hello2'\n\n");
+ std::cin.rdbuf(ss.rdbuf());
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello0\"\nmessage: \"Hello1\"\nmessage:
+ // \"Hello2\"\n\n"
+ EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+ "message: \"Hello0\"\nmessage: \"Hello2\"\n"));
+ std::cin.rdbuf(orig);
+
+ ShutdownServer();
+}
+
TEST_F(GrpcToolTest, ParseCommand) {
// Test input "grpc_cli parse localhost:<port> grpc.testing.EchoResponse
// ECHO_RESPONSE_MESSAGE"
diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc
index bc8a6083f4..d501c3697b 100644
--- a/test/cpp/util/proto_file_parser.cc
+++ b/test/cpp/util/proto_file_parser.cc
@@ -81,8 +81,9 @@ class ErrorPrinter : public protobuf::compiler::MultiFileErrorCollector {
ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
const grpc::string& proto_path,
const grpc::string& protofiles)
- : has_error_(false) {
- std::vector<grpc::string> service_list;
+ : has_error_(false),
+ dynamic_factory_(new protobuf::DynamicMessageFactory()) {
+ std::vector<std::string> service_list;
if (channel) {
reflection_db_.reset(new grpc::ProtoReflectionDescriptorDatabase(channel));
reflection_db_->GetServices(&service_list);
@@ -127,7 +128,6 @@ ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
}
desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get()));
- dynamic_factory_.reset(new protobuf::DynamicMessageFactory(desc_pool_.get()));
for (auto it = service_list.begin(); it != service_list.end(); it++) {
if (known_services.find(*it) == known_services.end()) {
@@ -144,6 +144,11 @@ ProtoFileParser::~ProtoFileParser() {}
grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) {
has_error_ = false;
+
+ if (known_methods_.find(method) != known_methods_.end()) {
+ return known_methods_[method];
+ }
+
const protobuf::MethodDescriptor* method_descriptor = nullptr;
for (auto it = service_desc_list_.begin(); it != service_desc_list_.end();
it++) {
@@ -169,6 +174,8 @@ grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) {
return "";
}
+ known_methods_[method] = method_descriptor->full_name();
+
return method_descriptor->full_name();
}
@@ -205,6 +212,25 @@ grpc::string ProtoFileParser::GetMessageTypeFromMethod(
: method_desc->output_type()->full_name();
}
+bool ProtoFileParser::IsStreaming(const grpc::string& method, bool is_request) {
+ has_error_ = false;
+
+ grpc::string full_method_name = GetFullMethodName(method);
+ if (has_error_) {
+ return false;
+ }
+
+ const protobuf::MethodDescriptor* method_desc =
+ desc_pool_->FindMethodByName(full_method_name);
+ if (!method_desc) {
+ LogError("Method not found");
+ return false;
+ }
+
+ return is_request ? method_desc->client_streaming()
+ : method_desc->server_streaming();
+}
+
grpc::string ProtoFileParser::GetSerializedProtoFromMethod(
const grpc::string& method, const grpc::string& text_format_proto,
bool is_request) {
diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h
index c1070a37b5..23d311ef8f 100644
--- a/test/cpp/util/proto_file_parser.h
+++ b/test/cpp/util/proto_file_parser.h
@@ -84,6 +84,8 @@ class ProtoFileParser {
const grpc::string& message_type_name,
const grpc::string& serialized_proto);
+ bool IsStreaming(const grpc::string& method, bool is_request);
+
bool HasError() const { return has_error_; }
void LogError(const grpc::string& error_msg);
@@ -104,6 +106,7 @@ class ProtoFileParser {
std::unique_ptr<protobuf::DynamicMessageFactory> dynamic_factory_;
std::unique_ptr<grpc::protobuf::Message> request_prototype_;
std::unique_ptr<grpc::protobuf::Message> response_prototype_;
+ std::unordered_map<grpc::string, grpc::string> known_methods_;
std::vector<const protobuf::ServiceDescriptor*> service_desc_list_;
};