aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/microbenchmarks
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-08-10 14:57:52 -0700
committerGravatar Vijay Pai <vpai@google.com>2018-08-10 15:48:48 -0700
commita0e92e7727ded204e3ada8f5cfa455805098852f (patch)
tree42d74be4e0845ae3c308bd506b558b12ffa95be9 /test/cpp/microbenchmarks
parent9043a4f56d899a377d8763fa868b277d23e7b213 (diff)
Add proper synchronization so that stats are setup and destroyed cleanly
Diffstat (limited to 'test/cpp/microbenchmarks')
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc45
1 files changed, 40 insertions, 5 deletions
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
index 4a5487f1c4..06922afda3 100644
--- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -34,10 +34,13 @@ struct grpc_pollset {
gpr_mu mu;
};
+static gpr_mu g_mu;
+static gpr_cv g_cv;
+static int g_threads_active;
+static bool g_active;
+
namespace grpc {
namespace testing {
-
-static void* g_tag = (void*)static_cast<intptr_t>(10); // Some random number
static grpc_completion_queue* g_cq;
static grpc_event_engine_vtable g_vtable;
@@ -71,9 +74,11 @@ static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker,
}
gpr_mu_unlock(&ps->mu);
- GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag));
+
+ void* tag = (void*)static_cast<intptr_t>(10); // Some random number
+ GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
grpc_cq_end_op(
- g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
+ g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(&ps->mu);
@@ -137,15 +142,31 @@ static void teardown() {
code (i.e the code between two successive calls of state.KeepRunning()) if
state.KeepRunning() returns false. So it is safe to do the teardown in one
of the threads after state.keepRunning() returns false.
+
+ However, our use requires synchronization because we do additional work at
+ each thread that requires specific ordering (TrackCounters must be constructed
+ after grpc_init because it needs the number of cores, initialized by grpc,
+ and its Finish call must take place before grpc_shutdown so that it can use
+ grpc_stats).
*/
static void BM_Cq_Throughput(benchmark::State& state) {
- TrackCounters track_counters;
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
auto thd_idx = state.thread_index;
+ gpr_mu_lock(&g_mu);
+ g_threads_active++;
if (thd_idx == 0) {
setup();
+ g_active = true;
+ gpr_cv_broadcast(&g_cv);
+ } else {
+ while (!g_active) {
+ gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
}
+ gpr_mu_unlock(&g_mu);
+
+ TrackCounters track_counters;
while (state.KeepRunning()) {
GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
@@ -155,8 +176,20 @@ static void BM_Cq_Throughput(benchmark::State& state) {
state.SetItemsProcessed(state.iterations());
track_counters.Finish(state);
+ gpr_mu_lock(&g_mu);
+ g_threads_active--;
+ if (g_threads_active == 0) {
+ gpr_cv_broadcast(&g_cv);
+ } else {
+ while (g_threads_active > 0) {
+ gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+
if (thd_idx == 0) {
teardown();
+ g_active = false;
}
}
@@ -172,6 +205,8 @@ void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
+ gpr_mu_init(&g_mu);
+ gpr_cv_init(&g_cv);
::benchmark::Initialize(&argc, argv);
::grpc::testing::InitTest(&argc, &argv, false);
benchmark::RunTheBenchmarksNamespaced();