diff options
author | 2018-02-01 00:37:59 +0100 | |
---|---|---|
committer | 2018-02-01 00:37:59 +0100 | |
commit | e294279e398566fb6f41256c79b9d3e886a0d156 (patch) | |
tree | 03fcd162e2f3bde279ac0b926056fd4e5e7553d5 /test | |
parent | d6358a5b0a473042927dbd0e9e0102eac5a89159 (diff) | |
parent | 889bb7fde4c3f8f36c3821e16b0b66870e68526d (diff) |
Merge pull request #14236 from jtattermusch/fix_passthru_endpoint_race
Make grpc_passthru_endpoint_stats refcounted
Diffstat (limited to 'test')
-rw-r--r-- | test/core/util/passthru_endpoint.cc | 26 | ||||
-rw-r--r-- | test/core/util/passthru_endpoint.h | 8 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 29 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/fullstack_fixtures.h | 40 | ||||
-rw-r--r-- | test/cpp/performance/writes_per_rpc_test.cc | 21 |
5 files changed, 95 insertions, 29 deletions
diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index 5f127cb960..0da0765979 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -48,8 +48,6 @@ struct passthru_endpoint { gpr_mu mu; int halves; grpc_passthru_endpoint_stats* stats; - grpc_passthru_endpoint_stats - dummy_stats; // used if constructor stats == nullptr bool shutdown; half client; half server; @@ -137,6 +135,7 @@ static void me_destroy(grpc_endpoint* ep) { if (0 == --p->halves) { gpr_mu_unlock(&p->mu); gpr_mu_destroy(&p->mu); + grpc_passthru_endpoint_stats_destroy(p->stats); grpc_slice_buffer_destroy_internal(&p->client.read_buffer); grpc_slice_buffer_destroy_internal(&p->server.read_buffer); grpc_resource_user_unref(p->client.resource_user); @@ -194,11 +193,30 @@ void grpc_passthru_endpoint_create(grpc_endpoint** client, passthru_endpoint* m = (passthru_endpoint*)gpr_malloc(sizeof(*m)); m->halves = 2; m->shutdown = 0; - m->stats = stats == nullptr ? &m->dummy_stats : stats; - memset(m->stats, 0, sizeof(*m->stats)); + if (stats == nullptr) { + m->stats = grpc_passthru_endpoint_stats_create(); + } else { + gpr_ref(&stats->refs); + m->stats = stats; + } half_init(&m->client, m, resource_quota, "client"); half_init(&m->server, m, resource_quota, "server"); gpr_mu_init(&m->mu); *client = &m->client.base; *server = &m->server.base; } + +grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() { + grpc_passthru_endpoint_stats* stats = + (grpc_passthru_endpoint_stats*)gpr_malloc( + sizeof(grpc_passthru_endpoint_stats)); + memset(stats, 0, sizeof(*stats)); + gpr_ref_init(&stats->refs, 1); + return stats; +} + +void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) { + if (gpr_unref(&stats->refs)) { + gpr_free(stats); + } +} diff --git a/test/core/util/passthru_endpoint.h b/test/core/util/passthru_endpoint.h index bddd8ea6a2..a46c775505 100644 --- a/test/core/util/passthru_endpoint.h +++ b/test/core/util/passthru_endpoint.h @@ -23,7 +23,11 @@ #include "src/core/lib/iomgr/endpoint.h" +/* The struct is refcounted, always use grpc_passthru_endpoint_stats_create and + * grpc_passthru_endpoint_stats_destroy, rather then embedding it in your + * objects by value. */ typedef struct { + gpr_refcount refs; gpr_atm num_writes; } grpc_passthru_endpoint_stats; @@ -32,4 +36,8 @@ void grpc_passthru_endpoint_create(grpc_endpoint** client, grpc_resource_quota* resource_quota, grpc_passthru_endpoint_stats* stats); +grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create(); + +void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats); + #endif diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index d6d7d41e5e..294f1feb80 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -79,9 +79,11 @@ static void write_csv(std::ostream* out, A0&& a0, Arg&&... arg) { class TrickledCHTTP2 : public EndpointPairFixture { public: TrickledCHTTP2(Service* service, bool streaming, size_t req_size, - size_t resp_size, size_t kilobits_per_second) - : EndpointPairFixture(service, MakeEndpoints(kilobits_per_second), - FixtureConfiguration()) { + size_t resp_size, size_t kilobits_per_second, + grpc_passthru_endpoint_stats* stats) + : EndpointPairFixture(service, MakeEndpoints(kilobits_per_second, stats), + FixtureConfiguration()), + stats_(stats) { if (FLAGS_log) { std::ostringstream fn; fn << "trickle." << (streaming ? "streaming" : "unary") << "." << req_size @@ -101,9 +103,15 @@ class TrickledCHTTP2 : public EndpointPairFixture { } } + virtual ~TrickledCHTTP2() { + if (stats_ != nullptr) { + grpc_passthru_endpoint_stats_destroy(stats_); + } + } + void AddToLabel(std::ostream& out, benchmark::State& state) { out << " writes/iter:" - << ((double)stats_.num_writes / (double)state.iterations()) + << ((double)stats_->num_writes / (double)state.iterations()) << " cli_transport_stalls/iter:" << ((double) client_stats_.streams_stalled_due_to_transport_flow_control / @@ -193,7 +201,7 @@ class TrickledCHTTP2 : public EndpointPairFixture { } private: - grpc_passthru_endpoint_stats stats_; + grpc_passthru_endpoint_stats* stats_; struct Stats { int streams_stalled_due_to_stream_flow_control = 0; int streams_stalled_due_to_transport_flow_control = 0; @@ -203,10 +211,11 @@ class TrickledCHTTP2 : public EndpointPairFixture { std::unique_ptr<std::ofstream> log_; gpr_timespec start_ = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_endpoint_pair MakeEndpoints(size_t kilobits) { + static grpc_endpoint_pair MakeEndpoints(size_t kilobits, + grpc_passthru_endpoint_stats* stats) { grpc_endpoint_pair p; grpc_passthru_endpoint_create(&p.client, &p.server, Library::get().rq(), - &stats_); + stats); double bytes_per_second = 125.0 * kilobits; p.client = grpc_trickle_endpoint_create(p.client, bytes_per_second); p.server = grpc_trickle_endpoint_create(p.server, bytes_per_second); @@ -251,7 +260,8 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { EchoTestService::AsyncService service; std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( &service, true, state.range(0) /* req_size */, - state.range(0) /* resp_size */, state.range(1) /* bw in kbit/s */)); + state.range(0) /* resp_size */, state.range(1) /* bw in kbit/s */, + grpc_passthru_endpoint_stats_create())); { EchoResponse send_response; EchoResponse recv_response; @@ -344,7 +354,8 @@ static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) { EchoTestService::AsyncService service; std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( &service, false, state.range(0) /* req_size */, - state.range(1) /* resp_size */, state.range(2) /* bw in kbit/s */)); + state.range(1) /* resp_size */, state.range(2) /* bw in kbit/s */, + grpc_passthru_endpoint_stats_create())); EchoRequest send_request; EchoResponse send_response; EchoResponse recv_response; diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index d1ede755a5..d73caa01c8 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -245,31 +245,53 @@ class SockPair : public EndpointPairFixture { fixture_configuration) {} }; -class InProcessCHTTP2 : public EndpointPairFixture { +/* Use InProcessCHTTP2 instead. This class (with stats as an explicit parameter) + is here only to be able to initialize both the base class and stats_ with the + same stats instance without accessing the stats_ fields before the object is + properly initialized. */ +class InProcessCHTTP2WithExplicitStats : public EndpointPairFixture { public: - InProcessCHTTP2(Service* service, - const FixtureConfiguration& fixture_configuration = - FixtureConfiguration()) - : EndpointPairFixture(service, MakeEndpoints(), fixture_configuration) {} + InProcessCHTTP2WithExplicitStats( + Service* service, grpc_passthru_endpoint_stats* stats, + const FixtureConfiguration& fixture_configuration) + : EndpointPairFixture(service, MakeEndpoints(stats), + fixture_configuration), + stats_(stats) {} + + virtual ~InProcessCHTTP2WithExplicitStats() { + if (stats_ != nullptr) { + grpc_passthru_endpoint_stats_destroy(stats_); + } + } void AddToLabel(std::ostream& out, benchmark::State& state) { EndpointPairFixture::AddToLabel(out, state); out << " writes/iter:" - << static_cast<double>(gpr_atm_no_barrier_load(&stats_.num_writes)) / + << static_cast<double>(gpr_atm_no_barrier_load(&stats_->num_writes)) / static_cast<double>(state.iterations()); } private: - grpc_passthru_endpoint_stats stats_; + grpc_passthru_endpoint_stats* stats_; - grpc_endpoint_pair MakeEndpoints() { + static grpc_endpoint_pair MakeEndpoints(grpc_passthru_endpoint_stats* stats) { grpc_endpoint_pair p; grpc_passthru_endpoint_create(&p.client, &p.server, Library::get().rq(), - &stats_); + stats); return p; } }; +class InProcessCHTTP2 : public InProcessCHTTP2WithExplicitStats { + public: + InProcessCHTTP2(Service* service, + const FixtureConfiguration& fixture_configuration = + FixtureConfiguration()) + : InProcessCHTTP2WithExplicitStats(service, + grpc_passthru_endpoint_stats_create(), + fixture_configuration) {} +}; + //////////////////////////////////////////////////////////////////////////////// // Minimal stack fixtures diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index 0b9dc83f2b..b7d951a86e 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -142,18 +142,24 @@ class EndpointPairFixture { class InProcessCHTTP2 : public EndpointPairFixture { public: - InProcessCHTTP2(Service* service) - : EndpointPairFixture(service, MakeEndpoints()) {} + InProcessCHTTP2(Service* service, grpc_passthru_endpoint_stats* stats) + : EndpointPairFixture(service, MakeEndpoints(stats)), stats_(stats) {} - int writes_performed() const { return stats_.num_writes; } + virtual ~InProcessCHTTP2() { + if (stats_ != nullptr) { + grpc_passthru_endpoint_stats_destroy(stats_); + } + } + + int writes_performed() const { return stats_->num_writes; } private: - grpc_passthru_endpoint_stats stats_; + grpc_passthru_endpoint_stats* stats_; - grpc_endpoint_pair MakeEndpoints() { + static grpc_endpoint_pair MakeEndpoints(grpc_passthru_endpoint_stats* stats) { grpc_endpoint_pair p; grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(), - &stats_); + stats); return p; } }; @@ -162,7 +168,8 @@ static double UnaryPingPong(int request_size, int response_size) { const int kIterations = 10000; EchoTestService::AsyncService service; - std::unique_ptr<InProcessCHTTP2> fixture(new InProcessCHTTP2(&service)); + std::unique_ptr<InProcessCHTTP2> fixture( + new InProcessCHTTP2(&service, grpc_passthru_endpoint_stats_create())); EchoRequest send_request; EchoResponse send_response; EchoResponse recv_response; |