aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_async.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r--test/cpp/qps/client_async.cc90
1 files changed, 58 insertions, 32 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 82c3356f02..f7dda0f758 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -70,6 +55,11 @@ class ClientRpcContext {
}
virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
+ void lock() { mu_.lock(); }
+ void unlock() { mu_.unlock(); }
+
+ private:
+ std::mutex mu_;
};
template <class RequestType, class ResponseType>
@@ -121,6 +111,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -178,8 +169,14 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
num_async_threads_(NumThreads(config)) {
SetupLoadTest(config, num_async_threads_);
- for (int i = 0; i < num_async_threads_; i++) {
+ int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
+ int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator
+ for (int i = 0; i < num_cqs; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
+ }
+
+ for (int i = 0; i < num_async_threads_; i++) {
+ cq_.emplace_back(i % cli_cqs_.size());
next_issuers_.emplace_back(NextIssuer(i));
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
@@ -205,6 +202,14 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
+ int GetPollCount() override {
+ int count = 0;
+ for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+ count += grpc_get_cq_poll_num((*cq)->cq());
+ }
+ return count;
+ }
+
protected:
const int num_async_threads_;
@@ -238,20 +243,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
- if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
// Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
+ // We want to delete the context. However, it is possible that
+ // another thread that just initiated an action on this
+ // context still has its lock even though the action on the
+ // context has completed. To delay for that, just grab the
+ // lock for serialization. Take a new scope.
+ { std::lock_guard<ClientRpcContext> lctx(*ctx); }
delete ctx;
return true;
- } else if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[thread_idx].get());
- // delete the old version
+ }
+ bool del = false;
+
+ // Create a new scope for a lock_guard'ed region
+ {
+ std::lock_guard<ClientRpcContext> lctx(*ctx);
+ if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ // set the old version to delete
+ del = true;
+ }
+ }
+ if (del) {
delete ctx;
}
return true;
@@ -262,6 +283,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
+ std::vector<int> cq_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
@@ -384,6 +406,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingPingPongImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_);
}
@@ -522,6 +545,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromClientImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -639,6 +663,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromServerImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -781,6 +806,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextGenericStreamingImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_);
}