diff options
author | Vijay Pai <vpai@google.com> | 2016-07-13 19:20:25 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2016-07-13 19:20:25 -0700 |
commit | 40317fd7202ab96f8fb3c1f39258fff1ede3480e (patch) | |
tree | f917d95a39d5b6c27053abe9cc8c35c73253598b /test/cpp/qps | |
parent | ad7c52761895c46a3964ab8864d11c7aa269a29b (diff) |
Resolve pernicious race between destructor and thread functions by insisting that destructor is invoked after the class has gone back to being a harmless single-threaded thing.
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client.h | 25 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 54 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 17 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 3 |
4 files changed, 64 insertions, 35 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 38478be5d9..95023d2f80 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -162,10 +162,20 @@ class Client { return stats; } + // Must call AwaitThreadsCompletion before destructor to avoid a race + // between destructor and invocation of virtual ThreadFunc + void AwaitThreadsCompletion() { + DestroyMultithreading(); + std::unique_lock<std::mutex> g(thread_completion_mu_); + while (threads_remaining_ != 0) { + threads_complete_.wait(g); + } + } protected: bool closed_loop_; void StartThreads(size_t num_threads) { + threads_remaining_ = num_threads; for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); } @@ -173,6 +183,7 @@ class Client { void EndThreads() { threads_.clear(); } + virtual void DestroyMultithreading() = 0; virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { @@ -270,6 +281,7 @@ class Client { done_ = true; } if (done_) { + client_->CompleteThread(); return; } } @@ -277,7 +289,6 @@ class Client { std::mutex mu_; bool done_; - Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; @@ -289,6 +300,18 @@ class Client { InterarrivalTimer interarrival_timer_; std::vector<gpr_timespec> next_time_; + + std::mutex thread_completion_mu_; + size_t threads_remaining_; + std::condition_variable threads_complete_; + + void CompleteThread() { + std::lock_guard<std::mutex> g(thread_completion_mu_); + threads_remaining_--; + if (threads_remaining_ == 0) { + threads_complete_.notify_all(); + } + } }; template <class StubType, class RequestType> diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index a0705673bd..f7fe746bbf 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -190,14 +190,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } } virtual ~AsyncClient() { - for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - std::lock_guard<std::mutex> lock((*ss)->mutex); - (*ss)->shutdown = true; - } - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); - } - this->EndThreads(); // Need "this->" for resolution for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { void* got_tag; bool ok; @@ -206,6 +198,34 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } } } + protected: + const int num_async_threads_; + + private: + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} + }; + + int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = cores_; + gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); + } + return num_threads; + } + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + std::lock_guard<std::mutex> lock((*ss)->mutex); + (*ss)->shutdown = true; + } + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + this->EndThreads(); // this needed for resolution + } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { @@ -234,24 +254,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } } - protected: - const int num_async_threads_; - - private: - struct PerThreadShutdownState { - mutable std::mutex mutex; - bool shutdown; - PerThreadShutdownState() : shutdown(false) {} - }; - - int NumThreads(const ClientConfig& config) { - int num_threads = config.async_client_threads(); - if (num_threads <= 0) { // Use dynamic sizing - num_threads = cores_; - gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); - } - return num_threads; - } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::function<gpr_timespec()>> next_issuers_; std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 92680986bd..cc2c5ca540 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -87,6 +87,8 @@ class SynchronousClient size_t num_threads_; std::vector<SimpleResponse> responses_; + private: + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); } }; class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { @@ -95,7 +97,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { : SynchronousClient(config) { StartThreads(num_threads_); } - ~SynchronousUnaryClient() { EndThreads(); } + ~SynchronousUnaryClient() {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); @@ -124,17 +126,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { StartThreads(num_threads_); } ~SynchronousStreamingClient() { - EndThreads(); for (size_t i = 0; i < num_threads_; i++) { auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - Status s = (*stream)->Finish(); - EXPECT_TRUE(s.ok()); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, - s.error_message().c_str()); - } + Status s = (*stream)->Finish(); + EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, + s.error_message().c_str()); + } } } delete[] stream_; diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 49ef52895c..e147734f7a 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -227,6 +227,9 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } + gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion"); + client->AwaitThreadsCompletion(); + gpr_log(GPR_INFO, "RunClientBody: Returning"); return Status::OK; } |