From b0f15e8af3622ebffa8414771fa6934568155179 Mon Sep 17 00:00:00 2001 From: vjpai Date: Wed, 6 Jul 2016 13:57:01 -0700 Subject: Reduce assertions, use status codes, increase verbosity on errors --- test/cpp/qps/client_sync.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'test/cpp/qps/client_sync.cc') diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c88e95b80e..686c8d750c 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -31,7 +31,6 @@ * */ -#include #include #include #include @@ -128,11 +127,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } ~SynchronousStreamingClient() { EndThreads(); - for (auto stream = &stream_[0]; stream != &stream_[num_threads_]; - stream++) { + for (size_t i = 0; i < num_threads_; i++) { + auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - EXPECT_TRUE((*stream)->Finish().ok()); + Status s = (*stream)->Finish(); + EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, + s.error_message().c_str()); + } } } delete[] stream_; -- cgit v1.2.3 From f373f2cf8b9d6f8975e1dd976cddb1e3618e8ff9 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 8 Jul 2016 09:40:55 -0700 Subject: Stop holding histogram for a long time --- test/cpp/qps/client.h | 35 +++++++++++++++++++---------------- test/cpp/qps/client_async.cc | 19 +++++++++---------- test/cpp/qps/client_sync.cc | 10 ++++------ 3 files changed, 32 insertions(+), 32 deletions(-) (limited to 'test/cpp/qps/client_sync.cc') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 047bd16408..38478be5d9 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -112,6 +112,17 @@ class ClientRequestCreator { } }; +class HistogramEntry GRPC_FINAL { + public: + HistogramEntry(): used_(false) {} + bool used() const {return used_;} + double value() const {return value_;} + void set_value(double v) {used_ = true; value_ = v;} + private: + bool used_; + double value_; +}; + class Client { public: Client() : timer_(new UsageTimer), interarrival_timer_() {} @@ -162,7 +173,7 @@ class Client { void EndThreads() { threads_.clear(); } - virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -215,7 +226,6 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), - new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -230,14 +240,10 @@ class Client { void BeginSwap(Histogram* n) { std::lock_guard g(mu_); - new_stats_ = n; + n->Swap(&histogram_); } void EndSwap() { - std::unique_lock g(mu_); - while (new_stats_ != nullptr) { - cv_.wait(g); - }; } void MergeStatsInto(Histogram* hist) { @@ -252,9 +258,13 @@ class Client { void ThreadFunc() { for (;;) { // run the loop body - const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // lock, see if we're done + HistogramEntry entry; + const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); + // lock, update histogram if needed and see if we're done std::lock_guard g(mu_); + if (entry.used()) { + histogram_.Add(entry.value()); + } if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; @@ -262,17 +272,10 @@ class Client { if (done_) { return; } - // check if we're resetting stats, swap out the histogram if so - if (new_stats_) { - new_stats_->Swap(&histogram_); - new_stats_ = nullptr; - cv_.notify_one(); - } } } std::mutex mu_; - std::condition_variable cv_; bool done_; Histogram* new_stats_; Histogram histogram_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1507d1e3d6..c2b69337a3 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -48,7 +48,6 @@ #include #include #include -#include #include #include "src/proto/grpc/testing/services.grpc.pb.h" @@ -64,7 +63,7 @@ class ClientRpcContext { ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate - virtual bool RunNextState(bool, Histogram* hist) = 0; + virtual bool RunNextState(bool, HistogramEntry* entry) = 0; virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast(c); } static ClientRpcContext* detag(void* t) { @@ -104,7 +103,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); } } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); @@ -114,7 +113,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_state_ = State::RESP_DONE; return true; case State::RESP_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::INVALID; return false; @@ -201,7 +200,7 @@ class AsyncClient : public ClientImpl { } } - bool ThreadFunc(Histogram* histogram, + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; @@ -209,7 +208,7 @@ class AsyncClient : public ClientImpl { if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, histogram)) { + if (!ctx->RunNextState(ok, entry)) { // The RPC and callback are done, so clone the ctx // and kickstart the new one auto clone = ctx->StartNewClone(); @@ -298,7 +297,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -330,7 +329,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -430,7 +429,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -462,7 +461,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c88e95b80e..f328f492e3 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -46,7 +46,6 @@ #include #include #include -#include #include #include #include @@ -55,7 +54,6 @@ #include "src/core/lib/profiling/timers.h" #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" -#include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/usage_timer.h" @@ -100,7 +98,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { } ~SynchronousUnaryClient() { EndThreads(); } - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = UsageTimer::Now(); @@ -108,7 +106,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { grpc::ClientContext context; grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); - histogram->Add((UsageTimer::Now() - start) * 1e9); + entry->set_value((UsageTimer::Now() - start) * 1e9); return s.ok(); } }; @@ -139,13 +137,13 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { delete[] context_; } - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { - histogram->Add((UsageTimer::Now() - start) * 1e9); + entry->set_value((UsageTimer::Now() - start) * 1e9); return true; } return false; -- cgit v1.2.3 From 40317fd7202ab96f8fb3c1f39258fff1ede3480e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jul 2016 19:20:25 -0700 Subject: Resolve pernicious race between destructor and thread functions by insisting that destructor is invoked after the class has gone back to being a harmless single-threaded thing. --- test/cpp/qps/client.h | 25 +++++++++++++++++++- test/cpp/qps/client_async.cc | 54 +++++++++++++++++++++++--------------------- test/cpp/qps/client_sync.cc | 17 +++++++------- test/cpp/qps/qps_worker.cc | 3 +++ 4 files changed, 64 insertions(+), 35 deletions(-) (limited to 'test/cpp/qps/client_sync.cc') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 38478be5d9..95023d2f80 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -162,10 +162,20 @@ class Client { return stats; } + // Must call AwaitThreadsCompletion before destructor to avoid a race + // between destructor and invocation of virtual ThreadFunc + void AwaitThreadsCompletion() { + DestroyMultithreading(); + std::unique_lock g(thread_completion_mu_); + while (threads_remaining_ != 0) { + threads_complete_.wait(g); + } + } protected: bool closed_loop_; void StartThreads(size_t num_threads) { + threads_remaining_ = num_threads; for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); } @@ -173,6 +183,7 @@ class Client { void EndThreads() { threads_.clear(); } + virtual void DestroyMultithreading() = 0; virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { @@ -270,6 +281,7 @@ class Client { done_ = true; } if (done_) { + client_->CompleteThread(); return; } } @@ -277,7 +289,6 @@ class Client { std::mutex mu_; bool done_; - Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; @@ -289,6 +300,18 @@ class Client { InterarrivalTimer interarrival_timer_; std::vector next_time_; + + std::mutex thread_completion_mu_; + size_t threads_remaining_; + std::condition_variable threads_complete_; + + void CompleteThread() { + std::lock_guard g(thread_completion_mu_); + threads_remaining_--; + if (threads_remaining_ == 0) { + threads_complete_.notify_all(); + } + } }; template diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index a0705673bd..f7fe746bbf 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -190,14 +190,6 @@ class AsyncClient : public ClientImpl { } } virtual ~AsyncClient() { - for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - std::lock_guard lock((*ss)->mutex); - (*ss)->shutdown = true; - } - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); - } - this->EndThreads(); // Need "this->" for resolution for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { void* got_tag; bool ok; @@ -206,6 +198,34 @@ class AsyncClient : public ClientImpl { } } } + protected: + const int num_async_threads_; + + private: + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} + }; + + int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = cores_; + gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); + } + return num_threads; + } + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + std::lock_guard lock((*ss)->mutex); + (*ss)->shutdown = true; + } + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + this->EndThreads(); // this needed for resolution + } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { @@ -234,24 +254,6 @@ class AsyncClient : public ClientImpl { } } - protected: - const int num_async_threads_; - - private: - struct PerThreadShutdownState { - mutable std::mutex mutex; - bool shutdown; - PerThreadShutdownState() : shutdown(false) {} - }; - - int NumThreads(const ClientConfig& config) { - int num_threads = config.async_client_threads(); - if (num_threads <= 0) { // Use dynamic sizing - num_threads = cores_; - gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); - } - return num_threads; - } std::vector> cli_cqs_; std::vector> next_issuers_; std::vector> shutdown_state_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 92680986bd..cc2c5ca540 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -87,6 +87,8 @@ class SynchronousClient size_t num_threads_; std::vector responses_; + private: + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); } }; class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { @@ -95,7 +97,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { : SynchronousClient(config) { StartThreads(num_threads_); } - ~SynchronousUnaryClient() { EndThreads(); } + ~SynchronousUnaryClient() {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); @@ -124,17 +126,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { StartThreads(num_threads_); } ~SynchronousStreamingClient() { - EndThreads(); for (size_t i = 0; i < num_threads_; i++) { auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - Status s = (*stream)->Finish(); - EXPECT_TRUE(s.ok()); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, - s.error_message().c_str()); - } + Status s = (*stream)->Finish(); + EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, + s.error_message().c_str()); + } } } delete[] stream_; diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 49ef52895c..e147734f7a 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -227,6 +227,9 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } + gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion"); + client->AwaitThreadsCompletion(); + gpr_log(GPR_INFO, "RunClientBody: Returning"); return Status::OK; } -- cgit v1.2.3 From 5fde20d9f0ce64f5caade6f485c3715af3ff23b8 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jul 2016 19:25:59 -0700 Subject: clang-format --- test/cpp/qps/client.h | 18 +++++++++++------- test/cpp/qps/client_async.cc | 3 ++- test/cpp/qps/client_sync.cc | 1 + test/cpp/qps/driver.cc | 10 +++++----- test/cpp/qps/qps_worker.cc | 4 ++-- test/cpp/qps/server_async.cc | 7 ++++--- 6 files changed, 25 insertions(+), 18 deletions(-) (limited to 'test/cpp/qps/client_sync.cc') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 95023d2f80..4045e13460 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -114,10 +114,14 @@ class ClientRequestCreator { class HistogramEntry GRPC_FINAL { public: - HistogramEntry(): used_(false) {} - bool used() const {return used_;} - double value() const {return value_;} - void set_value(double v) {used_ = true; value_ = v;} + HistogramEntry() : used_(false) {} + bool used() const { return used_; } + double value() const { return value_; } + void set_value(double v) { + used_ = true; + value_ = v; + } + private: bool used_; double value_; @@ -171,6 +175,7 @@ class Client { threads_complete_.wait(g); } } + protected: bool closed_loop_; @@ -254,8 +259,7 @@ class Client { n->Swap(&histogram_); } - void EndSwap() { - } + void EndSwap() {} void MergeStatsInto(Histogram* hist) { std::unique_lock g(mu_); @@ -281,7 +285,7 @@ class Client { done_ = true; } if (done_) { - client_->CompleteThread(); + client_->CompleteThread(); return; } } diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f7fe746bbf..feb58e7a82 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -198,6 +198,7 @@ class AsyncClient : public ClientImpl { } } } + protected: const int num_async_threads_; @@ -224,7 +225,7 @@ class AsyncClient : public ClientImpl { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { (*cq)->Shutdown(); } - this->EndThreads(); // this needed for resolution + this->EndThreads(); // this needed for resolution } bool ThreadFunc(HistogramEntry* entry, diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index cc2c5ca540..25c7823553 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -87,6 +87,7 @@ class SynchronousClient size_t num_threads_; std::vector responses_; + private: void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); } }; diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 7f12ee9c0e..2aeaea51f2 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -348,7 +348,7 @@ std::unique_ptr RunScenario( ClientArgs args; *args.mutable_setup() = per_client_config; clients[i].stream = - clients[i].stub->RunClient(runsc::AllocContext(&contexts)); + clients[i].stub->RunClient(runsc::AllocContext(&contexts)); if (!clients[i].stream->Write(args)) { gpr_log(GPR_ERROR, "Could not write args to client %zu", i); } @@ -439,7 +439,7 @@ std::unique_ptr RunScenario( result->add_client_success(s.ok()); if (!s.ok()) { gpr_log(GPR_ERROR, "Client %zu had an error %s", i, - s.error_message().c_str()); + s.error_message().c_str()); } } delete[] clients; @@ -475,7 +475,7 @@ std::unique_ptr RunScenario( result->add_server_success(s.ok()); if (!s.ok()) { gpr_log(GPR_ERROR, "Server %zu had an error %s", i, - s.error_message().c_str()); + s.error_message().c_str()); } } @@ -497,8 +497,8 @@ bool RunQuit() { ctx.set_fail_fast(false); Status s = stub->QuitWorker(&ctx, dummy, &dummy); if (!s.ok()) { - gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", - i, s.error_message().c_str()); + gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i, + s.error_message().c_str()); result = false; } } diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index e147734f7a..d3e53fe14a 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -222,7 +222,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { } *status.mutable_stats() = client->Mark(args.mark().reset()); if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark"); + return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark"); } gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } @@ -267,7 +267,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { } *status.mutable_stats() = server->Mark(args.mark().reset()); if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark"); + return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark"); } gpr_log(GPR_INFO, "RunServerBody: Mark response given"); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index da1a289e02..7e663ee0c2 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -132,8 +132,7 @@ class AsyncQpsServerTest : public Server { (*ss)->shutdown = true; } // TODO (vpai): Remove this deadline and allow Shutdown to finish properly - auto deadline = - std::chrono::system_clock::now() + std::chrono::seconds(3); + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3); server_->Shutdown(deadline); for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { (*cq)->Shutdown(); @@ -164,7 +163,9 @@ class AsyncQpsServerTest : public Server { // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down std::lock_guard l(shutdown_state_[thread_idx]->mutex); - if (shutdown_state_[thread_idx]->shutdown) { return; } + if (shutdown_state_[thread_idx]->shutdown) { + return; + } const bool still_going = ctx->RunNextState(ok); // if this RPC context is done, refresh it if (!still_going) { -- cgit v1.2.3