aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc4
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc11
-rw-r--r--test/cpp/microbenchmarks/bm_pollset.cc36
-rw-r--r--test/cpp/qps/client_async.cc48
4 files changed, 63 insertions, 36 deletions
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
index 9d7f65d292..0d267da723 100644
--- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -67,7 +67,9 @@ static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
*mu = &ps->mu;
}
-static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
+static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* ps) {
+ gpr_mu_destroy(&ps->mu);
+}
static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) {
return GRPC_ERROR_NONE;
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
index 47705d3031..01ff39121e 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
@@ -105,6 +105,17 @@ static void BM_PumpStreamClientToServer(benchmark::State& state) {
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
+ response_rw.Finish(Status::OK, tag(0));
+ Status final_status;
+ request_rw->Finish(&final_status, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ GPR_ASSERT(final_status.ok());
}
fixture->Finish(state);
fixture.reset();
diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc
index 0f3d3cef66..f5e8d13881 100644
--- a/test/cpp/microbenchmarks/bm_pollset.cc
+++ b/test/cpp/microbenchmarks/bm_pollset.cc
@@ -59,7 +59,7 @@ extern "C" {
auto& force_library_initialization = Library::get();
static void shutdown_ps(grpc_exec_ctx* exec_ctx, void* ps, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(ps));
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(ps));
}
static void BM_CreateDestroyPollset(benchmark::State& state) {
@@ -136,8 +136,7 @@ static void BM_PollEmptyPollset(benchmark::State& state) {
gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(mu);
while (state.KeepRunning()) {
- grpc_pollset_worker* worker;
- GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline));
+ GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
grpc_closure shutdown_ps_closure;
grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
@@ -150,6 +149,34 @@ static void BM_PollEmptyPollset(benchmark::State& state) {
}
BENCHMARK(BM_PollEmptyPollset);
+static void BM_PollAddFd(benchmark::State& state) {
+ TrackCounters track_counters;
+ size_t ps_sz = grpc_pollset_size();
+ grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz));
+ gpr_mu* mu;
+ grpc_pollset_init(ps, &mu);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_wakeup_fd wakeup_fd;
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd)));
+ grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx");
+ while (state.KeepRunning()) {
+ grpc_pollset_add_fd(&exec_ctx, ps, fd);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx");
+ grpc_closure shutdown_ps_closure;
+ grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
+ grpc_schedule_on_exec_ctx);
+ gpr_mu_lock(mu);
+ grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure);
+ gpr_mu_unlock(mu);
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(ps);
+ track_counters.Finish(state);
+}
+BENCHMARK(BM_PollAddFd);
+
class Closure : public grpc_closure {
public:
virtual ~Closure() {}
@@ -233,8 +260,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
grpc_fd_notify_on_read(&exec_ctx, wakeup, continue_closure);
gpr_mu_lock(mu);
while (!done) {
- grpc_pollset_worker* worker;
- GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline));
+ GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done");
wakeup_fd.read_fd = 0;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 01856f714a..82c3356f02 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -238,39 +238,27 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(
- &got_tag, &ok,
- std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
- case CompletionQueue::GOT_EVENT: {
- // Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- // Proceed while holding a lock to make sure that
- // this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
- if (shutdown_state_[thread_idx]->shutdown) {
- delete ctx;
- return true;
- } else if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[thread_idx].get());
- // delete the old version
- delete ctx;
- }
+ if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ // Got a regular event, so process it
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ delete ctx;
return true;
+ } else if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[thread_idx].get());
+ // delete the old version
+ delete ctx;
}
- case CompletionQueue::TIMEOUT: {
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
- if (shutdown_state_[thread_idx]->shutdown) {
- return true;
- }
- return true;
- }
- case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
- // done
- return true;
+ return true;
+ } else {
+ // queue is shutting down, so we must be done
+ return true;
}
- GPR_UNREACHABLE_CODE(return true);
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;