diff options
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r-- | test/cpp/qps/client_async.cc | 47 |
1 files changed, 40 insertions, 7 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 03e116e26c..f7dda0f758 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -55,6 +55,11 @@ class ClientRpcContext { } virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0; + void lock() { mu_.lock(); } + void unlock() { mu_.unlock(); } + + private: + std::mutex mu_; }; template <class RequestType, class ResponseType> @@ -106,6 +111,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq); } @@ -163,8 +169,14 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { num_async_threads_(NumThreads(config)) { SetupLoadTest(config, num_async_threads_); - for (int i = 0; i < num_async_threads_; i++) { + int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified + int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator + for (int i = 0; i < num_cqs; i++) { cli_cqs_.emplace_back(new CompletionQueue); + } + + for (int i = 0; i < num_async_threads_; i++) { + cq_.emplace_back(i % cli_cqs_.size()); next_issuers_.emplace_back(NextIssuer(i)); shutdown_state_.emplace_back(new PerThreadShutdownState()); } @@ -231,20 +243,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { void* got_tag; bool ok; - if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { + if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); if (shutdown_state_[thread_idx]->shutdown) { + // We want to delete the context. However, it is possible that + // another thread that just initiated an action on this + // context still has its lock even though the action on the + // context has completed. To delay for that, just grab the + // lock for serialization. Take a new scope. + { std::lock_guard<ClientRpcContext> lctx(*ctx); } delete ctx; return true; - } else if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[thread_idx].get()); - // delete the old version + } + bool del = false; + + // Create a new scope for a lock_guard'ed region + { + std::lock_guard<ClientRpcContext> lctx(*ctx); + if (!ctx->RunNextState(ok, entry)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + // set the old version to delete + del = true; + } + } + if (del) { delete ctx; } return true; @@ -255,6 +283,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; + std::vector<int> cq_; std::vector<std::function<gpr_timespec()>> next_issuers_; std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; @@ -377,6 +406,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingPingPongImpl( stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq, messages_per_stream_); } @@ -515,6 +545,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingFromClientImpl( stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq); } @@ -632,6 +663,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingFromServerImpl( stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq); } @@ -774,6 +806,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextGenericStreamingImpl( stub_, req_, next_issue_, start_req_, callback_); + std::lock_guard<ClientRpcContext> lclone(*clone); clone->StartInternal(cq, messages_per_stream_); } |