From 27df2cf69caf015ffdd1df5945f9fc3faa94906f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 1 Jul 2015 15:11:29 -0700 Subject: Pluck some low hanging concurrency fruit Make the shutdown flag on servers be per thread to save contention on the lock that must guard it. --- test/cpp/qps/server_async.cc | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) (limited to 'test/cpp/qps/server_async.cc') diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 210aef4fd6..f5251e961b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -64,7 +64,7 @@ namespace testing { class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) { + AsyncQpsServerTest(const ServerConfig &config, int port) { char *server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -96,6 +96,9 @@ class AsyncQpsServerTest : public Server { request_streaming, ProcessRPC)); } } + for (int i = 0; i < config.threads(); i++) { + shutdown_state_.emplace_back(new PerThreadShutdownState()); + } for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down @@ -105,11 +108,9 @@ class AsyncQpsServerTest : public Server { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke bool still_going = ctx->RunNextState(ok); - std::unique_lock g(shutdown_mutex_); - if (!shutdown_) { + if (!shutdown_state_[i]->shutdown()) { // this RPC context is done, so refresh it if (!still_going) { - g.unlock(); ctx->Reset(); } } else { @@ -122,9 +123,8 @@ class AsyncQpsServerTest : public Server { } ~AsyncQpsServerTest() { server_->Shutdown(); - { - std::lock_guard g(shutdown_mutex_); - shutdown_ = true; + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + (*ss)->set_shutdown(); } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); @@ -316,8 +316,25 @@ class AsyncQpsServerTest : public Server { TestService::AsyncService async_service_; std::forward_list contexts_; - std::mutex shutdown_mutex_; - bool shutdown_; + class PerThreadShutdownState { + public: + PerThreadShutdownState() : shutdown_(false) {} + + bool shutdown() const { + std::lock_guard lock(mutex_); + return shutdown_; + } + + void set_shutdown() { + std::lock_guard lock(mutex_); + shutdown_ = true; + } + + private: + mutable std::mutex mutex_; + bool shutdown_; + }; + std::vector> shutdown_state_; }; std::unique_ptr CreateAsyncServer(const ServerConfig &config, -- cgit v1.2.3