From 14acde64cca60789e29d6ac2d32261c4c18a74b3 Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Wed, 10 Oct 2018 15:05:11 -0700 Subject: Addressed Vijay's code review comments --- test/cpp/qps/client_callback.cc | 49 ++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 25 deletions(-) (limited to 'test/cpp/qps') diff --git a/test/cpp/qps/client_callback.cc b/test/cpp/qps/client_callback.cc index 50bb796b93..2cf5960fe7 100644 --- a/test/cpp/qps/client_callback.cc +++ b/test/cpp/qps/client_callback.cc @@ -43,8 +43,7 @@ namespace testing { * Maintains context info per RPC */ struct CallbackClientRpcContext { - CallbackClientRpcContext(BenchmarkService::Stub* stub) - : response_(), context_(), alarm_(), stub_(stub) {} + CallbackClientRpcContext(BenchmarkService::Stub* stub) : stub_(stub) {} ~CallbackClientRpcContext() {} @@ -83,8 +82,9 @@ class CallbackClient std::condition_variable shutdown_cv_; // Number of rpcs done after thread completion size_t rpcs_done_; - // Per Thread Queue of Context data pointers for running a RPC - std::vector>> ctxs_; + // Vector of Context data pointers for running a RPC + std::vector> ctx_; + virtual void InitThreadFuncImpl(size_t thread_idx) = 0; virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0; @@ -93,7 +93,8 @@ class CallbackClient ThreadFuncImpl(t, thread_idx); } - virtual void ScheduleRpc(Thread* t, size_t thread_idx, size_t queue_idx) = 0; + virtual void ScheduleRpc(Thread* t, size_t thread_idx, + size_t ctx_vector_idx) = 0; /** * The main thread of the benchmark will be waiting on DestroyMultithreading. @@ -134,11 +135,9 @@ class CallbackClient class CallbackUnaryClient final : public CallbackClient { public: CallbackUnaryClient(const ClientConfig& config) : CallbackClient(config) { - ctxs_.resize(num_threads_); - for (int ch = 0; ch < config.client_channels(); ch++) { - for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - size_t bucket = (i * ch) % num_threads_; - ctxs_[bucket].emplace_back( + for (size_t ch = 0; ch < config.client_channels(); ch++) { + for (size_t i = 0; i < config.outstanding_rpcs_per_channel(); i++) { + ctx_.emplace_back( new CallbackClientRpcContext(channels_[ch].get_stub())); } } @@ -148,8 +147,9 @@ class CallbackUnaryClient final : public CallbackClient { protected: bool ThreadFuncImpl(Thread* t, size_t thread_idx) override { - for (size_t i = 0; i < ctxs_[thread_idx].size(); i++) { - ScheduleRpc(t, thread_idx, i); + for (int vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_; + vector_idx += num_threads_) { + ScheduleRpc(t, thread_idx, vector_idx); } return true; } @@ -157,27 +157,26 @@ class CallbackUnaryClient final : public CallbackClient { void InitThreadFuncImpl(size_t thread_idx) override { return; } private: - void ScheduleRpc(Thread* t, size_t thread_idx, size_t queue_idx) override { + void ScheduleRpc(Thread* t, size_t thread_idx, size_t vector_idx) override { if (!closed_loop_) { gpr_timespec next_issue_time = NextIssueTime(thread_idx); // Start an alarm callback to run the internal callback after // next_issue_time - ctxs_[thread_idx][queue_idx]->alarm_.experimental().Set( - next_issue_time, [this, t, thread_idx, queue_idx](bool ok) { - IssueUnaryCallbackRpc(t, thread_idx, queue_idx); + ctx_[vector_idx]->alarm_.experimental().Set( + next_issue_time, [this, t, thread_idx, vector_idx](bool ok) { + IssueUnaryCallbackRpc(t, thread_idx, vector_idx); }); } else { - IssueUnaryCallbackRpc(t, thread_idx, queue_idx); + IssueUnaryCallbackRpc(t, thread_idx, vector_idx); } } - void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t queue_idx) { + void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t vector_idx) { GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0); double start = UsageTimer::Now(); - ctxs_[thread_idx][queue_idx]->stub_->experimental_async()->UnaryCall( - std::move(&ctxs_[thread_idx][queue_idx]->context_), &request_, - &ctxs_[thread_idx][queue_idx]->response_, - [this, t, thread_idx, start, queue_idx](grpc::Status s) { + ctx_[vector_idx]->stub_->experimental_async()->UnaryCall( + (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_, + [this, t, thread_idx, start, vector_idx](grpc::Status s) { // Update Histogram with data from the callback run HistogramEntry entry; if (s.ok()) { @@ -191,10 +190,10 @@ class CallbackUnaryClient final : public CallbackClient { NotifyMainThreadOfThreadCompletion(); } else { // Reallocate ctx for next RPC - ctxs_[thread_idx][queue_idx].reset(new CallbackClientRpcContext( - ctxs_[thread_idx][queue_idx]->stub_)); + ctx_[vector_idx].reset( + new CallbackClientRpcContext(ctx_[vector_idx]->stub_)); // Schedule a new RPC - ScheduleRpc(t, thread_idx, queue_idx); + ScheduleRpc(t, thread_idx, vector_idx); } }); } -- cgit v1.2.3