diff options
author | Vijay Pai <vpai@google.com> | 2015-06-05 02:32:39 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-06-05 02:32:39 -0700 |
commit | 06c995137bdfb26999c5e496bed3ae76ce1dd09e (patch) | |
tree | 94543710f632d590148d4e030651108083ffc41a | |
parent | 7b172b2411717d564f201c13317687132822964b (diff) |
Take some code out from under a lock_guard and try to
maintain affinity of next_channel whenever possible
-rw-r--r-- | test/cpp/qps/client_async.cc | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index ad0cffabda..ab592abc3d 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -273,17 +273,23 @@ class AsyncClient : public Client { // Attempt to issue bool issued = false; for (int num_attempts = 0; num_attempts < channel_count_ && !issued; - num_attempts++, - next_channel_[thread_idx] = - (next_channel_[thread_idx]+1)%channel_count_) { - std::lock_guard<std::mutex> - g(channel_lock_[next_channel_[thread_idx]]); - if ((rpcs_outstanding_[next_channel_[thread_idx]] < - max_outstanding_per_channel_) && - !contexts_[next_channel_[thread_idx]].empty()) { - // Get an idle context from the front of the list - auto ctx = *(contexts_[next_channel_[thread_idx]].begin()); - contexts_[next_channel_[thread_idx]].pop_front(); + num_attempts++) { + bool can_issue = false; + ClientRpcContext* ctx; + { + std::lock_guard<std::mutex> + g(channel_lock_[next_channel_[thread_idx]]); + if ((rpcs_outstanding_[next_channel_[thread_idx]] < + max_outstanding_per_channel_) && + !contexts_[next_channel_[thread_idx]].empty()) { + // Get an idle context from the front of the list + ctx = *(contexts_[next_channel_[thread_idx]].begin()); + contexts_[next_channel_[thread_idx]].pop_front(); + rpcs_outstanding_[next_channel_[thread_idx]]++; + can_issue = true; + } + } + if (can_issue) { // do the work to issue rpc_deadlines_[thread_idx].emplace_back( grpc_time_source::now() + std::chrono::seconds(1)); @@ -291,11 +297,15 @@ class AsyncClient : public Client { --it; ctx->set_deadline_posn(it); ctx->Start(cli_cqs_[thread_idx].get()); - rpcs_outstanding_[next_channel_[thread_idx]]++; issued = true; + } else { + // Do a modular increment of next_channel only if we didn't issue + next_channel_[thread_idx] = + (next_channel_[thread_idx]+1)%channel_count_; } } if (issued) { + // We issued one; see when we can issue the next grpc_time next_issue; NextIssueTime(thread_idx, &next_issue); next_issue_[thread_idx]=next_issue; |