aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Moiz Haidry <42048362+mhaidrygoog@users.noreply.github.com>2019-01-03 20:38:54 -0800
committerGravatar GitHub <noreply@github.com>2019-01-03 20:38:54 -0800
commitd3db9fee9c0baa59f72bafb2aa1c882446432daf (patch)
treef2824d4b37561b68741d857a41043c3660fe3ecd /test
parent2dda0bb21bbe6e0914cd12fbf3ffa013111cc8a3 (diff)
parent7bb853ebdd0b6e057de447147ad60ebf42e0903d (diff)
Merge pull request #17461 from mhaidrygoog/callback_client_streaming_benchmark
Add support for Callback Client Streaming benchmarks
Diffstat (limited to 'test')
-rw-r--r--test/cpp/qps/client.h118
-rw-r--r--test/cpp/qps/client_callback.cc201
2 files changed, 235 insertions, 84 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 73f91eed2d..ceb5cdd710 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -236,58 +236,7 @@ class Client {
return 0;
}
- protected:
- bool closed_loop_;
- gpr_atm thread_pool_done_;
- double median_latency_collection_interval_seconds_; // In seconds
-
- void StartThreads(size_t num_threads) {
- gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
- threads_remaining_ = num_threads;
- for (size_t i = 0; i < num_threads; i++) {
- threads_.emplace_back(new Thread(this, i));
- }
- }
-
- void EndThreads() {
- MaybeStartRequests();
- threads_.clear();
- }
-
- virtual void DestroyMultithreading() = 0;
-
- void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
- // Set up the load distribution based on the number of threads
- const auto& load = config.load_params();
-
- std::unique_ptr<RandomDistInterface> random_dist;
- switch (load.load_case()) {
- case LoadParams::kClosedLoop:
- // Closed-loop doesn't use random dist at all
- break;
- case LoadParams::kPoisson:
- random_dist.reset(
- new ExpDist(load.poisson().offered_load() / num_threads));
- break;
- default:
- GPR_ASSERT(false);
- }
-
- // Set closed_loop_ based on whether or not random_dist is set
- if (!random_dist) {
- closed_loop_ = true;
- } else {
- closed_loop_ = false;
- // set up interarrival timer according to random dist
- interarrival_timer_.init(*random_dist, num_threads);
- const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
- for (size_t i = 0; i < num_threads; i++) {
- next_time_.push_back(gpr_time_add(
- now,
- gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
- }
- }
- }
+ bool IsClosedLoop() { return closed_loop_; }
gpr_timespec NextIssueTime(int thread_idx) {
const gpr_timespec result = next_time_[thread_idx];
@@ -297,9 +246,9 @@ class Client {
GPR_TIMESPAN));
return result;
}
- std::function<gpr_timespec()> NextIssuer(int thread_idx) {
- return closed_loop_ ? std::function<gpr_timespec()>()
- : std::bind(&Client::NextIssueTime, this, thread_idx);
+
+ bool ThreadCompleted() {
+ return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
}
class Thread {
@@ -380,8 +329,62 @@ class Client {
double interval_start_time_;
};
- bool ThreadCompleted() {
- return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
+ protected:
+ bool closed_loop_;
+ gpr_atm thread_pool_done_;
+ double median_latency_collection_interval_seconds_; // In seconds
+
+ void StartThreads(size_t num_threads) {
+ gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
+ threads_remaining_ = num_threads;
+ for (size_t i = 0; i < num_threads; i++) {
+ threads_.emplace_back(new Thread(this, i));
+ }
+ }
+
+ void EndThreads() {
+ MaybeStartRequests();
+ threads_.clear();
+ }
+
+ virtual void DestroyMultithreading() = 0;
+
+ void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
+ // Set up the load distribution based on the number of threads
+ const auto& load = config.load_params();
+
+ std::unique_ptr<RandomDistInterface> random_dist;
+ switch (load.load_case()) {
+ case LoadParams::kClosedLoop:
+ // Closed-loop doesn't use random dist at all
+ break;
+ case LoadParams::kPoisson:
+ random_dist.reset(
+ new ExpDist(load.poisson().offered_load() / num_threads));
+ break;
+ default:
+ GPR_ASSERT(false);
+ }
+
+ // Set closed_loop_ based on whether or not random_dist is set
+ if (!random_dist) {
+ closed_loop_ = true;
+ } else {
+ closed_loop_ = false;
+ // set up interarrival timer according to random dist
+ interarrival_timer_.init(*random_dist, num_threads);
+ const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (size_t i = 0; i < num_threads; i++) {
+ next_time_.push_back(gpr_time_add(
+ now,
+ gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
+ }
+ }
+ }
+
+ std::function<gpr_timespec()> NextIssuer(int thread_idx) {
+ return closed_loop_ ? std::function<gpr_timespec()>()
+ : std::bind(&Client::NextIssueTime, this, thread_idx);
}
virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
@@ -436,6 +439,7 @@ class ClientImpl : public Client {
config.payload_config());
}
virtual ~ClientImpl() {}
+ const RequestType* request() { return &request_; }
void WaitForChannelsToConnect() {
int connect_deadline_seconds = 10;
diff --git a/test/cpp/qps/client_callback.cc b/test/cpp/qps/client_callback.cc
index 87889e36dc..4a06325f2b 100644
--- a/test/cpp/qps/client_callback.cc
+++ b/test/cpp/qps/client_callback.cc
@@ -66,13 +66,35 @@ class CallbackClient
config, BenchmarkStubCreator) {
num_threads_ = NumThreads(config);
rpcs_done_ = 0;
- SetupLoadTest(config, num_threads_);
+
+ // Don't divide the fixed load among threads as the user threads
+ // only bootstrap the RPCs
+ SetupLoadTest(config, 1);
total_outstanding_rpcs_ =
config.client_channels() * config.outstanding_rpcs_per_channel();
}
virtual ~CallbackClient() {}
+ /**
+ * The main thread of the benchmark will be waiting on DestroyMultithreading.
+ * Increment the rpcs_done_ variable to signify that the Callback RPC
+ * after thread completion is done. When the last outstanding rpc increments
+ * the counter it should also signal the main thread's conditional variable.
+ */
+ void NotifyMainThreadOfThreadCompletion() {
+ std::lock_guard<std::mutex> l(shutdown_mu_);
+ rpcs_done_++;
+ if (rpcs_done_ == total_outstanding_rpcs_) {
+ shutdown_cv_.notify_one();
+ }
+ }
+
+ gpr_timespec NextRPCIssueTime() {
+ std::lock_guard<std::mutex> l(next_issue_time_mu_);
+ return Client::NextIssueTime(0);
+ }
+
protected:
size_t num_threads_;
size_t total_outstanding_rpcs_;
@@ -93,24 +115,9 @@ class CallbackClient
ThreadFuncImpl(t, thread_idx);
}
- virtual void ScheduleRpc(Thread* t, size_t thread_idx,
- size_t ctx_vector_idx) = 0;
-
- /**
- * The main thread of the benchmark will be waiting on DestroyMultithreading.
- * Increment the rpcs_done_ variable to signify that the Callback RPC
- * after thread completion is done. When the last outstanding rpc increments
- * the counter it should also signal the main thread's conditional variable.
- */
- void NotifyMainThreadOfThreadCompletion() {
- std::lock_guard<std::mutex> l(shutdown_mu_);
- rpcs_done_++;
- if (rpcs_done_ == total_outstanding_rpcs_) {
- shutdown_cv_.notify_one();
- }
- }
-
private:
+ std::mutex next_issue_time_mu_; // Used by next issue time
+
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
@@ -149,7 +156,7 @@ class CallbackUnaryClient final : public CallbackClient {
bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
vector_idx += num_threads_) {
- ScheduleRpc(t, thread_idx, vector_idx);
+ ScheduleRpc(t, vector_idx);
}
return true;
}
@@ -157,26 +164,26 @@ class CallbackUnaryClient final : public CallbackClient {
void InitThreadFuncImpl(size_t thread_idx) override { return; }
private:
- void ScheduleRpc(Thread* t, size_t thread_idx, size_t vector_idx) override {
+ void ScheduleRpc(Thread* t, size_t vector_idx) {
if (!closed_loop_) {
- gpr_timespec next_issue_time = NextIssueTime(thread_idx);
+ gpr_timespec next_issue_time = NextRPCIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_[vector_idx]->alarm_.experimental().Set(
- next_issue_time, [this, t, thread_idx, vector_idx](bool ok) {
- IssueUnaryCallbackRpc(t, thread_idx, vector_idx);
+ next_issue_time, [this, t, vector_idx](bool ok) {
+ IssueUnaryCallbackRpc(t, vector_idx);
});
} else {
- IssueUnaryCallbackRpc(t, thread_idx, vector_idx);
+ IssueUnaryCallbackRpc(t, vector_idx);
}
}
- void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t vector_idx) {
+ void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0);
double start = UsageTimer::Now();
ctx_[vector_idx]->stub_->experimental_async()->UnaryCall(
(&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
- [this, t, thread_idx, start, vector_idx](grpc::Status s) {
+ [this, t, start, vector_idx](grpc::Status s) {
// Update Histogram with data from the callback run
HistogramEntry entry;
if (s.ok()) {
@@ -193,17 +200,157 @@ class CallbackUnaryClient final : public CallbackClient {
ctx_[vector_idx].reset(
new CallbackClientRpcContext(ctx_[vector_idx]->stub_));
// Schedule a new RPC
- ScheduleRpc(t, thread_idx, vector_idx);
+ ScheduleRpc(t, vector_idx);
}
});
}
};
+class CallbackStreamingClient : public CallbackClient {
+ public:
+ CallbackStreamingClient(const ClientConfig& config)
+ : CallbackClient(config),
+ messages_per_stream_(config.messages_per_stream()) {
+ for (int ch = 0; ch < config.client_channels(); ch++) {
+ for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
+ ctx_.emplace_back(
+ new CallbackClientRpcContext(channels_[ch].get_stub()));
+ }
+ }
+ StartThreads(num_threads_);
+ }
+ ~CallbackStreamingClient() {}
+
+ void AddHistogramEntry(double start_, bool ok, Thread* thread_ptr) {
+ // Update Histogram with data from the callback run
+ HistogramEntry entry;
+ if (ok) {
+ entry.set_value((UsageTimer::Now() - start_) * 1e9);
+ }
+ thread_ptr->UpdateHistogram(&entry);
+ }
+
+ int messages_per_stream() { return messages_per_stream_; }
+
+ protected:
+ const int messages_per_stream_;
+};
+
+class CallbackStreamingPingPongClient : public CallbackStreamingClient {
+ public:
+ CallbackStreamingPingPongClient(const ClientConfig& config)
+ : CallbackStreamingClient(config) {}
+ ~CallbackStreamingPingPongClient() {}
+};
+
+class CallbackStreamingPingPongReactor final
+ : public grpc::experimental::ClientBidiReactor<SimpleRequest,
+ SimpleResponse> {
+ public:
+ CallbackStreamingPingPongReactor(
+ CallbackStreamingPingPongClient* client,
+ std::unique_ptr<CallbackClientRpcContext> ctx)
+ : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
+
+ void StartNewRpc() {
+ if (client_->ThreadCompleted()) return;
+ start_ = UsageTimer::Now();
+ ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this);
+ StartWrite(client_->request());
+ StartCall();
+ }
+
+ void OnWriteDone(bool ok) override {
+ if (!ok || client_->ThreadCompleted()) {
+ if (!ok) gpr_log(GPR_ERROR, "Error writing RPC");
+ StartWritesDone();
+ return;
+ }
+ StartRead(&ctx_->response_);
+ }
+
+ void OnReadDone(bool ok) override {
+ client_->AddHistogramEntry(start_, ok, thread_ptr_);
+
+ if (client_->ThreadCompleted() || !ok ||
+ (client_->messages_per_stream() != 0 &&
+ ++messages_issued_ >= client_->messages_per_stream())) {
+ if (!ok) {
+ gpr_log(GPR_ERROR, "Error reading RPC");
+ }
+ StartWritesDone();
+ return;
+ }
+ StartWrite(client_->request());
+ }
+
+ void OnDone(const Status& s) override {
+ if (client_->ThreadCompleted() || !s.ok()) {
+ client_->NotifyMainThreadOfThreadCompletion();
+ return;
+ }
+ ctx_.reset(new CallbackClientRpcContext(ctx_->stub_));
+ ScheduleRpc();
+ }
+
+ void ScheduleRpc() {
+ if (client_->ThreadCompleted()) return;
+
+ if (!client_->IsClosedLoop()) {
+ gpr_timespec next_issue_time = client_->NextRPCIssueTime();
+ // Start an alarm callback to run the internal callback after
+ // next_issue_time
+ ctx_->alarm_.experimental().Set(next_issue_time,
+ [this](bool ok) { StartNewRpc(); });
+ } else {
+ StartNewRpc();
+ }
+ }
+
+ void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
+
+ CallbackStreamingPingPongClient* client_;
+ std::unique_ptr<CallbackClientRpcContext> ctx_;
+ Client::Thread* thread_ptr_; // Needed to update histogram entries
+ double start_; // Track message start time
+ int messages_issued_; // Messages issued by this stream
+};
+
+class CallbackStreamingPingPongClientImpl final
+ : public CallbackStreamingPingPongClient {
+ public:
+ CallbackStreamingPingPongClientImpl(const ClientConfig& config)
+ : CallbackStreamingPingPongClient(config) {
+ for (size_t i = 0; i < total_outstanding_rpcs_; i++)
+ reactor_.emplace_back(
+ new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
+ }
+ ~CallbackStreamingPingPongClientImpl() {}
+
+ bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
+ for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
+ vector_idx += num_threads_) {
+ reactor_[vector_idx]->set_thread_ptr(t);
+ reactor_[vector_idx]->ScheduleRpc();
+ }
+ return true;
+ }
+
+ void InitThreadFuncImpl(size_t thread_idx) override {}
+
+ private:
+ std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
+};
+
+// TODO(mhaidry) : Implement Streaming from client, server and both ways
+
std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
switch (config.rpc_type()) {
case UNARY:
return std::unique_ptr<Client>(new CallbackUnaryClient(config));
case STREAMING:
+ return std::unique_ptr<Client>(
+ new CallbackStreamingPingPongClientImpl(config));
case STREAMING_FROM_CLIENT:
case STREAMING_FROM_SERVER:
case STREAMING_BOTH_WAYS: