From 06c995137bdfb26999c5e496bed3ae76ce1dd09e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 5 Jun 2015 02:32:39 -0700 Subject: Take some code out from under a lock_guard and try to maintain affinity of next_channel whenever possible --- test/cpp/qps/client_async.cc | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index ad0cffabda..ab592abc3d 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -273,17 +273,23 @@ class AsyncClient : public Client { // Attempt to issue bool issued = false; for (int num_attempts = 0; num_attempts < channel_count_ && !issued; - num_attempts++, - next_channel_[thread_idx] = - (next_channel_[thread_idx]+1)%channel_count_) { - std::lock_guard - g(channel_lock_[next_channel_[thread_idx]]); - if ((rpcs_outstanding_[next_channel_[thread_idx]] < - max_outstanding_per_channel_) && - !contexts_[next_channel_[thread_idx]].empty()) { - // Get an idle context from the front of the list - auto ctx = *(contexts_[next_channel_[thread_idx]].begin()); - contexts_[next_channel_[thread_idx]].pop_front(); + num_attempts++) { + bool can_issue = false; + ClientRpcContext* ctx; + { + std::lock_guard + g(channel_lock_[next_channel_[thread_idx]]); + if ((rpcs_outstanding_[next_channel_[thread_idx]] < + max_outstanding_per_channel_) && + !contexts_[next_channel_[thread_idx]].empty()) { + // Get an idle context from the front of the list + ctx = *(contexts_[next_channel_[thread_idx]].begin()); + contexts_[next_channel_[thread_idx]].pop_front(); + rpcs_outstanding_[next_channel_[thread_idx]]++; + can_issue = true; + } + } + if (can_issue) { // do the work to issue rpc_deadlines_[thread_idx].emplace_back( grpc_time_source::now() + std::chrono::seconds(1)); @@ -291,11 +297,15 @@ class AsyncClient : public Client { --it; ctx->set_deadline_posn(it); ctx->Start(cli_cqs_[thread_idx].get()); - rpcs_outstanding_[next_channel_[thread_idx]]++; issued = true; + } else { + // Do a modular increment of next_channel only if we didn't issue + next_channel_[thread_idx] = + (next_channel_[thread_idx]+1)%channel_count_; } } if (issued) { + // We issued one; see when we can issue the next grpc_time next_issue; NextIssueTime(thread_idx, &next_issue); next_issue_[thread_idx]=next_issue; -- cgit v1.2.3