aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-07-13 19:20:25 -0700
committerGravatar Vijay Pai <vpai@google.com>2016-07-13 19:20:25 -0700
commit40317fd7202ab96f8fb3c1f39258fff1ede3480e (patch)
treef917d95a39d5b6c27053abe9cc8c35c73253598b /test/cpp
parentad7c52761895c46a3964ab8864d11c7aa269a29b (diff)
Resolve pernicious race between destructor and thread functions by insisting that destructor is invoked after the class has gone back to being a harmless single-threaded thing.
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/qps/client.h25
-rw-r--r--test/cpp/qps/client_async.cc54
-rw-r--r--test/cpp/qps/client_sync.cc17
-rw-r--r--test/cpp/qps/qps_worker.cc3
4 files changed, 64 insertions, 35 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 38478be5d9..95023d2f80 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -162,10 +162,20 @@ class Client {
return stats;
}
+ // Must call AwaitThreadsCompletion before destructor to avoid a race
+ // between destructor and invocation of virtual ThreadFunc
+ void AwaitThreadsCompletion() {
+ DestroyMultithreading();
+ std::unique_lock<std::mutex> g(thread_completion_mu_);
+ while (threads_remaining_ != 0) {
+ threads_complete_.wait(g);
+ }
+ }
protected:
bool closed_loop_;
void StartThreads(size_t num_threads) {
+ threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
@@ -173,6 +183,7 @@ class Client {
void EndThreads() { threads_.clear(); }
+ virtual void DestroyMultithreading() = 0;
virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
@@ -270,6 +281,7 @@ class Client {
done_ = true;
}
if (done_) {
+ client_->CompleteThread();
return;
}
}
@@ -277,7 +289,6 @@ class Client {
std::mutex mu_;
bool done_;
- Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
@@ -289,6 +300,18 @@ class Client {
InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_;
+
+ std::mutex thread_completion_mu_;
+ size_t threads_remaining_;
+ std::condition_variable threads_complete_;
+
+ void CompleteThread() {
+ std::lock_guard<std::mutex> g(thread_completion_mu_);
+ threads_remaining_--;
+ if (threads_remaining_ == 0) {
+ threads_complete_.notify_all();
+ }
+ }
};
template <class StubType, class RequestType>
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index a0705673bd..f7fe746bbf 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -190,14 +190,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
virtual ~AsyncClient() {
- for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
- std::lock_guard<std::mutex> lock((*ss)->mutex);
- (*ss)->shutdown = true;
- }
- for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
- (*cq)->Shutdown();
- }
- this->EndThreads(); // Need "this->" for resolution
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
void* got_tag;
bool ok;
@@ -206,6 +198,34 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
}
+ protected:
+ const int num_async_threads_;
+
+ private:
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
+ };
+
+ int NumThreads(const ClientConfig& config) {
+ int num_threads = config.async_client_threads();
+ if (num_threads <= 0) { // Use dynamic sizing
+ num_threads = cores_;
+ gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
+ }
+ return num_threads;
+ }
+ void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
+ for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
+ }
+ for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+ (*cq)->Shutdown();
+ }
+ this->EndThreads(); // this needed for resolution
+ }
bool ThreadFunc(HistogramEntry* entry,
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
@@ -234,24 +254,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
- protected:
- const int num_async_threads_;
-
- private:
- struct PerThreadShutdownState {
- mutable std::mutex mutex;
- bool shutdown;
- PerThreadShutdownState() : shutdown(false) {}
- };
-
- int NumThreads(const ClientConfig& config) {
- int num_threads = config.async_client_threads();
- if (num_threads <= 0) { // Use dynamic sizing
- num_threads = cores_;
- gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
- }
- return num_threads;
- }
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 92680986bd..cc2c5ca540 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -87,6 +87,8 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
+ private:
+ void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); }
};
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
@@ -95,7 +97,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
: SynchronousClient(config) {
StartThreads(num_threads_);
}
- ~SynchronousUnaryClient() { EndThreads(); }
+ ~SynchronousUnaryClient() {}
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
@@ -124,17 +126,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
- EndThreads();
for (size_t i = 0; i < num_threads_; i++) {
auto stream = &stream_[i];
if (*stream) {
(*stream)->WritesDone();
- Status s = (*stream)->Finish();
- EXPECT_TRUE(s.ok());
- if (!s.ok()) {
- gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
- s.error_message().c_str());
- }
+ Status s = (*stream)->Finish();
+ EXPECT_TRUE(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
+ s.error_message().c_str());
+ }
}
}
delete[] stream_;
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 49ef52895c..e147734f7a 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -227,6 +227,9 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
+ gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
+ client->AwaitThreadsCompletion();
+
gpr_log(GPR_INFO, "RunClientBody: Returning");
return Status::OK;
}