diff options
Diffstat (limited to 'test/cpp')
28 files changed, 1305 insertions, 299 deletions
diff --git a/test/cpp/common/alarm_cpp_test.cc b/test/cpp/common/alarm_cpp_test.cc index a05ac30b1c..41085174a4 100644 --- a/test/cpp/common/alarm_cpp_test.cc +++ b/test/cpp/common/alarm_cpp_test.cc @@ -43,12 +43,12 @@ namespace { TEST(AlarmTest, RegularExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), junk); + Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(1), junk); void* output_tag; bool ok; const CompletionQueue::NextStatus status = cq.AsyncNext( - (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2)); + (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(2)); EXPECT_EQ(status, CompletionQueue::GOT_EVENT); EXPECT_TRUE(ok); @@ -65,7 +65,7 @@ TEST(AlarmTest, RegularExpiryChrono) { void* output_tag; bool ok; const CompletionQueue::NextStatus status = cq.AsyncNext( - (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2)); + (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(2)); EXPECT_EQ(status, CompletionQueue::GOT_EVENT); EXPECT_TRUE(ok); @@ -75,12 +75,12 @@ TEST(AlarmTest, RegularExpiryChrono) { TEST(AlarmTest, ZeroExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0), junk); + Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(0), junk); void* output_tag; bool ok; const CompletionQueue::NextStatus status = cq.AsyncNext( - (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0)); + (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(0)); EXPECT_EQ(status, CompletionQueue::GOT_EVENT); EXPECT_TRUE(ok); @@ -90,12 +90,12 @@ TEST(AlarmTest, ZeroExpiry) { TEST(AlarmTest, NegativeExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(-1), junk); + Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(-1), junk); void* output_tag; bool ok; const CompletionQueue::NextStatus status = cq.AsyncNext( - (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0)); + (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(0)); EXPECT_EQ(status, CompletionQueue::GOT_EVENT); EXPECT_TRUE(ok); @@ -105,13 +105,13 @@ TEST(AlarmTest, NegativeExpiry) { TEST(AlarmTest, Cancellation) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), junk); + Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(2), junk); alarm.Cancel(); void* output_tag; bool ok; const CompletionQueue::NextStatus status = cq.AsyncNext( - (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); + (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(1)); EXPECT_EQ(status, CompletionQueue::GOT_EVENT); EXPECT_FALSE(ok); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 2ce3f2f7bd..32e8a41795 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -42,6 +42,7 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/tls.h> @@ -228,12 +229,7 @@ class TestScenario { : disable_blocking(non_block), credentials_type(creds_type), message_content(content) {} - void Log() const { - gpr_log( - GPR_INFO, - "Scenario: disable_blocking %d, credentials %s, message size %" PRIuPTR, - disable_blocking, credentials_type.c_str(), message_content.size()); - } + void Log() const; bool disable_blocking; // Although the below grpc::string's are logically const, we can't declare // them const because of a limitation in the way old compilers (e.g., gcc-4.4) @@ -242,6 +238,20 @@ class TestScenario { grpc::string message_content; }; +static std::ostream& operator<<(std::ostream& out, + const TestScenario& scenario) { + return out << "TestScenario{disable_blocking=" + << (scenario.disable_blocking ? "true" : "false") + << ", credentials='" << scenario.credentials_type + << "', message_size=" << scenario.message_content.size() << "}"; +} + +void TestScenario::Log() const { + std::ostringstream out; + out << *this; + gpr_log(GPR_DEBUG, "%s", out.str().c_str()); +} + class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { protected: AsyncEnd2endTest() { GetParam().Log(); } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 1a1a94e87c..47e5c5bd77 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -209,10 +209,7 @@ class TestScenario { public: TestScenario(bool proxy, const grpc::string& creds_type) : use_proxy(proxy), credentials_type(creds_type) {} - void Log() const { - gpr_log(GPR_INFO, "Scenario: proxy %d, credentials %s", use_proxy, - credentials_type.c_str()); - } + void Log() const; bool use_proxy; // Although the below grpc::string is logically const, we can't declare // them const because of a limitation in the way old compilers (e.g., gcc-4.4) @@ -220,6 +217,19 @@ class TestScenario { grpc::string credentials_type; }; +static std::ostream& operator<<(std::ostream& out, + const TestScenario& scenario) { + return out << "TestScenario{use_proxy=" + << (scenario.use_proxy ? "true" : "false") << ", credentials='" + << scenario.credentials_type << "'}"; +} + +void TestScenario::Log() const { + std::ostringstream out; + out << *this; + gpr_log(GPR_DEBUG, "%s", out.str().c_str()); +} + class End2endTest : public ::testing::TestWithParam<TestScenario> { protected: End2endTest() @@ -636,7 +646,7 @@ TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5); } -TEST_P(End2endTest, SimpleRpcWithCustomeUserAgentPrefix) { +TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) { user_agent_prefix_ = "custom_prefix"; ResetStub(); EchoRequest request; @@ -891,6 +901,8 @@ TEST_P(End2endTest, RpcMaxMessageSize) { EchoRequest request; EchoResponse response; request.set_message(string(kMaxMessageSize_ * 2, 'a')); + // cancelled is not guaranteed to appear before the end of the service handler + request.mutable_param()->set_skip_cancelled_check(true); ClientContext context; Status s = stub_->Echo(&context, request, &response); @@ -1293,6 +1305,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { EXPECT_FALSE(s.ok()); EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); } + TEST_P(SecureEnd2endTest, SetPerCallCredentials) { ResetStub(); EchoRequest request; diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc index 191d729a9e..82ccf436f8 100644 --- a/test/cpp/grpclb/grpclb_api_test.cc +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -63,7 +63,7 @@ grpc::string PackedStringToIp(const grpc_grpclb_ip_address& pb_ip) { } else { abort(); } - GPR_ASSERT(inet_ntop(af, pb_ip.bytes, ip_str, 46) != NULL); + GPR_ASSERT(inet_ntop(af, (void*)pb_ip.bytes, ip_str, 46) != NULL); return ip_str; } diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 3637190b6d..4b8a434c78 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -170,7 +170,7 @@ static grpc_slice build_response_payload_slice( static void drain_cq(grpc_completion_queue *cq) { grpc_event ev; do { - ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + ev = grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), NULL); } while (ev.type != GRPC_QUEUE_SHUTDOWN); } @@ -288,7 +288,8 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; - op->data.send_status_from_server.status_details = "xyz"; + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; op->flags = 0; op->reserved = NULL; op++; @@ -335,7 +336,7 @@ static void start_backend_server(server_fixture *sf) { GPR_ASSERT(GRPC_CALL_OK == error); gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport); ev = grpc_completion_queue_next(sf->cq, - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL); + grpc_timeout_seconds_to_deadline(60), NULL); if (!ev.success) { gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport); cq_verifier_destroy(cqv); @@ -379,7 +380,7 @@ static void start_backend_server(server_fixture *sf) { error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); GPR_ASSERT(GRPC_CALL_OK == error); ev = grpc_completion_queue_next( - sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL); + sf->cq, grpc_timeout_seconds_to_deadline(3), NULL); if (ev.type == GRPC_OP_COMPLETE && ev.success) { GPR_ASSERT(ev.tag = tag(102)); if (request_payload_recv == NULL) { @@ -409,7 +410,7 @@ static void start_backend_server(server_fixture *sf) { grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); GPR_ASSERT(GRPC_CALL_OK == error); ev = grpc_completion_queue_next( - sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL); + sf->cq, grpc_timeout_seconds_to_deadline(3), NULL); if (ev.type == GRPC_OP_COMPLETE && ev.success) { GPR_ASSERT(ev.tag = tag(103)); } else { @@ -433,7 +434,9 @@ static void start_backend_server(server_fixture *sf) { op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; - op->data.send_status_from_server.status_details = "Backend server out a-ok"; + grpc_slice status_details = + grpc_slice_from_static_string("Backend server out a-ok"); + op->data.send_status_from_server.status_details = &status_details; op->flags = 0; op->reserved = NULL; op++; @@ -462,8 +465,7 @@ static void perform_request(client_fixture *cf) { grpc_metadata_array trailing_metadata_recv; grpc_status_code status; grpc_call_error error; - char *details = NULL; - size_t details_capacity = 0; + grpc_slice details; grpc_byte_buffer *request_payload; grpc_byte_buffer *response_payload_recv; int i; @@ -472,9 +474,11 @@ static void perform_request(client_fixture *cf) { grpc_slice request_payload_slice = grpc_slice_from_copied_string("hello world"); + grpc_slice host = grpc_slice_from_static_string("foo.test.google.fr:1234"); c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS, - cf->cq, "/foo", "foo.test.google.fr:1234", - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL); + cf->cq, grpc_slice_from_static_string("/foo"), + &host, grpc_timeout_seconds_to_deadline(5), + NULL); gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c); GPR_ASSERT(c); char *peer; @@ -497,7 +501,6 @@ static void perform_request(client_fixture *cf) { op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; - op->data.recv_status_on_client.status_details_capacity = &details_capacity; op->flags = 0; op->reserved = NULL; op++; @@ -553,7 +556,7 @@ static void perform_request(client_fixture *cf) { grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv); - gpr_free(details); + grpc_slice_unref(details); gpr_log(GPR_INFO, "Client call (peer %s) DESTROYED.", peer); gpr_free(peer); } @@ -602,7 +605,7 @@ static void teardown_server(server_fixture *sf) { gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport); grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000)); GPR_ASSERT(grpc_completion_queue_pluck( - sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) + sf->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(sf->server); gpr_thd_join(sf->tid); diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 3265554444..8a00b61cef 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -32,8 +32,7 @@ */ #include <memory> - -#include <unistd.h> +#include <unordered_map> #include <gflags/gflags.h> #include <grpc++/channel.h> @@ -108,119 +107,78 @@ int main(int argc, char** argv) { grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case), true, FLAGS_do_not_abort_on_transient_failures); - if (FLAGS_test_case == "empty_unary") { - client.DoEmpty(); - } else if (FLAGS_test_case == "large_unary") { - client.DoLargeUnary(); - } else if (FLAGS_test_case == "server_compressed_unary") { - client.DoServerCompressedUnary(); - } else if (FLAGS_test_case == "client_compressed_unary") { - client.DoClientCompressedUnary(); - } else if (FLAGS_test_case == "client_streaming") { - client.DoRequestStreaming(); - } else if (FLAGS_test_case == "server_streaming") { - client.DoResponseStreaming(); - } else if (FLAGS_test_case == "server_compressed_streaming") { - client.DoServerCompressedStreaming(); - } else if (FLAGS_test_case == "client_compressed_streaming") { - client.DoClientCompressedStreaming(); - } else if (FLAGS_test_case == "slow_consumer") { - client.DoResponseStreamingWithSlowConsumer(); - } else if (FLAGS_test_case == "half_duplex") { - client.DoHalfDuplex(); - } else if (FLAGS_test_case == "ping_pong") { - client.DoPingPong(); - } else if (FLAGS_test_case == "cancel_after_begin") { - client.DoCancelAfterBegin(); - } else if (FLAGS_test_case == "cancel_after_first_response") { - client.DoCancelAfterFirstResponse(); - } else if (FLAGS_test_case == "timeout_on_sleeping_server") { - client.DoTimeoutOnSleepingServer(); - } else if (FLAGS_test_case == "empty_stream") { - client.DoEmptyStream(); - } else if (FLAGS_test_case == "compute_engine_creds") { - client.DoComputeEngineCreds(FLAGS_default_service_account, - FLAGS_oauth_scope); - } else if (FLAGS_test_case == "jwt_token_creds") { - grpc::string json_key = GetServiceAccountJsonKey(); - client.DoJwtTokenCreds(json_key); - } else if (FLAGS_test_case == "oauth2_auth_token") { - client.DoOauth2AuthToken(FLAGS_default_service_account, FLAGS_oauth_scope); - } else if (FLAGS_test_case == "per_rpc_creds") { - grpc::string json_key = GetServiceAccountJsonKey(); - client.DoPerRpcCreds(json_key); - } else if (FLAGS_test_case == "status_code_and_message") { - client.DoStatusWithMessage(); - } else if (FLAGS_test_case == "custom_metadata") { - client.DoCustomMetadata(); - } else if (FLAGS_test_case == "unimplemented_method") { - client.DoUnimplementedMethod(); - } else if (FLAGS_test_case == "unimplemented_service") { - client.DoUnimplementedService(); - } else if (FLAGS_test_case == "cacheable_unary") { - client.DoCacheableUnary(); - } else if (FLAGS_test_case == "all") { - client.DoEmpty(); - client.DoLargeUnary(); - client.DoClientCompressedUnary(); - client.DoServerCompressedUnary(); - client.DoRequestStreaming(); - client.DoResponseStreaming(); - client.DoClientCompressedStreaming(); - client.DoServerCompressedStreaming(); - client.DoHalfDuplex(); - client.DoPingPong(); - client.DoCancelAfterBegin(); - client.DoCancelAfterFirstResponse(); - client.DoTimeoutOnSleepingServer(); - client.DoEmptyStream(); - client.DoStatusWithMessage(); - client.DoCustomMetadata(); - client.DoUnimplementedMethod(); - client.DoUnimplementedService(); - client.DoCacheableUnary(); - // service_account_creds and jwt_token_creds can only run with ssl. - if (FLAGS_use_tls) { - grpc::string json_key = GetServiceAccountJsonKey(); - client.DoJwtTokenCreds(json_key); - client.DoOauth2AuthToken(FLAGS_default_service_account, - FLAGS_oauth_scope); - client.DoPerRpcCreds(json_key); + + std::unordered_map<grpc::string, std::function<bool()>> actions; + actions["empty_unary"] = + std::bind(&grpc::testing::InteropClient::DoEmpty, &client); + actions["large_unary"] = + std::bind(&grpc::testing::InteropClient::DoLargeUnary, &client); + actions["server_compressed_unary"] = std::bind( + &grpc::testing::InteropClient::DoServerCompressedUnary, &client); + actions["client_compressed_unary"] = std::bind( + &grpc::testing::InteropClient::DoClientCompressedUnary, &client); + actions["client_streaming"] = + std::bind(&grpc::testing::InteropClient::DoRequestStreaming, &client); + actions["server_streaming"] = + std::bind(&grpc::testing::InteropClient::DoResponseStreaming, &client); + actions["server_compressed_streaming"] = std::bind( + &grpc::testing::InteropClient::DoServerCompressedStreaming, &client); + actions["client_compressed_streaming"] = std::bind( + &grpc::testing::InteropClient::DoClientCompressedStreaming, &client); + actions["slow_consumer"] = std::bind( + &grpc::testing::InteropClient::DoResponseStreamingWithSlowConsumer, + &client); + actions["half_duplex"] = + std::bind(&grpc::testing::InteropClient::DoHalfDuplex, &client); + actions["ping_pong"] = + std::bind(&grpc::testing::InteropClient::DoPingPong, &client); + actions["cancel_after_begin"] = + std::bind(&grpc::testing::InteropClient::DoCancelAfterBegin, &client); + actions["cancel_after_first_response"] = std::bind( + &grpc::testing::InteropClient::DoCancelAfterFirstResponse, &client); + actions["timeout_on_sleeping_server"] = std::bind( + &grpc::testing::InteropClient::DoTimeoutOnSleepingServer, &client); + actions["empty_stream"] = + std::bind(&grpc::testing::InteropClient::DoEmptyStream, &client); + if (FLAGS_use_tls) { + actions["compute_engine_creds"] = + std::bind(&grpc::testing::InteropClient::DoComputeEngineCreds, &client, + FLAGS_default_service_account, FLAGS_oauth_scope); + actions["jwt_token_creds"] = + std::bind(&grpc::testing::InteropClient::DoJwtTokenCreds, &client, + GetServiceAccountJsonKey()); + actions["oauth2_auth_token"] = + std::bind(&grpc::testing::InteropClient::DoOauth2AuthToken, &client, + FLAGS_default_service_account, FLAGS_oauth_scope); + actions["per_rpc_creds"] = + std::bind(&grpc::testing::InteropClient::DoPerRpcCreds, &client, + GetServiceAccountJsonKey()); + } + actions["status_code_and_message"] = + std::bind(&grpc::testing::InteropClient::DoStatusWithMessage, &client); + actions["custom_metadata"] = + std::bind(&grpc::testing::InteropClient::DoCustomMetadata, &client); + actions["unimplemented_method"] = + std::bind(&grpc::testing::InteropClient::DoUnimplementedMethod, &client); + actions["unimplemented_service"] = + std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client); + // actions["cacheable_unary"] = + // std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client); + + if (FLAGS_test_case == "all") { + for (const auto& action : actions) { + action.second(); } - // compute_engine_creds only runs in GCE. + } else if (actions.find(FLAGS_test_case) != actions.end()) { + actions.find(FLAGS_test_case)->second(); } else { - const char* testcases[] = {"all", - "cacheable_unary", - "cancel_after_begin", - "cancel_after_first_response", - "client_compressed_streaming", - "client_compressed_unary", - "client_streaming", - "compute_engine_creds", - "custom_metadata", - "empty_stream", - "empty_unary", - "half_duplex", - "jwt_token_creds", - "large_unary", - "oauth2_auth_token", - "oauth2_auth_token", - "per_rpc_creds", - "per_rpc_creds", - "ping_pong", - "server_compressed_streaming", - "server_compressed_unary", - "server_streaming", - "status_code_and_message", - "timeout_on_sleeping_server", - "unimplemented_method", - "unimplemented_service"}; - char* joined_testcases = - gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL); - + grpc::string test_cases; + for (const auto& action : actions) { + if (!test_cases.empty()) test_cases += "\n"; + test_cases += action.first; + } gpr_log(GPR_ERROR, "Unsupported test case %s. Valid options are\n%s", - FLAGS_test_case.c_str(), joined_testcases); - gpr_free(joined_testcases); + FLAGS_test_case.c_str(), test_cases.c_str()); ret = 1; } diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc index 91564e5dce..d3192ad0c9 100644 --- a/test/cpp/interop/client_helper.cc +++ b/test/cpp/interop/client_helper.cc @@ -33,8 +33,6 @@ #include "test/cpp/interop/client_helper.h" -#include <unistd.h> - #include <fstream> #include <memory> #include <sstream> diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index d1242627ef..b7f2723c39 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -31,7 +31,6 @@ * */ -#include <unistd.h> #include <cinttypes> #include <fstream> #include <memory> @@ -43,6 +42,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/lib/transport/byte_stream.h" @@ -109,7 +109,10 @@ TestService::Stub* InteropClient::ServiceStub::Get() { UnimplementedService::Stub* InteropClient::ServiceStub::GetUnimplementedServiceStub() { - return UnimplementedService::NewStub(channel_).get(); + if (unimplemented_service_stub_ == nullptr) { + unimplemented_service_stub_ = UnimplementedService::NewStub(channel_); + } + return unimplemented_service_stub_.get(); } void InteropClient::ServiceStub::Reset(std::shared_ptr<Channel> channel) { @@ -615,7 +618,9 @@ bool InteropClient::DoResponseStreamingWithSlowConsumer() { GPR_ASSERT(response.payload().body() == grpc::string(kResponseMessageSize, '\0')); gpr_log(GPR_DEBUG, "received message %d", i); - usleep(kReceiveDelayMilliSeconds * 1000); + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN))); ++i; } @@ -943,7 +948,7 @@ bool InteropClient::DoCustomMetadata() { const auto& server_initial_metadata = context.GetServerInitialMetadata(); auto iter = server_initial_metadata.find(kEchoInitialMetadataKey); GPR_ASSERT(iter != server_initial_metadata.end()); - GPR_ASSERT(iter->second.data() == kInitialMetadataValue); + GPR_ASSERT(iter->second == kInitialMetadataValue); const auto& server_trailing_metadata = context.GetServerTrailingMetadata(); iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey); GPR_ASSERT(iter != server_trailing_metadata.end()); @@ -994,7 +999,7 @@ bool InteropClient::DoCustomMetadata() { const auto& server_initial_metadata = context.GetServerInitialMetadata(); auto iter = server_initial_metadata.find(kEchoInitialMetadataKey); GPR_ASSERT(iter != server_initial_metadata.end()); - GPR_ASSERT(iter->second.data() == kInitialMetadataValue); + GPR_ASSERT(iter->second == kInitialMetadataValue); const auto& server_trailing_metadata = context.GetServerTrailingMetadata(); iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey); GPR_ASSERT(iter != server_trailing_metadata.end()); diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 7ec7ebee20..74f4db6b78 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -107,6 +107,7 @@ class InteropClient { private: std::unique_ptr<TestService::Stub> stub_; + std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_; std::shared_ptr<Channel> channel_; bool new_stub_every_call_; // If true, a new stub is returned by every // Get() call diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index 956840ba70..5a810b45ef 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -31,8 +31,6 @@ * */ -#include <unistd.h> - #include <fstream> #include <memory> #include <sstream> @@ -45,6 +43,7 @@ #include <grpc++/server_context.h> #include <grpc/grpc.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/lib/support/string.h" @@ -91,7 +90,9 @@ void MaybeEchoMetadata(ServerContext* context) { auto iter = client_metadata.find(kEchoInitialMetadataKey); if (iter != client_metadata.end()) { - context->AddInitialMetadata(kEchoInitialMetadataKey, iter->second.data()); + context->AddInitialMetadata( + kEchoInitialMetadataKey, + grpc::string(iter->second.begin(), iter->second.end())); } iter = client_metadata.find(kEchoTrailingBinMetadataKey); if (iter != client_metadata.end()) { @@ -105,7 +106,9 @@ void MaybeEchoMetadata(ServerContext* context) { if (iter != client_metadata.end()) { iter = client_metadata.find("user-agent"); if (iter != client_metadata.end()) { - context->AddInitialMetadata(kEchoUserAgentKey, iter->second.data()); + context->AddInitialMetadata( + kEchoUserAgentKey, + grpc::string(iter->second.begin(), iter->second.end())); } } } @@ -346,6 +349,7 @@ void grpc::testing::interop::RunServer( std::unique_ptr<Server> server(builder.BuildAndStart()); gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str()); while (!gpr_atm_no_barrier_load(&g_got_sigint)) { - sleep(5); + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(5, GPR_TIMESPAN))); } } diff --git a/test/cpp/interop/interop_server_bootstrap.cc b/test/cpp/interop/interop_server_bootstrap.cc index 99518c6943..7cbf221a03 100644 --- a/test/cpp/interop/interop_server_bootstrap.cc +++ b/test/cpp/interop/interop_server_bootstrap.cc @@ -32,7 +32,6 @@ */ #include <signal.h> -#include <unistd.h> #include "test/cpp/interop/server_helper.h" #include "test/cpp/util/test_config.h" diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc index 53d51e80e7..634d0a90fc 100644 --- a/test/cpp/interop/reconnect_interop_server.cc +++ b/test/cpp/interop/reconnect_interop_server.cc @@ -34,7 +34,6 @@ // Test description at doc/connection-backoff-interop-test-description.md #include <signal.h> -#include <unistd.h> #include <condition_variable> #include <memory> diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc index 6c0bf80488..c3e96c572c 100644 --- a/test/cpp/microbenchmarks/bm_fullstack.cc +++ b/test/cpp/microbenchmarks/bm_fullstack.cc @@ -84,6 +84,16 @@ static class InitializeStuff { * FIXTURES */ +static void ApplyCommonServerBuilderConfig(ServerBuilder* b) { + b->SetMaxReceiveMessageSize(INT_MAX); + b->SetMaxSendMessageSize(INT_MAX); +} + +static void ApplyCommonChannelArguments(ChannelArguments* c) { + c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); + c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); +} + class FullstackFixture { public: FullstackFixture(Service* service, const grpc::string& address) { @@ -91,8 +101,11 @@ class FullstackFixture { b.AddListeningPort(address, InsecureServerCredentials()); cq_ = b.AddCompletionQueue(true); b.RegisterService(service); + ApplyCommonServerBuilderConfig(&b); server_ = b.BuildAndStart(); - channel_ = CreateChannel(address, InsecureChannelCredentials()); + ChannelArguments args; + ApplyCommonChannelArguments(&args); + channel_ = CreateCustomChannel(address, InsecureChannelCredentials(), args); } virtual ~FullstackFixture() { @@ -117,6 +130,8 @@ class TCP : public FullstackFixture { public: TCP(Service* service) : FullstackFixture(service, MakeAddress()) {} + void Finish(benchmark::State& state) {} + private: static grpc::string MakeAddress() { int port = grpc_pick_unused_port_or_die(); @@ -130,6 +145,8 @@ class UDS : public FullstackFixture { public: UDS(Service* service) : FullstackFixture(service, MakeAddress()) {} + void Finish(benchmark::State& state) {} + private: static grpc::string MakeAddress() { int port = grpc_pick_unused_port_or_die(); // just for a unique id - not a @@ -146,6 +163,7 @@ class EndpointPairFixture { ServerBuilder b; cq_ = b.AddCompletionQueue(true); b.RegisterService(service); + ApplyCommonServerBuilderConfig(&b); server_ = b.BuildAndStart(); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -174,6 +192,7 @@ class EndpointPairFixture { { ChannelArguments args; args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); + ApplyCommonChannelArguments(&args); grpc_channel_args c_args = args.c_channel_args(); grpc_transport* transport = @@ -213,6 +232,8 @@ class SockPair : public EndpointPairFixture { : EndpointPairFixture(service, grpc_iomgr_create_endpoint_pair( "test", initialize_stuff.rq(), 8192)) { } + + void Finish(benchmark::State& state) {} }; class InProcessCHTTP2 : public EndpointPairFixture { @@ -220,10 +241,20 @@ class InProcessCHTTP2 : public EndpointPairFixture { InProcessCHTTP2(Service* service) : EndpointPairFixture(service, MakeEndpoints()) {} + void Finish(benchmark::State& state) { + std::ostringstream out; + out << "writes/iteration:" + << ((double)stats_.num_writes / (double)state.iterations()); + state.SetLabel(out.str()); + } + private: + grpc_passthru_endpoint_stats stats_; + grpc_endpoint_pair MakeEndpoints() { grpc_endpoint_pair p; - grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq()); + grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(), + &stats_); return p; } }; @@ -343,6 +374,12 @@ static void BM_UnaryPingPong(benchmark::State& state) { EchoRequest send_request; EchoResponse send_response; EchoResponse recv_response; + if (state.range(0) > 0) { + send_request.set_message(std::string(state.range(0), 'a')); + } + if (state.range(1) > 0) { + send_response.set_message(std::string(state.range(1), 'a')); + } Status recv_status; struct ServerEnv { ServerContext ctx; @@ -365,6 +402,7 @@ static void BM_UnaryPingPong(benchmark::State& state) { std::unique_ptr<EchoTestService::Stub> stub( EchoTestService::NewStub(fixture->channel())); while (state.KeepRunning()) { + recv_response.Clear(); ClientContext cli_ctx; ClientContextMutator cli_ctx_mut(&cli_ctx); std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( @@ -393,56 +431,216 @@ static void BM_UnaryPingPong(benchmark::State& state) { service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, fixture->cq(), fixture->cq(), tag(slot)); } + fixture->Finish(state); fixture.reset(); server_env[0]->~ServerEnv(); server_env[1]->~ServerEnv(); + state.SetBytesProcessed(state.range(0) * state.iterations() + + state.range(1) * state.iterations()); +} + +template <class Fixture> +static void BM_PumpStreamClientToServer(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoRequest send_request; + EchoRequest recv_request; + if (state.range(0) > 0) { + send_request.set_message(std::string(state.range(0), 'a')); + } + Status recv_status; + ServerContext svr_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + int need_tags = (1 << 0) | (1 << 1); + void* t; + bool ok; + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + response_rw.Read(&recv_request, tag(0)); + while (state.KeepRunning()) { + request_rw->Write(send_request, tag(1)); + while (true) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + if (t == tag(0)) { + response_rw.Read(&recv_request, tag(0)); + } else if (t == tag(1)) { + break; + } else { + GPR_ASSERT(false); + } + } + } + request_rw->WritesDone(tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + } + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(state.range(0) * state.iterations()); +} + +template <class Fixture> +static void BM_PumpStreamServerToClient(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + if (state.range(0) > 0) { + send_response.set_message(std::string(state.range(0), 'a')); + } + Status recv_status; + ServerContext svr_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + int need_tags = (1 << 0) | (1 << 1); + void* t; + bool ok; + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + request_rw->Read(&recv_response, tag(0)); + while (state.KeepRunning()) { + response_rw.Write(send_response, tag(1)); + while (true) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + if (t == tag(0)) { + request_rw->Read(&recv_response, tag(0)); + } else if (t == tag(1)) { + break; + } else { + GPR_ASSERT(false); + } + } + } + response_rw.Finish(Status::OK, tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + } + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(state.range(0) * state.iterations()); } /******************************************************************************* * CONFIGURATIONS */ -BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP, NoOpMutator, NoOpMutator); -BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS, NoOpMutator, NoOpMutator); -BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair, NoOpMutator, NoOpMutator); -BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, NoOpMutator); +static void SweepSizesArgs(benchmark::internal::Benchmark* b) { + b->Args({0, 0}); + for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { + b->Args({i, 0}); + b->Args({0, i}); + b->Args({i, i}); + } +} + +BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP, NoOpMutator, NoOpMutator) + ->Apply(SweepSizesArgs); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS, NoOpMutator, NoOpMutator) + ->Args({0, 0}); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair, NoOpMutator, NoOpMutator) + ->Args({0, 0}); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, NoOpMutator) + ->Apply(SweepSizesArgs); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, - Client_AddMetadata<RandomBinaryMetadata<10>, 1>, - NoOpMutator); + Client_AddMetadata<RandomBinaryMetadata<10>, 1>, NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, - Client_AddMetadata<RandomBinaryMetadata<31>, 1>, - NoOpMutator); + Client_AddMetadata<RandomBinaryMetadata<31>, 1>, NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, Client_AddMetadata<RandomBinaryMetadata<100>, 1>, - NoOpMutator); + NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, - Client_AddMetadata<RandomBinaryMetadata<10>, 2>, - NoOpMutator); + Client_AddMetadata<RandomBinaryMetadata<10>, 2>, NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, - Client_AddMetadata<RandomBinaryMetadata<31>, 2>, - NoOpMutator); + Client_AddMetadata<RandomBinaryMetadata<31>, 2>, NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, Client_AddMetadata<RandomBinaryMetadata<100>, 2>, - NoOpMutator); + NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, - Server_AddInitialMetadata<RandomBinaryMetadata<10>, 1>); + Server_AddInitialMetadata<RandomBinaryMetadata<10>, 1>) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, - Server_AddInitialMetadata<RandomBinaryMetadata<31>, 1>); + Server_AddInitialMetadata<RandomBinaryMetadata<31>, 1>) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, - Server_AddInitialMetadata<RandomBinaryMetadata<100>, 1>); + Server_AddInitialMetadata<RandomBinaryMetadata<100>, 1>) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, - Client_AddMetadata<RandomAsciiMetadata<10>, 1>, NoOpMutator); + Client_AddMetadata<RandomAsciiMetadata<10>, 1>, NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, - Client_AddMetadata<RandomAsciiMetadata<31>, 1>, NoOpMutator); + Client_AddMetadata<RandomAsciiMetadata<31>, 1>, NoOpMutator) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, - Client_AddMetadata<RandomAsciiMetadata<100>, 1>, - NoOpMutator); + Client_AddMetadata<RandomAsciiMetadata<100>, 1>, NoOpMutator) + ->Args({0, 0}); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, + Server_AddInitialMetadata<RandomAsciiMetadata<10>, 1>) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, - Server_AddInitialMetadata<RandomAsciiMetadata<10>, 1>); + Server_AddInitialMetadata<RandomAsciiMetadata<31>, 1>) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, - Server_AddInitialMetadata<RandomAsciiMetadata<31>, 1>); + Server_AddInitialMetadata<RandomAsciiMetadata<100>, 1>) + ->Args({0, 0}); BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, - Server_AddInitialMetadata<RandomAsciiMetadata<100>, 1>); + Server_AddInitialMetadata<RandomAsciiMetadata<10>, 100>) + ->Args({0, 0}); + +BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP) + ->Range(0, 128 * 1024 * 1024); +BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS) + ->Range(0, 128 * 1024 * 1024); +BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, SockPair) + ->Range(0, 128 * 1024 * 1024); +BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, InProcessCHTTP2) + ->Range(0, 128 * 1024 * 1024); +BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, TCP) + ->Range(0, 128 * 1024 * 1024); +BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, UDS) + ->Range(0, 128 * 1024 * 1024); +BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair) + ->Range(0, 128 * 1024 * 1024); +BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2) + ->Range(0, 128 * 1024 * 1024); } // namespace testing } // namespace grpc diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc new file mode 100644 index 0000000000..7a914c1547 --- /dev/null +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -0,0 +1,268 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/channel.h> +#include <grpc++/create_channel.h> +#include <grpc++/impl/grpc_library.h> +#include <grpc++/security/credentials.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc/support/log.h> +#include <gtest/gtest.h> + +extern "C" { +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/endpoint_pair.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/server.h" +#include "test/core/util/passthru_endpoint.h" +#include "test/core/util/port.h" +} +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/test_config.h" + +namespace grpc { +namespace testing { + +static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } + +static void ApplyCommonServerBuilderConfig(ServerBuilder* b) { + b->SetMaxReceiveMessageSize(INT_MAX); + b->SetMaxSendMessageSize(INT_MAX); +} + +static void ApplyCommonChannelArguments(ChannelArguments* c) { + c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); + c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); +} + +static class InitializeStuff { + public: + InitializeStuff() { + init_lib_.init(); + rq_ = grpc_resource_quota_create("bm"); + } + + ~InitializeStuff() { init_lib_.shutdown(); } + + grpc_resource_quota* rq() { return rq_; } + + private: + internal::GrpcLibrary init_lib_; + grpc_resource_quota* rq_; +} initialize_stuff; + +class EndpointPairFixture { + public: + EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) { + ServerBuilder b; + cq_ = b.AddCompletionQueue(true); + b.RegisterService(service); + ApplyCommonServerBuilderConfig(&b); + server_ = b.BuildAndStart(); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + /* add server endpoint to server_ */ + { + const grpc_channel_args* server_args = + grpc_server_get_channel_args(server_->c_server()); + grpc_transport* transport = grpc_create_chttp2_transport( + &exec_ctx, server_args, endpoints.server, 0 /* is_client */); + + grpc_pollset** pollsets; + size_t num_pollsets = 0; + grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets); + + for (size_t i = 0; i < num_pollsets; i++) { + grpc_endpoint_add_to_pollset(&exec_ctx, endpoints.server, pollsets[i]); + } + + grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport, + NULL, server_args); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + } + + /* create channel */ + { + ChannelArguments args; + args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); + ApplyCommonChannelArguments(&args); + + grpc_channel_args c_args = args.c_channel_args(); + grpc_transport* transport = + grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1); + GPR_ASSERT(transport); + grpc_channel* channel = grpc_channel_create( + &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + + channel_ = CreateChannelInternal("", channel); + } + + grpc_exec_ctx_finish(&exec_ctx); + } + + virtual ~EndpointPairFixture() { + server_->Shutdown(); + cq_->Shutdown(); + void* tag; + bool ok; + while (cq_->Next(&tag, &ok)) { + } + } + + ServerCompletionQueue* cq() { return cq_.get(); } + std::shared_ptr<Channel> channel() { return channel_; } + + private: + std::unique_ptr<Server> server_; + std::unique_ptr<ServerCompletionQueue> cq_; + std::shared_ptr<Channel> channel_; +}; + +class InProcessCHTTP2 : public EndpointPairFixture { + public: + InProcessCHTTP2(Service* service) + : EndpointPairFixture(service, MakeEndpoints()) {} + + int writes_performed() const { return stats_.num_writes; } + + private: + grpc_passthru_endpoint_stats stats_; + + grpc_endpoint_pair MakeEndpoints() { + grpc_endpoint_pair p; + grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(), + &stats_); + return p; + } +}; + +static double UnaryPingPong(int request_size, int response_size) { + const int kIterations = 10000; + + EchoTestService::AsyncService service; + std::unique_ptr<InProcessCHTTP2> fixture(new InProcessCHTTP2(&service)); + EchoRequest send_request; + EchoResponse send_response; + EchoResponse recv_response; + if (request_size > 0) { + send_request.set_message(std::string(request_size, 'a')); + } + if (response_size > 0) { + send_response.set_message(std::string(response_size, 'a')); + } + Status recv_status; + struct ServerEnv { + ServerContext ctx; + EchoRequest recv_request; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; + ServerEnv() : response_writer(&ctx) {} + }; + uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; + ServerEnv* server_env[2] = { + reinterpret_cast<ServerEnv*>(server_env_buffer), + reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; + new (server_env[0]) ServerEnv; + new (server_env[1]) ServerEnv; + service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, + &server_env[0]->response_writer, fixture->cq(), + fixture->cq(), tag(0)); + service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, + &server_env[1]->response_writer, fixture->cq(), + fixture->cq(), tag(1)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + for (int iteration = 0; iteration < kIterations; iteration++) { + recv_response.Clear(); + ClientContext cli_ctx; + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); + void* t; + bool ok; + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + GPR_ASSERT(t == tag(0) || t == tag(1)); + intptr_t slot = reinterpret_cast<intptr_t>(t); + ServerEnv* senv = server_env[slot]; + senv->response_writer.Finish(send_response, Status::OK, tag(3)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + for (int i = (1 << 3) | (1 << 4); i != 0;) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int tagnum = (int)reinterpret_cast<intptr_t>(t); + GPR_ASSERT(i & (1 << tagnum)); + i -= 1 << tagnum; + } + GPR_ASSERT(recv_status.ok()); + + senv->~ServerEnv(); + senv = new (senv) ServerEnv(); + service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, + fixture->cq(), fixture->cq(), tag(slot)); + } + + double writes_per_iteration = + (double)fixture->writes_performed() / (double)kIterations; + + fixture.reset(); + server_env[0]->~ServerEnv(); + server_env[1]->~ServerEnv(); + + return writes_per_iteration; +} + +TEST(WritesPerRpcTest, UnaryPingPong) { + EXPECT_LT(UnaryPingPong(0, 0), 2.05); + EXPECT_LT(UnaryPingPong(1, 0), 2.05); + EXPECT_LT(UnaryPingPong(0, 1), 2.05); + EXPECT_LT(UnaryPingPong(4096, 0), 2.2); + EXPECT_LT(UnaryPingPong(0, 4096), 2.2); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index b1e61865e7..498416c64a 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -153,7 +153,6 @@ class SynchronousStreamingClient final : public SynchronousClient { if (*stream) { (*stream)->WritesDone(); Status s = (*stream)->Finish(); - EXPECT_TRUE(s.ok()); if (!s.ok()) { gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, s.error_message().c_str()); @@ -173,7 +172,11 @@ class SynchronousStreamingClient final : public SynchronousClient { entry->set_value((UsageTimer::Now() - start) * 1e9); return true; } - return false; + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + context_[thread_idx].~ClientContext(); + new (&context_[thread_idx]) ClientContext(); + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + return true; } private: diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py index 188d6196e5..2f035abedd 100755 --- a/test/cpp/qps/gen_build_yaml.py +++ b/test/cpp/qps/gen_build_yaml.py @@ -92,7 +92,8 @@ print yaml.dump({ 'defaults': 'boringssl', 'cpu_cost': guess_cpu(scenario_json, False), 'exclude_configs': ['tsan', 'asan'], - 'timeout_seconds': 6*60 + 'timeout_seconds': 6*60, + 'excluded_poll_engines': scenario_json.get('EXCLUDED_POLL_ENGINES', []) } for scenario_json in scenario_config.CXXLanguage().scenarios() if 'scalable' in scenario_json.get('CATEGORIES', []) @@ -109,8 +110,9 @@ print yaml.dump({ 'defaults': 'boringssl', 'cpu_cost': guess_cpu(scenario_json, True), 'exclude_configs': sorted(c for c in configs_from_yaml if c not in ('tsan', 'asan')), - 'timeout_seconds': 6*60 - } + 'timeout_seconds': 6*60, + 'excluded_poll_engines': scenario_json.get('EXCLUDED_POLL_ENGINES', []) + } for scenario_json in scenario_config.CXXLanguage().scenarios() if 'scalable' in scenario_json.get('CATEGORIES', []) ] diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index 8dc50ac6d8..70e2709ac0 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -56,7 +56,7 @@ static void RunQPS() { client_config.set_async_client_threads(8); client_config.set_rpc_type(STREAMING); client_config.mutable_load_params()->mutable_poisson()->set_offered_load( - 1000.0 / GRPC_TEST_SLOWDOWN_FACTOR); + 1000.0 / grpc_test_slowdown_factor()); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); diff --git a/test/cpp/qps/usage_timer.cc b/test/cpp/qps/usage_timer.cc index c6697fbdfd..70ef26e82a 100644 --- a/test/cpp/qps/usage_timer.cc +++ b/test/cpp/qps/usage_timer.cc @@ -39,8 +39,15 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> +#ifdef __linux__ #include <sys/resource.h> #include <sys/time.h> + +static double time_double(struct timeval* tv) { + return tv->tv_sec + 1e-6 * tv->tv_usec; +} +#endif + UsageTimer::UsageTimer() : start_(Sample()) {} double UsageTimer::Now() { @@ -48,8 +55,16 @@ double UsageTimer::Now() { return ts.tv_sec + 1e-9 * ts.tv_nsec; } -static double time_double(struct timeval* tv) { - return tv->tv_sec + 1e-6 * tv->tv_usec; +static void get_resource_usage(double* utime, double* stime) { +#ifdef __linux__ + struct rusage usage; + getrusage(RUSAGE_SELF, &usage); + *utime = time_double(&usage.ru_utime); + *stime = time_double(&usage.ru_stime); +#else + *utime = 0; + *stime = 0; +#endif } static void get_cpu_usage(unsigned long long* total_cpu_time, @@ -74,15 +89,9 @@ static void get_cpu_usage(unsigned long long* total_cpu_time, } UsageTimer::Result UsageTimer::Sample() { - struct rusage usage; - struct timeval tv; - gettimeofday(&tv, NULL); - getrusage(RUSAGE_SELF, &usage); - Result r; - r.wall = time_double(&tv); - r.user = time_double(&usage.ru_utime); - r.system = time_double(&usage.ru_stime); + r.wall = Now(); + get_resource_usage(&r.user, &r.system); r.total_cpu_time = 0; r.idle_cpu_time = 0; get_cpu_usage(&r.total_cpu_time, &r.idle_cpu_time); diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 2068b7c213..e88d0647dd 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -31,7 +31,7 @@ * */ -#include <sys/signal.h> +#include <signal.h> #include <chrono> #include <thread> diff --git a/test/cpp/test/server_context_test_spouse_test.cc b/test/cpp/test/server_context_test_spouse_test.cc index e0d6a2ff67..eb279e2401 100644 --- a/test/cpp/test/server_context_test_spouse_test.cc +++ b/test/cpp/test/server_context_test_spouse_test.cc @@ -36,11 +36,14 @@ #include <cstring> #include <vector> +#include <grpc++/impl/grpc_library.h> #include <gtest/gtest.h> namespace grpc { namespace testing { +static internal::GrpcLibraryInitializer g_initializer; + const char key1[] = "metadata-key1"; const char key2[] = "metadata-key2"; const char val1[] = "metadata-val1"; diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc index 284761c53a..35c8d5d088 100644 --- a/test/cpp/thread_manager/thread_manager_test.cc +++ b/test/cpp/thread_manager/thread_manager_test.cc @@ -31,6 +31,7 @@ *is % allowed in string */ +#include <ctime> #include <memory> #include <string> diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc index a02a8b2ee2..041cc0e4c3 100644 --- a/test/cpp/util/cli_call.cc +++ b/test/cpp/util/cli_call.cc @@ -37,8 +37,6 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/generic/generic_stub.h> #include <grpc++/support/byte_buffer.h> #include <grpc/grpc.h> #include <grpc/slice.h> @@ -56,55 +54,172 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel, const OutgoingMetadataContainer& metadata, IncomingMetadataContainer* server_initial_metadata, IncomingMetadataContainer* server_trailing_metadata) { - std::unique_ptr<grpc::GenericStub> stub(new grpc::GenericStub(channel)); - grpc::ClientContext ctx; + CliCall call(channel, method, metadata); + call.Write(request); + call.WritesDone(); + if (!call.Read(response, server_initial_metadata)) { + fprintf(stderr, "Failed to read response.\n"); + } + return call.Finish(server_trailing_metadata); +} + +CliCall::CliCall(std::shared_ptr<grpc::Channel> channel, + const grpc::string& method, + const OutgoingMetadataContainer& metadata) + : stub_(new grpc::GenericStub(channel)) { + gpr_mu_init(&write_mu_); + gpr_cv_init(&write_cv_); if (!metadata.empty()) { for (OutgoingMetadataContainer::const_iterator iter = metadata.begin(); iter != metadata.end(); ++iter) { - ctx.AddMetadata(iter->first, iter->second); + ctx_.AddMetadata(iter->first, iter->second); } } - grpc::CompletionQueue cq; - std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call( - stub->Call(&ctx, method, &cq, tag(1))); + call_ = stub_->Call(&ctx_, method, &cq_, tag(1)); void* got_tag; bool ok; - cq.Next(&got_tag, &ok); + cq_.Next(&got_tag, &ok); GPR_ASSERT(ok); +} + +CliCall::~CliCall() { + gpr_cv_destroy(&write_cv_); + gpr_mu_destroy(&write_mu_); +} + +void CliCall::Write(const grpc::string& request) { + void* got_tag; + bool ok; grpc_slice s = grpc_slice_from_copied_string(request.c_str()); grpc::Slice req_slice(s, grpc::Slice::STEAL_REF); grpc::ByteBuffer send_buffer(&req_slice, 1); - call->Write(send_buffer, tag(2)); - cq.Next(&got_tag, &ok); - GPR_ASSERT(ok); - call->WritesDone(tag(3)); - cq.Next(&got_tag, &ok); + call_->Write(send_buffer, tag(2)); + cq_.Next(&got_tag, &ok); GPR_ASSERT(ok); +} + +bool CliCall::Read(grpc::string* response, + IncomingMetadataContainer* server_initial_metadata) { + void* got_tag; + bool ok; + grpc::ByteBuffer recv_buffer; - call->Read(&recv_buffer, tag(4)); - cq.Next(&got_tag, &ok); - if (!ok) { - std::cout << "Failed to read response." << std::endl; + call_->Read(&recv_buffer, tag(3)); + + if (!cq_.Next(&got_tag, &ok) || !ok) { + return false; } - grpc::Status status; - call->Finish(&status, tag(5)); - cq.Next(&got_tag, &ok); + std::vector<grpc::Slice> slices; + GPR_ASSERT(recv_buffer.Dump(&slices).ok()); + + response->clear(); + for (size_t i = 0; i < slices.size(); i++) { + response->append(reinterpret_cast<const char*>(slices[i].begin()), + slices[i].size()); + } + if (server_initial_metadata) { + *server_initial_metadata = ctx_.GetServerInitialMetadata(); + } + return true; +} + +void CliCall::WritesDone() { + void* got_tag; + bool ok; + + call_->WritesDone(tag(4)); + cq_.Next(&got_tag, &ok); GPR_ASSERT(ok); +} - if (status.ok()) { - std::vector<grpc::Slice> slices; - (void)recv_buffer.Dump(&slices); +void CliCall::WriteAndWait(const grpc::string& request) { + grpc_slice s = grpc_slice_from_copied_string(request.c_str()); + grpc::Slice req_slice(s, grpc::Slice::STEAL_REF); + grpc::ByteBuffer send_buffer(&req_slice, 1); + + gpr_mu_lock(&write_mu_); + call_->Write(send_buffer, tag(2)); + write_done_ = false; + while (!write_done_) { + gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&write_mu_); +} + +void CliCall::WritesDoneAndWait() { + gpr_mu_lock(&write_mu_); + call_->WritesDone(tag(4)); + write_done_ = false; + while (!write_done_) { + gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&write_mu_); +} - response->clear(); - for (size_t i = 0; i < slices.size(); i++) { - response->append(reinterpret_cast<const char*>(slices[i].begin()), - slices[i].size()); +bool CliCall::ReadAndMaybeNotifyWrite( + grpc::string* response, + IncomingMetadataContainer* server_initial_metadata) { + void* got_tag; + bool ok; + grpc::ByteBuffer recv_buffer; + + call_->Read(&recv_buffer, tag(3)); + bool cq_result = cq_.Next(&got_tag, &ok); + + while (got_tag != tag(3)) { + gpr_mu_lock(&write_mu_); + write_done_ = true; + gpr_cv_signal(&write_cv_); + gpr_mu_unlock(&write_mu_); + + cq_result = cq_.Next(&got_tag, &ok); + if (got_tag == tag(2)) { + GPR_ASSERT(ok); } } - *server_initial_metadata = ctx.GetServerInitialMetadata(); - *server_trailing_metadata = ctx.GetServerTrailingMetadata(); + if (!cq_result || !ok) { + // If the RPC is ended on the server side, we should still wait for the + // pending write on the client side to be done. + if (!ok) { + gpr_mu_lock(&write_mu_); + if (!write_done_) { + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag != tag(2)); + write_done_ = true; + gpr_cv_signal(&write_cv_); + } + gpr_mu_unlock(&write_mu_); + } + return false; + } + + std::vector<grpc::Slice> slices; + GPR_ASSERT(recv_buffer.Dump(&slices).ok()); + response->clear(); + for (size_t i = 0; i < slices.size(); i++) { + response->append(reinterpret_cast<const char*>(slices[i].begin()), + slices[i].size()); + } + if (server_initial_metadata) { + *server_initial_metadata = ctx_.GetServerInitialMetadata(); + } + return true; +} + +Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) { + void* got_tag; + bool ok; + grpc::Status status; + + call_->Finish(&status, tag(5)); + cq_.Next(&got_tag, &ok); + GPR_ASSERT(ok); + if (server_trailing_metadata) { + *server_trailing_metadata = ctx_.GetServerTrailingMetadata(); + } + return status; } diff --git a/test/cpp/util/cli_call.h b/test/cpp/util/cli_call.h index 65da86bd4e..91f0dbc9ed 100644 --- a/test/cpp/util/cli_call.h +++ b/test/cpp/util/cli_call.h @@ -37,23 +37,74 @@ #include <map> #include <grpc++/channel.h> +#include <grpc++/completion_queue.h> +#include <grpc++/generic/generic_stub.h> #include <grpc++/support/status.h> #include <grpc++/support/string_ref.h> namespace grpc { + +class ClientContext; + namespace testing { +// CliCall handles the sending and receiving of generic messages given the name +// of the remote method. This class is only used by GrpcTool. Its thread-safe +// and thread-unsafe methods should not be used together. class CliCall final { public: typedef std::multimap<grpc::string, grpc::string> OutgoingMetadataContainer; typedef std::multimap<grpc::string_ref, grpc::string_ref> IncomingMetadataContainer; + + CliCall(std::shared_ptr<grpc::Channel> channel, const grpc::string& method, + const OutgoingMetadataContainer& metadata); + ~CliCall(); + + // Perform an unary generic RPC. static Status Call(std::shared_ptr<grpc::Channel> channel, const grpc::string& method, const grpc::string& request, grpc::string* response, const OutgoingMetadataContainer& metadata, IncomingMetadataContainer* server_initial_metadata, IncomingMetadataContainer* server_trailing_metadata); + + // Send a generic request message in a synchronous manner. NOT thread-safe. + void Write(const grpc::string& request); + + // Send a generic request message in a synchronous manner. NOT thread-safe. + void WritesDone(); + + // Receive a generic response message in a synchronous manner.NOT thread-safe. + bool Read(grpc::string* response, + IncomingMetadataContainer* server_initial_metadata); + + // Thread-safe write. Must be used with ReadAndMaybeNotifyWrite. Send out a + // generic request message and wait for ReadAndMaybeNotifyWrite to finish it. + void WriteAndWait(const grpc::string& request); + + // Thread-safe WritesDone. Must be used with ReadAndMaybeNotifyWrite. Send out + // WritesDone for gereneric request messages and wait for + // ReadAndMaybeNotifyWrite to finish it. + void WritesDoneAndWait(); + + // Thread-safe Read. Blockingly receive a generic response message. Notify + // writes if they are finished when this read is waiting for a resposne. + bool ReadAndMaybeNotifyWrite( + grpc::string* response, + IncomingMetadataContainer* server_initial_metadata); + + // Finish the RPC. + Status Finish(IncomingMetadataContainer* server_trailing_metadata); + + private: + std::unique_ptr<grpc::GenericStub> stub_; + grpc::ClientContext ctx_; + std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call_; + grpc::CompletionQueue cq_; + gpr_mu write_mu_; + gpr_cv write_cv_; // Protected by write_mu_; + bool write_done_; // Portected by write_mu_; }; } // namespace testing diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index fe248601ee..a78bed4b90 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -83,10 +83,10 @@ DEFINE_string(outfile, "", "Output file (default is stdout)"); static bool SimplePrint(const grpc::string& outfile, const grpc::string& output) { if (outfile.empty()) { - std::cout << output; + std::cout << output << std::endl; } else { - std::ofstream output_file(outfile, std::ios::trunc | std::ios::binary); - output_file << output; + std::ofstream output_file(outfile, std::ios::app | std::ios::binary); + output_file << output << std::endl; output_file.close(); } return true; diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc index b9900ca1b7..856cd32c3c 100644 --- a/test/cpp/util/grpc_tool.cc +++ b/test/cpp/util/grpc_tool.cc @@ -33,12 +33,13 @@ #include "test/cpp/util/grpc_tool.h" -#include <unistd.h> +#include <cstdio> #include <fstream> #include <iostream> #include <memory> #include <sstream> #include <string> +#include <thread> #include <gflags/gflags.h> #include <grpc++/channel.h> @@ -47,12 +48,19 @@ #include <grpc++/security/credentials.h> #include <grpc++/support/string_ref.h> #include <grpc/grpc.h> +#include <grpc/support/port_platform.h> #include "test/cpp/util/cli_call.h" #include "test/cpp/util/proto_file_parser.h" #include "test/cpp/util/proto_reflection_descriptor_database.h" #include "test/cpp/util/service_describer.h" +#if GPR_WINDOWS +#include <io.h> +#else +#include <unistd.h> +#endif + namespace grpc { namespace testing { @@ -159,6 +167,36 @@ void PrintMetadata(const T& m, const grpc::string& message) { } } +void ReadResponse(CliCall* call, const grpc::string& method_name, + GrpcToolOutputCallback callback, ProtoFileParser* parser, + gpr_mu* parser_mu, bool print_mode) { + grpc::string serialized_response_proto; + std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata; + + for (bool receive_initial_metadata = true; call->ReadAndMaybeNotifyWrite( + &serialized_response_proto, + receive_initial_metadata ? &server_initial_metadata : nullptr); + receive_initial_metadata = false) { + fprintf(stderr, "got response.\n"); + if (!FLAGS_binary_output) { + gpr_mu_lock(parser_mu); + serialized_response_proto = parser->GetTextFormatFromMethod( + method_name, serialized_response_proto, false /* is_request */); + if (parser->HasError() && print_mode) { + fprintf(stderr, "Failed to parse response.\n"); + } + gpr_mu_unlock(parser_mu); + } + if (receive_initial_metadata) { + PrintMetadata(server_initial_metadata, + "Received initial metadata from server:"); + } + if (!callback(serialized_response_proto) && print_mode) { + fprintf(stderr, "Failed to output response.\n"); + } + } +} + struct Command { const char* command; std::function<bool(GrpcTool*, int, const char**, const CliCredentials&, @@ -416,85 +454,191 @@ bool GrpcTool::CallMethod(int argc, const char** argv, grpc::string server_address(argv[0]); grpc::string method_name(argv[1]); grpc::string formatted_method_name; - std::unique_ptr<grpc::testing::ProtoFileParser> parser; + std::unique_ptr<ProtoFileParser> parser; grpc::string serialized_request_proto; + bool print_mode = false; - if (argc == 3) { - request_text = argv[2]; - if (!FLAGS_infile.empty()) { - fprintf(stderr, "warning: request given in argv, ignoring --infile\n"); - } + std::shared_ptr<grpc::Channel> channel = + FLAGS_remotedb + ? grpc::CreateChannel(server_address, cred.GetCredentials()) + : nullptr; + + parser.reset(new grpc::testing::ProtoFileParser(channel, FLAGS_proto_path, + FLAGS_protofiles)); + + if (FLAGS_binary_input) { + formatted_method_name = method_name; } else { - std::stringstream input_stream; + formatted_method_name = parser->GetFormattedMethodName(method_name); + } + + if (parser->HasError()) { + return false; + } + + if (parser->IsStreaming(method_name, true /* is_request */)) { + std::istream* input_stream; + std::ifstream input_file; + + if (argc == 3) { + request_text = argv[2]; + } + + std::multimap<grpc::string, grpc::string> client_metadata; + ParseMetadataFlag(&client_metadata); + PrintMetadata(client_metadata, "Sending client initial metadata:"); + + CliCall call(channel, formatted_method_name, client_metadata); + if (FLAGS_infile.empty()) { - if (isatty(STDIN_FILENO)) { - fprintf(stderr, "reading request message from stdin...\n"); + if (isatty(fileno(stdin))) { + print_mode = true; + fprintf(stderr, "reading streaming request message from stdin...\n"); } - input_stream << std::cin.rdbuf(); + input_stream = &std::cin; } else { - std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary); - input_stream << input_file.rdbuf(); + input_file.open(FLAGS_infile, std::ios::in | std::ios::binary); + input_stream = &input_file; + } + + gpr_mu parser_mu; + gpr_mu_init(&parser_mu); + std::thread read_thread(ReadResponse, &call, method_name, callback, + parser.get(), &parser_mu, print_mode); + + std::stringstream request_ss; + grpc::string line; + while (!request_text.empty() || + (!input_stream->eof() && getline(*input_stream, line))) { + if (!request_text.empty()) { + if (FLAGS_binary_input) { + serialized_request_proto = request_text; + request_text.clear(); + } else { + gpr_mu_lock(&parser_mu); + serialized_request_proto = parser->GetSerializedProtoFromMethod( + method_name, request_text, true /* is_request */); + request_text.clear(); + if (parser->HasError()) { + if (print_mode) { + fprintf(stderr, "Failed to parse request.\n"); + } + gpr_mu_unlock(&parser_mu); + continue; + } + gpr_mu_unlock(&parser_mu); + } + + call.WriteAndWait(serialized_request_proto); + if (print_mode) { + fprintf(stderr, "Request sent.\n"); + } + } else { + if (line.length() == 0) { + request_text = request_ss.str(); + request_ss.str(grpc::string()); + request_ss.clear(); + } else { + request_ss << line << ' '; + } + } + } + if (input_file.is_open()) { input_file.close(); } - request_text = input_stream.str(); - } - std::shared_ptr<grpc::Channel> channel = - grpc::CreateChannel(server_address, cred.GetCredentials()); - if (!FLAGS_binary_input || !FLAGS_binary_output) { - parser.reset( - new grpc::testing::ProtoFileParser(FLAGS_remotedb ? channel : nullptr, - FLAGS_proto_path, FLAGS_protofiles)); - if (parser->HasError()) { + call.WritesDoneAndWait(); + read_thread.join(); + + std::multimap<grpc::string_ref, grpc::string_ref> server_trailing_metadata; + Status status = call.Finish(&server_trailing_metadata); + PrintMetadata(server_trailing_metadata, + "Received trailing metadata from server:"); + + if (status.ok()) { + fprintf(stderr, "Stream RPC succeeded with OK status\n"); + return true; + } else { + fprintf(stderr, "Rpc failed with status code %d, error message: %s\n", + status.error_code(), status.error_message().c_str()); return false; } - } - if (FLAGS_binary_input) { - serialized_request_proto = request_text; - formatted_method_name = method_name; - } else { - formatted_method_name = parser->GetFormattedMethodName(method_name); - serialized_request_proto = parser->GetSerializedProtoFromMethod( - method_name, request_text, true /* is_request */); - if (parser->HasError()) { - return false; + } else { // parser->IsStreaming(method_name, true /* is_request */) + if (argc == 3) { + request_text = argv[2]; + if (!FLAGS_infile.empty()) { + fprintf(stderr, "warning: request given in argv, ignoring --infile\n"); + } + } else { + std::stringstream input_stream; + if (FLAGS_infile.empty()) { + if (isatty(fileno(stdin))) { + fprintf(stderr, "reading request message from stdin...\n"); + } + input_stream << std::cin.rdbuf(); + } else { + std::ifstream input_file(FLAGS_infile, std::ios::in | std::ios::binary); + input_stream << input_file.rdbuf(); + input_file.close(); + } + request_text = input_stream.str(); } - } - fprintf(stderr, "connecting to %s\n", server_address.c_str()); - grpc::string serialized_response_proto; - std::multimap<grpc::string, grpc::string> client_metadata; - std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata, - server_trailing_metadata; - ParseMetadataFlag(&client_metadata); - PrintMetadata(client_metadata, "Sending client initial metadata:"); - grpc::Status status = grpc::testing::CliCall::Call( - channel, formatted_method_name, serialized_request_proto, - &serialized_response_proto, client_metadata, &server_initial_metadata, - &server_trailing_metadata); - PrintMetadata(server_initial_metadata, - "Received initial metadata from server:"); - PrintMetadata(server_trailing_metadata, - "Received trailing metadata from server:"); - if (status.ok()) { - fprintf(stderr, "Rpc succeeded with OK status\n"); - if (FLAGS_binary_output) { - output_ss << serialized_response_proto; + if (FLAGS_binary_input) { + serialized_request_proto = request_text; + // formatted_method_name = method_name; } else { - grpc::string response_text = parser->GetTextFormatFromMethod( - method_name, serialized_response_proto, false /* is_request */); + // formatted_method_name = parser->GetFormattedMethodName(method_name); + serialized_request_proto = parser->GetSerializedProtoFromMethod( + method_name, request_text, true /* is_request */); if (parser->HasError()) { return false; } - output_ss << "Response: \n " << response_text << std::endl; } - } else { - fprintf(stderr, "Rpc failed with status code %d, error message: %s\n", - status.error_code(), status.error_message().c_str()); + fprintf(stderr, "connecting to %s\n", server_address.c_str()); + + grpc::string serialized_response_proto; + std::multimap<grpc::string, grpc::string> client_metadata; + std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata, + server_trailing_metadata; + ParseMetadataFlag(&client_metadata); + PrintMetadata(client_metadata, "Sending client initial metadata:"); + + CliCall call(channel, formatted_method_name, client_metadata); + call.Write(serialized_request_proto); + call.WritesDone(); + + for (bool receive_initial_metadata = true; call.Read( + &serialized_response_proto, + receive_initial_metadata ? &server_initial_metadata : nullptr); + receive_initial_metadata = false) { + if (!FLAGS_binary_output) { + serialized_response_proto = parser->GetTextFormatFromMethod( + method_name, serialized_response_proto, false /* is_request */); + if (parser->HasError()) { + return false; + } + } + if (receive_initial_metadata) { + PrintMetadata(server_initial_metadata, + "Received initial metadata from server:"); + } + if (!callback(serialized_response_proto)) { + return false; + } + } + Status status = call.Finish(&server_trailing_metadata); + if (status.ok()) { + fprintf(stderr, "Rpc succeeded with OK status\n"); + return true; + } else { + fprintf(stderr, "Rpc failed with status code %d, error message: %s\n", + status.error_code(), status.error_message().c_str()); + return false; + } } - - return callback(output_ss.str()); + GPR_UNREACHABLE_CODE(return false); } bool GrpcTool::ParseMessage(int argc, const char** argv, @@ -531,7 +675,7 @@ bool GrpcTool::ParseMessage(int argc, const char** argv, } else { std::stringstream input_stream; if (FLAGS_infile.empty()) { - if (isatty(STDIN_FILENO)) { + if (isatty(fileno(stdin))) { fprintf(stderr, "reading request message from stdin...\n"); } input_stream << std::cin.rdbuf(); diff --git a/test/cpp/util/grpc_tool_test.cc b/test/cpp/util/grpc_tool_test.cc index 33ce611a60..26e2b1f502 100644 --- a/test/cpp/util/grpc_tool_test.cc +++ b/test/cpp/util/grpc_tool_test.cc @@ -102,6 +102,8 @@ DECLARE_bool(l); namespace { +const int kNumResponseStreamsMsgs = 3; + class TestCliCredentials final : public grpc::testing::CliCredentials { public: std::shared_ptr<grpc::ChannelCredentials> GetCredentials() const override { @@ -137,6 +139,71 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { response->set_message(request->message()); return Status::OK; } + + Status RequestStream(ServerContext* context, + ServerReader<EchoRequest>* reader, + EchoResponse* response) override { + EchoRequest request; + response->set_message(""); + if (!context->client_metadata().empty()) { + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = context->client_metadata().begin(); + iter != context->client_metadata().end(); ++iter) { + context->AddInitialMetadata(ToString(iter->first), + ToString(iter->second)); + } + } + context->AddTrailingMetadata("trailing_key", "trailing_value"); + while (reader->Read(&request)) { + response->mutable_message()->append(request.message()); + } + + return Status::OK; + } + + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter<EchoResponse>* writer) override { + if (!context->client_metadata().empty()) { + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = context->client_metadata().begin(); + iter != context->client_metadata().end(); ++iter) { + context->AddInitialMetadata(ToString(iter->first), + ToString(iter->second)); + } + } + context->AddTrailingMetadata("trailing_key", "trailing_value"); + + EchoResponse response; + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + response.set_message(request->message() + grpc::to_string(i)); + writer->Write(response); + } + + return Status::OK; + } + + Status BidiStream( + ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) override { + EchoRequest request; + EchoResponse response; + if (!context->client_metadata().empty()) { + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = context->client_metadata().begin(); + iter != context->client_metadata().end(); ++iter) { + context->AddInitialMetadata(ToString(iter->first), + ToString(iter->second)); + } + } + context->AddTrailingMetadata("trailing_key", "trailing_value"); + + while (stream->Read(&request)) { + response.set_message(request.message()); + stream->Write(response); + } + + return Status::OK; + } }; } // namespace @@ -347,6 +414,132 @@ TEST_F(GrpcToolTest, CallCommand) { ShutdownServer(); } +TEST_F(GrpcToolTest, CallCommandRequestStream) { + // Test input: grpc_cli call localhost:<port> RequestStream "message: + // 'Hello0'" + std::stringstream output_stream; + + const grpc::string server_address = SetUpServer(); + const char* argv[] = {"grpc_cli", "call", server_address.c_str(), + "RequestStream", "message: 'Hello0'"}; + + // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n" + std::streambuf* orig = std::cin.rdbuf(); + std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n"); + std::cin.rdbuf(ss.rdbuf()); + + EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), + std::bind(PrintStream, &output_stream, + std::placeholders::_1))); + + // Expected output: "message: \"Hello0Hello1Hello2\"" + EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(), + "message: \"Hello0Hello1Hello2\"")); + std::cin.rdbuf(orig); + ShutdownServer(); +} + +TEST_F(GrpcToolTest, CallCommandRequestStreamWithBadRequest) { + // Test input: grpc_cli call localhost:<port> RequestStream "message: + // 'Hello0'" + std::stringstream output_stream; + + const grpc::string server_address = SetUpServer(); + const char* argv[] = {"grpc_cli", "call", server_address.c_str(), + "RequestStream", "message: 'Hello0'"}; + + // Mock std::cin input "bad_field: 'Hello1'\n\n message: 'Hello2'\n\n" + std::streambuf* orig = std::cin.rdbuf(); + std::istringstream ss("bad_field: 'Hello1'\n\n message: 'Hello2'\n\n"); + std::cin.rdbuf(ss.rdbuf()); + + EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), + std::bind(PrintStream, &output_stream, + std::placeholders::_1))); + + // Expected output: "message: \"Hello0Hello2\"" + EXPECT_TRUE(NULL != + strstr(output_stream.str().c_str(), "message: \"Hello0Hello2\"")); + std::cin.rdbuf(orig); + ShutdownServer(); +} + +TEST_F(GrpcToolTest, CallCommandResponseStream) { + // Test input: grpc_cli call localhost:<port> ResponseStream "message: + // 'Hello'" + std::stringstream output_stream; + + const grpc::string server_address = SetUpServer(); + const char* argv[] = {"grpc_cli", "call", server_address.c_str(), + "ResponseStream", "message: 'Hello'"}; + + EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), + std::bind(PrintStream, &output_stream, + std::placeholders::_1))); + + // Expected output: "message: \"Hello{n}\"" + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + grpc::string expected_response_text = + "message: \"Hello" + grpc::to_string(i) + "\"\n"; + EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(), + expected_response_text.c_str())); + } + + ShutdownServer(); +} + +TEST_F(GrpcToolTest, CallCommandBidiStream) { + // Test input: grpc_cli call localhost:<port> BidiStream "message: 'Hello0'" + std::stringstream output_stream; + + const grpc::string server_address = SetUpServer(); + const char* argv[] = {"grpc_cli", "call", server_address.c_str(), + "BidiStream", "message: 'Hello0'"}; + + // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n" + std::streambuf* orig = std::cin.rdbuf(); + std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n"); + std::cin.rdbuf(ss.rdbuf()); + + EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), + std::bind(PrintStream, &output_stream, + std::placeholders::_1))); + + // Expected output: "message: \"Hello0\"\nmessage: \"Hello1\"\nmessage: + // \"Hello2\"\n\n" + EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(), + "message: \"Hello0\"\nmessage: " + "\"Hello1\"\nmessage: \"Hello2\"\n")); + std::cin.rdbuf(orig); + ShutdownServer(); +} + +TEST_F(GrpcToolTest, CallCommandBidiStreamWithBadRequest) { + // Test input: grpc_cli call localhost:<port> BidiStream "message: 'Hello0'" + std::stringstream output_stream; + + const grpc::string server_address = SetUpServer(); + const char* argv[] = {"grpc_cli", "call", server_address.c_str(), + "BidiStream", "message: 'Hello0'"}; + + // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n" + std::streambuf* orig = std::cin.rdbuf(); + std::istringstream ss("message: 1.0\n\n message: 'Hello2'\n\n"); + std::cin.rdbuf(ss.rdbuf()); + + EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), + std::bind(PrintStream, &output_stream, + std::placeholders::_1))); + + // Expected output: "message: \"Hello0\"\nmessage: \"Hello1\"\nmessage: + // \"Hello2\"\n\n" + EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(), + "message: \"Hello0\"\nmessage: \"Hello2\"\n")); + std::cin.rdbuf(orig); + + ShutdownServer(); +} + TEST_F(GrpcToolTest, ParseCommand) { // Test input "grpc_cli parse localhost:<port> grpc.testing.EchoResponse // ECHO_RESPONSE_MESSAGE" diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc index bc8a6083f4..5dd1b00e8a 100644 --- a/test/cpp/util/proto_file_parser.cc +++ b/test/cpp/util/proto_file_parser.cc @@ -81,7 +81,8 @@ class ErrorPrinter : public protobuf::compiler::MultiFileErrorCollector { ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel, const grpc::string& proto_path, const grpc::string& protofiles) - : has_error_(false) { + : has_error_(false), + dynamic_factory_(new protobuf::DynamicMessageFactory()) { std::vector<grpc::string> service_list; if (channel) { reflection_db_.reset(new grpc::ProtoReflectionDescriptorDatabase(channel)); @@ -127,7 +128,6 @@ ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel, } desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get())); - dynamic_factory_.reset(new protobuf::DynamicMessageFactory(desc_pool_.get())); for (auto it = service_list.begin(); it != service_list.end(); it++) { if (known_services.find(*it) == known_services.end()) { @@ -144,6 +144,11 @@ ProtoFileParser::~ProtoFileParser() {} grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) { has_error_ = false; + + if (known_methods_.find(method) != known_methods_.end()) { + return known_methods_[method]; + } + const protobuf::MethodDescriptor* method_descriptor = nullptr; for (auto it = service_desc_list_.begin(); it != service_desc_list_.end(); it++) { @@ -169,6 +174,8 @@ grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) { return ""; } + known_methods_[method] = method_descriptor->full_name(); + return method_descriptor->full_name(); } @@ -205,6 +212,25 @@ grpc::string ProtoFileParser::GetMessageTypeFromMethod( : method_desc->output_type()->full_name(); } +bool ProtoFileParser::IsStreaming(const grpc::string& method, bool is_request) { + has_error_ = false; + + grpc::string full_method_name = GetFullMethodName(method); + if (has_error_) { + return false; + } + + const protobuf::MethodDescriptor* method_desc = + desc_pool_->FindMethodByName(full_method_name); + if (!method_desc) { + LogError("Method not found"); + return false; + } + + return is_request ? method_desc->client_streaming() + : method_desc->server_streaming(); +} + grpc::string ProtoFileParser::GetSerializedProtoFromMethod( const grpc::string& method, const grpc::string& text_format_proto, bool is_request) { diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h index c1070a37b5..23d311ef8f 100644 --- a/test/cpp/util/proto_file_parser.h +++ b/test/cpp/util/proto_file_parser.h @@ -84,6 +84,8 @@ class ProtoFileParser { const grpc::string& message_type_name, const grpc::string& serialized_proto); + bool IsStreaming(const grpc::string& method, bool is_request); + bool HasError() const { return has_error_; } void LogError(const grpc::string& error_msg); @@ -104,6 +106,7 @@ class ProtoFileParser { std::unique_ptr<protobuf::DynamicMessageFactory> dynamic_factory_; std::unique_ptr<grpc::protobuf::Message> request_prototype_; std::unique_ptr<grpc::protobuf::Message> response_prototype_; + std::unordered_map<grpc::string, grpc::string> known_methods_; std::vector<const protobuf::ServiceDescriptor*> service_desc_list_; }; |