diff options
author | Vijay Pai <vpai@google.com> | 2015-06-05 03:32:03 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-06-05 03:32:03 -0700 |
commit | c7de81e57ec4131993bd469a146739d74d564ab1 (patch) | |
tree | a1e435c866927e6104e0956463db5b4d5ee6a532 | |
parent | db398e06092cb9af064949e1aeee799f9469de26 (diff) |
Better management of channel attempts and channel coverage
-rw-r--r-- | test/cpp/qps/client_async.cc | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index dc62d1152f..24d8b3751d 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -154,7 +154,8 @@ class AsyncClient : public Client { Client(config), channel_lock_(config.client_channels()), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), - channel_count_(config.client_channels()) { + channel_count_(config.client_channels()), + pref_channel_inc_(config.async_client_threads()) { SetupLoadTest(config, config.async_client_threads()); @@ -265,17 +266,17 @@ class AsyncClient : public Client { grpc_time_source::now() >= next_issue_[thread_idx]) { // Attempt to issue bool issued = false; - for (int num_attempts = 0; num_attempts < channel_count_ && !issued; - num_attempts++) { + for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx]; + num_attempts < channel_count_ && !issued; num_attempts++) { bool can_issue = false; ClientRpcContext* ctx = nullptr; { std::lock_guard<std::mutex> - g(channel_lock_[next_channel_[thread_idx]]); - if (!contexts_[next_channel_[thread_idx]].empty()) { + g(channel_lock_[channel_attempt]); + if (!contexts_[channel_attempt].empty()) { // Get an idle context from the front of the list - ctx = *(contexts_[next_channel_[thread_idx]].begin()); - contexts_[next_channel_[thread_idx]].pop_front(); + ctx = *(contexts_[channel_attempt].begin()); + contexts_[channel_attempt].pop_front(); can_issue = true; } } @@ -288,10 +289,14 @@ class AsyncClient : public Client { ctx->set_deadline_posn(it); ctx->Start(cli_cqs_[thread_idx].get()); issued = true; + // If we did issue, then next time, try our thread's next + // preferred channel + next_channel_[thread_idx] += pref_channel_inc_; + if (next_channel_[thread_idx] >= channel_count_) + next_channel_[thread_idx] = (thread_idx % channel_count_); } else { - // Do a modular increment of next_channel only if we didn't issue - next_channel_[thread_idx] = - (next_channel_[thread_idx]+1)%channel_count_; + // Do a modular increment of channel attempt if we couldn't issue + channel_attempt = (channel_attempt+1) % channel_count_; } } if (issued) { @@ -319,6 +324,7 @@ class AsyncClient : public Client { std::vector<context_list> contexts_; // per-channel list of idle contexts int max_outstanding_per_channel_; int channel_count_; + int pref_channel_inc_; }; class AsyncUnaryClient GRPC_FINAL : public AsyncClient { |