From 40317fd7202ab96f8fb3c1f39258fff1ede3480e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jul 2016 19:20:25 -0700 Subject: Resolve pernicious race between destructor and thread functions by insisting that destructor is invoked after the class has gone back to being a harmless single-threaded thing. --- test/cpp/qps/client.h | 25 +++++++++++++++++++- test/cpp/qps/client_async.cc | 54 +++++++++++++++++++++++--------------------- test/cpp/qps/client_sync.cc | 17 +++++++------- test/cpp/qps/qps_worker.cc | 3 +++ 4 files changed, 64 insertions(+), 35 deletions(-) (limited to 'test/cpp/qps') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 38478be5d9..95023d2f80 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -162,10 +162,20 @@ class Client { return stats; } + // Must call AwaitThreadsCompletion before destructor to avoid a race + // between destructor and invocation of virtual ThreadFunc + void AwaitThreadsCompletion() { + DestroyMultithreading(); + std::unique_lock g(thread_completion_mu_); + while (threads_remaining_ != 0) { + threads_complete_.wait(g); + } + } protected: bool closed_loop_; void StartThreads(size_t num_threads) { + threads_remaining_ = num_threads; for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); } @@ -173,6 +183,7 @@ class Client { void EndThreads() { threads_.clear(); } + virtual void DestroyMultithreading() = 0; virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { @@ -270,6 +281,7 @@ class Client { done_ = true; } if (done_) { + client_->CompleteThread(); return; } } @@ -277,7 +289,6 @@ class Client { std::mutex mu_; bool done_; - Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; @@ -289,6 +300,18 @@ class Client { InterarrivalTimer interarrival_timer_; std::vector next_time_; + + std::mutex thread_completion_mu_; + size_t threads_remaining_; + std::condition_variable threads_complete_; + + void CompleteThread() { + std::lock_guard g(thread_completion_mu_); + threads_remaining_--; + if (threads_remaining_ == 0) { + threads_complete_.notify_all(); + } + } }; template diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index a0705673bd..f7fe746bbf 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -190,14 +190,6 @@ class AsyncClient : public ClientImpl { } } virtual ~AsyncClient() { - for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - std::lock_guard lock((*ss)->mutex); - (*ss)->shutdown = true; - } - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); - } - this->EndThreads(); // Need "this->" for resolution for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { void* got_tag; bool ok; @@ -206,6 +198,34 @@ class AsyncClient : public ClientImpl { } } } + protected: + const int num_async_threads_; + + private: + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} + }; + + int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = cores_; + gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); + } + return num_threads; + } + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + std::lock_guard lock((*ss)->mutex); + (*ss)->shutdown = true; + } + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + this->EndThreads(); // this needed for resolution + } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { @@ -234,24 +254,6 @@ class AsyncClient : public ClientImpl { } } - protected: - const int num_async_threads_; - - private: - struct PerThreadShutdownState { - mutable std::mutex mutex; - bool shutdown; - PerThreadShutdownState() : shutdown(false) {} - }; - - int NumThreads(const ClientConfig& config) { - int num_threads = config.async_client_threads(); - if (num_threads <= 0) { // Use dynamic sizing - num_threads = cores_; - gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); - } - return num_threads; - } std::vector> cli_cqs_; std::vector> next_issuers_; std::vector> shutdown_state_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 92680986bd..cc2c5ca540 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -87,6 +87,8 @@ class SynchronousClient size_t num_threads_; std::vector responses_; + private: + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); } }; class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { @@ -95,7 +97,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { : SynchronousClient(config) { StartThreads(num_threads_); } - ~SynchronousUnaryClient() { EndThreads(); } + ~SynchronousUnaryClient() {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); @@ -124,17 +126,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { StartThreads(num_threads_); } ~SynchronousStreamingClient() { - EndThreads(); for (size_t i = 0; i < num_threads_; i++) { auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - Status s = (*stream)->Finish(); - EXPECT_TRUE(s.ok()); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, - s.error_message().c_str()); - } + Status s = (*stream)->Finish(); + EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, + s.error_message().c_str()); + } } } delete[] stream_; diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 49ef52895c..e147734f7a 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -227,6 +227,9 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } + gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion"); + client->AwaitThreadsCompletion(); + gpr_log(GPR_INFO, "RunClientBody: Returning"); return Status::OK; } -- cgit v1.2.3