aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_async.cc
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@ubivpai.dls.corp.google.com>2016-07-13 10:32:26 -0700
committerGravatar Vijay Pai <vpai@ubivpai.dls.corp.google.com>2016-07-13 10:32:26 -0700
commita831651aa53fcb44cbd57460de99625be33c93a5 (patch)
treea04a1f09081d29a5dfd63e9a35b758b9465f3dfc /test/cpp/qps/server_async.cc
parentf782465fba11864293d858ba91d5e715fc481d7d (diff)
Unify and make consistent the per-thread shutdown process
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r--test/cpp/qps/server_async.cc49
1 files changed, 19 insertions, 30 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index c9954d0d02..85acefa00b 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -123,21 +123,22 @@ class AsyncQpsServerTest : public Server {
for (int i = 0; i < num_threads; i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
- }
- for (int i = 0; i < num_threads; i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
~AsyncQpsServerTest() {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
- (*ss)->set_shutdown();
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
}
server_->Shutdown();
+ for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
+ (*cq)->Shutdown();
+ }
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
- (*cq)->Shutdown();
bool ok;
void *got_tag;
while ((*cq)->Next(&got_tag, &ok))
@@ -150,21 +151,21 @@ class AsyncQpsServerTest : public Server {
}
private:
- void ThreadFunc(int rank) {
+ void ThreadFunc(int thread_idx) {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
+ while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) { return; }
const bool still_going = ctx->RunNextState(ok);
- if (!shutdown_state_[rank]->shutdown()) {
- // this RPC context is done, so refresh it
- if (!still_going) {
- ctx->Reset();
- }
- } else {
- return;
+ // if this RPC context is done, refresh it
+ if (!still_going) {
+ ctx->Reset();
}
}
return;
@@ -333,24 +334,12 @@ class AsyncQpsServerTest : public Server {
ServiceType async_service_;
std::forward_list<ServerRpcContext *> contexts_;
- class PerThreadShutdownState {
- public:
- PerThreadShutdownState() : shutdown_(false) {}
-
- bool shutdown() const {
- std::lock_guard<std::mutex> lock(mutex_);
- return shutdown_;
- }
-
- void set_shutdown() {
- std::lock_guard<std::mutex> lock(mutex_);
- shutdown_ = true;
- }
-
- private:
- mutable std::mutex mutex_;
- bool shutdown_;
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
};
+
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};