diff options
Diffstat (limited to 'test/cpp')
33 files changed, 700 insertions, 271 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 3d664e8825..026a94112a 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -39,7 +39,6 @@ namespace grpc { class CompletionQueue; class Channel; -class RpcService; class ServerCompletionQueue; class ServerContext; } // namespace grpc @@ -169,10 +168,10 @@ class ServiceA final { ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override; - const ::grpc::RpcMethod rpcmethod_MethodA1_; - const ::grpc::RpcMethod rpcmethod_MethodA2_; - const ::grpc::RpcMethod rpcmethod_MethodA3_; - const ::grpc::RpcMethod rpcmethod_MethodA4_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA2_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA3_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA4_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -352,7 +351,7 @@ class ServiceA final { public: WithStreamedUnaryMethod_MethodA1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodA1() override { BaseClassMustBeDerivedFromService(this); @@ -373,7 +372,7 @@ class ServiceA final { public: WithSplitStreamingMethod_MethodA3() { ::grpc::Service::MarkMethodStreamed(2, - new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); } ~WithSplitStreamingMethod_MethodA3() override { BaseClassMustBeDerivedFromService(this); @@ -427,7 +426,7 @@ class ServiceB final { std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; - const ::grpc::RpcMethod rpcmethod_MethodB1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodB1_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -484,7 +483,7 @@ class ServiceB final { public: WithStreamedUnaryMethod_MethodB1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodB1() override { BaseClassMustBeDerivedFromService(this); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 2a33e8ae11..cf1cc7e486 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -28,12 +28,14 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/tls.h> #include "src/core/lib/iomgr/port.h" +#include "src/core/lib/support/env.h" #include "src/proto/grpc/health/v1/health.grpc.pb.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -99,7 +101,7 @@ class PollingOverrider { class Verifier { public: - explicit Verifier(bool spin) : spin_(spin) {} + explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {} // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { return ExpectUnless(i, expect_ok, false); @@ -142,6 +144,18 @@ class Verifier { return detag(got_tag); } + template <typename T> + CompletionQueue::NextStatus DoOnceThenAsyncNext( + CompletionQueue* cq, void** got_tag, bool* ok, T deadline, + std::function<void(void)> lambda) { + if (lambda_run_) { + return cq->AsyncNext(got_tag, ok, deadline); + } else { + lambda_run_ = true; + return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline); + } + } + // Verify keeps calling Next until all currently set // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } @@ -154,6 +168,7 @@ class Verifier { Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { @@ -193,6 +208,47 @@ class Verifier { } } + // This version of Verify stops after a certain deadline, and uses the + // DoThenAsyncNext API + // to call the lambda + void Verify(CompletionQueue* cq, + std::chrono::system_clock::time_point deadline, + std::function<void(void)> lambda) { + if (expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + while (std::chrono::system_clock::now() < deadline) { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + while (!expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + GPR_ASSERT(std::chrono::system_clock::now() < deadline); + auto r = DoOnceThenAsyncNext( + cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::GOT_EVENT); + } + GotTag(got_tag, ok, false); + } + } + } + private: void GotTag(void* got_tag, bool ok, bool ignore_ok) { auto it = expectations_.find(got_tag); @@ -226,6 +282,7 @@ class Verifier { std::map<void*, bool> expectations_; std::map<void*, MaybeExpect> maybe_expectations_; bool spin_; + bool lambda_run_; }; bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) { @@ -404,6 +461,14 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) { if (GetParam().inproc) { return; } + int poller_slowdown_factor = 1; + // It needs 2 pollset_works to reconnect the channel with polling engine + // "poll" + char* s = gpr_getenv("GRPC_POLL_STRATEGY"); + if (s != NULL && 0 == strcmp(s, "poll")) { + poller_slowdown_factor = 2; + } + gpr_free(s); ResetStub(); SendRpc(1); server_->Shutdown(); @@ -413,10 +478,13 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) { while (cq_->Next(&ignored_tag, &ignored_ok)) ; BuildAndStartServer(); - // It needs more than kConnectivityCheckIntervalMsec time to reconnect the - // channel. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(1600, GPR_TIMESPAN))); + // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to + // reconnect the channel. + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis( + 300 * poller_slowdown_factor * grpc_test_slowdown_factor(), + GPR_TIMESPAN))); SendRpc(1); } @@ -490,6 +558,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { EXPECT_TRUE(recv_status.ok()); } +// Test a simple RPC using the async version of Next +TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + + std::chrono::system_clock::time_point time_now( + std::chrono::system_clock::now()); + std::chrono::system_clock::time_point time_limit( + std::chrono::system_clock::now() + std::chrono::seconds(10)); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + + auto resp_writer_ptr = &response_writer; + auto lambda_2 = [&, this, resp_writer_ptr]() { + gpr_log(GPR_ERROR, "CALLED"); + service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(), + cq_.get(), tag(2)); + }; + + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Verify(cq_.get(), time_limit, lambda_2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + auto recv_resp_ptr = &recv_response; + auto status_ptr = &recv_status; + send_response.set_message(recv_request.message()); + auto lambda_3 = [&, this, resp_writer_ptr, send_response]() { + resp_writer_ptr->Finish(send_response, Status::OK, tag(3)); + }; + response_reader->Finish(recv_resp_ptr, status_ptr, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get(), std::chrono::system_clock::time_point::max(), + lambda_3); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // Two pings and a final pong. TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ResetStub(); @@ -1890,6 +2012,9 @@ INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel, } // namespace grpc int main(int argc, char** argv) { + // Change the backup poll interval from 5s to 200ms to speed up the + // ReconnectChannel test + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200"); grpc_test_init(argc, argv); gpr_tls_init(&g_is_async_end2end_test); ::testing::InitGoogleTest(&argc, argv); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index c236f76e89..fc87badc31 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -36,6 +36,7 @@ extern "C" { #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" +#include "src/core/lib/support/env.h" } #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -86,7 +87,11 @@ class MyTestServiceImpl : public TestServiceImpl { class ClientLbEnd2endTest : public ::testing::Test { protected: ClientLbEnd2endTest() - : server_host_("localhost"), kRequestMessage_("Live long and prosper.") {} + : server_host_("localhost"), kRequestMessage_("Live long and prosper.") { + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); + } void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -305,7 +310,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { ports.clear(); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET none *******"); - grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + grpc_connectivity_state channel_state; do { channel_state = channel_->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); @@ -481,7 +486,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { // An empty update will result in the channel going into TRANSIENT_FAILURE. ports.clear(); SetNextResolution(ports); - grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + grpc_connectivity_state channel_state; do { channel_state = channel_->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 1aa547d4e3..82ca39466e 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -30,11 +30,13 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/support/env.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -704,13 +706,25 @@ TEST_P(End2endTest, ReconnectChannel) { if (GetParam().inproc) { return; } + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200"); + int poller_slowdown_factor = 1; + // It needs 2 pollset_works to reconnect the channel with polling engine + // "poll" + char* s = gpr_getenv("GRPC_POLL_STRATEGY"); + if (s != NULL && 0 == strcmp(s, "poll")) { + poller_slowdown_factor = 2; + } + gpr_free(s); ResetStub(); SendRpc(stub_.get(), 1, false); RestartServer(std::shared_ptr<AuthMetadataProcessor>()); - // It needs more than kConnectivityCheckIntervalMsec time to reconnect the - // channel. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(1600, GPR_TIMESPAN))); + // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to + // reconnect the channel. + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis( + 300 * poller_slowdown_factor * grpc_test_slowdown_factor(), + GPR_TIMESPAN))); SendRpc(stub_.get(), 1, false); } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index f73a9c1791..b9ee77d7ff 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -36,6 +36,7 @@ extern "C" { #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/support/env.h" } #include "test/core/util/port.h" @@ -74,9 +75,9 @@ extern "C" { using std::chrono::system_clock; +using grpc::lb::v1::LoadBalancer; using grpc::lb::v1::LoadBalanceRequest; using grpc::lb::v1::LoadBalanceResponse; -using grpc::lb::v1::LoadBalancer; namespace grpc { namespace testing { @@ -332,8 +333,11 @@ class GrpclbEnd2endTest : public ::testing::Test { num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds), - kRequestMessage_("Live long and prosper.") {} + client_load_reporting_interval_seconds) { + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); + } void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -559,7 +563,6 @@ class GrpclbEnd2endTest : public ::testing::Test { std::unique_ptr<std::thread> thread_; }; - const grpc::string kMessage_ = "Live long and prosper."; const grpc::string server_host_; const size_t num_backends_; const size_t num_balancers_; @@ -571,7 +574,7 @@ class GrpclbEnd2endTest : public ::testing::Test { std::vector<ServerThread<BackendService>> backend_servers_; std::vector<ServerThread<BalancerService>> balancer_servers_; grpc_fake_resolver_response_generator* response_generator_; - const grpc::string kRequestMessage_; + const grpc::string kRequestMessage_ = "Live long and prosper."; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -1086,7 +1089,7 @@ TEST_F(SingleBalancerTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); + EXPECT_EQ(response.message(), kRequestMessage_); } } EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); @@ -1210,7 +1213,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); + EXPECT_EQ(response.message(), kRequestMessage_); } } EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index f990a7ed9d..90b2eddbbb 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -50,23 +50,6 @@ const int kNumRpcs = 1000; // Number of RPCs per thread namespace grpc { namespace testing { -namespace { - -// When echo_deadline is requested, deadline seen in the ServerContext is set in -// the response in seconds. -void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, - EchoResponse* response) { - if (request->has_param() && request->param().echo_deadline()) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - if (context->deadline() != system_clock::time_point::max()) { - Timepoint2Timespec(context->deadline(), &deadline); - } - response->mutable_param()->set_request_deadline(deadline.tv_sec); - } -} - -} // namespace - class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: TestServiceImpl() : signal_client_(false) {} @@ -74,29 +57,6 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { response->set_message(request->message()); - MaybeEchoDeadline(context, request, response); - if (request->has_param() && request->param().client_cancel_after_us()) { - { - std::unique_lock<std::mutex> lock(mu_); - signal_client_ = true; - } - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().client_cancel_after_us(), - GPR_TIMESPAN))); - } - return Status::CANCELLED; - } else if (request->has_param() && - request->param().server_cancel_after_us()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().server_cancel_after_us(), - GPR_TIMESPAN))); - return Status::CANCELLED; - } else { - EXPECT_FALSE(context->IsCancelled()); - } return Status::OK; } diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index e740ea513a..522bc9e0d6 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -42,6 +42,7 @@ extern "C" { #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h" +#include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/support/tmpfile.h" #include "src/core/lib/surface/channel.h" @@ -790,6 +791,9 @@ TEST(GrpclbTest, InvalidAddressInServerlist) {} int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); grpc_test_init(argc, argv); + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); grpc_init(); const auto result = RUN_ALL_TESTS(); grpc_shutdown(); diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index 5428cc47e7..bc2157b9f1 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -28,6 +28,7 @@ extern "C" { #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/transport/timeout_encoding.h" } #include "test/cpp/microbenchmarks/helpers.h" #include "third_party/benchmark/include/benchmark/benchmark.h" @@ -441,7 +442,7 @@ static void UnrefHeader(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_MDELEM_UNREF(exec_ctx, md); } -template <class Fixture> +template <class Fixture, void (*OnHeader)(grpc_exec_ctx *, void *, grpc_mdelem)> static void BM_HpackParserParseHeader(benchmark::State &state) { TrackCounters track_counters; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -449,7 +450,7 @@ static void BM_HpackParserParseHeader(benchmark::State &state) { std::vector<grpc_slice> benchmark_slices = Fixture::GetBenchmarkSlices(); grpc_chttp2_hpack_parser p; grpc_chttp2_hpack_parser_init(&exec_ctx, &p); - p.on_header = UnrefHeader; + p.on_header = OnHeader; p.on_header_user_data = nullptr; for (auto slice : init_slices) { GPR_ASSERT(GRPC_ERROR_NONE == @@ -759,32 +760,97 @@ class RepresentativeServerTrailingMetadata { } }; -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>); +static void free_timeout(void *p) { gpr_free(p); } + +// New implementation. +static void OnHeaderNew(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_mdelem md) { + if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) { + grpc_millis *cached_timeout = + static_cast<grpc_millis *>(grpc_mdelem_get_user_data(md, free_timeout)); + grpc_millis timeout; + if (cached_timeout != NULL) { + timeout = *cached_timeout; + } else { + if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), &timeout)) { + char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val); + gpr_free(val); + timeout = GRPC_MILLIS_INF_FUTURE; + } + if (GRPC_MDELEM_IS_INTERNED(md)) { + /* not already parsed: parse it now, and store the + * result away */ + cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis)); + *cached_timeout = timeout; + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + } + } + benchmark::DoNotOptimize(timeout); + GRPC_MDELEM_UNREF(exec_ctx, md); + } else { + GPR_ASSERT(0); + } +} + +// Send the same deadline repeatedly +class SameDeadline { + public: + static std::vector<grpc_slice> GetInitSlices() { + return { + grpc_slice_from_static_string("@\x0cgrpc-timeout\x03" + "30S")}; + } + static std::vector<grpc_slice> GetBenchmarkSlices() { + // Use saved key and literal value. + return {MakeSlice({0x0f, 0x2f, 0x03, '3', '0', 'S'})}; + } +}; + +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>, + UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeClientInitialMetadata); + RepresentativeClientInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - MoreRepresentativeClientInitialMetadata); + MoreRepresentativeClientInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeServerInitialMetadata); + RepresentativeServerInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeServerTrailingMetadata); + RepresentativeServerTrailingMetadata, UnrefHeader); + +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderNew); } // namespace hpack_parser_fixtures diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 8ee3ae7268..e9f537faa4 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -44,10 +44,16 @@ auto &force_library_initialization = Library::get(); class DummyEndpoint : public grpc_endpoint { public: DummyEndpoint() { - static const grpc_endpoint_vtable my_vtable = { - read, write, add_to_pollset, add_to_pollset_set, - shutdown, destroy, get_resource_user, get_peer, - get_fd}; + static const grpc_endpoint_vtable my_vtable = {read, + write, + add_to_pollset, + add_to_pollset_set, + delete_from_pollset_set, + shutdown, + destroy, + get_resource_user, + get_peer, + get_fd}; grpc_endpoint::vtable = &my_vtable; ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint"); } @@ -102,6 +108,10 @@ class DummyEndpoint : public grpc_endpoint { static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} + static void delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_resource_user_shutdown(exec_ctx, @@ -428,9 +438,8 @@ static void BM_TransportStreamSend(benchmark::State &state) { return; } // force outgoing window to be yuge - s->chttp2_stream()->flow_control.remote_window_delta = - 1024 * 1024 * 1024; - f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024; + s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); + f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); reset_op(); op.on_complete = c.get(); @@ -560,22 +569,21 @@ static void BM_TransportStreamRecv(benchmark::State &state) { std::unique_ptr<Closure> drain_continue; grpc_slice recv_slice; - std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx, - grpc_error *error) { - if (!state.KeepRunning()) return; - // force outgoing window to be yuge - s.chttp2_stream()->flow_control.local_window_delta = 1024 * 1024 * 1024; - s.chttp2_stream()->flow_control.announced_window_delta = 1024 * 1024 * 1024; - f.chttp2_transport()->flow_control.announced_window = 1024 * 1024 * 1024; - received = 0; - reset_op(); - op.on_complete = do_nothing.get(); - op.recv_message = true; - op.payload->recv_message.recv_message = &recv_stream; - op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(exec_ctx, &op); - f.PushInput(grpc_slice_ref(incoming_data)); - }); + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); + f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); + received = 0; + reset_op(); + op.on_complete = do_nothing.get(); + op.recv_message = true; + op.payload->recv_message.recv_message = &recv_stream; + op.payload->recv_message.recv_message_ready = drain_start.get(); + s.Op(exec_ctx, &op); + f.PushInput(grpc_slice_ref(incoming_data)); + }); drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (recv_stream == NULL) { diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index a0c0414f2f..020ec0461c 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -70,7 +70,7 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg, grpc_cq_completion* completion) {} -class DummyTag final : public CompletionQueueTag { +class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) override { return true; } }; diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 06ae342985..25d243a104 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -21,6 +21,7 @@ #include <benchmark/benchmark.h> #include <gflags/gflags.h> #include <fstream> + #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/iomgr/timer_manager.h" @@ -142,15 +143,18 @@ class TrickledCHTTP2 : public EndpointPairFixture { client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, - client->flow_control.remote_window, server->flow_control.remote_window, - client->flow_control.announced_window, - server->flow_control.announced_window, - client_stream ? client_stream->flow_control.remote_window_delta : -1, - server_stream ? server_stream->flow_control.remote_window_delta : -1, - client_stream ? client_stream->flow_control.local_window_delta : -1, - server_stream ? server_stream->flow_control.local_window_delta : -1, - client_stream ? client_stream->flow_control.announced_window_delta : -1, - server_stream ? server_stream->flow_control.announced_window_delta : -1, + client->flow_control->remote_window_, + server->flow_control->remote_window_, + client->flow_control->announced_window_, + server->flow_control->announced_window_, + client_stream ? client_stream->flow_control->remote_window_delta_ : -1, + server_stream ? server_stream->flow_control->remote_window_delta_ : -1, + client_stream ? client_stream->flow_control->local_window_delta_ : -1, + server_stream ? server_stream->flow_control->local_window_delta_ : -1, + client_stream ? client_stream->flow_control->announced_window_delta_ + : -1, + server_stream ? server_stream->flow_control->announced_window_delta_ + : -1, client->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], client->settings[GRPC_LOCAL_SETTINGS] diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index a7f8504505..9d345a909b 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -25,6 +25,7 @@ #include <grpc++/security/server_credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> +#include <grpc/support/atm.h> #include <grpc/support/log.h> extern "C" { @@ -259,7 +260,8 @@ class InProcessCHTTP2 : public EndpointPairFixture { void AddToLabel(std::ostream& out, benchmark::State& state) { EndpointPairFixture::AddToLabel(out, state); out << " writes/iter:" - << (double)stats_.num_writes / (double)state.iterations(); + << static_cast<double>(gpr_atm_no_barrier_load(&stats_.num_writes)) / + static_cast<double>(state.iterations()); } private: diff --git a/test/cpp/microbenchmarks/helpers.cc b/test/cpp/microbenchmarks/helpers.cc index 6802a0aa99..782f12e99a 100644 --- a/test/cpp/microbenchmarks/helpers.cc +++ b/test/cpp/microbenchmarks/helpers.cc @@ -16,6 +16,8 @@ * */ +#include <string.h> + #include "test/cpp/microbenchmarks/helpers.h" void TrackCounters::Finish(benchmark::State &state) { @@ -45,10 +47,14 @@ void TrackCounters::AddToLabel(std::ostream &out, benchmark::State &state) { << "/iter:" << ((double)stats.counters[i] / (double)state.iterations()); } for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { - out << " " << grpc_stats_histogram_name[i] << "-median:" - << grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 50.0) - << " " << grpc_stats_histogram_name[i] << "-99p:" - << grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 99.0); + std::ostringstream median_ss; + median_ss << grpc_stats_histogram_name[i] << "-median"; + state.counters[median_ss.str()] = benchmark::Counter( + grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 50.0)); + std::ostringstream tail_ss; + tail_ss << grpc_stats_histogram_name[i] << "-99p"; + state.counters[tail_ss.str()] = benchmark::Counter( + grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 99.0)); } #ifdef GPR_LOW_LEVEL_COUNTERS grpc_memory_counters counters_at_end = grpc_memory_counters_snapshot(); diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD index 3352269517..0d91d52f22 100644 --- a/test/cpp/qps/BUILD +++ b/test/cpp/qps/BUILD @@ -109,6 +109,18 @@ grpc_cc_library( deps = ["//:gpr"], ) +grpc_cc_test( + name = "inproc_sync_unary_ping_pong_test", + srcs = ["inproc_sync_unary_ping_pong_test.cc"], + deps = [ + ":benchmark_config", + ":driver_impl", + "//:grpc++", + "//test/cpp/util:test_config", + "//test/cpp/util:test_util", + ], +) + grpc_cc_library( name = "interarrival", hdrs = ["interarrival.h"], diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index abf755b393..82c6361abd 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -37,10 +37,14 @@ #include "src/cpp/util/core_stats.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" +#include "test/cpp/qps/qps_worker.h" +#include "test/cpp/qps/server.h" #include "test/cpp/qps/usage_timer.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/util/test_credentials_provider.h" +#define INPROC_NAME_PREFIX "qpsinproc:" + namespace grpc { namespace testing { @@ -226,8 +230,6 @@ class Client { } virtual void DestroyMultithreading() = 0; - virtual void InitThreadFunc(size_t thread_idx) = 0; - virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -275,7 +277,6 @@ class Client { : std::bind(&Client::NextIssueTime, this, thread_idx); } - private: class Thread { public: Thread(Client* client, size_t idx) @@ -295,6 +296,16 @@ class Client { MergeStatusHistogram(statuses_, s); } + void UpdateHistogram(HistogramEntry* entry) { + std::lock_guard<std::mutex> g(mu_); + if (entry->value_used()) { + histogram_.Add(entry->value()); + } + if (entry->status_used()) { + statuses_[entry->status()]++; + } + } + private: Thread(const Thread&); Thread& operator=(const Thread&); @@ -310,29 +321,8 @@ class Client { wait_loop++; } - client_->InitThreadFunc(idx_); - - for (;;) { - // run the loop body - HistogramEntry entry; - const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); - // lock, update histogram if needed and see if we're done - std::lock_guard<std::mutex> g(mu_); - if (entry.value_used()) { - histogram_.Add(entry.value()); - } - if (entry.status_used()) { - statuses_[entry.status()]++; - } - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } - if (!thread_still_ok || - static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) { - client_->CompleteThread(); - return; - } - } + client_->ThreadFunc(idx_, this); + client_->CompleteThread(); } std::mutex mu_; @@ -343,6 +333,12 @@ class Client { std::thread impl_; }; + bool ThreadCompleted() { + return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_)); + } + + virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0; + std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<UsageTimer> timer_; @@ -422,11 +418,21 @@ class ClientImpl : public Client { type = config.security_params().cred_type(); } - channel_ = CreateTestChannel( - target, type, config.security_params().server_host_override(), - !config.security_params().use_test_ca(), - std::shared_ptr<CallCredentials>(), args); - gpr_log(GPR_INFO, "Connecting to %s", target.c_str()); + grpc::string inproc_pfx(INPROC_NAME_PREFIX); + if (target.find(inproc_pfx) != 0) { + channel_ = CreateTestChannel( + target, type, config.security_params().server_host_override(), + !config.security_params().use_test_ca(), + std::shared_ptr<CallCredentials>(), args); + gpr_log(GPR_INFO, "Connecting to %s", target.c_str()); + is_inproc_ = false; + } else { + grpc::string tgt = target; + tgt.erase(0, inproc_pfx.length()); + int srv_num = std::stoi(tgt); + channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args); + is_inproc_ = true; + } stub_ = create_stub(channel_); } Channel* get_channel() { return channel_.get(); } @@ -434,9 +440,11 @@ class ClientImpl : public Client { std::unique_ptr<std::thread> WaitForReady() { return std::unique_ptr<std::thread>(new std::thread([this]() { - GPR_ASSERT(channel_->WaitForConnected( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(10, GPR_TIMESPAN)))); + if (!is_inproc_) { + GPR_ASSERT(channel_->WaitForConnected( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(10, GPR_TIMESPAN)))); + } })); } @@ -455,6 +463,7 @@ class ClientImpl : public Client { std::shared_ptr<Channel> channel_; std::unique_ptr<StubType> stub_; + bool is_inproc_; }; std::vector<ClientChannelInfo> channels_; std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)> diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9ed4e0b355..a541f94fa5 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,32 +236,46 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { this->EndThreads(); // this needed for resolution } - void InitThreadFunc(size_t thread_idx) override final {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final { + void ThreadFunc(size_t thread_idx, Client::Thread* t) override final { void* got_tag; bool ok; - if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + HistogramEntry entry; + HistogramEntry* entry_ptr = &entry; + if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; + shutdown_mu->lock(); + while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, entry_ptr, shutdown_mu]() { + if (!ctx->RunNextState(ok, entry_ptr)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + delete ctx; + } + shutdown_mu->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { + t->UpdateHistogram(entry_ptr); // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + ctx = ClientRpcContext::detag(got_tag); // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + shutdown_mu->lock(); if (shutdown_state_[thread_idx]->shutdown) { ctx->TryCancel(); delete ctx; - return true; - } - if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - delete ctx; + while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + ctx = ClientRpcContext::detag(got_tag); + ctx->TryCancel(); + delete ctx; + } + shutdown_mu->unlock(); + return; } - return true; - } else { - // queue is shutting down, so we must be done - return true; } } diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 94554a46b2..9f20b148eb 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -62,6 +62,25 @@ class SynchronousClient virtual ~SynchronousClient(){}; + virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; + + void ThreadFunc(size_t thread_idx, Thread* t) override { + InitThreadFuncImpl(thread_idx); + for (;;) { + // run the loop body + HistogramEntry entry; + const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); + t->UpdateHistogram(&entry); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + } + if (!thread_still_ok || ThreadCompleted()) { + return; + } + } + } + protected: // WaitToIssue returns false if we realize that we need to break out bool WaitToIssue(int thread_idx) { @@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFunc(size_t thread_idx) override {} + void InitThreadFuncImpl(size_t thread_idx) override {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], &responses_[thread_idx]); last_issue_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // Figure out how to make histogram sensible if this is rate-paced if (!WaitToIssue(thread_idx)) { return true; @@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromServer(&context_[thread_idx], request_); last_recv_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { double now = UsageTimer::Now(); @@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 4458e389e7..5504c0b96a 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -36,6 +36,7 @@ #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include "test/cpp/qps/client.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/qps_worker.h" @@ -63,11 +64,11 @@ static std::string get_host(const std::string& worker) { } static deque<string> get_workers(const string& env_name) { + deque<string> out; char* env = gpr_getenv(env_name.c_str()); if (!env) { env = gpr_strdup(""); } - deque<string> out; char* p = env; if (strlen(env) != 0) { for (;;) { @@ -187,12 +188,17 @@ static void postprocess_scenario_result(ScenarioResult* result) { client_queries_per_cpu_sec); } +std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr; + std::unique_ptr<ScenarioResult> RunScenario( const ClientConfig& initial_client_config, size_t num_clients, const ServerConfig& initial_server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, const grpc::string& qps_server_target_override, - const grpc::string& credential_type) { + const grpc::string& credential_type, bool run_inproc) { + if (run_inproc) { + g_inproc_servers = new std::vector<grpc::testing::Server*>; + } // Log everything from the driver gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); @@ -210,8 +216,8 @@ std::unique_ptr<ScenarioResult> RunScenario( ClientConfig result_client_config; const ServerConfig result_server_config = initial_server_config; - // Get client, server lists - auto workers = get_workers("QPS_WORKERS"); + // Get client, server lists; ignore if inproc test + auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>(); ClientConfig client_config = initial_client_config; // Spawn some local workers if desired @@ -227,9 +233,10 @@ std::unique_ptr<ScenarioResult> RunScenario( called_init = true; } - int driver_port = grpc_pick_unused_port_or_die(); - local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type)); char addr[256]; + // we use port # of -1 to indicate inproc + int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1; + local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type)); sprintf(addr, "localhost:%d", driver_port); if (spawn_local_worker_count < 0) { workers.push_front(addr); @@ -265,9 +272,14 @@ std::unique_ptr<ScenarioResult> RunScenario( for (size_t i = 0; i < num_servers; i++) { gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")", workers[i].c_str(), i); - servers[i].stub = WorkerService::NewStub(CreateChannel( - workers[i], GetCredentialsProvider()->GetChannelCredentials( - credential_type, &channel_args))); + if (!run_inproc) { + servers[i].stub = WorkerService::NewStub(CreateChannel( + workers[i], GetCredentialsProvider()->GetChannelCredentials( + credential_type, &channel_args))); + } else { + servers[i].stub = WorkerService::NewStub( + local_workers[i]->InProcessChannel(channel_args)); + } ServerConfig server_config = initial_server_config; if (server_config.core_limit() != 0) { @@ -289,6 +301,10 @@ std::unique_ptr<ScenarioResult> RunScenario( // overriding the qps server target only works if there is 1 server GPR_ASSERT(num_servers == 1); client_config.add_server_targets(qps_server_target_override); + } else if (run_inproc) { + std::string cli_target(INPROC_NAME_PREFIX); + cli_target += std::to_string(i); + client_config.add_server_targets(cli_target); } else { std::string host; char* cli_target; @@ -312,9 +328,14 @@ std::unique_ptr<ScenarioResult> RunScenario( const auto& worker = workers[i + num_servers]; gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")", worker.c_str(), i + num_servers); - clients[i].stub = WorkerService::NewStub( - CreateChannel(worker, GetCredentialsProvider()->GetChannelCredentials( - credential_type, &channel_args))); + if (!run_inproc) { + clients[i].stub = WorkerService::NewStub( + CreateChannel(worker, GetCredentialsProvider()->GetChannelCredentials( + credential_type, &channel_args))); + } else { + clients[i].stub = WorkerService::NewStub( + local_workers[i + num_servers]->InProcessChannel(channel_args)); + } ClientConfig per_client_config = client_config; if (initial_client_config.core_limit() != 0) { @@ -495,6 +516,9 @@ std::unique_ptr<ScenarioResult> RunScenario( } } + if (g_inproc_servers != nullptr) { + delete g_inproc_servers; + } postprocess_scenario_result(result.get()); return result; } diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 29f2776d79..fede4d8045 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -32,7 +32,7 @@ std::unique_ptr<ScenarioResult> RunScenario( const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, const grpc::string& qps_server_target_override, - const grpc::string& credential_type); + const grpc::string& credential_type, bool run_inproc); bool RunQuit(const grpc::string& credential_type); } // namespace testing diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py index 65553f57f1..1ef8f65b0b 100755 --- a/test/cpp/qps/gen_build_yaml.py +++ b/test/cpp/qps/gen_build_yaml.py @@ -85,6 +85,24 @@ print yaml.dump({ if 'scalable' in scenario_json.get('CATEGORIES', []) ] + [ { + 'name': 'qps_json_driver', + 'shortname': 'qps_json_driver:inproc_%s' % scenario_json['name'], + 'args': ['--run_inproc', '--scenarios_json', _scenario_json_string(scenario_json, False)], + 'ci_platforms': ['linux'], + 'platforms': ['linux'], + 'flaky': False, + 'language': 'c++', + 'boringssl': True, + 'defaults': 'boringssl', + 'cpu_cost': guess_cpu(scenario_json, False), + 'exclude_configs': ['tsan', 'asan'], + 'timeout_seconds': 6*60, + 'excluded_poll_engines': scenario_json.get('EXCLUDED_POLL_ENGINES', []) + } + for scenario_json in scenario_config.CXXLanguage().scenarios() + if 'inproc' in scenario_json.get('CATEGORIES', []) + ] + [ + { 'name': 'json_run_localhost', 'shortname': 'json_run_localhost:%s_low_thread_count' % scenario_json['name'], 'args': ['--scenarios_json', _scenario_json_string(scenario_json, True)], diff --git a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc new file mode 100644 index 0000000000..f2e977d48b --- /dev/null +++ b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc @@ -0,0 +1,66 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <set> + +#include <grpc/support/log.h> + +#include "test/cpp/qps/benchmark_config.h" +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/qps/server.h" +#include "test/cpp/util/test_config.h" +#include "test/cpp/util/test_credentials_provider.h" + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 5; + +static void RunSynchronousUnaryPingPong() { + gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong"); + + ClientConfig client_config; + client_config.set_client_type(SYNC_CLIENT); + client_config.set_outstanding_rpcs_per_channel(1); + client_config.set_client_channels(1); + client_config.set_rpc_type(UNARY); + client_config.mutable_load_params()->mutable_closed_loop(); + + ServerConfig server_config; + server_config.set_server_type(SYNC_SERVER); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", + kInsecureCredentialsType, true); + + GetReporter()->ReportQPS(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitTest(&argc, &argv, true); + + grpc::testing::RunSynchronousUnaryPingPong(); + + return 0; +} diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 1d394b216f..4b788eae73 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -117,8 +117,14 @@ int main(int argc, char** argv) { } } - delete g_driver; - g_driver = NULL; - for (int i = 0; i < kNumWorkers; ++i) delete g_workers[i]; + if (g_driver != nullptr) { + delete g_driver; + } + g_driver = nullptr; + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i] != nullptr) { + delete g_workers[i]; + } + } GPR_ASSERT(driver_join_status == 0); } diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index cca59f64d8..1672527426 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -30,6 +30,7 @@ #include "test/cpp/qps/driver.h" #include "test/cpp/qps/parse_json.h" #include "test/cpp/qps/report.h" +#include "test/cpp/qps/server.h" #include "test/cpp/util/test_config.h" #include "test/cpp/util/test_credentials_provider.h" @@ -64,6 +65,7 @@ DEFINE_string(json_file_out, "", "File to write the JSON output to."); DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType, "Credential type for communication with workers"); +DEFINE_bool(run_inproc, false, "Perform an in-process transport test"); namespace grpc { namespace testing { @@ -75,8 +77,9 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario, RunScenario(scenario.client_config(), scenario.num_clients(), scenario.server_config(), scenario.num_servers(), scenario.warmup_seconds(), scenario.benchmark_seconds(), - scenario.spawn_local_worker_count(), - FLAGS_qps_server_target_override, FLAGS_credential_type); + !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2, + FLAGS_qps_server_target_override, FLAGS_credential_type, + FLAGS_run_inproc); // Amend the result with scenario config. Eventually we should adjust // RunScenario contract so we don't need to touch the result here. diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index 069b3fa076..df929b9811 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -24,6 +24,7 @@ #include "test/cpp/qps/benchmark_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h" +#include "test/cpp/qps/server.h" #include "test/cpp/util/test_config.h" #include "test/cpp/util/test_credentials_provider.h" @@ -49,8 +50,9 @@ static void RunQPS() { server_config.set_server_type(ASYNC_SERVER); server_config.set_async_server_threads(8); - const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, - BENCHMARK, -2, "", kInsecureCredentialsType); + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", + kInsecureCredentialsType, false); GetReporter()->ReportQPSPerCore(*result); GetReporter()->ReportLatency(*result); diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index d20bc1b074..c288b03ec5 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -225,11 +225,14 @@ class WorkerServiceImpl final : public WorkerService::Service { if (!args.has_setup()) { return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args"); } - if (server_port_ != 0) { + if (server_port_ > 0) { args.mutable_setup()->set_port(server_port_); } gpr_log(GPR_INFO, "RunServerBody: about to create server"); auto server = CreateServer(args.setup()); + if (g_inproc_servers != nullptr) { + g_inproc_servers->push_back(server.get()); + } if (!server) { return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server"); } @@ -269,17 +272,17 @@ QpsWorker::QpsWorker(int driver_port, int server_port, impl_.reset(new WorkerServiceImpl(server_port, this)); gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0)); - char* server_address = NULL; - gpr_join_host_port(&server_address, "::", driver_port); - ServerBuilder builder; - builder.AddListeningPort( - server_address, - GetCredentialsProvider()->GetServerCredentials(credential_type)); + if (driver_port >= 0) { + char* server_address = nullptr; + gpr_join_host_port(&server_address, "::", driver_port); + builder.AddListeningPort( + server_address, + GetCredentialsProvider()->GetServerCredentials(credential_type)); + gpr_free(server_address); + } builder.RegisterService(impl_.get()); - gpr_free(server_address); - server_ = builder.BuildAndStart(); } diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h index 360125fb17..a5167426d0 100644 --- a/test/cpp/qps/qps_worker.h +++ b/test/cpp/qps/qps_worker.h @@ -21,17 +21,21 @@ #include <memory> +#include <grpc++/server.h> +#include <grpc++/support/channel_arguments.h> #include <grpc++/support/config.h> #include <grpc/support/atm.h> -namespace grpc { +#include "test/cpp/qps/server.h" -class Server; +namespace grpc { namespace testing { class WorkerServiceImpl; +extern std::vector<grpc::testing::Server*>* g_inproc_servers; + class QpsWorker { public: explicit QpsWorker(int driver_port, int server_port, @@ -41,9 +45,13 @@ class QpsWorker { bool Done() const; void MarkDone(); + std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args) { + return server_->InProcessChannel(args); + } + private: std::unique_ptr<WorkerServiceImpl> impl_; - std::unique_ptr<Server> server_; + std::unique_ptr<grpc::Server> server_; gpr_atm done_; }; diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc index 137b33ee25..bb415e9d63 100644 --- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc @@ -23,6 +23,7 @@ #include "test/cpp/qps/benchmark_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h" +#include "test/cpp/qps/server.h" #include "test/cpp/util/test_config.h" #include "test/cpp/util/test_credentials_provider.h" @@ -52,8 +53,9 @@ static void RunSynchronousUnaryPingPong() { client_config.mutable_security_params()->CopyFrom(security); server_config.mutable_security_params()->CopyFrom(security); - const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, - BENCHMARK, -2, "", kInsecureCredentialsType); + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", + kInsecureCredentialsType, false); GetReporter()->ReportQPS(*result); GetReporter()->ReportLatency(*result); diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 16d101d5e6..9da33566dd 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -42,10 +42,9 @@ class Server { explicit Server(const ServerConfig& config) : timer_(new UsageTimer), last_reset_poll_count_(0) { cores_ = gpr_cpu_num_cores(); - if (config.port()) { + if (config.port()) { // positive for a fixed port, negative for inproc port_ = config.port(); - - } else { + } else { // zero for dynamic port port_ = grpc_pick_unused_port_or_die(); } } @@ -115,6 +114,9 @@ class Server { return 0; } + virtual std::shared_ptr<Channel> InProcessChannel( + const ChannelArguments& args) = 0; + protected: static void ApplyConfigToBuilder(const ServerConfig& config, ServerBuilder* builder) { diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4a82f98199..1c1a5636a9 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -70,18 +70,21 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerAsyncReaderWriter<ResponseType, RequestType> *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_both_ways_function, - std::function<grpc::Status(const PayloadConfig &, const RequestType *, + std::function<grpc::Status(const PayloadConfig &, RequestType *, ResponseType *)> process_rpc) : Server(config) { - char *server_address = NULL; - - gpr_join_host_port(&server_address, "::", port()); - ServerBuilder builder; - builder.AddListeningPort(server_address, - Server::CreateServerCredentials(config)); - gpr_free(server_address); + + auto port_num = port(); + // Negative port number means inproc server, so no listen port needed + if (port_num >= 0) { + char *server_address = NULL; + gpr_join_host_port(&server_address, "::", port_num); + builder.AddListeningPort(server_address, + Server::CreateServerCredentials(config)); + gpr_free(server_address); + } register_service(&builder, &async_service_); @@ -183,6 +186,11 @@ class AsyncQpsServerTest final : public grpc::testing::Server { return count; } + std::shared_ptr<Channel> InProcessChannel( + const ChannelArguments &args) override { + return server_->InProcessChannel(args); + } + private: void ShutdownThreadFunc() { // TODO (vpai): Remove this deadline and allow Shutdown to finish properly @@ -194,23 +202,31 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); + if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ServerRpcContext *ctx; + std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex; + do { + ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { + mu_ptr->unlock(); return; } - std::lock_guard<ServerRpcContext> l2(*ctx); - const bool still_going = ctx->RunNextState(ok); - // if this RPC context is done, refresh it - if (!still_going) { - ctx->Reset(); - } - } - return; + } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, mu_ptr]() { + ctx->lock(); + if (!ctx->RunNextState(ok)) { + ctx->Reset(); + } + ctx->unlock(); + mu_ptr->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } class ServerRpcContext { @@ -238,7 +254,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextUnaryImpl::invoker), @@ -284,8 +300,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; @@ -296,7 +311,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingImpl::request_done), @@ -364,8 +379,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_; }; @@ -377,7 +391,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReader<ResponseType, RequestType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingFromClientImpl::request_done), @@ -435,8 +449,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReader<ResponseType, RequestType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncReader<ResponseType, RequestType> stream_; }; @@ -447,7 +460,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter<ResponseType> *, void *)> request_method, - std::function<grpc::Status(const RequestType *, ResponseType *)> + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingFromServerImpl::request_done), @@ -504,8 +517,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncWriter<ResponseType> *, void *)> request_method_; - std::function<grpc::Status(const RequestType *, ResponseType *)> - invoke_method_; + std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_; grpc::ServerAsyncWriter<ResponseType> stream_; }; @@ -534,8 +546,7 @@ static void RegisterGenericService(ServerBuilder *builder, builder->RegisterAsyncGenericService(service); } -static Status ProcessSimpleRPC(const PayloadConfig &, - const SimpleRequest *request, +static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request, SimpleResponse *response) { if (request->response_size() > 0) { if (!Server::SetPayload(request->response_type(), request->response_size(), @@ -543,12 +554,17 @@ static Status ProcessSimpleRPC(const PayloadConfig &, return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } } + // We are done using the request. Clear it to reduce working memory. + // This proves to reduce cache misses in large message size cases. + request->Clear(); return Status::OK; } static Status ProcessGenericRPC(const PayloadConfig &payload_config, - const ByteBuffer *request, - ByteBuffer *response) { + ByteBuffer *request, ByteBuffer *response) { + // We are done using the request. Clear it to reduce working memory. + // This proves to reduce cache misses in large message size cases. + request->Clear(); int resp_size = payload_config.bytebuf_params().resp_size(); std::unique_ptr<char[]> buf(new char[resp_size]); Slice slice(buf.get(), resp_size); diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 9954e2c0bf..4ef57bd08b 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -156,12 +156,15 @@ class SynchronousServer final : public grpc::testing::Server { explicit SynchronousServer(const ServerConfig& config) : Server(config) { ServerBuilder builder; - char* server_address = NULL; - - gpr_join_host_port(&server_address, "::", port()); - builder.AddListeningPort(server_address, - Server::CreateServerCredentials(config)); - gpr_free(server_address); + auto port_num = port(); + // Negative port number means inproc server, so no listen port needed + if (port_num >= 0) { + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", port_num); + builder.AddListeningPort(server_address, + Server::CreateServerCredentials(config)); + gpr_free(server_address); + } ApplyConfigToBuilder(config, &builder); @@ -170,6 +173,11 @@ class SynchronousServer final : public grpc::testing::Server { impl_ = builder.BuildAndStart(); } + std::shared_ptr<Channel> InProcessChannel( + const ChannelArguments& args) override { + return impl_->InProcessChannel(args); + } + private: BenchmarkServiceImpl service_; std::unique_ptr<grpc::Server> impl_; diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 27010b7315..38287464d9 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -20,6 +20,7 @@ #include <chrono> #include <thread> +#include <vector> #include <gflags/gflags.h> #include <grpc/grpc.h> @@ -41,6 +42,8 @@ static void sigint_handler(int x) { got_sigint = true; } namespace grpc { namespace testing { +std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr; + static void RunServer() { QpsWorker worker(FLAGS_driver_port, FLAGS_server_port, FLAGS_credential_type); diff --git a/test/cpp/util/create_test_channel.cc b/test/cpp/util/create_test_channel.cc index 34b6d60d01..4d047473b9 100644 --- a/test/cpp/util/create_test_channel.cc +++ b/test/cpp/util/create_test_channel.cc @@ -74,7 +74,7 @@ std::shared_ptr<Channel> CreateTestChannel( ChannelArguments channel_args(args); std::shared_ptr<ChannelCredentials> channel_creds; if (cred_type.empty()) { - return CreateChannel(server, InsecureChannelCredentials()); + return CreateCustomChannel(server, InsecureChannelCredentials(), args); } else if (cred_type == testing::kTlsCredentialsType) { // cred_type == "ssl" if (use_prod_roots) { gpr_once_init(&g_once_init_add_prod_ssl_provider, &AddProdSslType); @@ -101,7 +101,7 @@ std::shared_ptr<Channel> CreateTestChannel( cred_type, &channel_args); GPR_ASSERT(channel_creds != nullptr); - return CreateChannel(server, channel_creds); + return CreateCustomChannel(server, channel_creds, args); } } diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc index 69a6876a3f..16a00fb201 100644 --- a/test/cpp/util/error_details_test.cc +++ b/test/cpp/util/error_details_test.cc @@ -82,7 +82,7 @@ TEST(SetTest, NullInput) { TEST(SetTest, OutOfScopeErrorCode) { google::rpc::Status expected; - expected.set_code(20); // Out of scope (DATA_LOSS is 15). + expected.set_code(17); // Out of scope (UNAUTHENTICATED is 16). expected.set_message("I am an error message"); testing::EchoRequest expected_details; expected_details.set_message(grpc::string(100, '\0')); @@ -96,6 +96,24 @@ TEST(SetTest, OutOfScopeErrorCode) { EXPECT_EQ(expected.SerializeAsString(), to.error_details()); } +TEST(SetTest, ValidScopeErrorCode) { + for (int c = StatusCode::OK; c <= StatusCode::UNAUTHENTICATED; c++) { + google::rpc::Status expected; + expected.set_code(c); + expected.set_message("I am an error message"); + testing::EchoRequest expected_details; + expected_details.set_message(grpc::string(100, '\0')); + expected.add_details()->PackFrom(expected_details); + + Status to; + Status s = SetErrorDetails(expected, &to); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(c, to.error_code()); + EXPECT_EQ(expected.message(), to.error_message()); + EXPECT_EQ(expected.SerializeAsString(), to.error_details()); + } +} + } // namespace } // namespace grpc |