From c18ad11837f44b3eb2de788306a142b454873d09 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 9 Oct 2017 13:44:10 -0700 Subject: Wait until all clients connected before starting streams --- test/cpp/qps/client.h | 3 +++ test/cpp/qps/client_async.cc | 1 + test/cpp/qps/client_sync.cc | 56 ++++++++++++++++++++++---------------------- 3 files changed, 32 insertions(+), 28 deletions(-) (limited to 'test/cpp/qps') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 267a30c9f3..abf755b393 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -226,6 +226,7 @@ class Client { } virtual void DestroyMultithreading() = 0; + virtual void InitThreadFunc(size_t thread_idx) = 0; virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { @@ -309,6 +310,8 @@ class Client { wait_loop++; } + client_->InitThreadFunc(idx_); + for (;;) { // run the loop body HistogramEntry entry; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f5807da81e..9ed4e0b355 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,6 +236,7 @@ class AsyncClient : public ClientImpl { this->EndThreads(); // this needed for resolution } + void InitThreadFunc(size_t thread_idx) override final {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final { void* got_tag; bool ok; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 5d212f1acc..94554a46b2 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -103,6 +103,8 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} + void InitThreadFunc(size_t thread_idx) override {} + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; @@ -174,13 +176,7 @@ class SynchronousStreamingPingPongClient final grpc::ClientReaderWriter> { public: SynchronousStreamingPingPongClient(const ClientConfig& config) - : SynchronousStreamingClient(config) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); - messages_issued_[thread_idx] = 0; - } - } + : SynchronousStreamingClient(config) {} ~SynchronousStreamingPingPongClient() { std::vector cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -196,6 +192,12 @@ class SynchronousStreamingPingPongClient final } } + void InitThreadFunc(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + messages_issued_[thread_idx] = 0; + } + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; @@ -228,14 +230,7 @@ class SynchronousStreamingFromClientClient final : public SynchronousStreamingClient> { public: SynchronousStreamingFromClientClient(const ClientConfig& config) - : SynchronousStreamingClient(config), last_issue_(num_threads_) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); - last_issue_[thread_idx] = UsageTimer::Now(); - } - } + : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { std::vector cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -251,6 +246,13 @@ class SynchronousStreamingFromClientClient final } } + void InitThreadFunc(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + last_issue_[thread_idx] = UsageTimer::Now(); + } + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { // Figure out how to make histogram sensible if this is rate-paced if (!WaitToIssue(thread_idx)) { @@ -279,13 +281,12 @@ class SynchronousStreamingFromServerClient final : public SynchronousStreamingClient> { public: SynchronousStreamingFromServerClient(const ClientConfig& config) - : SynchronousStreamingClient(config), last_recv_(num_threads_) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); - last_recv_[thread_idx] = UsageTimer::Now(); - } + : SynchronousStreamingClient(config), last_recv_(num_threads_) {} + void InitThreadFunc(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + last_recv_[thread_idx] = UsageTimer::Now(); } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); @@ -311,12 +312,7 @@ class SynchronousStreamingBothWaysClient final grpc::ClientReaderWriter> { public: SynchronousStreamingBothWaysClient(const ClientConfig& config) - : SynchronousStreamingClient(config) { - for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { - auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); - } - } + : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { std::vector cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { @@ -332,6 +328,10 @@ class SynchronousStreamingBothWaysClient final } } + void InitThreadFunc(size_t thread_idx) override { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; -- cgit v1.2.3