diff options
author | Vijay Pai <vpai@ubivpai.dls.corp.google.com> | 2016-07-13 10:32:26 -0700 |
---|---|---|
committer | Vijay Pai <vpai@ubivpai.dls.corp.google.com> | 2016-07-13 10:32:26 -0700 |
commit | a831651aa53fcb44cbd57460de99625be33c93a5 (patch) | |
tree | a04a1f09081d29a5dfd63e9a35b758b9465f3dfc /test/cpp/qps/server_async.cc | |
parent | f782465fba11864293d858ba91d5e715fc481d7d (diff) |
Unify and make consistent the per-thread shutdown process
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r-- | test/cpp/qps/server_async.cc | 49 |
1 files changed, 19 insertions, 30 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c9954d0d02..85acefa00b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -123,21 +123,22 @@ class AsyncQpsServerTest : public Server { for (int i = 0; i < num_threads; i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); - } - for (int i = 0; i < num_threads; i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } ~AsyncQpsServerTest() { for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - (*ss)->set_shutdown(); + std::lock_guard<std::mutex> lock((*ss)->mutex); + (*ss)->shutdown = true; } server_->Shutdown(); + for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { + (*cq)->Shutdown(); + } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { - (*cq)->Shutdown(); bool ok; void *got_tag; while ((*cq)->Next(&got_tag, &ok)) @@ -150,21 +151,21 @@ class AsyncQpsServerTest : public Server { } private: - void ThreadFunc(int rank) { + void ThreadFunc(int thread_idx) { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[rank]->Next(&got_tag, &ok)) { + while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { return; } const bool still_going = ctx->RunNextState(ok); - if (!shutdown_state_[rank]->shutdown()) { - // this RPC context is done, so refresh it - if (!still_going) { - ctx->Reset(); - } - } else { - return; + // if this RPC context is done, refresh it + if (!still_going) { + ctx->Reset(); } } return; @@ -333,24 +334,12 @@ class AsyncQpsServerTest : public Server { ServiceType async_service_; std::forward_list<ServerRpcContext *> contexts_; - class PerThreadShutdownState { - public: - PerThreadShutdownState() : shutdown_(false) {} - - bool shutdown() const { - std::lock_guard<std::mutex> lock(mutex_); - return shutdown_; - } - - void set_shutdown() { - std::lock_guard<std::mutex> lock(mutex_); - shutdown_ = true; - } - - private: - mutable std::mutex mutex_; - bool shutdown_; + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} }; + std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; |