aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-07-15 07:21:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-07-19 09:26:04 -0700
commit77c7f9fd621148ed63f938c21ad3364bba65445e (patch)
treed17544ed7c0ab2ab5e4085cb098271d390d178fc /test/cpp
parent5ca7e47493899b69126a2ef331936bcba37ee545 (diff)
Merge pull request #7407 from ctiller/delayed-write
Benchmark fixes
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/qps/client.h68
-rw-r--r--test/cpp/qps/client_async.cc92
-rw-r--r--test/cpp/qps/client_sync.cc28
-rw-r--r--test/cpp/qps/driver.cc135
-rw-r--r--test/cpp/qps/driver.h2
-rw-r--r--test/cpp/qps/qps_json_driver.cc18
-rw-r--r--test/cpp/qps/qps_worker.cc40
-rw-r--r--test/cpp/qps/server_async.cc56
8 files changed, 284 insertions, 155 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 047bd16408..4045e13460 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -112,6 +112,21 @@ class ClientRequestCreator<ByteBuffer> {
}
};
+class HistogramEntry GRPC_FINAL {
+ public:
+ HistogramEntry() : used_(false) {}
+ bool used() const { return used_; }
+ double value() const { return value_; }
+ void set_value(double v) {
+ used_ = true;
+ value_ = v;
+ }
+
+ private:
+ bool used_;
+ double value_;
+};
+
class Client {
public:
Client() : timer_(new UsageTimer), interarrival_timer_() {}
@@ -151,10 +166,21 @@ class Client {
return stats;
}
+ // Must call AwaitThreadsCompletion before destructor to avoid a race
+ // between destructor and invocation of virtual ThreadFunc
+ void AwaitThreadsCompletion() {
+ DestroyMultithreading();
+ std::unique_lock<std::mutex> g(thread_completion_mu_);
+ while (threads_remaining_ != 0) {
+ threads_complete_.wait(g);
+ }
+ }
+
protected:
bool closed_loop_;
void StartThreads(size_t num_threads) {
+ threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
@@ -162,7 +188,8 @@ class Client {
void EndThreads() { threads_.clear(); }
- virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+ virtual void DestroyMultithreading() = 0;
+ virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@@ -215,7 +242,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
- new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@@ -230,15 +256,10 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
- new_stats_ = n;
+ n->Swap(&histogram_);
}
- void EndSwap() {
- std::unique_lock<std::mutex> g(mu_);
- while (new_stats_ != nullptr) {
- cv_.wait(g);
- };
- }
+ void EndSwap() {}
void MergeStatsInto(Histogram* hist) {
std::unique_lock<std::mutex> g(mu_);
@@ -252,29 +273,26 @@ class Client {
void ThreadFunc() {
for (;;) {
// run the loop body
- const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
- // lock, see if we're done
+ HistogramEntry entry;
+ const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
+ // lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
+ if (entry.used()) {
+ histogram_.Add(entry.value());
+ }
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
+ client_->CompleteThread();
return;
}
- // check if we're resetting stats, swap out the histogram if so
- if (new_stats_) {
- new_stats_->Swap(&histogram_);
- new_stats_ = nullptr;
- cv_.notify_one();
- }
}
}
std::mutex mu_;
- std::condition_variable cv_;
bool done_;
- Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
@@ -286,6 +304,18 @@ class Client {
InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_;
+
+ std::mutex thread_completion_mu_;
+ size_t threads_remaining_;
+ std::condition_variable threads_complete_;
+
+ void CompleteThread() {
+ std::lock_guard<std::mutex> g(thread_completion_mu_);
+ threads_remaining_--;
+ if (threads_remaining_ == 0) {
+ threads_complete_.notify_all();
+ }
+ }
};
template <class StubType, class RequestType>
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 5dd0bd8533..5d9cb4bd0c 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -31,7 +31,6 @@
*
*/
-#include <cassert>
#include <forward_list>
#include <functional>
#include <list>
@@ -48,7 +47,6 @@
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
#include <grpc/support/cpu.h>
-#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include "src/proto/grpc/testing/services.grpc.pb.h"
@@ -64,7 +62,7 @@ class ClientRpcContext {
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
- virtual bool RunNextState(bool, Histogram* hist) = 0;
+ virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
virtual ClientRpcContext* StartNewClone() = 0;
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
static ClientRpcContext* detag(void* t) {
@@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
}
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
switch (next_state_) {
case State::READY:
start_ = UsageTimer::Now();
@@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_ = State::RESP_DONE;
return true;
case State::RESP_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::INVALID;
return false;
@@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
for (int i = 0; i < num_async_threads_; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
next_issuers_.emplace_back(NextIssuer(i));
+ shutdown_state_.emplace_back(new PerThreadShutdownState());
}
using namespace std::placeholders;
@@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
virtual ~AsyncClient() {
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
- (*cq)->Shutdown();
void* got_tag;
bool ok;
while ((*cq)->Next(&got_tag, &ok)) {
@@ -201,7 +199,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
- bool ThreadFunc(Histogram* histogram,
+ protected:
+ const int num_async_threads_;
+
+ private:
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
+ };
+
+ int NumThreads(const ClientConfig& config) {
+ int num_threads = config.async_client_threads();
+ if (num_threads <= 0) { // Use dynamic sizing
+ num_threads = cores_;
+ gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
+ }
+ return num_threads;
+ }
+ void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
+ for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
+ }
+ for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+ (*cq)->Shutdown();
+ }
+ this->EndThreads(); // this needed for resolution
+ }
+
+ bool ThreadFunc(HistogramEntry* entry,
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
@@ -209,12 +236,15 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
switch (cli_cqs_[thread_idx]->AsyncNext(
&got_tag, &ok,
std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
- case CompletionQueue::SHUTDOWN:
- return false;
case CompletionQueue::GOT_EVENT: {
// Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- if (!ctx->RunNextState(ok, histogram)) {
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ return true;
+ } else if (!ctx->RunNextState(ok, entry)) {
// The RPC and callback are done, so clone the ctx
// and kickstart the new one
auto clone = ctx->StartNewClone();
@@ -224,29 +254,23 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
return true;
}
- case CompletionQueue::TIMEOUT:
- // TODO(ctiller): do something here to track how frequently we pass
- // through this codepath.
+ case CompletionQueue::TIMEOUT: {
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ return true;
+ }
+ return true;
+ }
+ case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
+ // done
return true;
}
- GPR_UNREACHABLE_CODE(return false);
- }
-
- protected:
- const int num_async_threads_;
-
- private:
- int NumThreads(const ClientConfig& config) {
- int num_threads = config.async_client_threads();
- if (num_threads <= 0) { // Use dynamic sizing
- num_threads = cores_;
- gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
- }
- return num_threads;
+ GPR_UNREACHABLE_CODE(return true);
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
+ std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@@ -262,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL
config, SetupCtx, BenchmarkStubCreator) {
StartThreads(num_async_threads_);
}
- ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
+ ~AsyncUnaryClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -307,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@@ -339,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@@ -391,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
- ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+ ~AsyncStreamingClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -439,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@@ -471,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@@ -527,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
- ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+ ~GenericAsyncStreamingClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index c88e95b80e..25c7823553 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -31,7 +31,6 @@
*
*/
-#include <cassert>
#include <chrono>
#include <memory>
#include <mutex>
@@ -46,7 +45,6 @@
#include <grpc++/server_builder.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/histogram.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -55,7 +53,6 @@
#include "src/core/lib/profiling/timers.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/cpp/qps/client.h"
-#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/usage_timer.h"
@@ -90,6 +87,9 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
+
+ private:
+ void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); }
};
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
@@ -98,9 +98,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
: SynchronousClient(config) {
StartThreads(num_threads_);
}
- ~SynchronousUnaryClient() { EndThreads(); }
+ ~SynchronousUnaryClient() {}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = UsageTimer::Now();
@@ -108,7 +108,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::ClientContext context;
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
- histogram->Add((UsageTimer::Now() - start) * 1e9);
+ entry->set_value((UsageTimer::Now() - start) * 1e9);
return s.ok();
}
};
@@ -127,25 +127,29 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
- EndThreads();
- for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
- stream++) {
+ for (size_t i = 0; i < num_threads_; i++) {
+ auto stream = &stream_[i];
if (*stream) {
(*stream)->WritesDone();
- EXPECT_TRUE((*stream)->Finish().ok());
+ Status s = (*stream)->Finish();
+ EXPECT_TRUE(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
+ s.error_message().c_str());
+ }
}
}
delete[] stream_;
delete[] context_;
}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0);
double start = UsageTimer::Now();
if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
- histogram->Add((UsageTimer::Now() - start) * 1e9);
+ entry->set_value((UsageTimer::Now() - start) * 1e9);
return true;
}
return false;
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 08bf045883..2aeaea51f2 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -87,7 +87,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
- assert(s.ok());
+ GPR_ASSERT(s.ok());
std::deque<int> dq;
for (int i = 0; i < cores.cores(); i++) {
dq.push_back(i);
@@ -289,9 +289,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts));
- GPR_ASSERT(servers[i].stream->Write(args));
+ if (!servers[i].stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
+ }
ServerStatus init_status;
- GPR_ASSERT(servers[i].stream->Read(&init_status));
+ if (!servers[i].stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
+ }
gpr_join_host_port(&cli_target, host, init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(host);
@@ -345,9 +349,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = per_client_config;
clients[i].stream =
clients[i].stub->RunClient(runsc::AllocContext(&contexts));
- GPR_ASSERT(clients[i].stream->Write(args));
+ if (!clients[i].stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
+ }
ClientStatus init_status;
- GPR_ASSERT(clients[i].stream->Read(&init_status));
+ if (!clients[i].stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
+ }
}
// Let everything warmup
@@ -362,19 +370,31 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
client_mark.mutable_mark()->set_reset(true);
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Write(server_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Write(client_mark));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
}
ServerStatus server_status;
ClientStatus client_status;
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Read(&server_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Read(&client_status));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Read(&client_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
+ }
}
// Wait some time
@@ -390,37 +410,73 @@ std::unique_ptr<ScenarioResult> RunScenario(
Histogram merged_latencies;
gpr_log(GPR_INFO, "Finishing clients");
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Write(client_mark));
- GPR_ASSERT(client->stream->WritesDone());
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
+ if (!client->stream->WritesDone()) {
+ gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Read(&client_status));
- const auto& stats = client_status.stats();
- merged_latencies.MergeProto(stats.latencies());
- result->add_client_stats()->CopyFrom(stats);
- GPR_ASSERT(!client->stream->Read(&client_status));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ // Read the client final status
+ if (client->stream->Read(&client_status)) {
+ gpr_log(GPR_INFO, "Received final status from client %zu", i);
+ const auto& stats = client_status.stats();
+ merged_latencies.MergeProto(stats.latencies());
+ result->add_client_stats()->CopyFrom(stats);
+ // That final status should be the last message on the client stream
+ GPR_ASSERT(!client->stream->Read(&client_status));
+ } else {
+ gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Finish().ok());
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ Status s = client->stream->Finish();
+ result->add_client_success(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
+ s.error_message().c_str());
+ }
}
delete[] clients;
merged_latencies.FillProto(result->mutable_latencies());
gpr_log(GPR_INFO, "Finishing servers");
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
- GPR_ASSERT(server->stream->WritesDone());
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Write(server_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+ }
+ if (!server->stream->WritesDone()) {
+ gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
+ }
}
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
- result->add_server_stats()->CopyFrom(server_status.stats());
- result->add_server_cores(server_status.cores());
- GPR_ASSERT(!server->stream->Read(&server_status));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ // Read the server final status
+ if (server->stream->Read(&server_status)) {
+ gpr_log(GPR_INFO, "Received final status from server %zu", i);
+ result->add_server_stats()->CopyFrom(server_status.stats());
+ result->add_server_cores(server_status.cores());
+ // That final status should be the last message on the server stream
+ GPR_ASSERT(!server->stream->Read(&server_status));
+ } else {
+ gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
+ }
}
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Finish().ok());
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ Status s = server->stream->Finish();
+ result->add_server_success(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
+ s.error_message().c_str());
+ }
}
delete[] servers;
@@ -429,8 +485,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
return result;
}
-void RunQuit() {
+bool RunQuit() {
// Get client, server lists
+ bool result = true;
auto workers = get_workers("QPS_WORKERS");
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
@@ -438,8 +495,14 @@ void RunQuit() {
Void dummy;
grpc::ClientContext ctx;
ctx.set_fail_fast(false);
- GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
+ Status s = stub->QuitWorker(&ctx, dummy, &dummy);
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
+ s.error_message().c_str());
+ result = false;
+ }
}
+ return result;
}
} // namespace testing
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 3a5cf138f1..93f4370caf 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -47,7 +47,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
-void RunQuit();
+bool RunQuit();
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index f5d739f893..1524ebbc38 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers");
namespace grpc {
namespace testing {
-static void QpsDriver() {
+static bool QpsDriver() {
grpc::string json;
bool scfile = (FLAGS_scenarios_file != "");
@@ -81,13 +81,13 @@ static void QpsDriver() {
} else if (scjson) {
json = FLAGS_scenarios_json.c_str();
} else if (FLAGS_quit) {
- RunQuit();
- return;
+ return RunQuit();
}
// Parse into an array of scenarios
Scenarios scenarios;
ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios);
+ bool success = true;
// Make sure that there is at least some valid scenario here
GPR_ASSERT(scenarios.scenarios_size() > 0);
@@ -109,7 +109,15 @@ static void QpsDriver() {
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
+
+ for (int i = 0; success && i < result->client_success_size(); i++) {
+ success = result->client_success(i);
+ }
+ for (int i = 0; success && i < result->server_success_size(); i++) {
+ success = result->server_success(i);
+ }
}
+ return success;
}
} // namespace testing
@@ -118,7 +126,7 @@ static void QpsDriver() {
int main(int argc, char **argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- grpc::testing::QpsDriver();
+ bool ok = grpc::testing::QpsDriver();
- return 0;
+ return ok ? 0 : 1;
}
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index f514e23e85..d3e53fe14a 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -33,7 +33,6 @@
#include "test/cpp/qps/qps_worker.h"
-#include <cassert>
#include <memory>
#include <mutex>
#include <sstream>
@@ -124,11 +123,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
}
ScopedProfile profile("qps_client.prof", false);
Status ret = RunClientBody(ctx, stream);
+ gpr_log(GPR_INFO, "RunClient: Returning");
return ret;
}
@@ -137,11 +137,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
}
ScopedProfile profile("qps_server.prof", false);
Status ret = RunServerBody(ctx, stream);
+ gpr_log(GPR_INFO, "RunServer: Returning");
return ret;
}
@@ -154,7 +155,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
}
worker_->MarkDone();
@@ -197,33 +198,38 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
}
if (!args.has_setup()) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
}
gpr_log(GPR_INFO, "RunClientBody: about to create client");
auto client = CreateClient(args.setup());
if (!client) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
}
gpr_log(GPR_INFO, "RunClientBody: client created");
ClientStatus status;
if (!stream->Write(status)) {
- return Status(StatusCode::UNKNOWN, "");
+ return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
}
gpr_log(GPR_INFO, "RunClientBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunClientBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = client->Mark(args.mark().reset());
- stream->Write(status);
+ if (!stream->Write(status)) {
+ return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
+ }
gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
+ gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
+ client->AwaitThreadsCompletion();
+
gpr_log(GPR_INFO, "RunClientBody: Returning");
return Status::OK;
}
@@ -232,10 +238,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
ServerArgs args;
if (!stream->Read(&args)) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
}
if (!args.has_setup()) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
}
if (server_port_ != 0) {
args.mutable_setup()->set_port(server_port_);
@@ -243,24 +249,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
gpr_log(GPR_INFO, "RunServerBody: about to create server");
auto server = CreateServer(args.setup());
if (!server) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
}
gpr_log(GPR_INFO, "RunServerBody: server created");
ServerStatus status;
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
- return Status(StatusCode::UNKNOWN, "");
+ return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
}
gpr_log(GPR_INFO, "RunServerBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunServerBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = server->Mark(args.mark().reset());
- stream->Write(status);
+ if (!stream->Write(status)) {
+ return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
+ }
gpr_log(GPR_INFO, "RunServerBody: Mark response given");
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 73ca19148b..dea8746331 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -123,22 +123,24 @@ class AsyncQpsServerTest : public Server {
for (int i = 0; i < num_threads; i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
- }
- for (int i = 0; i < num_threads; i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
~AsyncQpsServerTest() {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
- (*ss)->set_shutdown();
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
+ }
+ // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
+ auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
+ server_->Shutdown(deadline);
+ for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
+ (*cq)->Shutdown();
}
- server_->Shutdown(std::chrono::system_clock::now() +
- std::chrono::seconds(3));
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
- (*cq)->Shutdown();
bool ok;
void *got_tag;
while ((*cq)->Next(&got_tag, &ok))
@@ -151,22 +153,24 @@ class AsyncQpsServerTest : public Server {
}
private:
- void ThreadFunc(int rank) {
+ void ThreadFunc(int thread_idx) {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
+ while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
- const bool still_going = ctx->RunNextState(ok);
- if (!shutdown_state_[rank]->shutdown()) {
- // this RPC context is done, so refresh it
- if (!still_going) {
- ctx->Reset();
- }
- } else {
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
return;
}
+ const bool still_going = ctx->RunNextState(ok);
+ // if this RPC context is done, refresh it
+ if (!still_going) {
+ ctx->Reset();
+ }
}
return;
}
@@ -334,24 +338,12 @@ class AsyncQpsServerTest : public Server {
ServiceType async_service_;
std::forward_list<ServerRpcContext *> contexts_;
- class PerThreadShutdownState {
- public:
- PerThreadShutdownState() : shutdown_(false) {}
-
- bool shutdown() const {
- std::lock_guard<std::mutex> lock(mutex_);
- return shutdown_;
- }
-
- void set_shutdown() {
- std::lock_guard<std::mutex> lock(mutex_);
- shutdown_ = true;
- }
-
- private:
- mutable std::mutex mutex_;
- bool shutdown_;
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
};
+
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};