diff options
author | 2016-09-09 20:05:37 -0700 | |
---|---|---|
committer | 2016-12-15 16:10:37 -0800 | |
commit | d37f642f359cb7fd7405831e675abb93fd4704e2 (patch) | |
tree | c91596b19971b347cd039c99051818ddf4dd7eda /test/cpp/util/grpc_tool.cc | |
parent | f9329217b1ce334a19b8720f70a17ce8f5d5db23 (diff) |
Support server streaming
Skip unparsable input
Add tests for uni-directional stream calls
Simplify client stream handling
Diffstat (limited to 'test/cpp/util/grpc_tool.cc')
-rw-r--r-- | test/cpp/util/grpc_tool.cc | 105 |
1 files changed, 59 insertions, 46 deletions
diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc index 8082d6027b..762f8e8c23 100644 --- a/test/cpp/util/grpc_tool.cc +++ b/test/cpp/util/grpc_tool.cc @@ -418,6 +418,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv, grpc::string formatted_method_name; std::unique_ptr<grpc::testing::ProtoFileParser> parser; grpc::string serialized_request_proto; + bool print_mode = false; std::shared_ptr<grpc::Channel> channel = FLAGS_remotedb @@ -435,17 +436,19 @@ bool GrpcTool::CallMethod(int argc, const char** argv, } if (parser->IsStreaming(method_name, true /* is_request */)) { - fprintf(stderr, "streaming request\n"); + // TODO(zyc): Support BidiStream + if (parser->IsStreaming(method_name, false /* is_request */)) { + fprintf(stderr, + "Bidirectional-streaming method is not supported."); + return false; + } + std::istream* input_stream; std::ifstream input_file; if (argc == 3) { request_text = argv[2]; - if (!FLAGS_infile.empty()) { - fprintf(stderr, "warning: request given in argv, ignoring --infile\n"); - } } - // std::stringstream input_stream; std::multimap<grpc::string, grpc::string> client_metadata; ParseMetadataFlag(&client_metadata); @@ -455,47 +458,47 @@ bool GrpcTool::CallMethod(int argc, const char** argv, if (FLAGS_infile.empty()) { if (isatty(STDIN_FILENO)) { - fprintf(stderr, "reading request message from stdin...\n"); + print_mode = true; + fprintf(stderr, "reading streaming request message from stdin...\n"); } input_stream = &std::cin; - // rdbuf = std::cin.rdbuf(); - // input_stream.rdbuf(std::cin.rdbuf()); - // input_stream << std::cin.rdbuf(); - } else { input_file.open(FLAGS_infile, std::ios::in | std::ios::binary); - // rdbuf = input_file.rdbuf(); - // input_stream.rdbuf(input_file.rdbuf()); input_stream = &input_file; - // input_file.close(); } - // request_text = input_stream.str(); std::stringstream request_ss; grpc::string line; - while (!input_stream->eof() && getline(*input_stream, line)) { - if (line.length() == 0) { - // request_text = request_ss.str(); + while (!request_text.empty() || + (!input_stream->eof() && getline(*input_stream, line))) { + if (!request_text.empty()) { if (FLAGS_binary_input) { - serialized_request_proto = request_ss.str(); + serialized_request_proto = request_text; + request_text.clear(); } else { serialized_request_proto = parser->GetSerializedProtoFromMethod( - method_name, request_ss.str(), true /* is_request */); + method_name, request_text, true /* is_request */); + request_text.clear(); if (parser->HasError()) { - return false; + if (print_mode) { + fprintf(stderr, "Failed to parse request.\n"); + } + continue; } } - request_ss.str(grpc::string()); - request_ss.clear(); - - grpc::string response_text = parser->GetTextFormatFromMethod( - method_name, serialized_request_proto, true /* is_request */); call.Write(serialized_request_proto); - - fprintf(stderr, "%s", response_text.c_str()); + if (print_mode) { + fprintf(stderr, "Request sent.\n"); + } } else { - request_ss << line << ' '; + if (line.length() == 0) { + request_text = request_ss.str(); + request_ss.str(grpc::string()); + request_ss.clear(); + } else { + request_ss << line << ' '; + } } } if (input_file.is_open()) { @@ -507,7 +510,9 @@ bool GrpcTool::CallMethod(int argc, const char** argv, grpc::string serialized_response_proto; std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata, server_trailing_metadata; - call.Read(&serialized_response_proto, &server_initial_metadata); + if (!call.Read(&serialized_response_proto, &server_trailing_metadata)) { + fprintf(stderr, "Failed to read response.\n"); + } Status status = call.Finish(&server_trailing_metadata); PrintMetadata(server_initial_metadata, @@ -524,7 +529,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv, if (parser->HasError()) { return false; } - output_ss << "Response: \n " << response_text << std::endl; + output_ss << response_text; } } else { fprintf(stderr, "Rpc failed with status code %d, error message: %s\n", @@ -569,32 +574,40 @@ bool GrpcTool::CallMethod(int argc, const char** argv, server_trailing_metadata; ParseMetadataFlag(&client_metadata); PrintMetadata(client_metadata, "Sending client initial metadata:"); - grpc::Status status = grpc::testing::CliCall::Call( - channel, formated_method_name, serialized_request_proto, - &serialized_response_proto, client_metadata, &server_initial_metadata, - &server_trailing_metadata); - PrintMetadata(server_initial_metadata, - "Received initial metadata from server:"); - PrintMetadata(server_trailing_metadata, - "Received trailing metadata from server:"); - if (status.ok()) { - fprintf(stderr, "Rpc succeeded with OK status\n"); - if (FLAGS_binary_output) { - output_ss << serialized_response_proto; - } else { - grpc::string response_text = parser->GetTextFormatFromMethod( + + CliCall call(channel, formated_method_name, client_metadata); + call.Write(serialized_request_proto); + call.WritesDone(); + + for (bool receive_initial_metadata = true; call.Read( + &serialized_response_proto, + receive_initial_metadata ? &server_initial_metadata : nullptr); + receive_initial_metadata = false) { + if (!FLAGS_binary_output) { + serialized_response_proto = parser->GetTextFormatFromMethod( method_name, serialized_response_proto, false /* is_request */); if (parser->HasError()) { return false; } - output_ss << "Response: \n " << response_text << std::endl; } + if (receive_initial_metadata) { + PrintMetadata(server_initial_metadata, + "Received initial metadata from server:"); + } + if (!callback(serialized_response_proto)) { + return false; + } + } + Status status = call.Finish(&server_trailing_metadata); + if (status.ok()) { + fprintf(stderr, "Rpc succeeded with OK status\n"); + return true; } else { fprintf(stderr, "Rpc failed with status code %d, error message: %s\n", status.error_code(), status.error_message().c_str()); + return false; } } - return callback(output_ss.str()); } |