aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-06-05 03:32:03 -0700
committerGravatar Vijay Pai <vpai@google.com>2015-06-05 03:32:03 -0700
commitc7de81e57ec4131993bd469a146739d74d564ab1 (patch)
treea1e435c866927e6104e0956463db5b4d5ee6a532
parentdb398e06092cb9af064949e1aeee799f9469de26 (diff)
Better management of channel attempts and channel coverage
-rw-r--r--test/cpp/qps/client_async.cc26
1 files changed, 16 insertions, 10 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index dc62d1152f..24d8b3751d 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -154,7 +154,8 @@ class AsyncClient : public Client {
Client(config), channel_lock_(config.client_channels()),
contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
- channel_count_(config.client_channels()) {
+ channel_count_(config.client_channels()),
+ pref_channel_inc_(config.async_client_threads()) {
SetupLoadTest(config, config.async_client_threads());
@@ -265,17 +266,17 @@ class AsyncClient : public Client {
grpc_time_source::now() >= next_issue_[thread_idx]) {
// Attempt to issue
bool issued = false;
- for (int num_attempts = 0; num_attempts < channel_count_ && !issued;
- num_attempts++) {
+ for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
+ num_attempts < channel_count_ && !issued; num_attempts++) {
bool can_issue = false;
ClientRpcContext* ctx = nullptr;
{
std::lock_guard<std::mutex>
- g(channel_lock_[next_channel_[thread_idx]]);
- if (!contexts_[next_channel_[thread_idx]].empty()) {
+ g(channel_lock_[channel_attempt]);
+ if (!contexts_[channel_attempt].empty()) {
// Get an idle context from the front of the list
- ctx = *(contexts_[next_channel_[thread_idx]].begin());
- contexts_[next_channel_[thread_idx]].pop_front();
+ ctx = *(contexts_[channel_attempt].begin());
+ contexts_[channel_attempt].pop_front();
can_issue = true;
}
}
@@ -288,10 +289,14 @@ class AsyncClient : public Client {
ctx->set_deadline_posn(it);
ctx->Start(cli_cqs_[thread_idx].get());
issued = true;
+ // If we did issue, then next time, try our thread's next
+ // preferred channel
+ next_channel_[thread_idx] += pref_channel_inc_;
+ if (next_channel_[thread_idx] >= channel_count_)
+ next_channel_[thread_idx] = (thread_idx % channel_count_);
} else {
- // Do a modular increment of next_channel only if we didn't issue
- next_channel_[thread_idx] =
- (next_channel_[thread_idx]+1)%channel_count_;
+ // Do a modular increment of channel attempt if we couldn't issue
+ channel_attempt = (channel_attempt+1) % channel_count_;
}
}
if (issued) {
@@ -319,6 +324,7 @@ class AsyncClient : public Client {
std::vector<context_list> contexts_; // per-channel list of idle contexts
int max_outstanding_per_channel_;
int channel_count_;
+ int pref_channel_inc_;
};
class AsyncUnaryClient GRPC_FINAL : public AsyncClient {