diff options
Diffstat (limited to 'test/cpp')
32 files changed, 1345 insertions, 1118 deletions
diff --git a/test/cpp/codegen/BUILD b/test/cpp/codegen/BUILD index 4780800645..8de46be816 100644 --- a/test/cpp/codegen/BUILD +++ b/test/cpp/codegen/BUILD @@ -14,14 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_test") +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") -package( - features = [ - "-layering_check", - "-parse_headers", - ], -) +grpc_package(name = "test/cpp/codegen") grpc_cc_test( name = "codegen_test_full", diff --git a/test/cpp/common/BUILD b/test/cpp/common/BUILD index be9c279f13..e2b6365b13 100644 --- a/test/cpp/common/BUILD +++ b/test/cpp/common/BUILD @@ -14,14 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_test") +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") -package( - features = [ - "-layering_check", - "-parse_headers", - ], -) +grpc_package(name = "test/cpp/common") grpc_cc_test( name = "alarm_cpp_test", diff --git a/test/cpp/common/alarm_cpp_test.cc b/test/cpp/common/alarm_cpp_test.cc index ce4168843c..212972d25d 100644 --- a/test/cpp/common/alarm_cpp_test.cc +++ b/test/cpp/common/alarm_cpp_test.cc @@ -18,6 +18,8 @@ #include <grpc++/alarm.h> #include <grpc++/completion_queue.h> +#include <thread> + #include <gtest/gtest.h> #include "test/core/util/test_config.h" @@ -28,6 +30,46 @@ namespace { TEST(AlarmTest, RegularExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); + Alarm alarm; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); + + void* output_tag; + bool ok; + const CompletionQueue::NextStatus status = cq.AsyncNext( + (void**)&output_tag, &ok, grpc_timeout_seconds_to_deadline(2)); + + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); +} + +TEST(AlarmTest, MultithreadedRegularExpiry) { + CompletionQueue cq; + void* junk = reinterpret_cast<void*>(1618033); + void* output_tag; + bool ok; + CompletionQueue::NextStatus status; + Alarm alarm; + + std::thread t1([&alarm, &cq, &junk] { + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); + }); + + std::thread t2([&cq, &ok, &output_tag, &status] { + status = cq.AsyncNext((void**)&output_tag, &ok, + grpc_timeout_seconds_to_deadline(2)); + }); + + t1.join(); + t2.join(); + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); +} + +TEST(AlarmTest, DeprecatedRegularExpiry) { + CompletionQueue cq; + void* junk = reinterpret_cast<void*>(1618033); Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(1), junk); void* output_tag; @@ -43,7 +85,8 @@ TEST(AlarmTest, RegularExpiry) { TEST(AlarmTest, MoveConstructor) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm first(&cq, grpc_timeout_seconds_to_deadline(1), junk); + Alarm first; + first.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); Alarm second(std::move(first)); void* output_tag; bool ok; @@ -57,7 +100,8 @@ TEST(AlarmTest, MoveConstructor) { TEST(AlarmTest, MoveAssignment) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm first(&cq, grpc_timeout_seconds_to_deadline(1), junk); + Alarm first; + first.Set(&cq, grpc_timeout_seconds_to_deadline(1), junk); Alarm second(std::move(first)); first = std::move(second); @@ -76,7 +120,8 @@ TEST(AlarmTest, RegularExpiryChrono) { void* junk = reinterpret_cast<void*>(1618033); std::chrono::system_clock::time_point one_sec_deadline = std::chrono::system_clock::now() + std::chrono::seconds(1); - Alarm alarm(&cq, one_sec_deadline, junk); + Alarm alarm; + alarm.Set(&cq, one_sec_deadline, junk); void* output_tag; bool ok; @@ -91,7 +136,8 @@ TEST(AlarmTest, RegularExpiryChrono) { TEST(AlarmTest, ZeroExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(0), junk); + Alarm alarm; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(0), junk); void* output_tag; bool ok; @@ -106,7 +152,8 @@ TEST(AlarmTest, ZeroExpiry) { TEST(AlarmTest, NegativeExpiry) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(-1), junk); + Alarm alarm; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(-1), junk); void* output_tag; bool ok; @@ -121,7 +168,8 @@ TEST(AlarmTest, NegativeExpiry) { TEST(AlarmTest, Cancellation) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); - Alarm alarm(&cq, grpc_timeout_seconds_to_deadline(2), junk); + Alarm alarm; + alarm.Set(&cq, grpc_timeout_seconds_to_deadline(2), junk); alarm.Cancel(); void* output_tag; diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 27c5492c17..b29a13d4fb 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -14,15 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test") +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") -package( - default_visibility=["//visibility:public"], # Allows external users to implement end2end tests. - features = [ - "-layering_check", - "-parse_headers", - ], -) +grpc_package(name = "test/cpp/end2end", visibility = "public") # Allows external users to implement end2end tests. grpc_cc_library( name = "test_service_impl", @@ -199,6 +193,7 @@ grpc_cc_test( "//test/cpp/util:test_util", ], external_deps = [ + "gmock", "gtest", ], ) @@ -241,6 +236,7 @@ grpc_cc_test( "//test/cpp/util:test_util", ], external_deps = [ + "gmock", "gtest", ], ) diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 7cb7b262de..41090d161a 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -260,11 +260,31 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { server_address_ << "localhost:" << port_; // Setup server + BuildAndStartServer(); + + gpr_tls_set(&g_is_async_end2end_test, 1); + } + + void TearDown() override { + gpr_tls_set(&g_is_async_end2end_test, 0); + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cq_->Shutdown(); + while (cq_->Next(&ignored_tag, &ignored_ok)) + ; + stub_.reset(); + poll_overrider_.reset(); + grpc_recycle_unused_port(port_); + } + + void BuildAndStartServer() { ServerBuilder builder; auto server_creds = GetCredentialsProvider()->GetServerCredentials( GetParam().credentials_type); builder.AddListeningPort(server_address_.str(), server_creds); - builder.RegisterService(&service_); + service_.reset(new grpc::testing::EchoTestService::AsyncService()); + builder.RegisterService(service_.get()); if (GetParam().health_check_service) { builder.RegisterService(&health_check_); } @@ -276,20 +296,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { new ServerBuilderSyncPluginDisabler()); builder.SetOption(move(sync_plugin_disabler)); server_ = builder.BuildAndStart(); - - gpr_tls_set(&g_is_async_end2end_test, 1); - } - - void TearDown() override { - server_->Shutdown(); - void* ignored_tag; - bool ignored_ok; - cq_->Shutdown(); - while (cq_->Next(&ignored_tag, &ignored_ok)) - ; - poll_overrider_.reset(); - gpr_tls_set(&g_is_async_end2end_test, 0); - grpc_recycle_unused_port(port_); } void ResetStub() { @@ -319,8 +325,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -341,7 +347,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<Server> server_; - grpc::testing::EchoTestService::AsyncService service_; + std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_; HealthCheck health_check_; std::ostringstream server_address_; int port_; @@ -359,6 +365,26 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) { SendRpc(10); } +TEST_P(AsyncEnd2endTest, ReconnectChannel) { + if (GetParam().inproc) { + return; + } + ResetStub(); + SendRpc(1); + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cq_->Shutdown(); + while (cq_->Next(&ignored_tag, &ignored_ok)) + ; + BuildAndStartServer(); + // It needs more than kConnectivityCheckIntervalMsec time to reconnect the + // channel. + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(1600, GPR_TIMESPAN))); + SendRpc(1); +} + // We do not need to protect notify because the use is synchronized. void ServerWait(Server* server, int* notify) { server->Wait(); @@ -370,6 +396,7 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { ResetStub(); SendRpc(1); EXPECT_EQ(0, notify); + gpr_tls_set(&g_is_async_end2end_test, 0); server_->Shutdown(); wait_thread.join(); EXPECT_EQ(1, notify); @@ -378,8 +405,9 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { TEST_P(AsyncEnd2endTest, ShutdownThenWait) { ResetStub(); SendRpc(1); - server_->Shutdown(); + std::thread t([this]() { server_->Shutdown(); }); server_->Wait(); + t.join(); } // Test a simple RPC using the async version of Next @@ -407,8 +435,8 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking) .Expect(2, true) @@ -444,8 +472,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); Verifier(GetParam().disable_blocking) .Expect(2, true) @@ -506,8 +534,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) { std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); cli_stream->Write(send_request, tag(3)); @@ -579,8 +607,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - cq_.get(), cq_.get(), tag(2)); + service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking) .Expect(1, true) @@ -635,8 +663,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) { std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - cq_.get(), cq_.get(), tag(2)); + service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking) .Expect(1, true) @@ -687,8 +715,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) { std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - cq_.get(), cq_.get(), tag(2)); + service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking) .Expect(1, true) @@ -741,8 +769,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); Verifier(GetParam().disable_blocking) .Expect(1, true) @@ -801,8 +829,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) { std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); @@ -869,8 +897,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) { std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); @@ -946,8 +974,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); @@ -991,8 +1019,8 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); @@ -1041,8 +1069,8 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); @@ -1104,8 +1132,8 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); @@ -1168,8 +1196,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) { stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); srv_ctx.AsyncNotifyWhenDone(tag(5)); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -1203,8 +1231,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) { stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); srv_ctx.AsyncNotifyWhenDone(tag(5)); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), - cq_.get(), tag(2)); + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -1295,8 +1323,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); - service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); // Client sends 3 messages (tags 3, 4 and 5) @@ -1426,8 +1454,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, - cq_.get(), cq_.get(), tag(2)); + service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -1562,8 +1590,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); - service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), - tag(2)); + service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); // Client sends the first and the only message diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e1160ecdc6..c236f76e89 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -85,7 +85,8 @@ class MyTestServiceImpl : public TestServiceImpl { class ClientLbEnd2endTest : public ::testing::Test { protected: - ClientLbEnd2endTest() : server_host_("localhost") {} + ClientLbEnd2endTest() + : server_host_("localhost"), kRequestMessage_("Live long and prosper.") {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -139,6 +140,7 @@ class ClientLbEnd2endTest : public ::testing::Test { } // else, default to pick first args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); + args.SetInt("grpc.testing.fixed_reconnect_backoff_ms", 2000); std::ostringstream uri; uri << "fake:///"; for (size_t i = 0; i < servers_.size() - 1; ++i) { @@ -150,18 +152,27 @@ class ClientLbEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } - void SendRpc(bool expect_ok = true) { + Status SendRpc(EchoResponse* response = nullptr) { + const bool local_response = (response == nullptr); + if (local_response) response = new EchoResponse; EchoRequest request; - EchoResponse response; - request.set_message("Live long and prosper."); + request.set_message(kRequestMessage_); ClientContext context; - Status status = stub_->Echo(&context, request, &response); - if (expect_ok) { - EXPECT_TRUE(status.ok()); - EXPECT_EQ(response.message(), request.message()); - } else { - EXPECT_FALSE(status.ok()); - } + Status status = stub_->Echo(&context, request, response); + if (local_response) delete response; + return status; + } + + void CheckRpcSendOk() { + EchoResponse response; + const Status status = SendRpc(&response); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(response.message(), kRequestMessage_); + } + + void CheckRpcSendFailure() { + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); } struct ServerData { @@ -169,16 +180,18 @@ class ClientLbEnd2endTest : public ::testing::Test { std::unique_ptr<Server> server_; MyTestServiceImpl service_; std::unique_ptr<std::thread> thread_; + bool server_ready_ = false; explicit ServerData(const grpc::string& server_host, int port = 0) { port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); gpr_log(GPR_INFO, "starting server on port %d", port_); std::mutex mu; + std::unique_lock<std::mutex> lock(mu); std::condition_variable cond; thread_.reset(new std::thread( std::bind(&ServerData::Start, this, server_host, &mu, &cond))); - std::unique_lock<std::mutex> lock(mu); - cond.wait(lock); + cond.wait(lock, [this] { return server_ready_; }); + server_ready_ = false; gpr_log(GPR_INFO, "server startup complete"); } @@ -192,6 +205,7 @@ class ClientLbEnd2endTest : public ::testing::Test { builder.RegisterService(&service_); server_ = builder.BuildAndStart(); std::lock_guard<std::mutex> lock(*mu); + server_ready_ = true; cond->notify_one(); } @@ -207,16 +221,42 @@ class ClientLbEnd2endTest : public ::testing::Test { void WaitForServer(size_t server_idx) { do { - SendRpc(); + CheckRpcSendOk(); } while (servers_[server_idx]->service_.request_count() == 0); ResetCounters(); } + bool SeenAllServers() { + for (const auto& server : servers_) { + if (server->service_.request_count() == 0) return false; + } + return true; + } + + // Updates \a connection_order by appending to it the index of the newly + // connected server. Must be called after every single RPC. + void UpdateConnectionOrder( + const std::vector<std::unique_ptr<ServerData>>& servers, + std::vector<int>* connection_order) { + for (size_t i = 0; i < servers.size(); ++i) { + if (servers[i]->service_.request_count() == 1) { + // Was the server index known? If not, update connection_order. + const auto it = + std::find(connection_order->begin(), connection_order->end(), i); + if (it == connection_order->end()) { + connection_order->push_back(i); + return; + } + } + } + } + const grpc::string server_host_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::vector<std::unique_ptr<ServerData>> servers_; grpc_fake_resolver_response_generator* response_generator_; + const grpc::string kRequestMessage_; }; TEST_F(ClientLbEnd2endTest, PickFirst) { @@ -230,7 +270,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) { } SetNextResolution(ports); for (size_t i = 0; i < servers_.size(); ++i) { - SendRpc(); + CheckRpcSendOk(); } // All requests should have gone to a single server. bool found = false; @@ -258,7 +298,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { ports.emplace_back(servers_[0]->port_); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); - SendRpc(); + CheckRpcSendOk(); EXPECT_EQ(servers_[0]->service_.request_count(), 1); // An empty update will result in the channel going into TRANSIENT_FAILURE. @@ -304,7 +344,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { ports.emplace_back(servers_[0]->port_); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); - SendRpc(); + CheckRpcSendOk(); EXPECT_EQ(servers_[0]->service_.request_count(), 1); servers_[0]->service_.ResetCounters(); @@ -314,7 +354,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { ports.emplace_back(servers_[0]->port_); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET superset *******"); - SendRpc(); + CheckRpcSendOk(); // We stick to the previously connected server. WaitForServer(0); EXPECT_EQ(0, servers_[1]->service_.request_count()); @@ -338,7 +378,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { for (size_t i = 0; i < 1000; ++i) { std::random_shuffle(ports.begin(), ports.end()); SetNextResolution(ports); - if (i % 10 == 0) SendRpc(); + if (i % 10 == 0) CheckRpcSendOk(); } } // Check LB policy name for the channel. @@ -355,13 +395,23 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) { ports.emplace_back(server->port_); } SetNextResolution(ports); + // Wait until all backends are ready. + do { + CheckRpcSendOk(); + } while (!SeenAllServers()); + ResetCounters(); + // "Sync" to the end of the list. Next sequence of picks will start at the + // first server (index 0). + WaitForServer(servers_.size() - 1); + std::vector<int> connection_order; for (size_t i = 0; i < servers_.size(); ++i) { - SendRpc(); - } - // One request should have gone to each server. - for (size_t i = 0; i < servers_.size(); ++i) { - EXPECT_EQ(1, servers_[i]->service_.request_count()); + CheckRpcSendOk(); + UpdateConnectionOrder(servers_, &connection_order); } + // Backends should be iterated over in the order in which the addresses were + // given. + const auto expected = std::vector<int>{0, 1, 2}; + EXPECT_EQ(expected, connection_order); // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); } @@ -378,7 +428,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { SetNextResolution(ports); WaitForServer(0); // Send RPCs. They should all go servers_[0] - for (size_t i = 0; i < 10; ++i) SendRpc(); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(); EXPECT_EQ(10, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); @@ -394,7 +444,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ(0, servers_[1]->service_.request_count()); WaitForServer(1); - for (size_t i = 0; i < 10; ++i) SendRpc(); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(); EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(10, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); @@ -406,7 +456,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { SetNextResolution(ports); WaitForServer(2); - for (size_t i = 0; i < 10; ++i) SendRpc(); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(); EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(10, servers_[2]->service_.request_count()); @@ -423,7 +473,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { WaitForServer(2); // Send three RPCs, one per server. - for (size_t i = 0; i < 3; ++i) SendRpc(); + for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(); EXPECT_EQ(1, servers_[0]->service_.request_count()); EXPECT_EQ(1, servers_[1]->service_.request_count()); EXPECT_EQ(1, servers_[2]->service_.request_count()); @@ -450,6 +500,37 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); } +TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub("round_robin"); + std::vector<int> ports; + + // Start with a single server. + ports.emplace_back(servers_[0]->port_); + SetNextResolution(ports); + WaitForServer(0); + // Send RPCs. They should all go to servers_[0] + for (size_t i = 0; i < 10; ++i) SendRpc(); + EXPECT_EQ(10, servers_[0]->service_.request_count()); + EXPECT_EQ(0, servers_[1]->service_.request_count()); + EXPECT_EQ(0, servers_[2]->service_.request_count()); + servers_[0]->service_.ResetCounters(); + + // Shutdown one of the servers to be sent in the update. + servers_[1]->Shutdown(false); + ports.emplace_back(servers_[1]->port_); + ports.emplace_back(servers_[2]->port_); + SetNextResolution(ports); + WaitForServer(0); + WaitForServer(2); + + // Send three RPCs, one per server. + for (size_t i = 0; i < kNumServers; ++i) SendRpc(); + // The server in shutdown shouldn't receive any. + EXPECT_EQ(0, servers_[1]->service_.request_count()); +} + TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; @@ -462,7 +543,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { for (size_t i = 0; i < 1000; ++i) { std::random_shuffle(ports.begin(), ports.end()); SetNextResolution(ports); - if (i % 10 == 0) SendRpc(); + if (i % 10 == 0) CheckRpcSendOk(); } // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); @@ -473,37 +554,30 @@ TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) { // update provisions of RR. } -TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) { +TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { // Start servers and send one RPC per server. - const int kNumServers = 1; + const int kNumServers = 3; std::vector<int> ports; - ports.push_back(grpc_pick_unused_port_or_die()); + for (int i = 0; i < kNumServers; ++i) { + ports.push_back(grpc_pick_unused_port_or_die()); + } StartServers(kNumServers, ports); ResetStub("round_robin"); SetNextResolution(ports); - // Send one RPC per backend and make sure they are used in order. - // Note: This relies on the fact that the subchannels are reported in - // state READY in the order in which the addresses are specified, - // which is only true because the backends are all local. - for (size_t i = 0; i < servers_.size(); ++i) { - SendRpc(); - EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; + // Send a number of RPCs, which succeed. + for (size_t i = 0; i < 100; ++i) { + CheckRpcSendOk(); } - // Check LB policy name for the channel. - EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); - // Kill all servers for (size_t i = 0; i < servers_.size(); ++i) { servers_[i]->Shutdown(false); } // Client request should fail. - SendRpc(false); - + CheckRpcSendFailure(); // Bring servers back up on the same port (we aren't recreating the channel). StartServers(kNumServers, ports); - // Client request should succeed. - SendRpc(); + CheckRpcSendOk(); } } // namespace diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 8bada48a2b..e54cd03ca2 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -238,6 +238,18 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { int port = grpc_pick_unused_port_or_die(); server_address_ << "127.0.0.1:" << port; // Setup server + BuildAndStartServer(processor); + } + + void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) { + if (is_server_started_) { + server_->Shutdown(); + BuildAndStartServer(processor); + } + } + + void BuildAndStartServer( + const std::shared_ptr<AuthMetadataProcessor>& processor) { ServerBuilder builder; ConfigureServerBuilder(&builder); auto server_creds = GetCredentialsProvider()->GetServerCredentials( @@ -685,6 +697,20 @@ TEST_P(End2endTest, MultipleRpcs) { } } +TEST_P(End2endTest, ReconnectChannel) { + if (GetParam().inproc) { + return; + } + ResetStub(); + SendRpc(stub_.get(), 1, false); + RestartServer(std::shared_ptr<AuthMetadataProcessor>()); + // It needs more than kConnectivityCheckIntervalMsec time to reconnect the + // channel. + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(1600, GPR_TIMESPAN))); + SendRpc(stub_.get(), 1, false); +} + TEST_P(End2endTest, RequestStreamOneRequest) { ResetStub(); EchoRequest request; @@ -731,6 +757,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { ResetStub(); EchoRequest request; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index b5cff664f6..570a3d1067 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -332,7 +332,8 @@ class GrpclbEnd2endTest : public ::testing::Test { num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds) {} + client_load_reporting_interval_seconds), + kRequestMessage_("Live long and prosper.") {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -378,6 +379,10 @@ class GrpclbEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } + void ResetBackendCounters() { + for (const auto& backend : backends_) backend->ResetCounters(); + } + ClientStats WaitForLoadReports() { ClientStats client_stats; for (const auto& balancer : balancers_) { @@ -386,6 +391,27 @@ class GrpclbEnd2endTest : public ::testing::Test { return client_stats; } + bool SeenAllBackends() { + for (const auto& backend : backends_) { + if (backend->request_count() == 0) return false; + } + return true; + } + + void WaitForAllBackends() { + while (!SeenAllBackends()) { + CheckRpcSendOk(); + } + ResetBackendCounters(); + } + + void WaitForBackend(size_t backend_idx) { + do { + CheckRpcSendOk(); + } while (backends_[backend_idx]->request_count() == 0); + ResetBackendCounters(); + } + struct AddressData { int port; bool is_balancer; @@ -429,20 +455,31 @@ class GrpclbEnd2endTest : public ::testing::Test { balancers_.at(i)->add_response(response, delay_ms); } - std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message, - int num_rpcs, - int timeout_ms = 1000) { - std::vector<std::pair<Status, EchoResponse>> results; + Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) { + const bool local_response = (response == nullptr); + if (local_response) response = new EchoResponse; EchoRequest request; - EchoResponse response; - request.set_message(message); - for (int i = 0; i < num_rpcs; i++) { - ClientContext context; - context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); - Status status = stub_->Echo(&context, request, &response); - results.push_back(std::make_pair(status, response)); + request.set_message(kRequestMessage_); + ClientContext context; + context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); + Status status = stub_->Echo(&context, request, response); + if (local_response) delete response; + return status; + } + + void CheckRpcSendOk(const size_t times = 1) { + for (size_t i = 0; i < times; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); } - return results; + } + + void CheckRpcSendFailure() { + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); } template <typename T> @@ -499,14 +536,12 @@ class GrpclbEnd2endTest : public ::testing::Test { const int client_load_reporting_interval_seconds_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; - std::vector<std::unique_ptr<BackendServiceImpl>> backends_; std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_; - std::vector<ServerThread<BackendService>> backend_servers_; std::vector<ServerThread<BalancerService>> balancer_servers_; - grpc_fake_resolver_response_generator* response_generator_; + const grpc::string kRequestMessage_; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -521,17 +556,12 @@ TEST_F(SingleBalancerTest, Vanilla) { 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Send 100 RPCs per server. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } + + // We need to wait for all backends to come online. + WaitForAllBackends(); + + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { @@ -561,8 +591,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { const auto t0 = system_clock::now(); // Client will block: LB will initially send empty serverlist. - const auto& statuses_and_responses = - SendRpc(kMessage_, num_backends_, kCallDeadlineMs); + CheckRpcSendOk(num_backends_); const auto ellapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - t0); @@ -576,13 +605,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); } - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); @@ -593,70 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, RepeatedServerlist) { - constexpr int kServerlistDelayMs = 100; - - // Send a serverlist right away. - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), - 0); - // ... and the same one a bit later. - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), - kServerlistDelayMs); - - // Send num_backends/2 requests. - auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2); - // only the first half of the backends will receive them. - for (size_t i = 0; i < backends_.size(); ++i) { - if (i < backends_.size() / 2) - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()) - << "for backend #" << i; - else - EXPECT_EQ(0U, backend_servers_[i].service_->request_count()) - << "for backend #" << i; - } - EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - - // Wait for the (duplicated) serverlist update. - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN))); - - // Verify the LB has sent two responses. - EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); - - // Some more calls to complete the total number of backends. - statuses_and_responses = SendRpc( - kMessage_, - num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */); - // Because a duplicated serverlist should have no effect, all backends must - // have been hit once now. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); - } - EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - balancers_[0]->NotifyDoneWithServerlists(); - // The balancer got a single request. - EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); -} - TEST_F(SingleBalancerTest, BackendsRestart) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( @@ -664,21 +622,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) { 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Send 100 RPCs per server. - auto statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - // Each backend should have gotten 100 requests. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(kNumRpcsPerAddress, - backend_servers_[i].service_->request_count()); - } + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); @@ -687,11 +632,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { for (size_t i = 0; i < backends_.size(); ++i) { if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); } - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - } + CheckRpcSendFailure(); for (size_t i = 0; i < num_backends_; ++i) { backends_.emplace_back(new BackendServiceImpl()); backend_servers_.emplace_back(ServerThread<BackendService>( @@ -703,11 +644,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { // TODO(dgq): implement the "backend restart" component as well. We need extra // machinery to either update the LB responses "on the fly" or instruct // backends which ports to restart on. - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - } + CheckRpcSendFailure(); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } @@ -727,13 +664,9 @@ TEST_F(UpdatesTest, UpdateBalancers) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -758,22 +691,12 @@ TEST_F(UpdatesTest, UpdateBalancers) { // Wait until update has been processed, as signaled by the second backend // receiving a request. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); - do { - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } - } while (backend_servers_[1].service_->request_count() == 0); + WaitForBackend(1); backend_servers_[1].service_->ResetCounters(); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); @@ -804,13 +727,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -837,11 +756,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); // Send 10 seconds worth of RPCs do { - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } + CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. @@ -860,11 +775,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { gpr_time_from_millis(10000, GPR_TIMESPAN)); // Send 10 seconds worth of RPCs do { - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } + CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. @@ -886,12 +797,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -903,12 +810,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // This is serviced by the existing RR policy gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should again have gone to the first backend. EXPECT_EQ(20U, backend_servers_[0].service_->request_count()); EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); @@ -935,23 +838,13 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // receiving a request. In the meantime, the client continues to be serviced // (by the first backend) without interruption. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); - do { - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } - } while (backend_servers_[1].service_->request_count() == 0); + WaitForBackend(1); // This is serviced by the existing RR policy backend_servers_[1].service_->ResetCounters(); gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); @@ -974,14 +867,11 @@ TEST_F(SingleBalancerTest, Drop) { 0, BalancerServiceImpl::BuildResponseForBackends( GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}), 0); - // Send 100 RPCs for each server and drop address. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); - + // Send kNumRpcsPerAddress RPCs for each server and drop address. size_t num_drops = 0; - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; + for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) { + EchoResponse response; + const Status status = SendRpc(&response); if (!status.ok() && status.error_message() == "Call dropped by load balancing policy") { ++num_drops; @@ -1010,12 +900,9 @@ TEST_F(SingleBalancerTest, DropAllFirst) { 0, BalancerServiceImpl::BuildResponseForBackends( {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), 0); - const auto& statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); - } + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } TEST_F(SingleBalancerTest, DropAll) { @@ -1028,21 +915,13 @@ TEST_F(SingleBalancerTest, DropAll) { 1000); // First call succeeds. - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } + CheckRpcSendOk(); // But eventually, the update with only dropped servers is processed and calls // fail. + Status status; do { - statuses_and_responses = SendRpc(kMessage_, 1); - ASSERT_EQ(statuses_and_responses.size(), 1UL); - } while (statuses_and_responses[0].first.ok()); - const Status& status = statuses_and_responses[0].first; + status = SendRpc(); + } while (status.ok()); EXPECT_FALSE(status.ok()); EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } @@ -1057,18 +936,8 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); - // Send 100 RPCs per server. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(kNumRpcsPerAddress, @@ -1096,14 +965,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { 0, BalancerServiceImpl::BuildResponseForBackends( GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}), 0); - // Send 100 RPCs for each server and drop address. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); size_t num_drops = 0; - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; + for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) { + EchoResponse response; + const Status status = SendRpc(&response); if (!status.ok() && status.error_message() == "Call dropped by load balancing policy") { ++num_drops; diff --git a/test/cpp/interop/BUILD b/test/cpp/interop/BUILD index 9123bd929e..4f21551ff4 100644 --- a/test/cpp/interop/BUILD +++ b/test/cpp/interop/BUILD @@ -14,14 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary") +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package") -package( - features = [ - "-layering_check", - "-parse_headers", - ], -) +grpc_package(name = "test/cpp/interop") grpc_cc_library( name = "server_helper_lib", diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index 442da38426..985a335f1b 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -14,14 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library") +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary", "grpc_package") -package( - features = [ - "-layering_check", - "-parse_headers", - ], -) +grpc_package(name = "test/cpp/microbenchmarks") grpc_cc_test( name = "noop-benchmark", @@ -33,75 +28,94 @@ grpc_cc_test( grpc_cc_library( name = "helpers", + testonly = 1, srcs = ["helpers.cc"], hdrs = [ "fullstack_context_mutators.h", "fullstack_fixtures.h", "helpers.h", ], + external_deps = [ + "benchmark", + ], deps = [ "//:grpc++_unsecure", "//src/proto/grpc/testing:echo_proto", "//test/core/util:grpc_test_util_unsecure", ], - external_deps = [ - "benchmark", - ], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_closure", + testonly = 1, srcs = ["bm_closure.cc"], deps = [":helpers"], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_cq", + testonly = 1, srcs = ["bm_cq.cc"], deps = [":helpers"], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_cq_multiple_threads", + testonly = 1, srcs = ["bm_cq_multiple_threads.cc"], deps = [":helpers"], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_error", + testonly = 1, srcs = ["bm_error.cc"], deps = [":helpers"], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_fullstack_streaming_ping_pong", - srcs = ["bm_fullstack_streaming_ping_pong.cc"], + testonly = 1, + srcs = [ + "bm_fullstack_streaming_ping_pong.cc", + "fullstack_streaming_ping_pong.h", + ], deps = [":helpers"], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_fullstack_streaming_pump", - srcs = ["bm_fullstack_streaming_pump.cc"], + testonly = 1, + srcs = [ + "bm_fullstack_streaming_pump.cc", + "fullstack_streaming_pump.h", + ], deps = [":helpers"], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_fullstack_trickle", + testonly = 1, srcs = ["bm_fullstack_trickle.cc"], - deps = [":helpers"], - external_deps = [ - "gflags", + deps = [ + ":helpers", + "//test/cpp/util:test_config", ], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_fullstack_unary_ping_pong", - srcs = ["bm_fullstack_unary_ping_pong.cc"], + testonly = 1, + srcs = [ + "bm_fullstack_unary_ping_pong.cc", + "fullstack_unary_ping_pong.h", + ], deps = [":helpers"], ) -grpc_cc_test( +grpc_cc_binary( name = "bm_metadata", + testonly = 1, srcs = ["bm_metadata.cc"], deps = [":helpers"], ) diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 508f7f94d6..cadc9b2a11 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -35,10 +35,11 @@ extern "C" { #include "src/core/ext/filters/http/client/http_client_filter.h" #include "src/core/ext/filters/http/message_compress/message_compress_filter.h" #include "src/core/ext/filters/http/server/http_server_filter.h" -#include "src/core/ext/filters/load_reporting/load_reporting_filter.h" +#include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h" #include "src/core/ext/filters/message_size/message_size_filter.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" @@ -396,10 +397,6 @@ grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, void DestroyChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} -char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - return gpr_strdup("peer"); -} - void GetChannelInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *channel_info) {} @@ -412,7 +409,6 @@ static const grpc_channel_filter dummy_filter = {StartTransportStreamOp, 0, InitChannelElem, DestroyChannelElem, - GetPeer, GetChannelInfo, "dummy_filter"}; @@ -459,11 +455,6 @@ void DestroyStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, /* implementation of grpc_transport_destroy */ void Destroy(grpc_exec_ctx *exec_ctx, grpc_transport *self) {} -/* implementation of grpc_transport_get_peer */ -char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_transport *self) { - return gpr_strdup("transport_peer"); -} - /* implementation of grpc_transport_get_endpoint */ grpc_endpoint *GetEndpoint(grpc_exec_ctx *exec_ctx, grpc_transport *self) { return nullptr; @@ -473,7 +464,7 @@ static const grpc_transport_vtable dummy_transport_vtable = { 0, "dummy_http2", InitStream, SetPollset, SetPollsetSet, PerformStreamOp, PerformOp, DestroyStream, Destroy, - GetPeer, GetEndpoint}; + GetEndpoint}; static grpc_transport dummy_transport = {&dummy_transport_vtable}; @@ -629,7 +620,7 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, SendEmptyMetadata); typedef Fixture<&grpc_message_size_filter, CHECKS_NOT_LAST> MessageSizeFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata); -typedef Fixture<&grpc_load_reporting_filter, CHECKS_NOT_LAST> +typedef Fixture<&grpc_server_load_reporting_filter, CHECKS_NOT_LAST> LoadReportingFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata); @@ -639,18 +630,22 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata); namespace isolated_call_filter { +typedef struct { grpc_call_combiner *call_combiner; } call_data; + static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { + call_data *calld = static_cast<call_data *>(elem->call_data); if (op->recv_initial_metadata) { - GRPC_CLOSURE_SCHED( - exec_ctx, + GRPC_CALL_COMBINER_START( + exec_ctx, calld->call_combiner, op->payload->recv_initial_metadata.recv_initial_metadata_ready, - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, "recv_initial_metadata"); } if (op->recv_message) { - GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); + GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, + op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE, "recv_message"); } GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_NONE); } @@ -667,6 +662,8 @@ static void StartTransportOp(grpc_exec_ctx *exec_ctx, static grpc_error *InitCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { + call_data *calld = static_cast<call_data *>(elem->call_data); + calld->call_combiner = args->call_combiner; return GRPC_ERROR_NONE; } @@ -687,24 +684,19 @@ grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, void DestroyChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} -char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - return gpr_strdup("peer"); -} - void GetChannelInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *channel_info) {} static const grpc_channel_filter isolated_call_filter = { StartTransportStreamOp, StartTransportOp, - 0, + sizeof(call_data), InitCallElem, SetPollsetOrPollsetSet, DestroyCallElem, 0, InitChannelElem, DestroyChannelElem, - GetPeer, GetChannelInfo, "isolated_call_filter"}; } // namespace isolated_call_filter diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index cb113c5254..070034fe33 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -29,6 +29,7 @@ extern "C" { #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" @@ -154,23 +155,59 @@ class Fixture { grpc_transport *t_; }; -static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template <class F> +std::unique_ptr<Closure> MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + GRPC_CLOSURE_INIT(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr<Closure>(new C(f, sched)); +} + +template <class F> +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + delete static_cast<C *>(arg); + } + }; + auto *c = new C{f}; + return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); +} class Stream { public: Stream(Fixture *f) : f_(f) { - GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); stream_size_ = grpc_transport_stream_size(f->transport()); stream_ = gpr_malloc(stream_size_); arena_ = gpr_arena_create(4096); } ~Stream() { + gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_free(stream_); gpr_arena_destroy(arena_); } void Init(benchmark::State &state) { + GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this, + "test_stream"); + gpr_event_init(&done_); memset(stream_, 0, stream_size_); if ((state.iterations() & 0xffff) == 0) { gpr_arena_destroy(arena_); @@ -181,13 +218,17 @@ class Stream { NULL, arena_); } - void DestroyThen(grpc_closure *closure) { - grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), - static_cast<grpc_stream *>(stream_), closure); + void DestroyThen(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { + destroy_closure_ = closure; +#ifndef NDEBUG + grpc_stream_unref(exec_ctx, &refcount_, "DestroyThen"); +#else + grpc_stream_unref(exec_ctx, &refcount_); +#endif } - void Op(grpc_transport_stream_op_batch *op) { - grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(), + void Op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op) { + grpc_transport_perform_stream_op(exec_ctx, f_->transport(), static_cast<grpc_stream *>(stream_), op); } @@ -196,48 +237,24 @@ class Stream { } private: + static void FinishDestroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + auto stream = static_cast<Stream *>(arg); + grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(), + static_cast<grpc_stream *>(stream->stream_), + stream->destroy_closure_); + gpr_event_set(&stream->done_, (void *)1); + } + Fixture *f_; grpc_stream_refcount refcount_; gpr_arena *arena_; size_t stream_size_; void *stream_; + grpc_closure *destroy_closure_ = nullptr; + gpr_event done_; }; -class Closure : public grpc_closure { - public: - virtual ~Closure() {} -}; - -template <class F> -std::unique_ptr<Closure> MakeClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public Closure { - C(const F &f, grpc_closure_scheduler *sched) : f_(f) { - GRPC_CLOSURE_INIT(this, Execute, this, sched); - } - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast<C *>(arg)->f_(exec_ctx, error); - } - }; - return std::unique_ptr<Closure>(new C(f, sched)); -} - -template <class F> -grpc_closure *MakeOnceClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public grpc_closure { - C(const F &f) : f_(f) {} - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast<C *>(arg)->f_(exec_ctx, error); - delete static_cast<C *>(arg); - } - }; - auto *c = new C{f}; - return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); -} - //////////////////////////////////////////////////////////////////////////////// // Benchmarks // @@ -246,11 +263,18 @@ static void BM_StreamCreateDestroy(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + memset(&op, 0, sizeof(op)); + op.cancel_stream = true; + op.payload = &op_payload; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; std::unique_ptr<Closure> next = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; s.Init(state); - s.DestroyThen(next.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, next.get()); }); GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -286,6 +310,7 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { Stream s(&f); grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload op_payload; + memset(&op_payload, 0, sizeof(op_payload)); std::unique_ptr<Closure> start; std::unique_ptr<Closure> done; @@ -313,14 +338,14 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { op.on_complete = done.get(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; - s.Op(&op); + s.Op(exec_ctx, &op); }); done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen(start.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, start.get()); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -337,6 +362,7 @@ static void BM_TransportEmptyOp(benchmark::State &state) { s.Init(state); grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload op_payload; + memset(&op_payload, 0, sizeof(op_payload)); auto reset_op = [&]() { memset(&op, 0, sizeof(op)); op.payload = &op_payload; @@ -346,24 +372,31 @@ static void BM_TransportEmptyOp(benchmark::State &state) { if (!state.KeepRunning()) return; reset_op(); op.on_complete = c.get(); - s.Op(&op); + s.Op(exec_ctx, &op); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + reset_op(); + op.cancel_stream = true; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); } BENCHMARK(BM_TransportEmptyOp); +std::vector<std::unique_ptr<gpr_event>> done_events; + static void BM_TransportStreamSend(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); - Stream s(&f); - s.Init(state); + auto s = std::unique_ptr<Stream>(new Stream(&f)); + s->Init(state); grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload op_payload; + memset(&op_payload, 0, sizeof(op_payload)); auto reset_op = [&]() { memset(&op, 0, sizeof(op)); op.payload = &op_payload; @@ -387,11 +420,17 @@ static void BM_TransportStreamSend(benchmark::State &state) { grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); } + gpr_event *bm_done = new gpr_event; + gpr_event_init(bm_done); + std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { - if (!state.KeepRunning()) return; + if (!state.KeepRunning()) { + gpr_event_set(bm_done, (void *)1); + return; + } // force outgoing window to be yuge - s.chttp2_stream()->flow_control.remote_window_delta = + s->chttp2_stream()->flow_control.remote_window_delta = 1024 * 1024 * 1024; f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024; grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); @@ -399,23 +438,27 @@ static void BM_TransportStreamSend(benchmark::State &state) { op.on_complete = c.get(); op.send_message = true; op.payload->send_message.send_message = &send_stream.base; - s.Op(&op); + s->Op(exec_ctx, &op); }); reset_op(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; op.on_complete = c.get(); - s.Op(&op); + s->Op(f.exec_ctx(), &op); f.FlushExecCtx(); + gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME)); + done_events.emplace_back(bm_done); + reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s->Op(f.exec_ctx(), &op); + s->DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); + s.reset(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); grpc_slice_buffer_destroy(&send_buffer); @@ -485,6 +528,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { Stream s(&f); s.Init(state); grpc_transport_stream_op_batch_payload op_payload; + memset(&op_payload, 0, sizeof(op_payload)); grpc_transport_stream_op_batch op; grpc_byte_stream *recv_stream; grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384); @@ -531,7 +575,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.recv_message = true; op.payload->recv_message.recv_message = &recv_stream; op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(&op); + s.Op(exec_ctx, &op); f.PushInput(grpc_slice_ref(incoming_data)); }); @@ -574,7 +618,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.payload->recv_initial_metadata.recv_initial_metadata_ready = do_nothing.get(); op.on_complete = c.get(); - s.Op(&op); + s.Op(f.exec_ctx(), &op); f.PushInput(SLICE_FROM_BUFFER( "\x00\x00\x00\x04\x00\x00\x00\x00\x00" // Generated using: @@ -592,9 +636,9 @@ static void BM_TransportStreamRecv(benchmark::State &state) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index f109fe6251..9d71d3990d 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -38,6 +38,8 @@ struct grpc_pollset { namespace grpc { namespace testing { +auto& force_library_initialization = Library::get(); + static void* g_tag = (void*)(intptr_t)10; // Some random number static grpc_completion_queue* g_cq; static grpc_event_engine_vtable g_vtable; diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc index 0712a40018..655e032faf 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc @@ -18,13 +18,7 @@ /* Benchmark gRPC end2end in various configurations */ -#include <benchmark/benchmark.h> -#include <sstream> -#include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" -#include "test/cpp/microbenchmarks/fullstack_fixtures.h" +#include "test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h" namespace grpc { namespace testing { @@ -33,365 +27,6 @@ namespace testing { auto& force_library_initialization = Library::get(); /******************************************************************************* - * BENCHMARKING KERNELS - */ - -static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } - -// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of -// messages in each call) in a loop on a single channel -// -// First parmeter (i.e state.range(0)): Message size (in bytes) to use -// Second parameter (i.e state.range(1)): Number of ping pong messages. -// Note: One ping-pong means two messages (one from client to server and -// the other from server to client): -template <class Fixture, class ClientContextMutator, class ServerContextMutator> -static void BM_StreamingPingPong(benchmark::State& state) { - const int msg_size = state.range(0); - const int max_ping_pongs = state.range(1); - - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoResponse send_response; - EchoResponse recv_response; - EchoRequest send_request; - EchoRequest recv_request; - - if (msg_size > 0) { - send_request.set_message(std::string(msg_size, 'a')); - send_response.set_message(std::string(msg_size, 'b')); - } - - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - - while (state.KeepRunning()) { - ServerContext svr_ctx; - ServerContextMutator svr_ctx_mut(&svr_ctx); - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - - ClientContext cli_ctx; - ClientContextMutator cli_ctx_mut(&cli_ctx); - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - - // Establish async stream between client side and server side - void* t; - bool ok; - int need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - // Send 'max_ping_pongs' number of ping pong messages - int ping_pong_cnt = 0; - while (ping_pong_cnt < max_ping_pongs) { - request_rw->Write(send_request, tag(0)); // Start client send - response_rw.Read(&recv_request, tag(1)); // Start server recv - request_rw->Read(&recv_response, tag(2)); // Start client recv - - need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - - // If server recv is complete, start the server send operation - if (i == 1) { - response_rw.Write(send_response, tag(3)); - } - - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - ping_pong_cnt++; - } - - request_rw->WritesDone(tag(0)); - response_rw.Finish(Status::OK, tag(1)); - - Status recv_status; - request_rw->Finish(&recv_status, tag(2)); - - need_tags = (1 << 0) | (1 << 1) | (1 << 2); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - GPR_ASSERT(recv_status.ok()); - } - } - - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); -} - -// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop -// First parmeter (i.e state.range(0)): Message size (in bytes) to use -template <class Fixture, class ClientContextMutator, class ServerContextMutator> -static void BM_StreamingPingPongMsgs(benchmark::State& state) { - const int msg_size = state.range(0); - - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoResponse send_response; - EchoResponse recv_response; - EchoRequest send_request; - EchoRequest recv_request; - - if (msg_size > 0) { - send_request.set_message(std::string(msg_size, 'a')); - send_response.set_message(std::string(msg_size, 'b')); - } - - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - - ServerContext svr_ctx; - ServerContextMutator svr_ctx_mut(&svr_ctx); - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - - ClientContext cli_ctx; - ClientContextMutator cli_ctx_mut(&cli_ctx); - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - - // Establish async stream between client side and server side - void* t; - bool ok; - int need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - request_rw->Write(send_request, tag(0)); // Start client send - response_rw.Read(&recv_request, tag(1)); // Start server recv - request_rw->Read(&recv_response, tag(2)); // Start client recv - - need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - - // If server recv is complete, start the server send operation - if (i == 1) { - response_rw.Write(send_response, tag(3)); - } - - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - } - - request_rw->WritesDone(tag(0)); - response_rw.Finish(Status::OK, tag(1)); - Status recv_status; - request_rw->Finish(&recv_status, tag(2)); - - need_tags = (1 << 0) | (1 << 1) | (1 << 2); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - GPR_ASSERT(recv_status.ok()); - } - - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(msg_size * state.iterations() * 2); -} - -// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of -// messages in each call) in a loop on a single channel. Different from -// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast, -// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving -// sendmsg syscalls for streaming by coalescing 1. initial metadata with first -// message; 2. final streaming message with trailing metadata. -// -// First parmeter (i.e state.range(0)): Message size (in bytes) to use -// Second parameter (i.e state.range(1)): Number of ping pong messages. -// Note: One ping-pong means two messages (one from client to server and -// the other from server to client): -// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish -// API and WriteLast API for server. -template <class Fixture, class ClientContextMutator, class ServerContextMutator> -static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { - const int msg_size = state.range(0); - const int max_ping_pongs = state.range(1); - // This options is used to test out server API: WriteLast and WriteAndFinish - // respectively, since we can not use both of them on server side at the same - // time. Value 1 means we are testing out the WriteAndFinish API, and - // otherwise we are testing out the WriteLast API. - const int write_and_finish = state.range(2); - - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoResponse send_response; - EchoResponse recv_response; - EchoRequest send_request; - EchoRequest recv_request; - - if (msg_size > 0) { - send_request.set_message(std::string(msg_size, 'a')); - send_response.set_message(std::string(msg_size, 'b')); - } - - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - - while (state.KeepRunning()) { - ServerContext svr_ctx; - ServerContextMutator svr_ctx_mut(&svr_ctx); - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - - ClientContext cli_ctx; - ClientContextMutator cli_ctx_mut(&cli_ctx); - cli_ctx.set_initial_metadata_corked(true); - // tag:1 here will never comes up, since we are not performing any op due - // to initial metadata coalescing. - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - - void* t; - bool ok; - int need_tags; - - // Send 'max_ping_pongs' number of ping pong messages - int ping_pong_cnt = 0; - while (ping_pong_cnt < max_ping_pongs) { - if (ping_pong_cnt == max_ping_pongs - 1) { - request_rw->WriteLast(send_request, WriteOptions(), tag(2)); - } else { - request_rw->Write(send_request, tag(2)); // Start client send - } - - need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5); - - if (ping_pong_cnt == 0) { - // wait for the server call structure (call_hook, etc.) to be - // initialized (async stream between client side and server side - // established). It is necessary when client init metadata is - // coalesced - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - while ((int)(intptr_t)t != 0) { - // In some cases tag:2 comes before tag:0 (write tag comes out - // first), this while loop is to make sure get tag:0. - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - } - } - - response_rw.Read(&recv_request, tag(3)); // Start server recv - request_rw->Read(&recv_response, tag(4)); // Start client recv - - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - - // If server recv is complete, start the server send operation - if (i == 3) { - if (ping_pong_cnt == max_ping_pongs - 1) { - if (write_and_finish == 1) { - response_rw.WriteAndFinish(send_response, WriteOptions(), - Status::OK, tag(5)); - } else { - response_rw.WriteLast(send_response, WriteOptions(), tag(5)); - // WriteLast buffers the write, so neither server write op nor - // client read op will finish inside the while loop. - need_tags &= ~(1 << 4); - need_tags &= ~(1 << 5); - } - } else { - response_rw.Write(send_response, tag(5)); - } - } - - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - ping_pong_cnt++; - } - - if (max_ping_pongs == 0) { - need_tags = (1 << 6) | (1 << 7) | (1 << 8); - } else { - if (write_and_finish == 1) { - need_tags = (1 << 8); - } else { - // server's buffered write and the client's read of the buffered write - // tags should come up. - need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8); - } - } - - // No message write or initial metadata write happened yet. - if (max_ping_pongs == 0) { - request_rw->WritesDone(tag(6)); - // wait for server call data structure(call_hook, etc.) to be - // initialized, since initial metadata is corked. - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - while ((int)(intptr_t)t != 0) { - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - } - response_rw.Finish(Status::OK, tag(7)); - } else { - if (write_and_finish != 1) { - response_rw.Finish(Status::OK, tag(7)); - } - } - - Status recv_status; - request_rw->Finish(&recv_status, tag(8)); - - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - - GPR_ASSERT(recv_status.ok()); - } - } - - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); -} - -/******************************************************************************* * CONFIGURATIONS */ diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc index 6fbf9da0ad..c7ceacd320 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc @@ -18,157 +18,18 @@ /* Benchmark gRPC end2end in various configurations */ -#include <benchmark/benchmark.h> -#include <sstream> -#include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" -#include "test/cpp/microbenchmarks/fullstack_fixtures.h" +#include "test/cpp/microbenchmarks/fullstack_streaming_pump.h" namespace grpc { namespace testing { -// force library initialization -auto& force_library_initialization = Library::get(); - -/******************************************************************************* - * BENCHMARKING KERNELS - */ - -static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } - -template <class Fixture> -static void BM_PumpStreamClientToServer(benchmark::State& state) { - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoRequest send_request; - EchoRequest recv_request; - if (state.range(0) > 0) { - send_request.set_message(std::string(state.range(0), 'a')); - } - Status recv_status; - ServerContext svr_ctx; - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - ClientContext cli_ctx; - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); - void* t; - bool ok; - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - response_rw.Read(&recv_request, tag(0)); - while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - request_rw->Write(send_request, tag(1)); - while (true) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - response_rw.Read(&recv_request, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); - } - } - } - request_rw->WritesDone(tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - response_rw.Finish(Status::OK, tag(0)); - Status final_status; - request_rw->Finish(&final_status, tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - GPR_ASSERT(final_status.ok()); - } - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); -} - -template <class Fixture> -static void BM_PumpStreamServerToClient(benchmark::State& state) { - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - { - EchoResponse send_response; - EchoResponse recv_response; - if (state.range(0) > 0) { - send_response.set_message(std::string(state.range(0), 'a')); - } - Status recv_status; - ServerContext svr_ctx; - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - ClientContext cli_ctx; - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); - void* t; - bool ok; - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - request_rw->Read(&recv_response, tag(0)); - while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - response_rw.Write(send_response, tag(1)); - while (true) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - request_rw->Read(&recv_response, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); - } - } - } - response_rw.Finish(Status::OK, tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - } - fixture->Finish(state); - fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); -} - /******************************************************************************* * CONFIGURATIONS */ +// force library initialization +auto& force_library_initialization = Library::get(); + BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP) ->Range(0, 128 * 1024 * 1024); BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS) diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 5c44b9751f..59fb29dd60 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -26,6 +26,7 @@ #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" +#include "test/cpp/util/test_config.h" extern "C" { #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" @@ -104,7 +105,7 @@ class TrickledCHTTP2 : public EndpointPairFixture { (double)state.iterations()); } - void Log(int64_t iteration) { + void Log(int64_t iteration) GPR_ATTRIBUTE_NO_TSAN { auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_); grpc_chttp2_transport* client = reinterpret_cast<grpc_chttp2_transport*>(client_transport_); @@ -192,7 +193,8 @@ class TrickledCHTTP2 : public EndpointPairFixture { return p; } - void UpdateStats(grpc_chttp2_transport* t, Stats* s, size_t backlog) { + void UpdateStats(grpc_chttp2_transport* t, Stats* s, + size_t backlog) GPR_ATTRIBUTE_NO_TSAN { if (backlog == 0) { if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != NULL) { s->streams_stalled_due_to_stream_flow_control++; @@ -420,6 +422,6 @@ BENCHMARK(BM_PumpUnbalancedUnary_Trickle)->Apply(UnaryTrickleArgs); int main(int argc, char** argv) { ::benchmark::Initialize(&argc, argv); - ::google::ParseCommandLineFlags(&argc, &argv, false); + ::grpc::testing::InitTest(&argc, &argv, false); ::benchmark::RunSpecifiedBenchmarks(); } diff --git a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc index 9af751245f..fa41d114c0 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc @@ -18,13 +18,7 @@ /* Benchmark gRPC end2end in various configurations */ -#include <benchmark/benchmark.h> -#include <sstream> -#include "src/core/lib/profiling/timers.h" -#include "src/cpp/client/create_channel_internal.h" -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" -#include "test/cpp/microbenchmarks/fullstack_fixtures.h" +#include "test/cpp/microbenchmarks/fullstack_unary_ping_pong.h" namespace grpc { namespace testing { @@ -33,85 +27,6 @@ namespace testing { auto& force_library_initialization = Library::get(); /******************************************************************************* - * BENCHMARKING KERNELS - */ - -static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } - -template <class Fixture, class ClientContextMutator, class ServerContextMutator> -static void BM_UnaryPingPong(benchmark::State& state) { - EchoTestService::AsyncService service; - std::unique_ptr<Fixture> fixture(new Fixture(&service)); - EchoRequest send_request; - EchoResponse send_response; - EchoResponse recv_response; - if (state.range(0) > 0) { - send_request.set_message(std::string(state.range(0), 'a')); - } - if (state.range(1) > 0) { - send_response.set_message(std::string(state.range(1), 'a')); - } - Status recv_status; - struct ServerEnv { - ServerContext ctx; - EchoRequest recv_request; - grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; - ServerEnv() : response_writer(&ctx) {} - }; - uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; - ServerEnv* server_env[2] = { - reinterpret_cast<ServerEnv*>(server_env_buffer), - reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; - new (server_env[0]) ServerEnv; - new (server_env[1]) ServerEnv; - service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, - &server_env[0]->response_writer, fixture->cq(), - fixture->cq(), tag(0)); - service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, - &server_env[1]->response_writer, fixture->cq(), - fixture->cq(), tag(1)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); - while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - recv_response.Clear(); - ClientContext cli_ctx; - ClientContextMutator cli_ctx_mut(&cli_ctx); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( - stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); - void* t; - bool ok; - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - GPR_ASSERT(t == tag(0) || t == tag(1)); - intptr_t slot = reinterpret_cast<intptr_t>(t); - ServerEnv* senv = server_env[slot]; - ServerContextMutator svr_ctx_mut(&senv->ctx); - senv->response_writer.Finish(send_response, Status::OK, tag(3)); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - for (int i = (1 << 3) | (1 << 4); i != 0;) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int tagnum = (int)reinterpret_cast<intptr_t>(t); - GPR_ASSERT(i & (1 << tagnum)); - i -= 1 << tagnum; - } - GPR_ASSERT(recv_status.ok()); - - senv->~ServerEnv(); - senv = new (senv) ServerEnv(); - service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, - fixture->cq(), fixture->cq(), tag(slot)); - } - fixture->Finish(state); - fixture.reset(); - server_env[0]->~ServerEnv(); - server_env[1]->~ServerEnv(); - state.SetBytesProcessed(state.range(0) * state.iterations() + - state.range(1) * state.iterations()); -} - -/******************************************************************************* * CONFIGURATIONS */ diff --git a/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h new file mode 100644 index 0000000000..ff1f966753 --- /dev/null +++ b/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h @@ -0,0 +1,396 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Benchmark gRPC end2end in various configurations */ + +#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H +#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H + +#include <benchmark/benchmark.h> +#include <sstream> +#include "src/core/lib/profiling/timers.h" +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" +#include "test/cpp/microbenchmarks/fullstack_fixtures.h" + +namespace grpc { +namespace testing { + +/******************************************************************************* + * BENCHMARKING KERNELS + */ + +static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } + +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPong(benchmark::State& state) { + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); + } + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + + while (state.KeepRunning()) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + // Establish async stream between client side and server side + void* t; + bool ok; + int need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + ping_pong_cnt++; + } + + request_rw->WritesDone(tag(0)); + response_rw.Finish(Status::OK, tag(1)); + + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); + } + } + + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); +} + +// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongMsgs(benchmark::State& state) { + const int msg_size = state.range(0); + + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); + } + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + // Establish async stream between client side and server side + void* t; + bool ok; + int need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + } + + request_rw->WritesDone(tag(0)); + response_rw.Finish(Status::OK, tag(1)); + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); + } + + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(msg_size * state.iterations() * 2); +} + +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel. Different from +// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast, +// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving +// sendmsg syscalls for streaming by coalescing 1. initial metadata with first +// message; 2. final streaming message with trailing metadata. +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish +// API and WriteLast API for server. +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + // This options is used to test out server API: WriteLast and WriteAndFinish + // respectively, since we can not use both of them on server side at the same + // time. Value 1 means we are testing out the WriteAndFinish API, and + // otherwise we are testing out the WriteLast API. + const int write_and_finish = state.range(2); + + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); + } + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + + while (state.KeepRunning()) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + cli_ctx.set_initial_metadata_corked(true); + // tag:1 here will never comes up, since we are not performing any op due + // to initial metadata coalescing. + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + void* t; + bool ok; + int need_tags; + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + if (ping_pong_cnt == max_ping_pongs - 1) { + request_rw->WriteLast(send_request, WriteOptions(), tag(2)); + } else { + request_rw->Write(send_request, tag(2)); // Start client send + } + + need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5); + + if (ping_pong_cnt == 0) { + // wait for the server call structure (call_hook, etc.) to be + // initialized (async stream between client side and server side + // established). It is necessary when client init metadata is + // coalesced + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + while ((int)(intptr_t)t != 0) { + // In some cases tag:2 comes before tag:0 (write tag comes out + // first), this while loop is to make sure get tag:0. + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + } + } + + response_rw.Read(&recv_request, tag(3)); // Start server recv + request_rw->Read(&recv_response, tag(4)); // Start client recv + + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 3) { + if (ping_pong_cnt == max_ping_pongs - 1) { + if (write_and_finish == 1) { + response_rw.WriteAndFinish(send_response, WriteOptions(), + Status::OK, tag(5)); + } else { + response_rw.WriteLast(send_response, WriteOptions(), tag(5)); + // WriteLast buffers the write, so neither server write op nor + // client read op will finish inside the while loop. + need_tags &= ~(1 << 4); + need_tags &= ~(1 << 5); + } + } else { + response_rw.Write(send_response, tag(5)); + } + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + ping_pong_cnt++; + } + + if (max_ping_pongs == 0) { + need_tags = (1 << 6) | (1 << 7) | (1 << 8); + } else { + if (write_and_finish == 1) { + need_tags = (1 << 8); + } else { + // server's buffered write and the client's read of the buffered write + // tags should come up. + need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8); + } + } + + // No message write or initial metadata write happened yet. + if (max_ping_pongs == 0) { + request_rw->WritesDone(tag(6)); + // wait for server call data structure(call_hook, etc.) to be + // initialized, since initial metadata is corked. + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + while ((int)(intptr_t)t != 0) { + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + } + response_rw.Finish(Status::OK, tag(7)); + } else { + if (write_and_finish != 1) { + response_rw.Finish(Status::OK, tag(7)); + } + } + + Status recv_status; + request_rw->Finish(&recv_status, tag(8)); + + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); + } + } + + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); +} +} // namespace testing +} // namespace grpc + +#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H diff --git a/test/cpp/microbenchmarks/fullstack_streaming_pump.h b/test/cpp/microbenchmarks/fullstack_streaming_pump.h new file mode 100644 index 0000000000..f9db212b02 --- /dev/null +++ b/test/cpp/microbenchmarks/fullstack_streaming_pump.h @@ -0,0 +1,170 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Benchmark gRPC end2end in various configurations */ + +#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H +#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H + +#include <benchmark/benchmark.h> +#include <sstream> +#include "src/core/lib/profiling/timers.h" +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" +#include "test/cpp/microbenchmarks/fullstack_fixtures.h" + +namespace grpc { +namespace testing { + +/******************************************************************************* + * BENCHMARKING KERNELS + */ + +static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } + +template <class Fixture> +static void BM_PumpStreamClientToServer(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoRequest send_request; + EchoRequest recv_request; + if (state.range(0) > 0) { + send_request.set_message(std::string(state.range(0), 'a')); + } + Status recv_status; + ServerContext svr_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + int need_tags = (1 << 0) | (1 << 1); + void* t; + bool ok; + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + response_rw.Read(&recv_request, tag(0)); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + request_rw->Write(send_request, tag(1)); + while (true) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + if (t == tag(0)) { + response_rw.Read(&recv_request, tag(0)); + } else if (t == tag(1)) { + break; + } else { + GPR_ASSERT(false); + } + } + } + request_rw->WritesDone(tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + response_rw.Finish(Status::OK, tag(0)); + Status final_status; + request_rw->Finish(&final_status, tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + GPR_ASSERT(final_status.ok()); + } + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(state.range(0) * state.iterations()); +} + +template <class Fixture> +static void BM_PumpStreamServerToClient(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + if (state.range(0) > 0) { + send_response.set_message(std::string(state.range(0), 'a')); + } + Status recv_status; + ServerContext svr_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + int need_tags = (1 << 0) | (1 << 1); + void* t; + bool ok; + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + request_rw->Read(&recv_response, tag(0)); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + response_rw.Write(send_response, tag(1)); + while (true) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + if (t == tag(0)) { + request_rw->Read(&recv_response, tag(0)); + } else if (t == tag(1)) { + break; + } else { + GPR_ASSERT(false); + } + } + } + response_rw.Finish(Status::OK, tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + } + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(state.range(0) * state.iterations()); +} +} // namespace testing +} // namespace grpc + +#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H diff --git a/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h new file mode 100644 index 0000000000..76d278b2a0 --- /dev/null +++ b/test/cpp/microbenchmarks/fullstack_unary_ping_pong.h @@ -0,0 +1,116 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* Benchmark gRPC end2end in various configurations */ + +#ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H +#define TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H + +#include <benchmark/benchmark.h> +#include <sstream> +#include "src/core/lib/profiling/timers.h" +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/microbenchmarks/fullstack_context_mutators.h" +#include "test/cpp/microbenchmarks/fullstack_fixtures.h" + +namespace grpc { +namespace testing { + +/******************************************************************************* + * BENCHMARKING KERNELS + */ + +static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } + +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_UnaryPingPong(benchmark::State& state) { + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + EchoRequest send_request; + EchoResponse send_response; + EchoResponse recv_response; + if (state.range(0) > 0) { + send_request.set_message(std::string(state.range(0), 'a')); + } + if (state.range(1) > 0) { + send_response.set_message(std::string(state.range(1), 'a')); + } + Status recv_status; + struct ServerEnv { + ServerContext ctx; + EchoRequest recv_request; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; + ServerEnv() : response_writer(&ctx) {} + }; + uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; + ServerEnv* server_env[2] = { + reinterpret_cast<ServerEnv*>(server_env_buffer), + reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; + new (server_env[0]) ServerEnv; + new (server_env[1]) ServerEnv; + service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, + &server_env[0]->response_writer, fixture->cq(), + fixture->cq(), tag(0)); + service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, + &server_env[1]->response_writer, fixture->cq(), + fixture->cq(), tag(1)); + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + while (state.KeepRunning()) { + GPR_TIMER_SCOPE("BenchmarkCycle", 0); + recv_response.Clear(); + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); + void* t; + bool ok; + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + GPR_ASSERT(t == tag(0) || t == tag(1)); + intptr_t slot = reinterpret_cast<intptr_t>(t); + ServerEnv* senv = server_env[slot]; + ServerContextMutator svr_ctx_mut(&senv->ctx); + senv->response_writer.Finish(send_response, Status::OK, tag(3)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + for (int i = (1 << 3) | (1 << 4); i != 0;) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int tagnum = (int)reinterpret_cast<intptr_t>(t); + GPR_ASSERT(i & (1 << tagnum)); + i -= 1 << tagnum; + } + GPR_ASSERT(recv_status.ok()); + + senv->~ServerEnv(); + senv = new (senv) ServerEnv(); + service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, + fixture->cq(), fixture->cq(), tag(slot)); + } + fixture->Finish(state); + fixture.reset(); + server_env[0]->~ServerEnv(); + server_env[1]->~ServerEnv(); + state.SetBytesProcessed(state.range(0) * state.iterations() + + state.range(1) * state.iterations()); +} +} // namespace testing +} // namespace grpc + +#endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_UNARY_PING_PONG_H diff --git a/test/cpp/microbenchmarks/helpers.cc b/test/cpp/microbenchmarks/helpers.cc index 0c10d6c4ce..b0caa48cd0 100644 --- a/test/cpp/microbenchmarks/helpers.cc +++ b/test/cpp/microbenchmarks/helpers.cc @@ -29,6 +29,20 @@ void TrackCounters::Finish(benchmark::State &state) { } void TrackCounters::AddToLabel(std::ostream &out, benchmark::State &state) { + grpc_stats_data stats_end; + grpc_stats_collect(&stats_end); + grpc_stats_data stats; + grpc_stats_diff(&stats_end, &stats_begin_, &stats); + for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + out << " " << grpc_stats_counter_name[i] + << "/iter:" << ((double)stats.counters[i] / (double)state.iterations()); + } + for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + out << " " << grpc_stats_histogram_name[i] << "-median:" + << grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 50.0) + << " " << grpc_stats_histogram_name[i] << "-99p:" + << grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 99.0); + } #ifdef GPR_LOW_LEVEL_COUNTERS grpc_memory_counters counters_at_end = grpc_memory_counters_snapshot(); out << " locks/iter:" << ((double)(gpr_atm_no_barrier_load(&gpr_mu_locks) - diff --git a/test/cpp/microbenchmarks/helpers.h b/test/cpp/microbenchmarks/helpers.h index c81d95a4ea..07dd611709 100644 --- a/test/cpp/microbenchmarks/helpers.h +++ b/test/cpp/microbenchmarks/helpers.h @@ -23,6 +23,7 @@ extern "C" { #include <grpc/support/port_platform.h> +#include "src/core/lib/debug/stats.h" #include "test/core/util/memory_counters.h" } @@ -62,10 +63,12 @@ extern "C" gpr_atm gpr_now_call_count; class TrackCounters { public: + TrackCounters() { grpc_stats_collect(&stats_begin_); } virtual void Finish(benchmark::State& state); virtual void AddToLabel(std::ostream& out, benchmark::State& state); private: + grpc_stats_data stats_begin_; #ifdef GPR_LOW_LEVEL_COUNTERS const size_t mu_locks_at_start_ = gpr_atm_no_barrier_load(&gpr_mu_locks); const size_t atm_cas_at_start_ = diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD index b3348b76fa..3352269517 100644 --- a/test/cpp/qps/BUILD +++ b/test/cpp/qps/BUILD @@ -14,14 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary") +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary", "grpc_package") -package( - features = [ - "-layering_check", - "-parse_headers", - ], -) +grpc_package(name = "test/cpp/qps") grpc_cc_library( name = "parse_json", @@ -51,6 +46,7 @@ grpc_cc_library( ":usage_timer", "//:grpc", "//:grpc++", + "//:grpc++_core_stats", "//src/proto/grpc/testing:control_proto", "//src/proto/grpc/testing:payloads_proto", "//src/proto/grpc/testing:services_proto", diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 6c4d92e859..7fbaf63492 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -31,10 +31,10 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "src/core/lib/surface/completion_queue.h" #include "src/proto/grpc/testing/payloads.pb.h" #include "src/proto/grpc/testing/services.grpc.pb.h" +#include "src/cpp/util/core_stats.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/usage_timer.h" @@ -173,6 +173,9 @@ class Client { timer_result = timer_->Mark(); } + grpc_stats_data core_stats; + grpc_stats_collect(&core_stats); + ClientStats stats; latencies.FillProto(stats.mutable_latencies()); for (StatusHistogram::const_iterator it = statuses.begin(); @@ -185,6 +188,7 @@ class Client { stats.set_time_system(timer_result.system); stats.set_time_user(timer_result.user); stats.set_cq_poll_count(poll_count); + CoreStatsToProto(core_stats, stats.mutable_core_stats()); return stats; } @@ -370,12 +374,11 @@ class ClientImpl : public Client { ClientImpl(const ClientConfig& config, std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) - : cores_(gpr_cpu_num_cores()), - channels_(config.client_channels()), - create_stub_(create_stub) { + : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) { for (int i = 0; i < config.client_channels(); i++) { - channels_[i].init(config.server_targets(i % config.server_targets_size()), - config, create_stub_, i); + channels_.emplace_back( + config.server_targets(i % config.server_targets_size()), config, + create_stub_, i); } ClientRequestCreator<RequestType> create_req(&request_, @@ -389,20 +392,11 @@ class ClientImpl : public Client { class ClientChannelInfo { public: - ClientChannelInfo() {} - ClientChannelInfo(const ClientChannelInfo& i) { - // The copy constructor is to satisfy old compilers - // that need it for using std::vector . It is only ever - // used for empty entries - GPR_ASSERT(!i.channel_ && !i.stub_); - } - void init(const grpc::string& target, const ClientConfig& config, - std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> - create_stub, - int shard) { - // We have to use a 2-phase init like this with a default - // constructor followed by an initializer function to make - // old compilers happy with using this in std::vector + ClientChannelInfo( + const grpc::string& target, const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub, + int shard) { ChannelArguments args; args.SetInt("shard_to_ensure_no_subchannel_merges", shard); set_channel_args(config, &args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 265f174cc5..912c871482 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -34,6 +34,7 @@ #include <grpc/support/cpu.h> #include <grpc/support/log.h> +#include "src/core/lib/surface/completion_queue.h" #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/usage_timer.h" @@ -140,7 +141,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { if (!next_issue_) { // ready to issue RunNextState(true, nullptr); } else { // wait for the issue time - alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + alarm_.reset(new Alarm); + alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this)); } } }; @@ -359,8 +361,8 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { break; // loop around, don't return case State::WAIT: next_state_ = State::READY_TO_WRITE; - alarm_.reset( - new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + alarm_.reset(new Alarm); + alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this)); return true; case State::READY_TO_WRITE: if (!ok) { @@ -517,8 +519,8 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { } break; // loop around, don't return case State::WAIT: - alarm_.reset( - new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + alarm_.reset(new Alarm); + alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this)); next_state_ = State::READY_TO_WRITE; return true; case State::READY_TO_WRITE: @@ -759,8 +761,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { break; // loop around, don't return case State::WAIT: next_state_ = State::READY_TO_WRITE; - alarm_.reset( - new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + alarm_.reset(new Alarm); + alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this)); return true; case State::READY_TO_WRITE: if (!ok) { diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h index c6e417985a..1fa310c209 100644 --- a/test/cpp/qps/interarrival.h +++ b/test/cpp/qps/interarrival.h @@ -21,7 +21,7 @@ #include <chrono> #include <cmath> -#include <cstdlib> +#include <random> #include <vector> #include <grpc++/support/config.h> @@ -75,13 +75,13 @@ class InterarrivalTimer { public: InterarrivalTimer() {} void init(const RandomDistInterface& r, int threads, int entries = 1000000) { + std::random_device devrand; + std::mt19937_64 generator(devrand()); + std::uniform_real_distribution<double> rando(0, 1); for (int i = 0; i < entries; i++) { - // rand is the only choice that is portable across POSIX and Windows - // and that supports new and old compilers - const double uniform_0_1 = - static_cast<double>(rand()) / static_cast<double>(RAND_MAX); random_table_.push_back( - static_cast<int64_t>(1e9 * r.transform(uniform_0_1))); + static_cast<int64_t>(1e9 * r.transform(rando(generator)))); + ; } // Now set up the thread positions for (int i = 0; i < threads; i++) { diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index a45b10bcb8..3c99bda144 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -26,6 +26,7 @@ #include "test/cpp/qps/stats.h" #include <grpc++/client_context.h> +#include "src/cpp/util/core_stats.h" #include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { @@ -85,6 +86,33 @@ void GprLogReporter::ReportQPS(const ScenarioResult& result) { gpr_log(GPR_INFO, "successful requests/second: %.1f", result.summary().successful_requests_per_second()); } + for (int i = 0; i < result.client_stats_size(); i++) { + if (result.client_stats(i).has_core_stats()) { + ReportCoreStats("CLIENT", i, result.client_stats(i).core_stats()); + } + } + for (int i = 0; i < result.server_stats_size(); i++) { + if (result.server_stats(i).has_core_stats()) { + ReportCoreStats("SERVER", i, result.server_stats(i).core_stats()); + } + } +} + +void GprLogReporter::ReportCoreStats(const char* name, int idx, + const grpc::core::Stats& stats) { + grpc_stats_data data; + ProtoToCoreStats(stats, &data); + for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + gpr_log(GPR_DEBUG, "%s[%d].%s = %" PRIdPTR, name, idx, + grpc_stats_counter_name[i], data.counters[i]); + } + for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + gpr_log(GPR_DEBUG, "%s[%d].%s = %lf/%lf/%lf (50/95/99%%-ile)", name, idx, + grpc_stats_histogram_name[i], + grpc_stats_histo_percentile(&data, (grpc_stats_histograms)i, 50), + grpc_stats_histo_percentile(&data, (grpc_stats_histograms)i, 95), + grpc_stats_histo_percentile(&data, (grpc_stats_histograms)i, 99)); + } } void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h index 321be2a97f..1d7b2b54e7 100644 --- a/test/cpp/qps/report.h +++ b/test/cpp/qps/report.h @@ -104,6 +104,9 @@ class GprLogReporter : public Reporter { void ReportCpuUsage(const ScenarioResult& result) override; void ReportPollCount(const ScenarioResult& result) override; void ReportQueriesPerCpuSec(const ScenarioResult& result) override; + + void ReportCoreStats(const char* name, int idx, + const grpc::core::Stats& stats); }; /** Dumps the report to a JSON file. */ diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index df27a4368e..16d101d5e6 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -26,7 +26,7 @@ #include <grpc/support/log.h> #include <vector> -#include "src/core/lib/surface/completion_queue.h" +#include "src/cpp/util/core_stats.h" #include "src/proto/grpc/testing/control.pb.h" #include "src/proto/grpc/testing/messages.pb.h" #include "test/core/end2end/data/ssl_test_data.h" @@ -64,6 +64,9 @@ class Server { timer_result = timer_->Mark(); } + grpc_stats_data core_stats; + grpc_stats_collect(&core_stats); + ServerStats stats; stats.set_time_elapsed(timer_result.wall); stats.set_time_system(timer_result.system); @@ -71,6 +74,7 @@ class Server { stats.set_total_cpu_time(timer_result.total_cpu_time); stats.set_idle_cpu_time(timer_result.idle_cpu_time); stats.set_cq_poll_count(poll_count); + CoreStatsToProto(core_stats, stats.mutable_core_stats()); return stats; } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 8b00bcfeaf..4a82f98199 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -35,6 +35,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include "src/core/lib/surface/completion_queue.h" #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/core/util/test_config.h" #include "test/cpp/qps/server.h" diff --git a/test/cpp/server/BUILD b/test/cpp/server/BUILD index 3f63be2aa3..7538845803 100644 --- a/test/cpp/server/BUILD +++ b/test/cpp/server/BUILD @@ -14,7 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary") +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary", "grpc_package") + +grpc_package(name = "test/cpp/server") grpc_cc_test( name = "server_builder_test", diff --git a/test/cpp/util/BUILD b/test/cpp/util/BUILD index fbdec05698..2559c18c32 100644 --- a/test/cpp/util/BUILD +++ b/test/cpp/util/BUILD @@ -14,15 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_binary", "grpc_cc_test") +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_binary", "grpc_cc_test", "grpc_package") -package( - default_visibility = ["//visibility:public"], - features = [ - "-layering_check", - "-parse_headers", - ], -) +grpc_package(name = "test/cpp/util", visibility = "public") grpc_cc_library( name = "test_config", |