aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/common/alarm_cpp_test.cc18
-rw-r--r--test/cpp/end2end/async_end2end_test.cc22
-rw-r--r--test/cpp/end2end/end2end_test.cc23
-rw-r--r--test/cpp/grpclb/grpclb_api_test.cc2
-rw-r--r--test/cpp/grpclb/grpclb_test.cc29
-rw-r--r--test/cpp/interop/client.cc182
-rw-r--r--test/cpp/interop/client_helper.cc2
-rw-r--r--test/cpp/interop/interop_client.cc15
-rw-r--r--test/cpp/interop/interop_client.h1
-rw-r--r--test/cpp/interop/interop_server.cc14
-rw-r--r--test/cpp/interop/interop_server_bootstrap.cc1
-rw-r--r--test/cpp/interop/reconnect_interop_server.cc1
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack.cc250
-rw-r--r--test/cpp/performance/writes_per_rpc_test.cc268
-rw-r--r--test/cpp/qps/client_sync.cc7
-rwxr-xr-xtest/cpp/qps/gen_build_yaml.py8
-rw-r--r--test/cpp/qps/qps_openloop_test.cc2
-rw-r--r--test/cpp/qps/usage_timer.cc29
-rw-r--r--test/cpp/qps/worker.cc2
-rw-r--r--test/cpp/test/server_context_test_spouse_test.cc3
-rw-r--r--test/cpp/thread_manager/thread_manager_test.cc1
-rw-r--r--test/cpp/util/cli_call.cc175
-rw-r--r--test/cpp/util/cli_call.h51
-rw-r--r--test/cpp/util/grpc_cli.cc6
-rw-r--r--test/cpp/util/grpc_tool.cc266
-rw-r--r--test/cpp/util/grpc_tool_test.cc193
-rw-r--r--test/cpp/util/proto_file_parser.cc30
-rw-r--r--test/cpp/util/proto_file_parser.h3
28 files changed, 1305 insertions, 299 deletions
diff --git a/test/cpp/common/alarm_cpp_test.cc b/test/cpp/common/alarm_cpp_test.cc
index a05ac30b1c..41085174a4 100644
--- a/test/cpp/common/alarm_cpp_test.cc
+++ b/test/cpp/common/alarm_cpp_test.cc
@@ -43,12 +43,12 @@ namespace {
TEST(AlarmTest, RegularExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), junk);
+ Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(1), junk);
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
- (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2));
+ (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(2));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
@@ -65,7 +65,7 @@ TEST(AlarmTest, RegularExpiryChrono) {
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
- (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2));
+ (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(2));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
@@ -75,12 +75,12 @@ TEST(AlarmTest, RegularExpiryChrono) {
TEST(AlarmTest, ZeroExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0), junk);
+ Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(0), junk);
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
- (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0));
+ (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(0));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
@@ -90,12 +90,12 @@ TEST(AlarmTest, ZeroExpiry) {
TEST(AlarmTest, NegativeExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(-1), junk);
+ Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(-1), junk);
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
- (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0));
+ (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(0));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
@@ -105,13 +105,13 @@ TEST(AlarmTest, NegativeExpiry) {
TEST(AlarmTest, Cancellation) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
- Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), junk);
+ Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(2), junk);
alarm.Cancel();
void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
- (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
+ (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(1));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_FALSE(ok);
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 2ce3f2f7bd..32e8a41795 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -42,6 +42,7 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/tls.h>
@@ -228,12 +229,7 @@ class TestScenario {
: disable_blocking(non_block),
credentials_type(creds_type),
message_content(content) {}
- void Log() const {
- gpr_log(
- GPR_INFO,
- "Scenario: disable_blocking %d, credentials %s, message size %" PRIuPTR,
- disable_blocking, credentials_type.c_str(), message_content.size());
- }
+ void Log() const;
bool disable_blocking;
// Although the below grpc::string's are logically const, we can't declare
// them const because of a limitation in the way old compilers (e.g., gcc-4.4)
@@ -242,6 +238,20 @@ class TestScenario {
grpc::string message_content;
};
+static std::ostream& operator<<(std::ostream& out,
+ const TestScenario& scenario) {
+ return out << "TestScenario{disable_blocking="
+ << (scenario.disable_blocking ? "true" : "false")
+ << ", credentials='" << scenario.credentials_type
+ << "', message_size=" << scenario.message_content.size() << "}";
+}
+
+void TestScenario::Log() const {
+ std::ostringstream out;
+ out << *this;
+ gpr_log(GPR_DEBUG, "%s", out.str().c_str());
+}
+
class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
AsyncEnd2endTest() { GetParam().Log(); }
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 1a1a94e87c..47e5c5bd77 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -209,10 +209,7 @@ class TestScenario {
public:
TestScenario(bool proxy, const grpc::string& creds_type)
: use_proxy(proxy), credentials_type(creds_type) {}
- void Log() const {
- gpr_log(GPR_INFO, "Scenario: proxy %d, credentials %s", use_proxy,
- credentials_type.c_str());
- }
+ void Log() const;
bool use_proxy;
// Although the below grpc::string is logically const, we can't declare
// them const because of a limitation in the way old compilers (e.g., gcc-4.4)
@@ -220,6 +217,19 @@ class TestScenario {
grpc::string credentials_type;
};
+static std::ostream& operator<<(std::ostream& out,
+ const TestScenario& scenario) {
+ return out << "TestScenario{use_proxy="
+ << (scenario.use_proxy ? "true" : "false") << ", credentials='"
+ << scenario.credentials_type << "'}";
+}
+
+void TestScenario::Log() const {
+ std::ostringstream out;
+ out << *this;
+ gpr_log(GPR_DEBUG, "%s", out.str().c_str());
+}
+
class End2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
End2endTest()
@@ -636,7 +646,7 @@ TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
}
-TEST_P(End2endTest, SimpleRpcWithCustomeUserAgentPrefix) {
+TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
user_agent_prefix_ = "custom_prefix";
ResetStub();
EchoRequest request;
@@ -891,6 +901,8 @@ TEST_P(End2endTest, RpcMaxMessageSize) {
EchoRequest request;
EchoResponse response;
request.set_message(string(kMaxMessageSize_ * 2, 'a'));
+ // cancelled is not guaranteed to appear before the end of the service handler
+ request.mutable_param()->set_skip_cancelled_check(true);
ClientContext context;
Status s = stub_->Echo(&context, request, &response);
@@ -1293,6 +1305,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
EXPECT_FALSE(s.ok());
EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
}
+
TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
ResetStub();
EchoRequest request;
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index 191d729a9e..82ccf436f8 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -63,7 +63,7 @@ grpc::string PackedStringToIp(const grpc_grpclb_ip_address& pb_ip) {
} else {
abort();
}
- GPR_ASSERT(inet_ntop(af, pb_ip.bytes, ip_str, 46) != NULL);
+ GPR_ASSERT(inet_ntop(af, (void*)pb_ip.bytes, ip_str, 46) != NULL);
return ip_str;
}
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index 3637190b6d..4b8a434c78 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -170,7 +170,7 @@ static grpc_slice build_response_payload_slice(
static void drain_cq(grpc_completion_queue *cq) {
grpc_event ev;
do {
- ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
+ ev = grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5),
NULL);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
@@ -288,7 +288,8 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
- op->data.send_status_from_server.status_details = "xyz";
+ grpc_slice status_details = grpc_slice_from_static_string("xyz");
+ op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = NULL;
op++;
@@ -335,7 +336,7 @@ static void start_backend_server(server_fixture *sf) {
GPR_ASSERT(GRPC_CALL_OK == error);
gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport);
ev = grpc_completion_queue_next(sf->cq,
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL);
+ grpc_timeout_seconds_to_deadline(60), NULL);
if (!ev.success) {
gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport);
cq_verifier_destroy(cqv);
@@ -379,7 +380,7 @@ static void start_backend_server(server_fixture *sf) {
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
ev = grpc_completion_queue_next(
- sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
+ sf->cq, grpc_timeout_seconds_to_deadline(3), NULL);
if (ev.type == GRPC_OP_COMPLETE && ev.success) {
GPR_ASSERT(ev.tag = tag(102));
if (request_payload_recv == NULL) {
@@ -409,7 +410,7 @@ static void start_backend_server(server_fixture *sf) {
grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
ev = grpc_completion_queue_next(
- sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
+ sf->cq, grpc_timeout_seconds_to_deadline(3), NULL);
if (ev.type == GRPC_OP_COMPLETE && ev.success) {
GPR_ASSERT(ev.tag = tag(103));
} else {
@@ -433,7 +434,9 @@ static void start_backend_server(server_fixture *sf) {
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
- op->data.send_status_from_server.status_details = "Backend server out a-ok";
+ grpc_slice status_details =
+ grpc_slice_from_static_string("Backend server out a-ok");
+ op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = NULL;
op++;
@@ -462,8 +465,7 @@ static void perform_request(client_fixture *cf) {
grpc_metadata_array trailing_metadata_recv;
grpc_status_code status;
grpc_call_error error;
- char *details = NULL;
- size_t details_capacity = 0;
+ grpc_slice details;
grpc_byte_buffer *request_payload;
grpc_byte_buffer *response_payload_recv;
int i;
@@ -472,9 +474,11 @@ static void perform_request(client_fixture *cf) {
grpc_slice request_payload_slice =
grpc_slice_from_copied_string("hello world");
+ grpc_slice host = grpc_slice_from_static_string("foo.test.google.fr:1234");
c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS,
- cf->cq, "/foo", "foo.test.google.fr:1234",
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL);
+ cf->cq, grpc_slice_from_static_string("/foo"),
+ &host, grpc_timeout_seconds_to_deadline(5),
+ NULL);
gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c);
GPR_ASSERT(c);
char *peer;
@@ -497,7 +501,6 @@ static void perform_request(client_fixture *cf) {
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
- op->data.recv_status_on_client.status_details_capacity = &details_capacity;
op->flags = 0;
op->reserved = NULL;
op++;
@@ -553,7 +556,7 @@ static void perform_request(client_fixture *cf) {
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
- gpr_free(details);
+ grpc_slice_unref(details);
gpr_log(GPR_INFO, "Client call (peer %s) DESTROYED.", peer);
gpr_free(peer);
}
@@ -602,7 +605,7 @@ static void teardown_server(server_fixture *sf) {
gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(
- sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
+ sf->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(sf->server);
gpr_thd_join(sf->tid);
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 3265554444..8a00b61cef 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -32,8 +32,7 @@
*/
#include <memory>
-
-#include <unistd.h>
+#include <unordered_map>
#include <gflags/gflags.h>
#include <grpc++/channel.h>
@@ -108,119 +107,78 @@ int main(int argc, char** argv) {
grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
true,
FLAGS_do_not_abort_on_transient_failures);
- if (FLAGS_test_case == "empty_unary") {
- client.DoEmpty();
- } else if (FLAGS_test_case == "large_unary") {
- client.DoLargeUnary();
- } else if (FLAGS_test_case == "server_compressed_unary") {
- client.DoServerCompressedUnary();
- } else if (FLAGS_test_case == "client_compressed_unary") {
- client.DoClientCompressedUnary();
- } else if (FLAGS_test_case == "client_streaming") {
- client.DoRequestStreaming();
- } else if (FLAGS_test_case == "server_streaming") {
- client.DoResponseStreaming();
- } else if (FLAGS_test_case == "server_compressed_streaming") {
- client.DoServerCompressedStreaming();
- } else if (FLAGS_test_case == "client_compressed_streaming") {
- client.DoClientCompressedStreaming();
- } else if (FLAGS_test_case == "slow_consumer") {
- client.DoResponseStreamingWithSlowConsumer();
- } else if (FLAGS_test_case == "half_duplex") {
- client.DoHalfDuplex();
- } else if (FLAGS_test_case == "ping_pong") {
- client.DoPingPong();
- } else if (FLAGS_test_case == "cancel_after_begin") {
- client.DoCancelAfterBegin();
- } else if (FLAGS_test_case == "cancel_after_first_response") {
- client.DoCancelAfterFirstResponse();
- } else if (FLAGS_test_case == "timeout_on_sleeping_server") {
- client.DoTimeoutOnSleepingServer();
- } else if (FLAGS_test_case == "empty_stream") {
- client.DoEmptyStream();
- } else if (FLAGS_test_case == "compute_engine_creds") {
- client.DoComputeEngineCreds(FLAGS_default_service_account,
- FLAGS_oauth_scope);
- } else if (FLAGS_test_case == "jwt_token_creds") {
- grpc::string json_key = GetServiceAccountJsonKey();
- client.DoJwtTokenCreds(json_key);
- } else if (FLAGS_test_case == "oauth2_auth_token") {
- client.DoOauth2AuthToken(FLAGS_default_service_account, FLAGS_oauth_scope);
- } else if (FLAGS_test_case == "per_rpc_creds") {
- grpc::string json_key = GetServiceAccountJsonKey();
- client.DoPerRpcCreds(json_key);
- } else if (FLAGS_test_case == "status_code_and_message") {
- client.DoStatusWithMessage();
- } else if (FLAGS_test_case == "custom_metadata") {
- client.DoCustomMetadata();
- } else if (FLAGS_test_case == "unimplemented_method") {
- client.DoUnimplementedMethod();
- } else if (FLAGS_test_case == "unimplemented_service") {
- client.DoUnimplementedService();
- } else if (FLAGS_test_case == "cacheable_unary") {
- client.DoCacheableUnary();
- } else if (FLAGS_test_case == "all") {
- client.DoEmpty();
- client.DoLargeUnary();
- client.DoClientCompressedUnary();
- client.DoServerCompressedUnary();
- client.DoRequestStreaming();
- client.DoResponseStreaming();
- client.DoClientCompressedStreaming();
- client.DoServerCompressedStreaming();
- client.DoHalfDuplex();
- client.DoPingPong();
- client.DoCancelAfterBegin();
- client.DoCancelAfterFirstResponse();
- client.DoTimeoutOnSleepingServer();
- client.DoEmptyStream();
- client.DoStatusWithMessage();
- client.DoCustomMetadata();
- client.DoUnimplementedMethod();
- client.DoUnimplementedService();
- client.DoCacheableUnary();
- // service_account_creds and jwt_token_creds can only run with ssl.
- if (FLAGS_use_tls) {
- grpc::string json_key = GetServiceAccountJsonKey();
- client.DoJwtTokenCreds(json_key);
- client.DoOauth2AuthToken(FLAGS_default_service_account,
- FLAGS_oauth_scope);
- client.DoPerRpcCreds(json_key);
+
+ std::unordered_map<grpc::string, std::function<bool()>> actions;
+ actions["empty_unary"] =
+ std::bind(&grpc::testing::InteropClient::DoEmpty, &client);
+ actions["large_unary"] =
+ std::bind(&grpc::testing::InteropClient::DoLargeUnary, &client);
+ actions["server_compressed_unary"] = std::bind(
+ &grpc::testing::InteropClient::DoServerCompressedUnary, &client);
+ actions["client_compressed_unary"] = std::bind(
+ &grpc::testing::InteropClient::DoClientCompressedUnary, &client);
+ actions["client_streaming"] =
+ std::bind(&grpc::testing::InteropClient::DoRequestStreaming, &client);
+ actions["server_streaming"] =
+ std::bind(&grpc::testing::InteropClient::DoResponseStreaming, &client);
+ actions["server_compressed_streaming"] = std::bind(
+ &grpc::testing::InteropClient::DoServerCompressedStreaming, &client);
+ actions["client_compressed_streaming"] = std::bind(
+ &grpc::testing::InteropClient::DoClientCompressedStreaming, &client);
+ actions["slow_consumer"] = std::bind(
+ &grpc::testing::InteropClient::DoResponseStreamingWithSlowConsumer,
+ &client);
+ actions["half_duplex"] =
+ std::bind(&grpc::testing::InteropClient::DoHalfDuplex, &client);
+ actions["ping_pong"] =
+ std::bind(&grpc::testing::InteropClient::DoPingPong, &client);
+ actions["cancel_after_begin"] =
+ std::bind(&grpc::testing::InteropClient::DoCancelAfterBegin, &client);
+ actions["cancel_after_first_response"] = std::bind(
+ &grpc::testing::InteropClient::DoCancelAfterFirstResponse, &client);
+ actions["timeout_on_sleeping_server"] = std::bind(
+ &grpc::testing::InteropClient::DoTimeoutOnSleepingServer, &client);
+ actions["empty_stream"] =
+ std::bind(&grpc::testing::InteropClient::DoEmptyStream, &client);
+ if (FLAGS_use_tls) {
+ actions["compute_engine_creds"] =
+ std::bind(&grpc::testing::InteropClient::DoComputeEngineCreds, &client,
+ FLAGS_default_service_account, FLAGS_oauth_scope);
+ actions["jwt_token_creds"] =
+ std::bind(&grpc::testing::InteropClient::DoJwtTokenCreds, &client,
+ GetServiceAccountJsonKey());
+ actions["oauth2_auth_token"] =
+ std::bind(&grpc::testing::InteropClient::DoOauth2AuthToken, &client,
+ FLAGS_default_service_account, FLAGS_oauth_scope);
+ actions["per_rpc_creds"] =
+ std::bind(&grpc::testing::InteropClient::DoPerRpcCreds, &client,
+ GetServiceAccountJsonKey());
+ }
+ actions["status_code_and_message"] =
+ std::bind(&grpc::testing::InteropClient::DoStatusWithMessage, &client);
+ actions["custom_metadata"] =
+ std::bind(&grpc::testing::InteropClient::DoCustomMetadata, &client);
+ actions["unimplemented_method"] =
+ std::bind(&grpc::testing::InteropClient::DoUnimplementedMethod, &client);
+ actions["unimplemented_service"] =
+ std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
+ // actions["cacheable_unary"] =
+ // std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
+
+ if (FLAGS_test_case == "all") {
+ for (const auto& action : actions) {
+ action.second();
}
- // compute_engine_creds only runs in GCE.
+ } else if (actions.find(FLAGS_test_case) != actions.end()) {
+ actions.find(FLAGS_test_case)->second();
} else {
- const char* testcases[] = {"all",
- "cacheable_unary",
- "cancel_after_begin",
- "cancel_after_first_response",
- "client_compressed_streaming",
- "client_compressed_unary",
- "client_streaming",
- "compute_engine_creds",
- "custom_metadata",
- "empty_stream",
- "empty_unary",
- "half_duplex",
- "jwt_token_creds",
- "large_unary",
- "oauth2_auth_token",
- "oauth2_auth_token",
- "per_rpc_creds",
- "per_rpc_creds",
- "ping_pong",
- "server_compressed_streaming",
- "server_compressed_unary",
- "server_streaming",
- "status_code_and_message",
- "timeout_on_sleeping_server",
- "unimplemented_method",
- "unimplemented_service"};
- char* joined_testcases =
- gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL);
-
+ grpc::string test_cases;
+ for (const auto& action : actions) {
+ if (!test_cases.empty()) test_cases += "\n";
+ test_cases += action.first;
+ }
gpr_log(GPR_ERROR, "Unsupported test case %s. Valid options are\n%s",
- FLAGS_test_case.c_str(), joined_testcases);
- gpr_free(joined_testcases);
+ FLAGS_test_case.c_str(), test_cases.c_str());
ret = 1;
}
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc
index 91564e5dce..d3192ad0c9 100644
--- a/test/cpp/interop/client_helper.cc
+++ b/test/cpp/interop/client_helper.cc
@@ -33,8 +33,6 @@
#include "test/cpp/interop/client_helper.h"
-#include <unistd.h>
-
#include <fstream>
#include <memory>
#include <sstream>
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index d1242627ef..b7f2723c39 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -31,7 +31,6 @@
*
*/
-#include <unistd.h>
#include <cinttypes>
#include <fstream>
#include <memory>
@@ -43,6 +42,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/transport/byte_stream.h"
@@ -109,7 +109,10 @@ TestService::Stub* InteropClient::ServiceStub::Get() {
UnimplementedService::Stub*
InteropClient::ServiceStub::GetUnimplementedServiceStub() {
- return UnimplementedService::NewStub(channel_).get();
+ if (unimplemented_service_stub_ == nullptr) {
+ unimplemented_service_stub_ = UnimplementedService::NewStub(channel_);
+ }
+ return unimplemented_service_stub_.get();
}
void InteropClient::ServiceStub::Reset(std::shared_ptr<Channel> channel) {
@@ -615,7 +618,9 @@ bool InteropClient::DoResponseStreamingWithSlowConsumer() {
GPR_ASSERT(response.payload().body() ==
grpc::string(kResponseMessageSize, '\0'));
gpr_log(GPR_DEBUG, "received message %d", i);
- usleep(kReceiveDelayMilliSeconds * 1000);
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN)));
++i;
}
@@ -943,7 +948,7 @@ bool InteropClient::DoCustomMetadata() {
const auto& server_initial_metadata = context.GetServerInitialMetadata();
auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
GPR_ASSERT(iter != server_initial_metadata.end());
- GPR_ASSERT(iter->second.data() == kInitialMetadataValue);
+ GPR_ASSERT(iter->second == kInitialMetadataValue);
const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
GPR_ASSERT(iter != server_trailing_metadata.end());
@@ -994,7 +999,7 @@ bool InteropClient::DoCustomMetadata() {
const auto& server_initial_metadata = context.GetServerInitialMetadata();
auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
GPR_ASSERT(iter != server_initial_metadata.end());
- GPR_ASSERT(iter->second.data() == kInitialMetadataValue);
+ GPR_ASSERT(iter->second == kInitialMetadataValue);
const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
GPR_ASSERT(iter != server_trailing_metadata.end());
diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h
index 7ec7ebee20..74f4db6b78 100644
--- a/test/cpp/interop/interop_client.h
+++ b/test/cpp/interop/interop_client.h
@@ -107,6 +107,7 @@ class InteropClient {
private:
std::unique_ptr<TestService::Stub> stub_;
+ std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_;
std::shared_ptr<Channel> channel_;
bool new_stub_every_call_; // If true, a new stub is returned by every
// Get() call
diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc
index 956840ba70..5a810b45ef 100644
--- a/test/cpp/interop/interop_server.cc
+++ b/test/cpp/interop/interop_server.cc
@@ -31,8 +31,6 @@
*
*/
-#include <unistd.h>
-
#include <fstream>
#include <memory>
#include <sstream>
@@ -45,6 +43,7 @@
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
+#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/support/string.h"
@@ -91,7 +90,9 @@ void MaybeEchoMetadata(ServerContext* context) {
auto iter = client_metadata.find(kEchoInitialMetadataKey);
if (iter != client_metadata.end()) {
- context->AddInitialMetadata(kEchoInitialMetadataKey, iter->second.data());
+ context->AddInitialMetadata(
+ kEchoInitialMetadataKey,
+ grpc::string(iter->second.begin(), iter->second.end()));
}
iter = client_metadata.find(kEchoTrailingBinMetadataKey);
if (iter != client_metadata.end()) {
@@ -105,7 +106,9 @@ void MaybeEchoMetadata(ServerContext* context) {
if (iter != client_metadata.end()) {
iter = client_metadata.find("user-agent");
if (iter != client_metadata.end()) {
- context->AddInitialMetadata(kEchoUserAgentKey, iter->second.data());
+ context->AddInitialMetadata(
+ kEchoUserAgentKey,
+ grpc::string(iter->second.begin(), iter->second.end()));
}
}
}
@@ -346,6 +349,7 @@ void grpc::testing::interop::RunServer(
std::unique_ptr<Server> server(builder.BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
- sleep(5);
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(5, GPR_TIMESPAN)));
}
}
diff --git a/test/cpp/interop/interop_server_bootstrap.cc b/test/cpp/interop/interop_server_bootstrap.cc
index 99518c6943..7cbf221a03 100644
--- a/test/cpp/interop/interop_server_bootstrap.cc
+++ b/test/cpp/interop/interop_server_bootstrap.cc
@@ -32,7 +32,6 @@
*/
#include <signal.h>
-#include <unistd.h>
#include "test/cpp/interop/server_helper.h"
#include "test/cpp/util/test_config.h"
diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc
index 53d51e80e7..634d0a90fc 100644
--- a/test/cpp/interop/reconnect_interop_server.cc
+++ b/test/cpp/interop/reconnect_interop_server.cc
@@ -34,7 +34,6 @@
// Test description at doc/connection-backoff-interop-test-description.md
#include <signal.h>
-#include <unistd.h>
#include <condition_variable>
#include <memory>
diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc
index 6c0bf80488..c3e96c572c 100644
--- a/test/cpp/microbenchmarks/bm_fullstack.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack.cc
@@ -84,6 +84,16 @@ static class InitializeStuff {
* FIXTURES
*/
+static void ApplyCommonServerBuilderConfig(ServerBuilder* b) {
+ b->SetMaxReceiveMessageSize(INT_MAX);
+ b->SetMaxSendMessageSize(INT_MAX);
+}
+
+static void ApplyCommonChannelArguments(ChannelArguments* c) {
+ c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
+ c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
+}
+
class FullstackFixture {
public:
FullstackFixture(Service* service, const grpc::string& address) {
@@ -91,8 +101,11 @@ class FullstackFixture {
b.AddListeningPort(address, InsecureServerCredentials());
cq_ = b.AddCompletionQueue(true);
b.RegisterService(service);
+ ApplyCommonServerBuilderConfig(&b);
server_ = b.BuildAndStart();
- channel_ = CreateChannel(address, InsecureChannelCredentials());
+ ChannelArguments args;
+ ApplyCommonChannelArguments(&args);
+ channel_ = CreateCustomChannel(address, InsecureChannelCredentials(), args);
}
virtual ~FullstackFixture() {
@@ -117,6 +130,8 @@ class TCP : public FullstackFixture {
public:
TCP(Service* service) : FullstackFixture(service, MakeAddress()) {}
+ void Finish(benchmark::State& state) {}
+
private:
static grpc::string MakeAddress() {
int port = grpc_pick_unused_port_or_die();
@@ -130,6 +145,8 @@ class UDS : public FullstackFixture {
public:
UDS(Service* service) : FullstackFixture(service, MakeAddress()) {}
+ void Finish(benchmark::State& state) {}
+
private:
static grpc::string MakeAddress() {
int port = grpc_pick_unused_port_or_die(); // just for a unique id - not a
@@ -146,6 +163,7 @@ class EndpointPairFixture {
ServerBuilder b;
cq_ = b.AddCompletionQueue(true);
b.RegisterService(service);
+ ApplyCommonServerBuilderConfig(&b);
server_ = b.BuildAndStart();
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -174,6 +192,7 @@ class EndpointPairFixture {
{
ChannelArguments args;
args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
+ ApplyCommonChannelArguments(&args);
grpc_channel_args c_args = args.c_channel_args();
grpc_transport* transport =
@@ -213,6 +232,8 @@ class SockPair : public EndpointPairFixture {
: EndpointPairFixture(service, grpc_iomgr_create_endpoint_pair(
"test", initialize_stuff.rq(), 8192)) {
}
+
+ void Finish(benchmark::State& state) {}
};
class InProcessCHTTP2 : public EndpointPairFixture {
@@ -220,10 +241,20 @@ class InProcessCHTTP2 : public EndpointPairFixture {
InProcessCHTTP2(Service* service)
: EndpointPairFixture(service, MakeEndpoints()) {}
+ void Finish(benchmark::State& state) {
+ std::ostringstream out;
+ out << "writes/iteration:"
+ << ((double)stats_.num_writes / (double)state.iterations());
+ state.SetLabel(out.str());
+ }
+
private:
+ grpc_passthru_endpoint_stats stats_;
+
grpc_endpoint_pair MakeEndpoints() {
grpc_endpoint_pair p;
- grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq());
+ grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(),
+ &stats_);
return p;
}
};
@@ -343,6 +374,12 @@ static void BM_UnaryPingPong(benchmark::State& state) {
EchoRequest send_request;
EchoResponse send_response;
EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_request.set_message(std::string(state.range(0), 'a'));
+ }
+ if (state.range(1) > 0) {
+ send_response.set_message(std::string(state.range(1), 'a'));
+ }
Status recv_status;
struct ServerEnv {
ServerContext ctx;
@@ -365,6 +402,7 @@ static void BM_UnaryPingPong(benchmark::State& state) {
std::unique_ptr<EchoTestService::Stub> stub(
EchoTestService::NewStub(fixture->channel()));
while (state.KeepRunning()) {
+ recv_response.Clear();
ClientContext cli_ctx;
ClientContextMutator cli_ctx_mut(&cli_ctx);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
@@ -393,56 +431,216 @@ static void BM_UnaryPingPong(benchmark::State& state) {
service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
fixture->cq(), fixture->cq(), tag(slot));
}
+ fixture->Finish(state);
fixture.reset();
server_env[0]->~ServerEnv();
server_env[1]->~ServerEnv();
+ state.SetBytesProcessed(state.range(0) * state.iterations() +
+ state.range(1) * state.iterations());
+}
+
+template <class Fixture>
+static void BM_PumpStreamClientToServer(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ if (state.range(0) > 0) {
+ send_request.set_message(std::string(state.range(0), 'a'));
+ }
+ Status recv_status;
+ ServerContext svr_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ ClientContext cli_ctx;
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+ int need_tags = (1 << 0) | (1 << 1);
+ void* t;
+ bool ok;
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ response_rw.Read(&recv_request, tag(0));
+ while (state.KeepRunning()) {
+ request_rw->Write(send_request, tag(1));
+ while (true) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ if (t == tag(0)) {
+ response_rw.Read(&recv_request, tag(0));
+ } else if (t == tag(1)) {
+ break;
+ } else {
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ request_rw->WritesDone(tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(state.range(0) * state.iterations());
+}
+
+template <class Fixture>
+static void BM_PumpStreamServerToClient(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<Fixture> fixture(new Fixture(&service));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_response.set_message(std::string(state.range(0), 'a'));
+ }
+ Status recv_status;
+ ServerContext svr_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ ClientContext cli_ctx;
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+ int need_tags = (1 << 0) | (1 << 1);
+ void* t;
+ bool ok;
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ request_rw->Read(&recv_response, tag(0));
+ while (state.KeepRunning()) {
+ response_rw.Write(send_response, tag(1));
+ while (true) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ if (t == tag(0)) {
+ request_rw->Read(&recv_response, tag(0));
+ } else if (t == tag(1)) {
+ break;
+ } else {
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ response_rw.Finish(Status::OK, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(state.range(0) * state.iterations());
}
/*******************************************************************************
* CONFIGURATIONS
*/
-BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP, NoOpMutator, NoOpMutator);
-BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS, NoOpMutator, NoOpMutator);
-BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair, NoOpMutator, NoOpMutator);
-BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, NoOpMutator);
+static void SweepSizesArgs(benchmark::internal::Benchmark* b) {
+ b->Args({0, 0});
+ for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) {
+ b->Args({i, 0});
+ b->Args({0, i});
+ b->Args({i, i});
+ }
+}
+
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP, NoOpMutator, NoOpMutator)
+ ->Apply(SweepSizesArgs);
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS, NoOpMutator, NoOpMutator)
+ ->Args({0, 0});
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair, NoOpMutator, NoOpMutator)
+ ->Args({0, 0});
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, NoOpMutator)
+ ->Apply(SweepSizesArgs);
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
- Client_AddMetadata<RandomBinaryMetadata<10>, 1>,
- NoOpMutator);
+ Client_AddMetadata<RandomBinaryMetadata<10>, 1>, NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
- Client_AddMetadata<RandomBinaryMetadata<31>, 1>,
- NoOpMutator);
+ Client_AddMetadata<RandomBinaryMetadata<31>, 1>, NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<100>, 1>,
- NoOpMutator);
+ NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
- Client_AddMetadata<RandomBinaryMetadata<10>, 2>,
- NoOpMutator);
+ Client_AddMetadata<RandomBinaryMetadata<10>, 2>, NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
- Client_AddMetadata<RandomBinaryMetadata<31>, 2>,
- NoOpMutator);
+ Client_AddMetadata<RandomBinaryMetadata<31>, 2>, NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<100>, 2>,
- NoOpMutator);
+ NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
- Server_AddInitialMetadata<RandomBinaryMetadata<10>, 1>);
+ Server_AddInitialMetadata<RandomBinaryMetadata<10>, 1>)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
- Server_AddInitialMetadata<RandomBinaryMetadata<31>, 1>);
+ Server_AddInitialMetadata<RandomBinaryMetadata<31>, 1>)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
- Server_AddInitialMetadata<RandomBinaryMetadata<100>, 1>);
+ Server_AddInitialMetadata<RandomBinaryMetadata<100>, 1>)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
- Client_AddMetadata<RandomAsciiMetadata<10>, 1>, NoOpMutator);
+ Client_AddMetadata<RandomAsciiMetadata<10>, 1>, NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
- Client_AddMetadata<RandomAsciiMetadata<31>, 1>, NoOpMutator);
+ Client_AddMetadata<RandomAsciiMetadata<31>, 1>, NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
- Client_AddMetadata<RandomAsciiMetadata<100>, 1>,
- NoOpMutator);
+ Client_AddMetadata<RandomAsciiMetadata<100>, 1>, NoOpMutator)
+ ->Args({0, 0});
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
+ Server_AddInitialMetadata<RandomAsciiMetadata<10>, 1>)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
- Server_AddInitialMetadata<RandomAsciiMetadata<10>, 1>);
+ Server_AddInitialMetadata<RandomAsciiMetadata<31>, 1>)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
- Server_AddInitialMetadata<RandomAsciiMetadata<31>, 1>);
+ Server_AddInitialMetadata<RandomAsciiMetadata<100>, 1>)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
- Server_AddInitialMetadata<RandomAsciiMetadata<100>, 1>);
+ Server_AddInitialMetadata<RandomAsciiMetadata<10>, 100>)
+ ->Args({0, 0});
+
+BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP)
+ ->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS)
+ ->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, SockPair)
+ ->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, InProcessCHTTP2)
+ ->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, TCP)
+ ->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, UDS)
+ ->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair)
+ ->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2)
+ ->Range(0, 128 * 1024 * 1024);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc
new file mode 100644
index 0000000000..7a914c1547
--- /dev/null
+++ b/test/cpp/performance/writes_per_rpc_test.cc
@@ -0,0 +1,268 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/channel.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/impl/grpc_library.h>
+#include <grpc++/security/credentials.h>
+#include <grpc++/security/server_credentials.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc/support/log.h>
+#include <gtest/gtest.h>
+
+extern "C" {
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/endpoint_pair.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/tcp_posix.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
+#include "test/core/util/passthru_endpoint.h"
+#include "test/core/util/port.h"
+}
+#include "src/cpp/client/create_channel_internal.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc {
+namespace testing {
+
+static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+
+static void ApplyCommonServerBuilderConfig(ServerBuilder* b) {
+ b->SetMaxReceiveMessageSize(INT_MAX);
+ b->SetMaxSendMessageSize(INT_MAX);
+}
+
+static void ApplyCommonChannelArguments(ChannelArguments* c) {
+ c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
+ c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
+}
+
+static class InitializeStuff {
+ public:
+ InitializeStuff() {
+ init_lib_.init();
+ rq_ = grpc_resource_quota_create("bm");
+ }
+
+ ~InitializeStuff() { init_lib_.shutdown(); }
+
+ grpc_resource_quota* rq() { return rq_; }
+
+ private:
+ internal::GrpcLibrary init_lib_;
+ grpc_resource_quota* rq_;
+} initialize_stuff;
+
+class EndpointPairFixture {
+ public:
+ EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) {
+ ServerBuilder b;
+ cq_ = b.AddCompletionQueue(true);
+ b.RegisterService(service);
+ ApplyCommonServerBuilderConfig(&b);
+ server_ = b.BuildAndStart();
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ /* add server endpoint to server_ */
+ {
+ const grpc_channel_args* server_args =
+ grpc_server_get_channel_args(server_->c_server());
+ grpc_transport* transport = grpc_create_chttp2_transport(
+ &exec_ctx, server_args, endpoints.server, 0 /* is_client */);
+
+ grpc_pollset** pollsets;
+ size_t num_pollsets = 0;
+ grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets);
+
+ for (size_t i = 0; i < num_pollsets; i++) {
+ grpc_endpoint_add_to_pollset(&exec_ctx, endpoints.server, pollsets[i]);
+ }
+
+ grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport,
+ NULL, server_args);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
+ }
+
+ /* create channel */
+ {
+ ChannelArguments args;
+ args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
+ ApplyCommonChannelArguments(&args);
+
+ grpc_channel_args c_args = args.c_channel_args();
+ grpc_transport* transport =
+ grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1);
+ GPR_ASSERT(transport);
+ grpc_channel* channel = grpc_channel_create(
+ &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
+
+ channel_ = CreateChannelInternal("", channel);
+ }
+
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+
+ virtual ~EndpointPairFixture() {
+ server_->Shutdown();
+ cq_->Shutdown();
+ void* tag;
+ bool ok;
+ while (cq_->Next(&tag, &ok)) {
+ }
+ }
+
+ ServerCompletionQueue* cq() { return cq_.get(); }
+ std::shared_ptr<Channel> channel() { return channel_; }
+
+ private:
+ std::unique_ptr<Server> server_;
+ std::unique_ptr<ServerCompletionQueue> cq_;
+ std::shared_ptr<Channel> channel_;
+};
+
+class InProcessCHTTP2 : public EndpointPairFixture {
+ public:
+ InProcessCHTTP2(Service* service)
+ : EndpointPairFixture(service, MakeEndpoints()) {}
+
+ int writes_performed() const { return stats_.num_writes; }
+
+ private:
+ grpc_passthru_endpoint_stats stats_;
+
+ grpc_endpoint_pair MakeEndpoints() {
+ grpc_endpoint_pair p;
+ grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(),
+ &stats_);
+ return p;
+ }
+};
+
+static double UnaryPingPong(int request_size, int response_size) {
+ const int kIterations = 10000;
+
+ EchoTestService::AsyncService service;
+ std::unique_ptr<InProcessCHTTP2> fixture(new InProcessCHTTP2(&service));
+ EchoRequest send_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (request_size > 0) {
+ send_request.set_message(std::string(request_size, 'a'));
+ }
+ if (response_size > 0) {
+ send_response.set_message(std::string(response_size, 'a'));
+ }
+ Status recv_status;
+ struct ServerEnv {
+ ServerContext ctx;
+ EchoRequest recv_request;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
+ ServerEnv() : response_writer(&ctx) {}
+ };
+ uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
+ ServerEnv* server_env[2] = {
+ reinterpret_cast<ServerEnv*>(server_env_buffer),
+ reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
+ new (server_env[0]) ServerEnv;
+ new (server_env[1]) ServerEnv;
+ service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
+ &server_env[0]->response_writer, fixture->cq(),
+ fixture->cq(), tag(0));
+ service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
+ &server_env[1]->response_writer, fixture->cq(),
+ fixture->cq(), tag(1));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ for (int iteration = 0; iteration < kIterations; iteration++) {
+ recv_response.Clear();
+ ClientContext cli_ctx;
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
+ void* t;
+ bool ok;
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ GPR_ASSERT(t == tag(0) || t == tag(1));
+ intptr_t slot = reinterpret_cast<intptr_t>(t);
+ ServerEnv* senv = server_env[slot];
+ senv->response_writer.Finish(send_response, Status::OK, tag(3));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+ for (int i = (1 << 3) | (1 << 4); i != 0;) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ GPR_ASSERT(ok);
+ int tagnum = (int)reinterpret_cast<intptr_t>(t);
+ GPR_ASSERT(i & (1 << tagnum));
+ i -= 1 << tagnum;
+ }
+ GPR_ASSERT(recv_status.ok());
+
+ senv->~ServerEnv();
+ senv = new (senv) ServerEnv();
+ service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
+ fixture->cq(), fixture->cq(), tag(slot));
+ }
+
+ double writes_per_iteration =
+ (double)fixture->writes_performed() / (double)kIterations;
+
+ fixture.reset();
+ server_env[0]->~ServerEnv();
+ server_env[1]->~ServerEnv();
+
+ return writes_per_iteration;
+}
+
+TEST(WritesPerRpcTest, UnaryPingPong) {
+ EXPECT_LT(UnaryPingPong(0, 0), 2.05);
+ EXPECT_LT(UnaryPingPong(1, 0), 2.05);
+ EXPECT_LT(UnaryPingPong(0, 1), 2.05);
+ EXPECT_LT(UnaryPingPong(4096, 0), 2.2);
+ EXPECT_LT(UnaryPingPong(0, 4096), 2.2);
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index b1e61865e7..498416c64a 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -153,7 +153,6 @@ class SynchronousStreamingClient final : public SynchronousClient {
if (*stream) {
(*stream)->WritesDone();
Status s = (*stream)->Finish();
- EXPECT_TRUE(s.ok());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
s.error_message().c_str());
@@ -173,7 +172,11 @@ class SynchronousStreamingClient final : public SynchronousClient {
entry->set_value((UsageTimer::Now() - start) * 1e9);
return true;
}
- return false;
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ context_[thread_idx].~ClientContext();
+ new (&context_[thread_idx]) ClientContext();
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ return true;
}
private:
diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py
index 188d6196e5..2f035abedd 100755
--- a/test/cpp/qps/gen_build_yaml.py
+++ b/test/cpp/qps/gen_build_yaml.py
@@ -92,7 +92,8 @@ print yaml.dump({
'defaults': 'boringssl',
'cpu_cost': guess_cpu(scenario_json, False),
'exclude_configs': ['tsan', 'asan'],
- 'timeout_seconds': 6*60
+ 'timeout_seconds': 6*60,
+ 'excluded_poll_engines': scenario_json.get('EXCLUDED_POLL_ENGINES', [])
}
for scenario_json in scenario_config.CXXLanguage().scenarios()
if 'scalable' in scenario_json.get('CATEGORIES', [])
@@ -109,8 +110,9 @@ print yaml.dump({
'defaults': 'boringssl',
'cpu_cost': guess_cpu(scenario_json, True),
'exclude_configs': sorted(c for c in configs_from_yaml if c not in ('tsan', 'asan')),
- 'timeout_seconds': 6*60
- }
+ 'timeout_seconds': 6*60,
+ 'excluded_poll_engines': scenario_json.get('EXCLUDED_POLL_ENGINES', [])
+ }
for scenario_json in scenario_config.CXXLanguage().scenarios()
if 'scalable' in scenario_json.get('CATEGORIES', [])
]
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index 8dc50ac6d8..70e2709ac0 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -56,7 +56,7 @@ static void RunQPS() {
client_config.set_async_client_threads(8);
client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
- 1000.0 / GRPC_TEST_SLOWDOWN_FACTOR);
+ 1000.0 / grpc_test_slowdown_factor());
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
diff --git a/test/cpp/qps/usage_timer.cc b/test/cpp/qps/usage_timer.cc
index c6697fbdfd..70ef26e82a 100644
--- a/test/cpp/qps/usage_timer.cc
+++ b/test/cpp/qps/usage_timer.cc
@@ -39,8 +39,15 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#ifdef __linux__
#include <sys/resource.h>
#include <sys/time.h>
+
+static double time_double(struct timeval* tv) {
+ return tv->tv_sec + 1e-6 * tv->tv_usec;
+}
+#endif
+
UsageTimer::UsageTimer() : start_(Sample()) {}
double UsageTimer::Now() {
@@ -48,8 +55,16 @@ double UsageTimer::Now() {
return ts.tv_sec + 1e-9 * ts.tv_nsec;
}
-static double time_double(struct timeval* tv) {
- return tv->tv_sec + 1e-6 * tv->tv_usec;
+static void get_resource_usage(double* utime, double* stime) {
+#ifdef __linux__
+ struct rusage usage;
+ getrusage(RUSAGE_SELF, &usage);
+ *utime = time_double(&usage.ru_utime);
+ *stime = time_double(&usage.ru_stime);
+#else
+ *utime = 0;
+ *stime = 0;
+#endif
}
static void get_cpu_usage(unsigned long long* total_cpu_time,
@@ -74,15 +89,9 @@ static void get_cpu_usage(unsigned long long* total_cpu_time,
}
UsageTimer::Result UsageTimer::Sample() {
- struct rusage usage;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- getrusage(RUSAGE_SELF, &usage);
-
Result r;
- r.wall = time_double(&tv);
- r.user = time_double(&usage.ru_utime);
- r.system = time_double(&usage.ru_stime);
+ r.wall = Now();
+ get_resource_usage(&r.user, &r.system);
r.total_cpu_time = 0;
r.idle_cpu_time = 0;
get_cpu_usage(&r.total_cpu_time, &r.idle_cpu_time);
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index 2068b7c213..e88d0647dd 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -31,7 +31,7 @@
*
*/
-#include <sys/signal.h>
+#include <signal.h>
#include <chrono>
#include <thread>
diff --git a/test/cpp/test/server_context_test_spouse_test.cc b/test/cpp/test/server_context_test_spouse_test.cc
index e0d6a2ff67..eb279e2401 100644
--- a/test/cpp/test/server_context_test_spouse_test.cc
+++ b/test/cpp/test/server_context_test_spouse_test.cc
@@ -36,11 +36,14 @@
#include <cstring>
#include <vector>
+#include <grpc++/impl/grpc_library.h>
#include <gtest/gtest.h>
namespace grpc {
namespace testing {
+static internal::GrpcLibraryInitializer g_initializer;
+
const char key1[] = "metadata-key1";
const char key2[] = "metadata-key2";
const char val1[] = "metadata-val1";
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
index 284761c53a..35c8d5d088 100644
--- a/test/cpp/thread_manager/thread_manager_test.cc
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -31,6 +31,7 @@
*is % allowed in string
*/
+#include <ctime>
#include <memory>
#include <string>
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc
index a02a8b2ee2..041cc0e4c3 100644
--- a/test/cpp/util/cli_call.cc
+++ b/test/cpp/util/cli_call.cc
@@ -37,8 +37,6 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/generic/generic_stub.h>
#include <grpc++/support/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
@@ -56,55 +54,172 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
const OutgoingMetadataContainer& metadata,
IncomingMetadataContainer* server_initial_metadata,
IncomingMetadataContainer* server_trailing_metadata) {
- std::unique_ptr<grpc::GenericStub> stub(new grpc::GenericStub(channel));
- grpc::ClientContext ctx;
+ CliCall call(channel, method, metadata);
+ call.Write(request);
+ call.WritesDone();
+ if (!call.Read(response, server_initial_metadata)) {
+ fprintf(stderr, "Failed to read response.\n");
+ }
+ return call.Finish(server_trailing_metadata);
+}
+
+CliCall::CliCall(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method,
+ const OutgoingMetadataContainer& metadata)
+ : stub_(new grpc::GenericStub(channel)) {
+ gpr_mu_init(&write_mu_);
+ gpr_cv_init(&write_cv_);
if (!metadata.empty()) {
for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
iter != metadata.end(); ++iter) {
- ctx.AddMetadata(iter->first, iter->second);
+ ctx_.AddMetadata(iter->first, iter->second);
}
}
- grpc::CompletionQueue cq;
- std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call(
- stub->Call(&ctx, method, &cq, tag(1)));
+ call_ = stub_->Call(&ctx_, method, &cq_, tag(1));
void* got_tag;
bool ok;
- cq.Next(&got_tag, &ok);
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+CliCall::~CliCall() {
+ gpr_cv_destroy(&write_cv_);
+ gpr_mu_destroy(&write_mu_);
+}
+
+void CliCall::Write(const grpc::string& request) {
+ void* got_tag;
+ bool ok;
grpc_slice s = grpc_slice_from_copied_string(request.c_str());
grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
grpc::ByteBuffer send_buffer(&req_slice, 1);
- call->Write(send_buffer, tag(2));
- cq.Next(&got_tag, &ok);
- GPR_ASSERT(ok);
- call->WritesDone(tag(3));
- cq.Next(&got_tag, &ok);
+ call_->Write(send_buffer, tag(2));
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
+
+bool CliCall::Read(grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata) {
+ void* got_tag;
+ bool ok;
+
grpc::ByteBuffer recv_buffer;
- call->Read(&recv_buffer, tag(4));
- cq.Next(&got_tag, &ok);
- if (!ok) {
- std::cout << "Failed to read response." << std::endl;
+ call_->Read(&recv_buffer, tag(3));
+
+ if (!cq_.Next(&got_tag, &ok) || !ok) {
+ return false;
}
- grpc::Status status;
- call->Finish(&status, tag(5));
- cq.Next(&got_tag, &ok);
+ std::vector<grpc::Slice> slices;
+ GPR_ASSERT(recv_buffer.Dump(&slices).ok());
+
+ response->clear();
+ for (size_t i = 0; i < slices.size(); i++) {
+ response->append(reinterpret_cast<const char*>(slices[i].begin()),
+ slices[i].size());
+ }
+ if (server_initial_metadata) {
+ *server_initial_metadata = ctx_.GetServerInitialMetadata();
+ }
+ return true;
+}
+
+void CliCall::WritesDone() {
+ void* got_tag;
+ bool ok;
+
+ call_->WritesDone(tag(4));
+ cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
+}
- if (status.ok()) {
- std::vector<grpc::Slice> slices;
- (void)recv_buffer.Dump(&slices);
+void CliCall::WriteAndWait(const grpc::string& request) {
+ grpc_slice s = grpc_slice_from_copied_string(request.c_str());
+ grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
+ grpc::ByteBuffer send_buffer(&req_slice, 1);
+
+ gpr_mu_lock(&write_mu_);
+ call_->Write(send_buffer, tag(2));
+ write_done_ = false;
+ while (!write_done_) {
+ gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&write_mu_);
+}
+
+void CliCall::WritesDoneAndWait() {
+ gpr_mu_lock(&write_mu_);
+ call_->WritesDone(tag(4));
+ write_done_ = false;
+ while (!write_done_) {
+ gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&write_mu_);
+}
- response->clear();
- for (size_t i = 0; i < slices.size(); i++) {
- response->append(reinterpret_cast<const char*>(slices[i].begin()),
- slices[i].size());
+bool CliCall::ReadAndMaybeNotifyWrite(
+ grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata) {
+ void* got_tag;
+ bool ok;
+ grpc::ByteBuffer recv_buffer;
+
+ call_->Read(&recv_buffer, tag(3));
+ bool cq_result = cq_.Next(&got_tag, &ok);
+
+ while (got_tag != tag(3)) {
+ gpr_mu_lock(&write_mu_);
+ write_done_ = true;
+ gpr_cv_signal(&write_cv_);
+ gpr_mu_unlock(&write_mu_);
+
+ cq_result = cq_.Next(&got_tag, &ok);
+ if (got_tag == tag(2)) {
+ GPR_ASSERT(ok);
}
}
- *server_initial_metadata = ctx.GetServerInitialMetadata();
- *server_trailing_metadata = ctx.GetServerTrailingMetadata();
+ if (!cq_result || !ok) {
+ // If the RPC is ended on the server side, we should still wait for the
+ // pending write on the client side to be done.
+ if (!ok) {
+ gpr_mu_lock(&write_mu_);
+ if (!write_done_) {
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(got_tag != tag(2));
+ write_done_ = true;
+ gpr_cv_signal(&write_cv_);
+ }
+ gpr_mu_unlock(&write_mu_);
+ }
+ return false;
+ }
+
+ std::vector<grpc::Slice> slices;
+ GPR_ASSERT(recv_buffer.Dump(&slices).ok());
+ response->clear();
+ for (size_t i = 0; i < slices.size(); i++) {
+ response->append(reinterpret_cast<const char*>(slices[i].begin()),
+ slices[i].size());
+ }
+ if (server_initial_metadata) {
+ *server_initial_metadata = ctx_.GetServerInitialMetadata();
+ }
+ return true;
+}
+
+Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
+ void* got_tag;
+ bool ok;
+ grpc::Status status;
+
+ call_->Finish(&status, tag(5));
+ cq_.Next(&got_tag, &ok);
+ GPR_ASSERT(ok);
+ if (server_trailing_metadata) {
+ *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
+ }
+
return status;
}
diff --git a/test/cpp/util/cli_call.h b/test/cpp/util/cli_call.h
index 65da86bd4e..91f0dbc9ed 100644
--- a/test/cpp/util/cli_call.h
+++ b/test/cpp/util/cli_call.h
@@ -37,23 +37,74 @@
#include <map>
#include <grpc++/channel.h>
+#include <grpc++/completion_queue.h>
+#include <grpc++/generic/generic_stub.h>
#include <grpc++/support/status.h>
#include <grpc++/support/string_ref.h>
namespace grpc {
+
+class ClientContext;
+
namespace testing {
+// CliCall handles the sending and receiving of generic messages given the name
+// of the remote method. This class is only used by GrpcTool. Its thread-safe
+// and thread-unsafe methods should not be used together.
class CliCall final {
public:
typedef std::multimap<grpc::string, grpc::string> OutgoingMetadataContainer;
typedef std::multimap<grpc::string_ref, grpc::string_ref>
IncomingMetadataContainer;
+
+ CliCall(std::shared_ptr<grpc::Channel> channel, const grpc::string& method,
+ const OutgoingMetadataContainer& metadata);
+ ~CliCall();
+
+ // Perform an unary generic RPC.
static Status Call(std::shared_ptr<grpc::Channel> channel,
const grpc::string& method, const grpc::string& request,
grpc::string* response,
const OutgoingMetadataContainer& metadata,
IncomingMetadataContainer* server_initial_metadata,
IncomingMetadataContainer* server_trailing_metadata);
+
+ // Send a generic request message in a synchronous manner. NOT thread-safe.
+ void Write(const grpc::string& request);
+
+ // Send a generic request message in a synchronous manner. NOT thread-safe.
+ void WritesDone();
+
+ // Receive a generic response message in a synchronous manner.NOT thread-safe.
+ bool Read(grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata);
+
+ // Thread-safe write. Must be used with ReadAndMaybeNotifyWrite. Send out a
+ // generic request message and wait for ReadAndMaybeNotifyWrite to finish it.
+ void WriteAndWait(const grpc::string& request);
+
+ // Thread-safe WritesDone. Must be used with ReadAndMaybeNotifyWrite. Send out
+ // WritesDone for gereneric request messages and wait for
+ // ReadAndMaybeNotifyWrite to finish it.
+ void WritesDoneAndWait();
+
+ // Thread-safe Read. Blockingly receive a generic response message. Notify
+ // writes if they are finished when this read is waiting for a resposne.
+ bool ReadAndMaybeNotifyWrite(
+ grpc::string* response,
+ IncomingMetadataContainer* server_initial_metadata);
+
+ // Finish the RPC.
+ Status Finish(IncomingMetadataContainer* server_trailing_metadata);
+
+ private:
+ std::unique_ptr<grpc::GenericStub> stub_;
+ grpc::ClientContext ctx_;
+ std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call_;
+ grpc::CompletionQueue cq_;
+ gpr_mu write_mu_;
+ gpr_cv write_cv_; // Protected by write_mu_;
+ bool write_done_; // Portected by write_mu_;
};
} // namespace testing
diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc
index fe248601ee..a78bed4b90 100644
--- a/test/cpp/util/grpc_cli.cc
+++ b/test/cpp/util/grpc_cli.cc
@@ -83,10 +83,10 @@ DEFINE_string(outfile, "", "Output file (default is stdout)");
static bool SimplePrint(const grpc::string& outfile,
const grpc::string& output) {
if (outfile.empty()) {
- std::cout << output;
+ std::cout << output << std::endl;
} else {
- std::ofstream output_file(outfile, std::ios::trunc | std::ios::binary);
- output_file << output;
+ std::ofstream output_file(outfile, std::ios::app | std::ios::binary);
+ output_file << output << std::endl;
output_file.close();
}
return true;
diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc
index b9900ca1b7..856cd32c3c 100644
--- a/test/cpp/util/grpc_tool.cc
+++ b/test/cpp/util/grpc_tool.cc
@@ -33,12 +33,13 @@
#include "test/cpp/util/grpc_tool.h"
-#include <unistd.h>
+#include <cstdio>
#include <fstream>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
+#include <thread>
#include <gflags/gflags.h>
#include <grpc++/channel.h>
@@ -47,12 +48,19 @@
#include <grpc++/security/credentials.h>
#include <grpc++/support/string_ref.h>
#include <grpc/grpc.h>
+#include <grpc/support/port_platform.h>
#include "test/cpp/util/cli_call.h"
#include "test/cpp/util/proto_file_parser.h"
#include "test/cpp/util/proto_reflection_descriptor_database.h"
#include "test/cpp/util/service_describer.h"
+#if GPR_WINDOWS
+#include <io.h>
+#else
+#include <unistd.h>
+#endif
+
namespace grpc {
namespace testing {
@@ -159,6 +167,36 @@ void PrintMetadata(const T& m, const grpc::string& message) {
}
}
+void ReadResponse(CliCall* call, const grpc::string& method_name,
+ GrpcToolOutputCallback callback, ProtoFileParser* parser,
+ gpr_mu* parser_mu, bool print_mode) {
+ grpc::string serialized_response_proto;
+ std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata;
+
+ for (bool receive_initial_metadata = true; call->ReadAndMaybeNotifyWrite(
+ &serialized_response_proto,
+ receive_initial_metadata ? &server_initial_metadata : nullptr);
+ receive_initial_metadata = false) {
+ fprintf(stderr, "got response.\n");
+ if (!FLAGS_binary_output) {
+ gpr_mu_lock(parser_mu);
+ serialized_response_proto = parser->GetTextFormatFromMethod(
+ method_name, serialized_response_proto, false /* is_request */);
+ if (parser->HasError() && print_mode) {
+ fprintf(stderr, "Failed to parse response.\n");
+ }
+ gpr_mu_unlock(parser_mu);
+ }
+ if (receive_initial_metadata) {
+ PrintMetadata(server_initial_metadata,
+ "Received initial metadata from server:");
+ }
+ if (!callback(serialized_response_proto) && print_mode) {
+ fprintf(stderr, "Failed to output response.\n");
+ }
+ }
+}
+
struct Command {
const char* command;
std::function<bool(GrpcTool*, int, const char**, const CliCredentials&,
@@ -416,85 +454,191 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
grpc::string server_address(argv[0]);
grpc::string method_name(argv[1]);
grpc::string formatted_method_name;
- std::unique_ptr<grpc::testing::ProtoFileParser> parser;
+ std::unique_ptr<ProtoFileParser> parser;
grpc::string serialized_request_proto;
+ bool print_mode = false;
- if (argc == 3) {
- request_text = argv[2];
- if (!FLAGS_infile.empty()) {
- fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
- }
+ std::shared_ptr<grpc::Channel> channel =
+ FLAGS_remotedb
+ ? grpc::CreateChannel(server_address, cred.GetCredentials())
+ : nullptr;
+
+ parser.reset(new grpc::testing::ProtoFileParser(channel, FLAGS_proto_path,
+ FLAGS_protofiles));
+
+ if (FLAGS_binary_input) {
+ formatted_method_name = method_name;
} else {
- std::stringstream input_stream;
+ formatted_method_name = parser->GetFormattedMethodName(method_name);
+ }
+
+ if (parser->HasError()) {
+ return false;
+ }
+
+ if (parser->IsStreaming(method_name, true /* is_request */)) {
+ std::istream* input_stream;
+ std::ifstream input_file;
+
+ if (argc == 3) {
+ request_text = argv[2];
+ }
+
+ std::multimap<grpc::string, grpc::string> client_metadata;
+ ParseMetadataFlag(&client_metadata);
+ PrintMetadata(client_metadata, "Sending client initial metadata:");
+
+ CliCall call(channel, formatted_method_name, client_metadata);
+
if (FLAGS_infile.empty()) {
- if (isatty(STDIN_FILENO)) {
- fprintf(stderr, "reading request message from stdin...\n");
+ if (isatty(fileno(stdin))) {
+ print_mode = true;
+ fprintf(stderr, "reading streaming request message from stdin...\n");
}
- input_stream << std::cin.rdbuf();
+ input_stream = &std::cin;
} else {
- std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary);
- input_stream << input_file.rdbuf();
+ input_file.open(FLAGS_infile, std::ios::in | std::ios::binary);
+ input_stream = &input_file;
+ }
+
+ gpr_mu parser_mu;
+ gpr_mu_init(&parser_mu);
+ std::thread read_thread(ReadResponse, &call, method_name, callback,
+ parser.get(), &parser_mu, print_mode);
+
+ std::stringstream request_ss;
+ grpc::string line;
+ while (!request_text.empty() ||
+ (!input_stream->eof() && getline(*input_stream, line))) {
+ if (!request_text.empty()) {
+ if (FLAGS_binary_input) {
+ serialized_request_proto = request_text;
+ request_text.clear();
+ } else {
+ gpr_mu_lock(&parser_mu);
+ serialized_request_proto = parser->GetSerializedProtoFromMethod(
+ method_name, request_text, true /* is_request */);
+ request_text.clear();
+ if (parser->HasError()) {
+ if (print_mode) {
+ fprintf(stderr, "Failed to parse request.\n");
+ }
+ gpr_mu_unlock(&parser_mu);
+ continue;
+ }
+ gpr_mu_unlock(&parser_mu);
+ }
+
+ call.WriteAndWait(serialized_request_proto);
+ if (print_mode) {
+ fprintf(stderr, "Request sent.\n");
+ }
+ } else {
+ if (line.length() == 0) {
+ request_text = request_ss.str();
+ request_ss.str(grpc::string());
+ request_ss.clear();
+ } else {
+ request_ss << line << ' ';
+ }
+ }
+ }
+ if (input_file.is_open()) {
input_file.close();
}
- request_text = input_stream.str();
- }
- std::shared_ptr<grpc::Channel> channel =
- grpc::CreateChannel(server_address, cred.GetCredentials());
- if (!FLAGS_binary_input || !FLAGS_binary_output) {
- parser.reset(
- new grpc::testing::ProtoFileParser(FLAGS_remotedb ? channel : nullptr,
- FLAGS_proto_path, FLAGS_protofiles));
- if (parser->HasError()) {
+ call.WritesDoneAndWait();
+ read_thread.join();
+
+ std::multimap<grpc::string_ref, grpc::string_ref> server_trailing_metadata;
+ Status status = call.Finish(&server_trailing_metadata);
+ PrintMetadata(server_trailing_metadata,
+ "Received trailing metadata from server:");
+
+ if (status.ok()) {
+ fprintf(stderr, "Stream RPC succeeded with OK status\n");
+ return true;
+ } else {
+ fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
+ status.error_code(), status.error_message().c_str());
return false;
}
- }
- if (FLAGS_binary_input) {
- serialized_request_proto = request_text;
- formatted_method_name = method_name;
- } else {
- formatted_method_name = parser->GetFormattedMethodName(method_name);
- serialized_request_proto = parser->GetSerializedProtoFromMethod(
- method_name, request_text, true /* is_request */);
- if (parser->HasError()) {
- return false;
+ } else { // parser->IsStreaming(method_name, true /* is_request */)
+ if (argc == 3) {
+ request_text = argv[2];
+ if (!FLAGS_infile.empty()) {
+ fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
+ }
+ } else {
+ std::stringstream input_stream;
+ if (FLAGS_infile.empty()) {
+ if (isatty(fileno(stdin))) {
+ fprintf(stderr, "reading request message from stdin...\n");
+ }
+ input_stream << std::cin.rdbuf();
+ } else {
+ std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary);
+ input_stream << input_file.rdbuf();
+ input_file.close();
+ }
+ request_text = input_stream.str();
}
- }
- fprintf(stderr, "connecting to %s\n", server_address.c_str());
- grpc::string serialized_response_proto;
- std::multimap<grpc::string, grpc::string> client_metadata;
- std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
- server_trailing_metadata;
- ParseMetadataFlag(&client_metadata);
- PrintMetadata(client_metadata, "Sending client initial metadata:");
- grpc::Status status = grpc::testing::CliCall::Call(
- channel, formatted_method_name, serialized_request_proto,
- &serialized_response_proto, client_metadata, &server_initial_metadata,
- &server_trailing_metadata);
- PrintMetadata(server_initial_metadata,
- "Received initial metadata from server:");
- PrintMetadata(server_trailing_metadata,
- "Received trailing metadata from server:");
- if (status.ok()) {
- fprintf(stderr, "Rpc succeeded with OK status\n");
- if (FLAGS_binary_output) {
- output_ss << serialized_response_proto;
+ if (FLAGS_binary_input) {
+ serialized_request_proto = request_text;
+ // formatted_method_name = method_name;
} else {
- grpc::string response_text = parser->GetTextFormatFromMethod(
- method_name, serialized_response_proto, false /* is_request */);
+ // formatted_method_name = parser->GetFormattedMethodName(method_name);
+ serialized_request_proto = parser->GetSerializedProtoFromMethod(
+ method_name, request_text, true /* is_request */);
if (parser->HasError()) {
return false;
}
- output_ss << "Response: \n " << response_text << std::endl;
}
- } else {
- fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
- status.error_code(), status.error_message().c_str());
+ fprintf(stderr, "connecting to %s\n", server_address.c_str());
+
+ grpc::string serialized_response_proto;
+ std::multimap<grpc::string, grpc::string> client_metadata;
+ std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
+ server_trailing_metadata;
+ ParseMetadataFlag(&client_metadata);
+ PrintMetadata(client_metadata, "Sending client initial metadata:");
+
+ CliCall call(channel, formatted_method_name, client_metadata);
+ call.Write(serialized_request_proto);
+ call.WritesDone();
+
+ for (bool receive_initial_metadata = true; call.Read(
+ &serialized_response_proto,
+ receive_initial_metadata ? &server_initial_metadata : nullptr);
+ receive_initial_metadata = false) {
+ if (!FLAGS_binary_output) {
+ serialized_response_proto = parser->GetTextFormatFromMethod(
+ method_name, serialized_response_proto, false /* is_request */);
+ if (parser->HasError()) {
+ return false;
+ }
+ }
+ if (receive_initial_metadata) {
+ PrintMetadata(server_initial_metadata,
+ "Received initial metadata from server:");
+ }
+ if (!callback(serialized_response_proto)) {
+ return false;
+ }
+ }
+ Status status = call.Finish(&server_trailing_metadata);
+ if (status.ok()) {
+ fprintf(stderr, "Rpc succeeded with OK status\n");
+ return true;
+ } else {
+ fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
+ status.error_code(), status.error_message().c_str());
+ return false;
+ }
}
-
- return callback(output_ss.str());
+ GPR_UNREACHABLE_CODE(return false);
}
bool GrpcTool::ParseMessage(int argc, const char** argv,
@@ -531,7 +675,7 @@ bool GrpcTool::ParseMessage(int argc, const char** argv,
} else {
std::stringstream input_stream;
if (FLAGS_infile.empty()) {
- if (isatty(STDIN_FILENO)) {
+ if (isatty(fileno(stdin))) {
fprintf(stderr, "reading request message from stdin...\n");
}
input_stream << std::cin.rdbuf();
diff --git a/test/cpp/util/grpc_tool_test.cc b/test/cpp/util/grpc_tool_test.cc
index 33ce611a60..26e2b1f502 100644
--- a/test/cpp/util/grpc_tool_test.cc
+++ b/test/cpp/util/grpc_tool_test.cc
@@ -102,6 +102,8 @@ DECLARE_bool(l);
namespace {
+const int kNumResponseStreamsMsgs = 3;
+
class TestCliCredentials final : public grpc::testing::CliCredentials {
public:
std::shared_ptr<grpc::ChannelCredentials> GetCredentials() const override {
@@ -137,6 +139,71 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
response->set_message(request->message());
return Status::OK;
}
+
+ Status RequestStream(ServerContext* context,
+ ServerReader<EchoRequest>* reader,
+ EchoResponse* response) override {
+ EchoRequest request;
+ response->set_message("");
+ if (!context->client_metadata().empty()) {
+ for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+ iter = context->client_metadata().begin();
+ iter != context->client_metadata().end(); ++iter) {
+ context->AddInitialMetadata(ToString(iter->first),
+ ToString(iter->second));
+ }
+ }
+ context->AddTrailingMetadata("trailing_key", "trailing_value");
+ while (reader->Read(&request)) {
+ response->mutable_message()->append(request.message());
+ }
+
+ return Status::OK;
+ }
+
+ Status ResponseStream(ServerContext* context, const EchoRequest* request,
+ ServerWriter<EchoResponse>* writer) override {
+ if (!context->client_metadata().empty()) {
+ for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+ iter = context->client_metadata().begin();
+ iter != context->client_metadata().end(); ++iter) {
+ context->AddInitialMetadata(ToString(iter->first),
+ ToString(iter->second));
+ }
+ }
+ context->AddTrailingMetadata("trailing_key", "trailing_value");
+
+ EchoResponse response;
+ for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
+ response.set_message(request->message() + grpc::to_string(i));
+ writer->Write(response);
+ }
+
+ return Status::OK;
+ }
+
+ Status BidiStream(
+ ServerContext* context,
+ ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
+ EchoRequest request;
+ EchoResponse response;
+ if (!context->client_metadata().empty()) {
+ for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
+ iter = context->client_metadata().begin();
+ iter != context->client_metadata().end(); ++iter) {
+ context->AddInitialMetadata(ToString(iter->first),
+ ToString(iter->second));
+ }
+ }
+ context->AddTrailingMetadata("trailing_key", "trailing_value");
+
+ while (stream->Read(&request)) {
+ response.set_message(request.message());
+ stream->Write(response);
+ }
+
+ return Status::OK;
+ }
};
} // namespace
@@ -347,6 +414,132 @@ TEST_F(GrpcToolTest, CallCommand) {
ShutdownServer();
}
+TEST_F(GrpcToolTest, CallCommandRequestStream) {
+ // Test input: grpc_cli call localhost:<port> RequestStream "message:
+ // 'Hello0'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "RequestStream", "message: 'Hello0'"};
+
+ // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+ std::streambuf* orig = std::cin.rdbuf();
+ std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n");
+ std::cin.rdbuf(ss.rdbuf());
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello0Hello1Hello2\""
+ EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+ "message: \"Hello0Hello1Hello2\""));
+ std::cin.rdbuf(orig);
+ ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandRequestStreamWithBadRequest) {
+ // Test input: grpc_cli call localhost:<port> RequestStream "message:
+ // 'Hello0'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "RequestStream", "message: 'Hello0'"};
+
+ // Mock std::cin input "bad_field: 'Hello1'\n\n message: 'Hello2'\n\n"
+ std::streambuf* orig = std::cin.rdbuf();
+ std::istringstream ss("bad_field: 'Hello1'\n\n message: 'Hello2'\n\n");
+ std::cin.rdbuf(ss.rdbuf());
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello0Hello2\""
+ EXPECT_TRUE(NULL !=
+ strstr(output_stream.str().c_str(), "message: \"Hello0Hello2\""));
+ std::cin.rdbuf(orig);
+ ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandResponseStream) {
+ // Test input: grpc_cli call localhost:<port> ResponseStream "message:
+ // 'Hello'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "ResponseStream", "message: 'Hello'"};
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello{n}\""
+ for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
+ grpc::string expected_response_text =
+ "message: \"Hello" + grpc::to_string(i) + "\"\n";
+ EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+ expected_response_text.c_str()));
+ }
+
+ ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandBidiStream) {
+ // Test input: grpc_cli call localhost:<port> BidiStream "message: 'Hello0'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "BidiStream", "message: 'Hello0'"};
+
+ // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+ std::streambuf* orig = std::cin.rdbuf();
+ std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n");
+ std::cin.rdbuf(ss.rdbuf());
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello0\"\nmessage: \"Hello1\"\nmessage:
+ // \"Hello2\"\n\n"
+ EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+ "message: \"Hello0\"\nmessage: "
+ "\"Hello1\"\nmessage: \"Hello2\"\n"));
+ std::cin.rdbuf(orig);
+ ShutdownServer();
+}
+
+TEST_F(GrpcToolTest, CallCommandBidiStreamWithBadRequest) {
+ // Test input: grpc_cli call localhost:<port> BidiStream "message: 'Hello0'"
+ std::stringstream output_stream;
+
+ const grpc::string server_address = SetUpServer();
+ const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
+ "BidiStream", "message: 'Hello0'"};
+
+ // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
+ std::streambuf* orig = std::cin.rdbuf();
+ std::istringstream ss("message: 1.0\n\n message: 'Hello2'\n\n");
+ std::cin.rdbuf(ss.rdbuf());
+
+ EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
+ std::bind(PrintStream, &output_stream,
+ std::placeholders::_1)));
+
+ // Expected output: "message: \"Hello0\"\nmessage: \"Hello1\"\nmessage:
+ // \"Hello2\"\n\n"
+ EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
+ "message: \"Hello0\"\nmessage: \"Hello2\"\n"));
+ std::cin.rdbuf(orig);
+
+ ShutdownServer();
+}
+
TEST_F(GrpcToolTest, ParseCommand) {
// Test input "grpc_cli parse localhost:<port> grpc.testing.EchoResponse
// ECHO_RESPONSE_MESSAGE"
diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc
index bc8a6083f4..5dd1b00e8a 100644
--- a/test/cpp/util/proto_file_parser.cc
+++ b/test/cpp/util/proto_file_parser.cc
@@ -81,7 +81,8 @@ class ErrorPrinter : public protobuf::compiler::MultiFileErrorCollector {
ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
const grpc::string& proto_path,
const grpc::string& protofiles)
- : has_error_(false) {
+ : has_error_(false),
+ dynamic_factory_(new protobuf::DynamicMessageFactory()) {
std::vector<grpc::string> service_list;
if (channel) {
reflection_db_.reset(new grpc::ProtoReflectionDescriptorDatabase(channel));
@@ -127,7 +128,6 @@ ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
}
desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get()));
- dynamic_factory_.reset(new protobuf::DynamicMessageFactory(desc_pool_.get()));
for (auto it = service_list.begin(); it != service_list.end(); it++) {
if (known_services.find(*it) == known_services.end()) {
@@ -144,6 +144,11 @@ ProtoFileParser::~ProtoFileParser() {}
grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) {
has_error_ = false;
+
+ if (known_methods_.find(method) != known_methods_.end()) {
+ return known_methods_[method];
+ }
+
const protobuf::MethodDescriptor* method_descriptor = nullptr;
for (auto it = service_desc_list_.begin(); it != service_desc_list_.end();
it++) {
@@ -169,6 +174,8 @@ grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) {
return "";
}
+ known_methods_[method] = method_descriptor->full_name();
+
return method_descriptor->full_name();
}
@@ -205,6 +212,25 @@ grpc::string ProtoFileParser::GetMessageTypeFromMethod(
: method_desc->output_type()->full_name();
}
+bool ProtoFileParser::IsStreaming(const grpc::string& method, bool is_request) {
+ has_error_ = false;
+
+ grpc::string full_method_name = GetFullMethodName(method);
+ if (has_error_) {
+ return false;
+ }
+
+ const protobuf::MethodDescriptor* method_desc =
+ desc_pool_->FindMethodByName(full_method_name);
+ if (!method_desc) {
+ LogError("Method not found");
+ return false;
+ }
+
+ return is_request ? method_desc->client_streaming()
+ : method_desc->server_streaming();
+}
+
grpc::string ProtoFileParser::GetSerializedProtoFromMethod(
const grpc::string& method, const grpc::string& text_format_proto,
bool is_request) {
diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h
index c1070a37b5..23d311ef8f 100644
--- a/test/cpp/util/proto_file_parser.h
+++ b/test/cpp/util/proto_file_parser.h
@@ -84,6 +84,8 @@ class ProtoFileParser {
const grpc::string& message_type_name,
const grpc::string& serialized_proto);
+ bool IsStreaming(const grpc::string& method, bool is_request);
+
bool HasError() const { return has_error_; }
void LogError(const grpc::string& error_msg);
@@ -104,6 +106,7 @@ class ProtoFileParser {
std::unique_ptr<protobuf::DynamicMessageFactory> dynamic_factory_;
std::unique_ptr<grpc::protobuf::Message> request_prototype_;
std::unique_ptr<grpc::protobuf::Message> response_prototype_;
+ std::unordered_map<grpc::string, grpc::string> known_methods_;
std::vector<const protobuf::ServiceDescriptor*> service_desc_list_;
};