From a0e92e7727ded204e3ada8f5cfa455805098852f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 10 Aug 2018 14:57:52 -0700 Subject: Add proper synchronization so that stats are setup and destroyed cleanly --- test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 45 +++++++++++++++++++--- 1 file changed, 40 insertions(+), 5 deletions(-) (limited to 'test/cpp/microbenchmarks') diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 4a5487f1c4..06922afda3 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -34,10 +34,13 @@ struct grpc_pollset { gpr_mu mu; }; +static gpr_mu g_mu; +static gpr_cv g_cv; +static int g_threads_active; +static bool g_active; + namespace grpc { namespace testing { - -static void* g_tag = (void*)static_cast(10); // Some random number static grpc_completion_queue* g_cq; static grpc_event_engine_vtable g_vtable; @@ -71,9 +74,11 @@ static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker, } gpr_mu_unlock(&ps->mu); - GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag)); + + void* tag = (void*)static_cast(10); // Some random number + GPR_ASSERT(grpc_cq_begin_op(g_cq, tag)); grpc_cq_end_op( - g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, nullptr, + g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr, static_cast(gpr_malloc(sizeof(grpc_cq_completion)))); grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&ps->mu); @@ -137,15 +142,31 @@ static void teardown() { code (i.e the code between two successive calls of state.KeepRunning()) if state.KeepRunning() returns false. So it is safe to do the teardown in one of the threads after state.keepRunning() returns false. + + However, our use requires synchronization because we do additional work at + each thread that requires specific ordering (TrackCounters must be constructed + after grpc_init because it needs the number of cores, initialized by grpc, + and its Finish call must take place before grpc_shutdown so that it can use + grpc_stats). */ static void BM_Cq_Throughput(benchmark::State& state) { - TrackCounters track_counters; gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); auto thd_idx = state.thread_index; + gpr_mu_lock(&g_mu); + g_threads_active++; if (thd_idx == 0) { setup(); + g_active = true; + gpr_cv_broadcast(&g_cv); + } else { + while (!g_active) { + gpr_cv_wait(&g_cv, &g_mu, deadline); + } } + gpr_mu_unlock(&g_mu); + + TrackCounters track_counters; while (state.KeepRunning()) { GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type == @@ -155,8 +176,20 @@ static void BM_Cq_Throughput(benchmark::State& state) { state.SetItemsProcessed(state.iterations()); track_counters.Finish(state); + gpr_mu_lock(&g_mu); + g_threads_active--; + if (g_threads_active == 0) { + gpr_cv_broadcast(&g_cv); + } else { + while (g_threads_active > 0) { + gpr_cv_wait(&g_cv, &g_mu, deadline); + } + } + gpr_mu_unlock(&g_mu); + if (thd_idx == 0) { teardown(); + g_active = false; } } @@ -172,6 +205,8 @@ void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } } // namespace benchmark int main(int argc, char** argv) { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); ::benchmark::Initialize(&argc, argv); ::grpc::testing::InitTest(&argc, &argv, false); benchmark::RunTheBenchmarksNamespaced(); -- cgit v1.2.3