From 761bcb4a18c7a59dfabe2276f615945f59e5021f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 11 Feb 2016 11:59:04 -0800 Subject: Cleanup to make it work with clang build --- test/cpp/qps/client.h | 4 ++++ test/cpp/qps/client_async.cc | 35 ++++++++++++++--------------------- 2 files changed, 18 insertions(+), 21 deletions(-) (limited to 'test') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 0393196907..6962d336dd 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -217,6 +217,10 @@ class Client { GPR_TIMESPAN)); return result; } + std::function NextIssuer(int thread_idx) { + return closed_loop_ ? std::function() + : std::bind(&Client::NextIssueTime, this, thread_idx); + } private: class Thread { diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 8a2de13c1a..b02d399fc6 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -157,6 +157,7 @@ class AsyncClient : public ClientImpl { public: using Client::SetupLoadTest; using Client::closed_loop_; + using Client::NextIssuer; using ClientImpl::cores_; using ClientImpl::channels_; using ClientImpl::request_; @@ -172,6 +173,7 @@ class AsyncClient : public ClientImpl { for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); + next_issuers_.emplace_back(NextIssuer(i)); } using namespace std::placeholders; @@ -179,11 +181,8 @@ class AsyncClient : public ClientImpl { for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (int ch = 0; ch < config.client_channels(); ch++) { auto* cq = cli_cqs_[t].get(); - std::function next_issue; - if (!closed_loop_) { - next_issue = std::bind(&Client::NextIssueTime, this, t); - } - auto ctx = setup_ctx(channels_[ch].get_stub(), next_issue, request_); + auto ctx = + setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_); ctx->Start(cq); t = (t + 1) % cli_cqs_.size(); } @@ -204,29 +203,22 @@ class AsyncClient : public ClientImpl { size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - bool got_event; - switch (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { - case CompletionQueue::SHUTDOWN: - return false; - case CompletionQueue::GOT_EVENT: - got_event = true; - break; - default: - GPR_ASSERT(false); - break; - } - if (got_event) { + if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { + // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { + if (!ctx->RunNextState(ok, histogram)) { // The RPC and callback are done, so clone the ctx - ClientRpcContext* clone_ctx = ctx->StartNewClone(); - clone_ctx->Start(cli_cqs_[thread_idx].get()); + // and kickstart the new one + auto clone = ctx->StartNewClone(); + clone->Start(cli_cqs_[thread_idx].get()); // delete the old version delete ctx; } + return true; + } else { // queue is shutting down + return false; } - return true; } protected: @@ -243,6 +235,7 @@ class AsyncClient : public ClientImpl { } std::vector> cli_cqs_; + std::vector> next_issuers_; }; static std::unique_ptr BenchmarkStubCreator( -- cgit v1.2.3