aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/util/grpc_tool.cc
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2016-09-09 20:05:37 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2016-12-15 16:10:37 -0800
commitd37f642f359cb7fd7405831e675abb93fd4704e2 (patch)
treec91596b19971b347cd039c99051818ddf4dd7eda /test/cpp/util/grpc_tool.cc
parentf9329217b1ce334a19b8720f70a17ce8f5d5db23 (diff)
Support server streaming
Skip unparsable input Add tests for uni-directional stream calls Simplify client stream handling
Diffstat (limited to 'test/cpp/util/grpc_tool.cc')
-rw-r--r--test/cpp/util/grpc_tool.cc105
1 files changed, 59 insertions, 46 deletions
diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc
index 8082d6027b..762f8e8c23 100644
--- a/test/cpp/util/grpc_tool.cc
+++ b/test/cpp/util/grpc_tool.cc
@@ -418,6 +418,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
grpc::string formatted_method_name;
std::unique_ptr<grpc::testing::ProtoFileParser> parser;
grpc::string serialized_request_proto;
+ bool print_mode = false;
std::shared_ptr<grpc::Channel> channel =
FLAGS_remotedb
@@ -435,17 +436,19 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
}
if (parser->IsStreaming(method_name, true /* is_request */)) {
- fprintf(stderr, "streaming request\n");
+ // TODO(zyc): Support BidiStream
+ if (parser->IsStreaming(method_name, false /* is_request */)) {
+ fprintf(stderr,
+ "Bidirectional-streaming method is not supported.");
+ return false;
+ }
+
std::istream* input_stream;
std::ifstream input_file;
if (argc == 3) {
request_text = argv[2];
- if (!FLAGS_infile.empty()) {
- fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
- }
}
- // std::stringstream input_stream;
std::multimap<grpc::string, grpc::string> client_metadata;
ParseMetadataFlag(&client_metadata);
@@ -455,47 +458,47 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
if (FLAGS_infile.empty()) {
if (isatty(STDIN_FILENO)) {
- fprintf(stderr, "reading request message from stdin...\n");
+ print_mode = true;
+ fprintf(stderr, "reading streaming request message from stdin...\n");
}
input_stream = &std::cin;
- // rdbuf = std::cin.rdbuf();
- // input_stream.rdbuf(std::cin.rdbuf());
- // input_stream << std::cin.rdbuf();
-
} else {
input_file.open(FLAGS_infile, std::ios::in | std::ios::binary);
- // rdbuf = input_file.rdbuf();
- // input_stream.rdbuf(input_file.rdbuf());
input_stream = &input_file;
- // input_file.close();
}
- // request_text = input_stream.str();
std::stringstream request_ss;
grpc::string line;
- while (!input_stream->eof() && getline(*input_stream, line)) {
- if (line.length() == 0) {
- // request_text = request_ss.str();
+ while (!request_text.empty() ||
+ (!input_stream->eof() && getline(*input_stream, line))) {
+ if (!request_text.empty()) {
if (FLAGS_binary_input) {
- serialized_request_proto = request_ss.str();
+ serialized_request_proto = request_text;
+ request_text.clear();
} else {
serialized_request_proto = parser->GetSerializedProtoFromMethod(
- method_name, request_ss.str(), true /* is_request */);
+ method_name, request_text, true /* is_request */);
+ request_text.clear();
if (parser->HasError()) {
- return false;
+ if (print_mode) {
+ fprintf(stderr, "Failed to parse request.\n");
+ }
+ continue;
}
}
- request_ss.str(grpc::string());
- request_ss.clear();
-
- grpc::string response_text = parser->GetTextFormatFromMethod(
- method_name, serialized_request_proto, true /* is_request */);
call.Write(serialized_request_proto);
-
- fprintf(stderr, "%s", response_text.c_str());
+ if (print_mode) {
+ fprintf(stderr, "Request sent.\n");
+ }
} else {
- request_ss << line << ' ';
+ if (line.length() == 0) {
+ request_text = request_ss.str();
+ request_ss.str(grpc::string());
+ request_ss.clear();
+ } else {
+ request_ss << line << ' ';
+ }
}
}
if (input_file.is_open()) {
@@ -507,7 +510,9 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
grpc::string serialized_response_proto;
std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
server_trailing_metadata;
- call.Read(&serialized_response_proto, &server_initial_metadata);
+ if (!call.Read(&serialized_response_proto, &server_trailing_metadata)) {
+ fprintf(stderr, "Failed to read response.\n");
+ }
Status status = call.Finish(&server_trailing_metadata);
PrintMetadata(server_initial_metadata,
@@ -524,7 +529,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
if (parser->HasError()) {
return false;
}
- output_ss << "Response: \n " << response_text << std::endl;
+ output_ss << response_text;
}
} else {
fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
@@ -569,32 +574,40 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
server_trailing_metadata;
ParseMetadataFlag(&client_metadata);
PrintMetadata(client_metadata, "Sending client initial metadata:");
- grpc::Status status = grpc::testing::CliCall::Call(
- channel, formated_method_name, serialized_request_proto,
- &serialized_response_proto, client_metadata, &server_initial_metadata,
- &server_trailing_metadata);
- PrintMetadata(server_initial_metadata,
- "Received initial metadata from server:");
- PrintMetadata(server_trailing_metadata,
- "Received trailing metadata from server:");
- if (status.ok()) {
- fprintf(stderr, "Rpc succeeded with OK status\n");
- if (FLAGS_binary_output) {
- output_ss << serialized_response_proto;
- } else {
- grpc::string response_text = parser->GetTextFormatFromMethod(
+
+ CliCall call(channel, formated_method_name, client_metadata);
+ call.Write(serialized_request_proto);
+ call.WritesDone();
+
+ for (bool receive_initial_metadata = true; call.Read(
+ &serialized_response_proto,
+ receive_initial_metadata ? &server_initial_metadata : nullptr);
+ receive_initial_metadata = false) {
+ if (!FLAGS_binary_output) {
+ serialized_response_proto = parser->GetTextFormatFromMethod(
method_name, serialized_response_proto, false /* is_request */);
if (parser->HasError()) {
return false;
}
- output_ss << "Response: \n " << response_text << std::endl;
}
+ if (receive_initial_metadata) {
+ PrintMetadata(server_initial_metadata,
+ "Received initial metadata from server:");
+ }
+ if (!callback(serialized_response_proto)) {
+ return false;
+ }
+ }
+ Status status = call.Finish(&server_trailing_metadata);
+ if (status.ok()) {
+ fprintf(stderr, "Rpc succeeded with OK status\n");
+ return true;
} else {
fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
status.error_code(), status.error_message().c_str());
+ return false;
}
}
-
return callback(output_ss.str());
}