From b0f15e8af3622ebffa8414771fa6934568155179 Mon Sep 17 00:00:00 2001 From: vjpai Date: Wed, 6 Jul 2016 13:57:01 -0700 Subject: Reduce assertions, use status codes, increase verbosity on errors --- test/cpp/qps/client_async.cc | 1 - test/cpp/qps/client_sync.cc | 12 ++-- test/cpp/qps/driver.cc | 131 +++++++++++++++++++++++++++++++------------ test/cpp/qps/qps_worker.cc | 35 ++++++------ 4 files changed, 122 insertions(+), 57 deletions(-) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1507d1e3d6..2f987fc80d 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -31,7 +31,6 @@ * */ -#include #include #include #include diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c88e95b80e..686c8d750c 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -31,7 +31,6 @@ * */ -#include #include #include #include @@ -128,11 +127,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } ~SynchronousStreamingClient() { EndThreads(); - for (auto stream = &stream_[0]; stream != &stream_[num_threads_]; - stream++) { + for (size_t i = 0; i < num_threads_; i++) { + auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - EXPECT_TRUE((*stream)->Finish().ok()); + Status s = (*stream)->Finish(); + EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, + s.error_message().c_str()); + } } } delete[] stream_; diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 08bf045883..ba38de76f3 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -87,7 +87,7 @@ static std::unordered_map> get_hosts_and_cores( CoreRequest dummy; CoreResponse cores; grpc::Status s = stub->CoreCount(&ctx, dummy, &cores); - assert(s.ok()); + GPR_ASSERT(s.ok()); std::deque dq; for (int i = 0; i < cores.cores(); i++) { dq.push_back(i); @@ -289,9 +289,13 @@ std::unique_ptr RunScenario( *args.mutable_setup() = server_config; servers[i].stream = servers[i].stub->RunServer(runsc::AllocContext(&contexts)); - GPR_ASSERT(servers[i].stream->Write(args)); + if (!servers[i].stream->Write(args)) { + gpr_log(GPR_ERROR, "Could not write args to server %zu", i); + } ServerStatus init_status; - GPR_ASSERT(servers[i].stream->Read(&init_status)); + if (!servers[i].stream->Read(&init_status)) { + gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i); + } gpr_join_host_port(&cli_target, host, init_status.port()); client_config.add_server_targets(cli_target); gpr_free(host); @@ -344,10 +348,14 @@ std::unique_ptr RunScenario( ClientArgs args; *args.mutable_setup() = per_client_config; clients[i].stream = - clients[i].stub->RunClient(runsc::AllocContext(&contexts)); - GPR_ASSERT(clients[i].stream->Write(args)); + clients[i].stub->RunClient(runsc::AllocContext(&contexts)); + if (!clients[i].stream->Write(args)) { + gpr_log(GPR_ERROR, "Could not write args to client %zu", i); + } ClientStatus init_status; - GPR_ASSERT(clients[i].stream->Read(&init_status)); + if (!clients[i].stream->Read(&init_status)) { + gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i); + } } // Let everything warmup @@ -362,19 +370,31 @@ std::unique_ptr RunScenario( server_mark.mutable_mark()->set_reset(true); ClientArgs client_mark; client_mark.mutable_mark()->set_reset(true); - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Write(server_mark)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Write(server_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Write(client_mark)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Write(client_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i); + } } ServerStatus server_status; ClientStatus client_status; - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Read(&server_status)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Read(&server_status)) { + gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Read(&client_status)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Read(&client_status)) { + gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i); + } } // Wait some time @@ -390,37 +410,71 @@ std::unique_ptr RunScenario( Histogram merged_latencies; gpr_log(GPR_INFO, "Finishing clients"); - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Write(client_mark)); - GPR_ASSERT(client->stream->WritesDone()); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Write(client_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i); + } + if (!client->stream->WritesDone()) { + gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Read(&client_status)); - const auto& stats = client_status.stats(); - merged_latencies.MergeProto(stats.latencies()); - result->add_client_stats()->CopyFrom(stats); - GPR_ASSERT(!client->stream->Read(&client_status)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + // Read the client final status + if (client->stream->Read(&client_status)) { + gpr_log(GPR_INFO, "Received final status from client %zu", i); + const auto& stats = client_status.stats(); + merged_latencies.MergeProto(stats.latencies()); + result->add_client_stats()->CopyFrom(stats); + // That final status should be the last message on the client stream + GPR_ASSERT(!client->stream->Read(&client_status)); + } else { + gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Finish().ok()); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + Status s = client->stream->Finish(); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Client %zu had an error %s", i, + s.error_message().c_str()); + } } delete[] clients; merged_latencies.FillProto(result->mutable_latencies()); gpr_log(GPR_INFO, "Finishing servers"); - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Write(server_mark)); - GPR_ASSERT(server->stream->WritesDone()); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Write(server_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i); + } + if (!server->stream->WritesDone()) { + gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i); + } } - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Read(&server_status)); - result->add_server_stats()->CopyFrom(server_status.stats()); - result->add_server_cores(server_status.cores()); - GPR_ASSERT(!server->stream->Read(&server_status)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + // Read the server final status + if (server->stream->Read(&server_status)) { + gpr_log(GPR_INFO, "Received final status from server %zu", i); + result->add_server_stats()->CopyFrom(server_status.stats()); + result->add_server_cores(server_status.cores()); + // That final status should be the last message on the server stream + GPR_ASSERT(!server->stream->Read(&server_status)); + } else { + gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i); + } } - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Finish().ok()); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + Status s = server->stream->Finish(); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Server %zu had an error %s", i, + s.error_message().c_str()); + } } delete[] servers; @@ -438,7 +492,12 @@ void RunQuit() { Void dummy; grpc::ClientContext ctx; ctx.set_fail_fast(false); - GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok()); + Status s = stub->QuitWorker(&ctx, dummy, &dummy); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", + i, s.error_message().c_str()); + GPR_ASSERT(false); + } } } diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index f514e23e85..8456fde0ed 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -33,7 +33,6 @@ #include "test/cpp/qps/qps_worker.h" -#include #include #include #include @@ -124,7 +123,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy"); } ScopedProfile profile("qps_client.prof", false); @@ -137,7 +136,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy"); } ScopedProfile profile("qps_server.prof", false); @@ -154,7 +153,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy"); } worker_->MarkDone(); @@ -197,30 +196,32 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ServerReaderWriter* stream) { ClientArgs args; if (!stream->Read(&args)) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args"); } if (!args.has_setup()) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg"); } gpr_log(GPR_INFO, "RunClientBody: about to create client"); auto client = CreateClient(args.setup()); if (!client) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client"); } gpr_log(GPR_INFO, "RunClientBody: client created"); ClientStatus status; if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, ""); + return Status(StatusCode::UNKNOWN, "Client couldn't report init status"); } gpr_log(GPR_INFO, "RunClientBody: creation status reported"); while (stream->Read(&args)) { gpr_log(GPR_INFO, "RunClientBody: Message read"); if (!args.has_mark()) { gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!"); - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark"); } *status.mutable_stats() = client->Mark(args.mark().reset()); - stream->Write(status); + if (!stream->Write(status)) { + return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark"); + } gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } @@ -232,10 +233,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ServerReaderWriter* stream) { ServerArgs args; if (!stream->Read(&args)) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args"); } if (!args.has_setup()) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args"); } if (server_port_ != 0) { args.mutable_setup()->set_port(server_port_); @@ -243,24 +244,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { gpr_log(GPR_INFO, "RunServerBody: about to create server"); auto server = CreateServer(args.setup()); if (!server) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server"); } gpr_log(GPR_INFO, "RunServerBody: server created"); ServerStatus status; status.set_port(server->port()); status.set_cores(server->cores()); if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, ""); + return Status(StatusCode::UNKNOWN, "Server couldn't report init status"); } gpr_log(GPR_INFO, "RunServerBody: creation status reported"); while (stream->Read(&args)) { gpr_log(GPR_INFO, "RunServerBody: Message read"); if (!args.has_mark()) { gpr_log(GPR_INFO, "RunServerBody: Message not a mark!"); - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark"); } *status.mutable_stats() = server->Mark(args.mark().reset()); - stream->Write(status); + if (!stream->Write(status)) { + return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark"); + } gpr_log(GPR_INFO, "RunServerBody: Mark response given"); } -- cgit v1.2.3 From ceb1a7d79f04bcee8c153b43fbe8bef408160537 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 7 Jul 2016 11:06:04 -0700 Subject: Add more information so that we can have a meaningful exit code --- src/proto/grpc/testing/control.proto | 3 +++ test/cpp/qps/driver.cc | 8 ++++++-- test/cpp/qps/driver.h | 2 +- test/cpp/qps/qps_json_driver.cc | 18 +++++++++++++----- 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 20496a8116..ece6910815 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -229,4 +229,7 @@ message ScenarioResult { repeated int32 server_cores = 5; // An after-the-fact computed summary ScenarioResultSummary summary = 6; + // Information on success or failure of each worker + repeated bool client_success = 7; + repeated bool server_success = 8; } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index ba38de76f3..7f12ee9c0e 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -436,6 +436,7 @@ std::unique_ptr RunScenario( for (size_t i = 0; i < num_clients; i++) { auto client = &clients[i]; Status s = client->stream->Finish(); + result->add_client_success(s.ok()); if (!s.ok()) { gpr_log(GPR_ERROR, "Client %zu had an error %s", i, s.error_message().c_str()); @@ -471,6 +472,7 @@ std::unique_ptr RunScenario( for (size_t i = 0; i < num_servers; i++) { auto server = &servers[i]; Status s = server->stream->Finish(); + result->add_server_success(s.ok()); if (!s.ok()) { gpr_log(GPR_ERROR, "Server %zu had an error %s", i, s.error_message().c_str()); @@ -483,8 +485,9 @@ std::unique_ptr RunScenario( return result; } -void RunQuit() { +bool RunQuit() { // Get client, server lists + bool result = true; auto workers = get_workers("QPS_WORKERS"); for (size_t i = 0; i < workers.size(); i++) { auto stub = WorkerService::NewStub( @@ -496,9 +499,10 @@ void RunQuit() { if (!s.ok()) { gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i, s.error_message().c_str()); - GPR_ASSERT(false); + result = false; } } + return result; } } // namespace testing diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 3a5cf138f1..93f4370caf 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -47,7 +47,7 @@ std::unique_ptr RunScenario( const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); -void RunQuit(); +bool RunQuit(); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index f5d739f893..1524ebbc38 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers"); namespace grpc { namespace testing { -static void QpsDriver() { +static bool QpsDriver() { grpc::string json; bool scfile = (FLAGS_scenarios_file != ""); @@ -81,13 +81,13 @@ static void QpsDriver() { } else if (scjson) { json = FLAGS_scenarios_json.c_str(); } else if (FLAGS_quit) { - RunQuit(); - return; + return RunQuit(); } // Parse into an array of scenarios Scenarios scenarios; ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios); + bool success = true; // Make sure that there is at least some valid scenario here GPR_ASSERT(scenarios.scenarios_size() > 0); @@ -109,7 +109,15 @@ static void QpsDriver() { GetReporter()->ReportQPSPerCore(*result); GetReporter()->ReportLatency(*result); GetReporter()->ReportTimes(*result); + + for (int i = 0; success && i < result->client_success_size(); i++) { + success = result->client_success(i); + } + for (int i = 0; success && i < result->server_success_size(); i++) { + success = result->server_success(i); + } } + return success; } } // namespace testing @@ -118,7 +126,7 @@ static void QpsDriver() { int main(int argc, char **argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - grpc::testing::QpsDriver(); + bool ok = grpc::testing::QpsDriver(); - return 0; + return ok ? 0 : 1; } -- cgit v1.2.3 From f373f2cf8b9d6f8975e1dd976cddb1e3618e8ff9 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 8 Jul 2016 09:40:55 -0700 Subject: Stop holding histogram for a long time --- test/cpp/qps/client.h | 35 +++++++++++++++++++---------------- test/cpp/qps/client_async.cc | 19 +++++++++---------- test/cpp/qps/client_sync.cc | 10 ++++------ 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 047bd16408..38478be5d9 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -112,6 +112,17 @@ class ClientRequestCreator { } }; +class HistogramEntry GRPC_FINAL { + public: + HistogramEntry(): used_(false) {} + bool used() const {return used_;} + double value() const {return value_;} + void set_value(double v) {used_ = true; value_ = v;} + private: + bool used_; + double value_; +}; + class Client { public: Client() : timer_(new UsageTimer), interarrival_timer_() {} @@ -162,7 +173,7 @@ class Client { void EndThreads() { threads_.clear(); } - virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -215,7 +226,6 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), - new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -230,14 +240,10 @@ class Client { void BeginSwap(Histogram* n) { std::lock_guard g(mu_); - new_stats_ = n; + n->Swap(&histogram_); } void EndSwap() { - std::unique_lock g(mu_); - while (new_stats_ != nullptr) { - cv_.wait(g); - }; } void MergeStatsInto(Histogram* hist) { @@ -252,9 +258,13 @@ class Client { void ThreadFunc() { for (;;) { // run the loop body - const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // lock, see if we're done + HistogramEntry entry; + const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); + // lock, update histogram if needed and see if we're done std::lock_guard g(mu_); + if (entry.used()) { + histogram_.Add(entry.value()); + } if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; @@ -262,17 +272,10 @@ class Client { if (done_) { return; } - // check if we're resetting stats, swap out the histogram if so - if (new_stats_) { - new_stats_->Swap(&histogram_); - new_stats_ = nullptr; - cv_.notify_one(); - } } } std::mutex mu_; - std::condition_variable cv_; bool done_; Histogram* new_stats_; Histogram histogram_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1507d1e3d6..c2b69337a3 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -48,7 +48,6 @@ #include #include #include -#include #include #include "src/proto/grpc/testing/services.grpc.pb.h" @@ -64,7 +63,7 @@ class ClientRpcContext { ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate - virtual bool RunNextState(bool, Histogram* hist) = 0; + virtual bool RunNextState(bool, HistogramEntry* entry) = 0; virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast(c); } static ClientRpcContext* detag(void* t) { @@ -104,7 +103,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); } } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); @@ -114,7 +113,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_state_ = State::RESP_DONE; return true; case State::RESP_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::INVALID; return false; @@ -201,7 +200,7 @@ class AsyncClient : public ClientImpl { } } - bool ThreadFunc(Histogram* histogram, + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; @@ -209,7 +208,7 @@ class AsyncClient : public ClientImpl { if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, histogram)) { + if (!ctx->RunNextState(ok, entry)) { // The RPC and callback are done, so clone the ctx // and kickstart the new one auto clone = ctx->StartNewClone(); @@ -298,7 +297,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -330,7 +329,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -430,7 +429,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -462,7 +461,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c88e95b80e..f328f492e3 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -46,7 +46,6 @@ #include #include #include -#include #include #include #include @@ -55,7 +54,6 @@ #include "src/core/lib/profiling/timers.h" #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" -#include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/usage_timer.h" @@ -100,7 +98,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { } ~SynchronousUnaryClient() { EndThreads(); } - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = UsageTimer::Now(); @@ -108,7 +106,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { grpc::ClientContext context; grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); - histogram->Add((UsageTimer::Now() - start) * 1e9); + entry->set_value((UsageTimer::Now() - start) * 1e9); return s.ok(); } }; @@ -139,13 +137,13 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { delete[] context_; } - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { - histogram->Add((UsageTimer::Now() - start) * 1e9); + entry->set_value((UsageTimer::Now() - start) * 1e9); return true; } return false; -- cgit v1.2.3 From f782465fba11864293d858ba91d5e715fc481d7d Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 8 Jul 2016 10:33:10 -0700 Subject: Fix some shutdown errors related to CQ/join ordering --- test/cpp/qps/client_async.cc | 43 ++++++++++++++++++++++++++++++------------- test/cpp/qps/qps_worker.cc | 2 ++ 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 963a1e1cd0..057e5a0d6b 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -189,14 +189,7 @@ class AsyncClient : public ClientImpl { } } virtual ~AsyncClient() { - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); - void* got_tag; - bool ok; - while ((*cq)->Next(&got_tag, &ok)) { - delete ClientRpcContext::detag(got_tag); - } - } + FinalShutdownCQs(); } bool ThreadFunc(HistogramEntry* entry, @@ -216,14 +209,29 @@ class AsyncClient : public ClientImpl { delete ctx; } return true; - } else { // queue is shutting down - return false; + } else { // queue is shutting down, so we must be done + return true; } } protected: const int num_async_threads_; + void ShutdownCQs() { + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + } + void FinalShutdownCQs() { + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + void* got_tag; + bool ok; + while ((*cq)->Next(&got_tag, &ok)) { + delete ClientRpcContext::detag(got_tag); + } + } + } + private: int NumThreads(const ClientConfig& config) { int num_threads = config.async_client_threads(); @@ -251,7 +259,10 @@ class AsyncUnaryClient GRPC_FINAL config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } - ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } + ~AsyncUnaryClient() GRPC_OVERRIDE { + ShutdownCQs(); + EndThreads(); + } private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -380,7 +391,10 @@ class AsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + ~AsyncStreamingClient() GRPC_OVERRIDE { + ShutdownCQs(); + EndThreads(); + } private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -516,7 +530,10 @@ class GenericAsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + ~GenericAsyncStreamingClient() GRPC_OVERRIDE { + ShutdownCQs(); + EndThreads(); + } private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 8456fde0ed..49ef52895c 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -128,6 +128,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ScopedProfile profile("qps_client.prof", false); Status ret = RunClientBody(ctx, stream); + gpr_log(GPR_INFO, "RunClient: Returning"); return ret; } @@ -141,6 +142,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ScopedProfile profile("qps_server.prof", false); Status ret = RunServerBody(ctx, stream); + gpr_log(GPR_INFO, "RunServer: Returning"); return ret; } -- cgit v1.2.3 From a831651aa53fcb44cbd57460de99625be33c93a5 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jul 2016 10:32:26 -0700 Subject: Unify and make consistent the per-thread shutdown process --- test/cpp/qps/client_async.cc | 62 +++++++++++++++++++++++--------------------- test/cpp/qps/server_async.cc | 49 ++++++++++++++-------------------- 2 files changed, 51 insertions(+), 60 deletions(-) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 057e5a0d6b..a0705673bd 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -174,6 +174,7 @@ class AsyncClient : public ClientImpl { for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); next_issuers_.emplace_back(NextIssuer(i)); + shutdown_state_.emplace_back(new PerThreadShutdownState()); } using namespace std::placeholders; @@ -189,7 +190,21 @@ class AsyncClient : public ClientImpl { } } virtual ~AsyncClient() { - FinalShutdownCQs(); + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + std::lock_guard lock((*ss)->mutex); + (*ss)->shutdown = true; + } + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + this->EndThreads(); // Need "this->" for resolution + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + void* got_tag; + bool ok; + while ((*cq)->Next(&got_tag, &ok)) { + delete ClientRpcContext::detag(got_tag); + } + } } bool ThreadFunc(HistogramEntry* entry, @@ -200,7 +215,12 @@ class AsyncClient : public ClientImpl { if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, entry)) { + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { + return true; + } else if (!ctx->RunNextState(ok, entry)) { // The RPC and callback are done, so clone the ctx // and kickstart the new one auto clone = ctx->StartNewClone(); @@ -217,22 +237,13 @@ class AsyncClient : public ClientImpl { protected: const int num_async_threads_; - void ShutdownCQs() { - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); - } - } - void FinalShutdownCQs() { - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - void* got_tag; - bool ok; - while ((*cq)->Next(&got_tag, &ok)) { - delete ClientRpcContext::detag(got_tag); - } - } - } - private: + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} + }; + int NumThreads(const ClientConfig& config) { int num_threads = config.async_client_threads(); if (num_threads <= 0) { // Use dynamic sizing @@ -241,9 +252,9 @@ class AsyncClient : public ClientImpl { } return num_threads; } - std::vector> cli_cqs_; std::vector> next_issuers_; + std::vector> shutdown_state_; }; static std::unique_ptr BenchmarkStubCreator( @@ -259,10 +270,7 @@ class AsyncUnaryClient GRPC_FINAL config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } - ~AsyncUnaryClient() GRPC_OVERRIDE { - ShutdownCQs(); - EndThreads(); - } + ~AsyncUnaryClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -391,10 +399,7 @@ class AsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~AsyncStreamingClient() GRPC_OVERRIDE { - ShutdownCQs(); - EndThreads(); - } + ~AsyncStreamingClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -530,10 +535,7 @@ class GenericAsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~GenericAsyncStreamingClient() GRPC_OVERRIDE { - ShutdownCQs(); - EndThreads(); - } + ~GenericAsyncStreamingClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c9954d0d02..85acefa00b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -123,21 +123,22 @@ class AsyncQpsServerTest : public Server { for (int i = 0; i < num_threads; i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); - } - for (int i = 0; i < num_threads; i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } ~AsyncQpsServerTest() { for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - (*ss)->set_shutdown(); + std::lock_guard lock((*ss)->mutex); + (*ss)->shutdown = true; } server_->Shutdown(); + for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { + (*cq)->Shutdown(); + } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { - (*cq)->Shutdown(); bool ok; void *got_tag; while ((*cq)->Next(&got_tag, &ok)) @@ -150,21 +151,21 @@ class AsyncQpsServerTest : public Server { } private: - void ThreadFunc(int rank) { + void ThreadFunc(int thread_idx) { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[rank]->Next(&got_tag, &ok)) { + while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { return; } const bool still_going = ctx->RunNextState(ok); - if (!shutdown_state_[rank]->shutdown()) { - // this RPC context is done, so refresh it - if (!still_going) { - ctx->Reset(); - } - } else { - return; + // if this RPC context is done, refresh it + if (!still_going) { + ctx->Reset(); } } return; @@ -333,24 +334,12 @@ class AsyncQpsServerTest : public Server { ServiceType async_service_; std::forward_list contexts_; - class PerThreadShutdownState { - public: - PerThreadShutdownState() : shutdown_(false) {} - - bool shutdown() const { - std::lock_guard lock(mutex_); - return shutdown_; - } - - void set_shutdown() { - std::lock_guard lock(mutex_); - shutdown_ = true; - } - - private: - mutable std::mutex mutex_; - bool shutdown_; + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} }; + std::vector> shutdown_state_; }; -- cgit v1.2.3 From ad7c52761895c46a3964ab8864d11c7aa269a29b Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jul 2016 14:51:44 -0700 Subject: Temporary workaround by setting up a deadline on server shutdown. This reveals an issue that needs to be solved: D0713 14:48:28.049861213 14503 server.c:704] Waiting for 44 channels and 0/1 listeners to be destroyed before shutting down server --- test/cpp/qps/server_async.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 85acefa00b..da1a289e02 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -131,7 +131,10 @@ class AsyncQpsServerTest : public Server { std::lock_guard lock((*ss)->mutex); (*ss)->shutdown = true; } - server_->Shutdown(); + // TODO (vpai): Remove this deadline and allow Shutdown to finish properly + auto deadline = + std::chrono::system_clock::now() + std::chrono::seconds(3); + server_->Shutdown(deadline); for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { (*cq)->Shutdown(); } -- cgit v1.2.3 From 40317fd7202ab96f8fb3c1f39258fff1ede3480e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jul 2016 19:20:25 -0700 Subject: Resolve pernicious race between destructor and thread functions by insisting that destructor is invoked after the class has gone back to being a harmless single-threaded thing. --- test/cpp/qps/client.h | 25 +++++++++++++++++++- test/cpp/qps/client_async.cc | 54 +++++++++++++++++++++++--------------------- test/cpp/qps/client_sync.cc | 17 +++++++------- test/cpp/qps/qps_worker.cc | 3 +++ 4 files changed, 64 insertions(+), 35 deletions(-) diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 38478be5d9..95023d2f80 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -162,10 +162,20 @@ class Client { return stats; } + // Must call AwaitThreadsCompletion before destructor to avoid a race + // between destructor and invocation of virtual ThreadFunc + void AwaitThreadsCompletion() { + DestroyMultithreading(); + std::unique_lock g(thread_completion_mu_); + while (threads_remaining_ != 0) { + threads_complete_.wait(g); + } + } protected: bool closed_loop_; void StartThreads(size_t num_threads) { + threads_remaining_ = num_threads; for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); } @@ -173,6 +183,7 @@ class Client { void EndThreads() { threads_.clear(); } + virtual void DestroyMultithreading() = 0; virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { @@ -270,6 +281,7 @@ class Client { done_ = true; } if (done_) { + client_->CompleteThread(); return; } } @@ -277,7 +289,6 @@ class Client { std::mutex mu_; bool done_; - Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; @@ -289,6 +300,18 @@ class Client { InterarrivalTimer interarrival_timer_; std::vector next_time_; + + std::mutex thread_completion_mu_; + size_t threads_remaining_; + std::condition_variable threads_complete_; + + void CompleteThread() { + std::lock_guard g(thread_completion_mu_); + threads_remaining_--; + if (threads_remaining_ == 0) { + threads_complete_.notify_all(); + } + } }; template diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index a0705673bd..f7fe746bbf 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -190,14 +190,6 @@ class AsyncClient : public ClientImpl { } } virtual ~AsyncClient() { - for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - std::lock_guard lock((*ss)->mutex); - (*ss)->shutdown = true; - } - for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); - } - this->EndThreads(); // Need "this->" for resolution for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { void* got_tag; bool ok; @@ -206,6 +198,34 @@ class AsyncClient : public ClientImpl { } } } + protected: + const int num_async_threads_; + + private: + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} + }; + + int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = cores_; + gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); + } + return num_threads; + } + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + std::lock_guard lock((*ss)->mutex); + (*ss)->shutdown = true; + } + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + this->EndThreads(); // this needed for resolution + } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { @@ -234,24 +254,6 @@ class AsyncClient : public ClientImpl { } } - protected: - const int num_async_threads_; - - private: - struct PerThreadShutdownState { - mutable std::mutex mutex; - bool shutdown; - PerThreadShutdownState() : shutdown(false) {} - }; - - int NumThreads(const ClientConfig& config) { - int num_threads = config.async_client_threads(); - if (num_threads <= 0) { // Use dynamic sizing - num_threads = cores_; - gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); - } - return num_threads; - } std::vector> cli_cqs_; std::vector> next_issuers_; std::vector> shutdown_state_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 92680986bd..cc2c5ca540 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -87,6 +87,8 @@ class SynchronousClient size_t num_threads_; std::vector responses_; + private: + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); } }; class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { @@ -95,7 +97,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { : SynchronousClient(config) { StartThreads(num_threads_); } - ~SynchronousUnaryClient() { EndThreads(); } + ~SynchronousUnaryClient() {} bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); @@ -124,17 +126,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { StartThreads(num_threads_); } ~SynchronousStreamingClient() { - EndThreads(); for (size_t i = 0; i < num_threads_; i++) { auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - Status s = (*stream)->Finish(); - EXPECT_TRUE(s.ok()); - if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, - s.error_message().c_str()); - } + Status s = (*stream)->Finish(); + EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, + s.error_message().c_str()); + } } } delete[] stream_; diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 49ef52895c..e147734f7a 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -227,6 +227,9 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } + gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion"); + client->AwaitThreadsCompletion(); + gpr_log(GPR_INFO, "RunClientBody: Returning"); return Status::OK; } -- cgit v1.2.3 From 5fde20d9f0ce64f5caade6f485c3715af3ff23b8 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jul 2016 19:25:59 -0700 Subject: clang-format --- test/cpp/qps/client.h | 18 +++++++++++------- test/cpp/qps/client_async.cc | 3 ++- test/cpp/qps/client_sync.cc | 1 + test/cpp/qps/driver.cc | 10 +++++----- test/cpp/qps/qps_worker.cc | 4 ++-- test/cpp/qps/server_async.cc | 7 ++++--- 6 files changed, 25 insertions(+), 18 deletions(-) diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 95023d2f80..4045e13460 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -114,10 +114,14 @@ class ClientRequestCreator { class HistogramEntry GRPC_FINAL { public: - HistogramEntry(): used_(false) {} - bool used() const {return used_;} - double value() const {return value_;} - void set_value(double v) {used_ = true; value_ = v;} + HistogramEntry() : used_(false) {} + bool used() const { return used_; } + double value() const { return value_; } + void set_value(double v) { + used_ = true; + value_ = v; + } + private: bool used_; double value_; @@ -171,6 +175,7 @@ class Client { threads_complete_.wait(g); } } + protected: bool closed_loop_; @@ -254,8 +259,7 @@ class Client { n->Swap(&histogram_); } - void EndSwap() { - } + void EndSwap() {} void MergeStatsInto(Histogram* hist) { std::unique_lock g(mu_); @@ -281,7 +285,7 @@ class Client { done_ = true; } if (done_) { - client_->CompleteThread(); + client_->CompleteThread(); return; } } diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f7fe746bbf..feb58e7a82 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -198,6 +198,7 @@ class AsyncClient : public ClientImpl { } } } + protected: const int num_async_threads_; @@ -224,7 +225,7 @@ class AsyncClient : public ClientImpl { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { (*cq)->Shutdown(); } - this->EndThreads(); // this needed for resolution + this->EndThreads(); // this needed for resolution } bool ThreadFunc(HistogramEntry* entry, diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index cc2c5ca540..25c7823553 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -87,6 +87,7 @@ class SynchronousClient size_t num_threads_; std::vector responses_; + private: void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); } }; diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 7f12ee9c0e..2aeaea51f2 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -348,7 +348,7 @@ std::unique_ptr RunScenario( ClientArgs args; *args.mutable_setup() = per_client_config; clients[i].stream = - clients[i].stub->RunClient(runsc::AllocContext(&contexts)); + clients[i].stub->RunClient(runsc::AllocContext(&contexts)); if (!clients[i].stream->Write(args)) { gpr_log(GPR_ERROR, "Could not write args to client %zu", i); } @@ -439,7 +439,7 @@ std::unique_ptr RunScenario( result->add_client_success(s.ok()); if (!s.ok()) { gpr_log(GPR_ERROR, "Client %zu had an error %s", i, - s.error_message().c_str()); + s.error_message().c_str()); } } delete[] clients; @@ -475,7 +475,7 @@ std::unique_ptr RunScenario( result->add_server_success(s.ok()); if (!s.ok()) { gpr_log(GPR_ERROR, "Server %zu had an error %s", i, - s.error_message().c_str()); + s.error_message().c_str()); } } @@ -497,8 +497,8 @@ bool RunQuit() { ctx.set_fail_fast(false); Status s = stub->QuitWorker(&ctx, dummy, &dummy); if (!s.ok()) { - gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", - i, s.error_message().c_str()); + gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i, + s.error_message().c_str()); result = false; } } diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index e147734f7a..d3e53fe14a 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -222,7 +222,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { } *status.mutable_stats() = client->Mark(args.mark().reset()); if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark"); + return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark"); } gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } @@ -267,7 +267,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { } *status.mutable_stats() = server->Mark(args.mark().reset()); if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark"); + return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark"); } gpr_log(GPR_INFO, "RunServerBody: Mark response given"); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index da1a289e02..7e663ee0c2 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -132,8 +132,7 @@ class AsyncQpsServerTest : public Server { (*ss)->shutdown = true; } // TODO (vpai): Remove this deadline and allow Shutdown to finish properly - auto deadline = - std::chrono::system_clock::now() + std::chrono::seconds(3); + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3); server_->Shutdown(deadline); for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { (*cq)->Shutdown(); @@ -164,7 +163,9 @@ class AsyncQpsServerTest : public Server { // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down std::lock_guard l(shutdown_state_[thread_idx]->mutex); - if (shutdown_state_[thread_idx]->shutdown) { return; } + if (shutdown_state_[thread_idx]->shutdown) { + return; + } const bool still_going = ctx->RunNextState(ok); // if this RPC context is done, refresh it if (!still_going) { -- cgit v1.2.3 From ad94fdfe51f7faa4be4d6eb40779802e7d56f780 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 15 Jul 2016 06:42:29 -0700 Subject: Fix compile error --- test/cpp/qps/client_async.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 43f037ce48..5d9cb4bd0c 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -265,6 +265,7 @@ class AsyncClient : public ClientImpl { // done return true; } + GPR_UNREACHABLE_CODE(return true); } std::vector> cli_cqs_; -- cgit v1.2.3