diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 4 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc | 11 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_pollset.cc | 36 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 48 |
4 files changed, 63 insertions, 36 deletions
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 9d7f65d292..0d267da723 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -67,7 +67,9 @@ static void pollset_init(grpc_pollset* ps, gpr_mu** mu) { *mu = &ps->mu; } -static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); } +static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* ps) { + gpr_mu_destroy(&ps->mu); +} static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) { return GRPC_ERROR_NONE; diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc index 47705d3031..01ff39121e 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc @@ -105,6 +105,17 @@ static void BM_PumpStreamClientToServer(benchmark::State& state) { GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } + response_rw.Finish(Status::OK, tag(0)); + Status final_status; + request_rw->Finish(&final_status, tag(1)); + need_tags = (1 << 0) | (1 << 1); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + GPR_ASSERT(final_status.ok()); } fixture->Finish(state); fixture.reset(); diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc index 0f3d3cef66..f5e8d13881 100644 --- a/test/cpp/microbenchmarks/bm_pollset.cc +++ b/test/cpp/microbenchmarks/bm_pollset.cc @@ -59,7 +59,7 @@ extern "C" { auto& force_library_initialization = Library::get(); static void shutdown_ps(grpc_exec_ctx* exec_ctx, void* ps, grpc_error* error) { - grpc_pollset_destroy(static_cast<grpc_pollset*>(ps)); + grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(ps)); } static void BM_CreateDestroyPollset(benchmark::State& state) { @@ -136,8 +136,7 @@ static void BM_PollEmptyPollset(benchmark::State& state) { gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); gpr_mu_lock(mu); while (state.KeepRunning()) { - grpc_pollset_worker* worker; - GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline)); + GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline)); } grpc_closure shutdown_ps_closure; grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps, @@ -150,6 +149,34 @@ static void BM_PollEmptyPollset(benchmark::State& state) { } BENCHMARK(BM_PollEmptyPollset); +static void BM_PollAddFd(benchmark::State& state) { + TrackCounters track_counters; + size_t ps_sz = grpc_pollset_size(); + grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz)); + gpr_mu* mu; + grpc_pollset_init(ps, &mu); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_wakeup_fd wakeup_fd; + GPR_ASSERT( + GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd))); + grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx"); + while (state.KeepRunning()) { + grpc_pollset_add_fd(&exec_ctx, ps, fd); + grpc_exec_ctx_flush(&exec_ctx); + } + grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx"); + grpc_closure shutdown_ps_closure; + grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps, + grpc_schedule_on_exec_ctx); + gpr_mu_lock(mu); + grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure); + gpr_mu_unlock(mu); + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(ps); + track_counters.Finish(state); +} +BENCHMARK(BM_PollAddFd); + class Closure : public grpc_closure { public: virtual ~Closure() {} @@ -233,8 +260,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) { grpc_fd_notify_on_read(&exec_ctx, wakeup, continue_closure); gpr_mu_lock(mu); while (!done) { - grpc_pollset_worker* worker; - GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline)); + GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline)); } grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done"); wakeup_fd.read_fd = 0; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 01856f714a..82c3356f02 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -238,39 +238,27 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { void* got_tag; bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext( - &got_tag, &ok, - std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { - case CompletionQueue::GOT_EVENT: { - // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - // Proceed while holding a lock to make sure that - // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); - if (shutdown_state_[thread_idx]->shutdown) { - delete ctx; - return true; - } else if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[thread_idx].get()); - // delete the old version - delete ctx; - } + if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { + // Got a regular event, so process it + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { + delete ctx; return true; + } else if (!ctx->RunNextState(ok, entry)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[thread_idx].get()); + // delete the old version + delete ctx; } - case CompletionQueue::TIMEOUT: { - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); - if (shutdown_state_[thread_idx]->shutdown) { - return true; - } - return true; - } - case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be - // done - return true; + return true; + } else { + // queue is shutting down, so we must be done + return true; } - GPR_UNREACHABLE_CODE(return true); } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; |