From 8eedd4a2c69e377d30f5466b985df9159cc48851 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 3 Jun 2015 15:39:35 -0700 Subject: Context activation for async --- test/cpp/qps/client_async.cc | 69 ++++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 22 deletions(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index e4ce93adb4..fbbc19f58d 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -57,15 +57,13 @@ namespace grpc { namespace testing { -typedef std::forward_list deadline_list; - class ClientRpcContext { public: ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; - virtual void StartNewClone() = 0; + virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast(c); } static ClientRpcContext* detag(void* t) { return reinterpret_cast(t); @@ -110,8 +108,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return ret; } - void StartNewClone() GRPC_OVERRIDE { - new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); } private: @@ -137,12 +135,17 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { response_reader_; }; +typedef std::forward_list deadline_list; +typedef std::forward_list context_list; + class AsyncClient : public Client { public: explicit AsyncClient(const ClientConfig& config, std::function setup_ctx) : - Client(config), channel_rpc_lock_(config.client_channels()) { + Client(config), channel_rpc_lock_(config.client_channels()), + max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), + channel_count_(config.client_channels()) { SetupLoadTest(config, config.async_client_threads()); @@ -167,8 +170,8 @@ class AsyncClient : public Client { int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (auto channel = channels_.begin(); channel != channels_.end(); - channel++) { + for (int ch = 0; ch < channel_count_; ch++) { + auto channel = channels_[ch]; auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); ClientRpcContext *ctx = setup_ctx(cq, channel->get_stub(), request_); @@ -177,6 +180,9 @@ class AsyncClient : public Client { // closed_loop streaming ctx->Start(); } + else { + contexts_[ch].push_front(ctx); + } } } } @@ -200,7 +206,12 @@ class AsyncClient : public Client { deadline = grpc_time_source::now() + std::chrono::seconds(1); short_deadline = deadline; } else { - deadline = *(rpc_deadlines_[thread_idx].begin()); + if (rpc_deadlines_[thread_idx].empty()) { + deadline = grpc_time_source::now() + std::chrono::seconds(1); + } + else { + deadline = *(rpc_deadlines_[thread_idx].begin()); + } short_deadline = issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline; } @@ -219,37 +230,50 @@ class AsyncClient : public Client { GPR_ASSERT(false); break; } - if (grpc_time_source::now() > deadline) { - // we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up"); - return false; - } - if (got_event) { + if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) && + grpc_time_source::now() > deadline) { + // we have missed some 1-second deadline, which is too much gpr_log(GPR_INFO, "Missed an RPC deadline, giving up"); + return false; + } + if (got_event) { ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); if (ctx->RunNextState(ok, histogram) == false) { // call the callback and then delete it rpc_deadlines_[thread_idx].erase_after(ctx->deadline_posn()); ctx->RunNextState(ok, histogram); - ctx->StartNewClone(); + ClientRpcContext *clone_ctx = ctx->StartNewClone(); delete ctx; + if (!closed_loop_) { + // Put this in the list of idle contexts for this channel + + } } issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been } if (issue_allowed_[thread_idx] && grpc_time_source::now() >= next_issue_[thread_idx]) { - // Attempt to issue + // Attempt to issue bool issued = false; for (int num_attempts = 0; num_attempts < channel_count_ && !issued; - num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { + num_attempts++, + next_channel_[thread_idx] = + (next_channel_[thread_idx]+1)%channel_count_) { std::lock_guard - g(channel_rpc_lock_[next_channel_[thread_idx]]); - if (rpcs_outstanding_[next_channel_[thread_idx]] < max_outstanding_per_channel_) { + g(channel_rpc_lock_[next_channel_[thread_idx]]); + if ((rpcs_outstanding_[next_channel_[thread_idx]] < + max_outstanding_per_channel_) && + !contexts_[next_channel_[thread_idx]].empty()) { + // Get an idle context from the front of the list + auto ctx = contexts_[next_channel_[thread_idx]].begin(); + contexts_[next_channel_[thread_idx]].pop_front(); // do the work to issue + ctx->Start(); rpcs_outstanding_[next_channel_[thread_idx]]++; issued = true; } } if (!issued) - issue_allowed_[thread_idx] = false; + issue_allowed_[thread_idx] = false; } return true; } @@ -264,6 +288,7 @@ class AsyncClient : public Client { std::vector channel_rpc_lock_; std::vector rpcs_outstanding_; // per-channel vector + std::vector contexts_; // per-channel list of idle contexts int max_outstanding_per_channel_; int channel_count_; }; @@ -311,8 +336,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { return (this->*next_state_)(ok, hist); } - void StartNewClone() GRPC_OVERRIDE { - new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); } void Start() GRPC_OVERRIDE {} private: -- cgit v1.2.3