aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/cpp/qps/client_async.cc1
-rw-r--r--test/cpp/qps/client_sync.cc12
-rw-r--r--test/cpp/qps/driver.cc131
-rw-r--r--test/cpp/qps/qps_worker.cc35
4 files changed, 122 insertions, 57 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 1507d1e3d6..2f987fc80d 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -31,7 +31,6 @@
*
*/
-#include <cassert>
#include <forward_list>
#include <functional>
#include <list>
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index c88e95b80e..686c8d750c 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -31,7 +31,6 @@
*
*/
-#include <cassert>
#include <chrono>
#include <memory>
#include <mutex>
@@ -128,11 +127,16 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
}
~SynchronousStreamingClient() {
EndThreads();
- for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
- stream++) {
+ for (size_t i = 0; i < num_threads_; i++) {
+ auto stream = &stream_[i];
if (*stream) {
(*stream)->WritesDone();
- EXPECT_TRUE((*stream)->Finish().ok());
+ Status s = (*stream)->Finish();
+ EXPECT_TRUE(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
+ s.error_message().c_str());
+ }
}
}
delete[] stream_;
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 08bf045883..ba38de76f3 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -87,7 +87,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
- assert(s.ok());
+ GPR_ASSERT(s.ok());
std::deque<int> dq;
for (int i = 0; i < cores.cores(); i++) {
dq.push_back(i);
@@ -289,9 +289,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts));
- GPR_ASSERT(servers[i].stream->Write(args));
+ if (!servers[i].stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
+ }
ServerStatus init_status;
- GPR_ASSERT(servers[i].stream->Read(&init_status));
+ if (!servers[i].stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
+ }
gpr_join_host_port(&cli_target, host, init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(host);
@@ -344,10 +348,14 @@ std::unique_ptr<ScenarioResult> RunScenario(
ClientArgs args;
*args.mutable_setup() = per_client_config;
clients[i].stream =
- clients[i].stub->RunClient(runsc::AllocContext(&contexts));
- GPR_ASSERT(clients[i].stream->Write(args));
+ clients[i].stub->RunClient(runsc::AllocContext(&contexts));
+ if (!clients[i].stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
+ }
ClientStatus init_status;
- GPR_ASSERT(clients[i].stream->Read(&init_status));
+ if (!clients[i].stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
+ }
}
// Let everything warmup
@@ -362,19 +370,31 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
client_mark.mutable_mark()->set_reset(true);
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Write(server_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Write(client_mark));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
}
ServerStatus server_status;
ClientStatus client_status;
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Read(&server_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Read(&client_status));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Read(&client_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
+ }
}
// Wait some time
@@ -390,37 +410,71 @@ std::unique_ptr<ScenarioResult> RunScenario(
Histogram merged_latencies;
gpr_log(GPR_INFO, "Finishing clients");
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Write(client_mark));
- GPR_ASSERT(client->stream->WritesDone());
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
+ if (!client->stream->WritesDone()) {
+ gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Read(&client_status));
- const auto& stats = client_status.stats();
- merged_latencies.MergeProto(stats.latencies());
- result->add_client_stats()->CopyFrom(stats);
- GPR_ASSERT(!client->stream->Read(&client_status));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ // Read the client final status
+ if (client->stream->Read(&client_status)) {
+ gpr_log(GPR_INFO, "Received final status from client %zu", i);
+ const auto& stats = client_status.stats();
+ merged_latencies.MergeProto(stats.latencies());
+ result->add_client_stats()->CopyFrom(stats);
+ // That final status should be the last message on the client stream
+ GPR_ASSERT(!client->stream->Read(&client_status));
+ } else {
+ gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Finish().ok());
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ Status s = client->stream->Finish();
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
+ s.error_message().c_str());
+ }
}
delete[] clients;
merged_latencies.FillProto(result->mutable_latencies());
gpr_log(GPR_INFO, "Finishing servers");
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
- GPR_ASSERT(server->stream->WritesDone());
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Write(server_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+ }
+ if (!server->stream->WritesDone()) {
+ gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
+ }
}
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
- result->add_server_stats()->CopyFrom(server_status.stats());
- result->add_server_cores(server_status.cores());
- GPR_ASSERT(!server->stream->Read(&server_status));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ // Read the server final status
+ if (server->stream->Read(&server_status)) {
+ gpr_log(GPR_INFO, "Received final status from server %zu", i);
+ result->add_server_stats()->CopyFrom(server_status.stats());
+ result->add_server_cores(server_status.cores());
+ // That final status should be the last message on the server stream
+ GPR_ASSERT(!server->stream->Read(&server_status));
+ } else {
+ gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
+ }
}
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Finish().ok());
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ Status s = server->stream->Finish();
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
+ s.error_message().c_str());
+ }
}
delete[] servers;
@@ -438,7 +492,12 @@ void RunQuit() {
Void dummy;
grpc::ClientContext ctx;
ctx.set_fail_fast(false);
- GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
+ Status s = stub->QuitWorker(&ctx, dummy, &dummy);
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s",
+ i, s.error_message().c_str());
+ GPR_ASSERT(false);
+ }
}
}
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index f514e23e85..8456fde0ed 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -33,7 +33,6 @@
#include "test/cpp/qps/qps_worker.h"
-#include <cassert>
#include <memory>
#include <mutex>
#include <sstream>
@@ -124,7 +123,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
}
ScopedProfile profile("qps_client.prof", false);
@@ -137,7 +136,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
}
ScopedProfile profile("qps_server.prof", false);
@@ -154,7 +153,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
}
worker_->MarkDone();
@@ -197,30 +196,32 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
}
if (!args.has_setup()) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
}
gpr_log(GPR_INFO, "RunClientBody: about to create client");
auto client = CreateClient(args.setup());
if (!client) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
}
gpr_log(GPR_INFO, "RunClientBody: client created");
ClientStatus status;
if (!stream->Write(status)) {
- return Status(StatusCode::UNKNOWN, "");
+ return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
}
gpr_log(GPR_INFO, "RunClientBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunClientBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = client->Mark(args.mark().reset());
- stream->Write(status);
+ if (!stream->Write(status)) {
+ return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
+ }
gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
@@ -232,10 +233,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
ServerArgs args;
if (!stream->Read(&args)) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
}
if (!args.has_setup()) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
}
if (server_port_ != 0) {
args.mutable_setup()->set_port(server_port_);
@@ -243,24 +244,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
gpr_log(GPR_INFO, "RunServerBody: about to create server");
auto server = CreateServer(args.setup());
if (!server) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
}
gpr_log(GPR_INFO, "RunServerBody: server created");
ServerStatus status;
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
- return Status(StatusCode::UNKNOWN, "");
+ return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
}
gpr_log(GPR_INFO, "RunServerBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunServerBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = server->Mark(args.mark().reset());
- stream->Write(status);
+ if (!stream->Write(status)) {
+ return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
+ }
gpr_log(GPR_INFO, "RunServerBody: Mark response given");
}