aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/client.h57
-rw-r--r--test/cpp/qps/client_async.cc11
-rw-r--r--test/cpp/qps/client_sync.cc3
-rw-r--r--test/cpp/qps/driver.cc34
-rw-r--r--test/cpp/qps/report.cc6
-rw-r--r--test/cpp/qps/server_async.cc19
-rw-r--r--test/cpp/qps/server_sync.cc8
7 files changed, 114 insertions, 24 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 5fb87b2782..9983c8a7b0 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -36,6 +36,7 @@
#include <condition_variable>
#include <mutex>
+#include <unordered_map>
#include <vector>
#include <grpc++/channel.h>
@@ -114,19 +115,37 @@ class ClientRequestCreator<ByteBuffer> {
class HistogramEntry GRPC_FINAL {
public:
- HistogramEntry() : used_(false) {}
- bool used() const { return used_; }
+ HistogramEntry() : value_used_(false), status_used_(false) {}
+ bool value_used() const { return value_used_; }
double value() const { return value_; }
void set_value(double v) {
- used_ = true;
+ value_used_ = true;
value_ = v;
}
+ bool status_used() const { return status_used_; }
+ int status() const { return status_; }
+ void set_status(int status) {
+ status_used_ = true;
+ status_ = status;
+ }
private:
- bool used_;
+ bool value_used_;
double value_;
+ bool status_used_;
+ int status_;
};
+typedef std::unordered_map<int, int64_t> StatusHistogram;
+
+inline void MergeStatusHistogram(const StatusHistogram& from,
+ StatusHistogram* to) {
+ for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
+ ++it) {
+ (*to)[it->first] += it->second;
+ }
+}
+
class Client {
public:
Client()
@@ -139,6 +158,7 @@ class Client {
ClientStats Mark(bool reset) {
Histogram latencies;
+ StatusHistogram statuses;
UsageTimer::Result timer_result;
MaybeStartRequests();
@@ -146,27 +166,36 @@ class Client {
// avoid std::vector for old compilers that expect a copy constructor
if (reset) {
Histogram* to_merge = new Histogram[threads_.size()];
+ StatusHistogram* to_merge_status = new StatusHistogram[threads_.size()];
+
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->BeginSwap(&to_merge[i]);
+ threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
}
std::unique_ptr<UsageTimer> timer(new UsageTimer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->EndSwap();
latencies.Merge(to_merge[i]);
+ MergeStatusHistogram(to_merge_status[i], &statuses);
}
delete[] to_merge;
+ delete[] to_merge_status;
timer_result = timer->Mark();
} else {
// merge snapshots of each thread histogram
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->MergeStatsInto(&latencies);
+ threads_[i]->MergeStatsInto(&latencies, &statuses);
}
timer_result = timer_->Mark();
}
ClientStats stats;
latencies.FillProto(stats.mutable_latencies());
+ for (StatusHistogram::const_iterator it = statuses.begin();
+ it != statuses.end(); ++it) {
+ RequestResultCount* rrc = stats.add_request_results();
+ rrc->set_status_code(it->first);
+ rrc->set_count(it->second);
+ }
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
@@ -258,16 +287,16 @@ class Client {
~Thread() { impl_.join(); }
- void BeginSwap(Histogram* n) {
+ void BeginSwap(Histogram* n, StatusHistogram* s) {
std::lock_guard<std::mutex> g(mu_);
n->Swap(&histogram_);
+ s->swap(statuses_);
}
- void EndSwap() {}
-
- void MergeStatsInto(Histogram* hist) {
+ void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
std::unique_lock<std::mutex> g(mu_);
hist->Merge(histogram_);
+ MergeStatusHistogram(statuses_, s);
}
private:
@@ -288,9 +317,12 @@ class Client {
const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
// lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
- if (entry.used()) {
+ if (entry.value_used()) {
histogram_.Add(entry.value());
}
+ if (entry.status_used()) {
+ statuses_[entry.status()]++;
+ }
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
}
@@ -304,6 +336,7 @@ class Client {
std::mutex mu_;
Histogram histogram_;
+ StatusHistogram statuses_;
Client* client_;
const size_t idx_;
std::thread impl_;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 081114859c..4d36a6ba42 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -83,7 +83,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)>
start_req,
- std::function<void(grpc::Status, ResponseType*)> on_done)
+ std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
: context_(),
stub_(stub),
cq_(nullptr),
@@ -113,7 +113,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
return true;
case State::RESP_DONE:
entry->set_value((UsageTimer::Now() - start_) * 1e9);
- callback_(status_, &response_);
+ callback_(status_, &response_, entry);
next_state_ = State::INVALID;
return false;
default:
@@ -135,7 +135,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
ResponseType response_;
enum State { INVALID, READY, RESP_DONE };
State next_state_;
- std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_;
std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
@@ -290,7 +290,10 @@ class AsyncUnaryClient GRPC_FINAL
~AsyncUnaryClient() GRPC_OVERRIDE {}
private:
- static void CheckDone(grpc::Status s, SimpleResponse* response) {}
+ static void CheckDone(grpc::Status s, SimpleResponse* response,
+ HistogramEntry* entry) {
+ entry->set_status(s.error_code());
+ }
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 8062424a1f..f61e80d76b 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -130,7 +130,8 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
entry->set_value((UsageTimer::Now() - start) * 1e9);
- return s.ok();
+ entry->set_status(s.error_code());
+ return true;
}
};
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 7460bb526a..a440341ccf 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -45,6 +45,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
+#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/env.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/core/util/port.h"
@@ -131,7 +132,8 @@ static void postprocess_scenario_result(ScenarioResult* result) {
Histogram histogram;
histogram.MergeProto(result->latencies());
- auto qps = histogram.Count() / average(result->client_stats(), WallTime);
+ auto time_estimate = average(result->client_stats(), WallTime);
+ auto qps = histogram.Count() / time_estimate;
auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
result->mutable_summary()->set_qps(qps);
@@ -156,6 +158,23 @@ static void postprocess_scenario_result(ScenarioResult* result) {
result->mutable_summary()->set_server_user_time(server_user_time);
result->mutable_summary()->set_client_system_time(client_system_time);
result->mutable_summary()->set_client_user_time(client_user_time);
+
+ if (result->request_results_size() > 0) {
+ int64_t successes = 0;
+ int64_t failures = 0;
+ for (int i = 0; i < result->request_results_size(); i++) {
+ RequestResultCount rrc = result->request_results(i);
+ if (rrc.status_code() == 0) {
+ successes += rrc.count();
+ } else {
+ failures += rrc.count();
+ }
+ }
+ result->mutable_summary()->set_successful_requests_per_second(
+ successes / time_estimate);
+ result->mutable_summary()->set_failed_requests_per_second(failures /
+ time_estimate);
+ }
}
// Namespace for classes and functions used only in RunScenario
@@ -438,9 +457,12 @@ std::unique_ptr<ScenarioResult> RunScenario(
start,
gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
+ gpr_timer_set_enabled(0);
+
// Finish a run
std::unique_ptr<ScenarioResult> result(new ScenarioResult);
Histogram merged_latencies;
+ std::unordered_map<int, int64_t> merged_statuses;
gpr_log(GPR_INFO, "Finishing clients");
for (size_t i = 0; i < num_clients; i++) {
@@ -459,6 +481,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_log(GPR_INFO, "Received final status from client %zu", i);
const auto& stats = client_status.stats();
merged_latencies.MergeProto(stats.latencies());
+ for (int i = 0; i < stats.request_results_size(); i++) {
+ merged_statuses[stats.request_results(i).status_code()] +=
+ stats.request_results(i).count();
+ }
result->add_client_stats()->CopyFrom(stats);
// That final status should be the last message on the client stream
GPR_ASSERT(!client->stream->Read(&client_status));
@@ -478,6 +504,12 @@ std::unique_ptr<ScenarioResult> RunScenario(
delete[] clients;
merged_latencies.FillProto(result->mutable_latencies());
+ for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
+ it != merged_statuses.end(); ++it) {
+ RequestResultCount* rrc = result->add_request_results();
+ rrc->set_status_code(it->first);
+ rrc->set_count(it->second);
+ }
gpr_log(GPR_INFO, "Finishing servers");
for (size_t i = 0; i < num_servers; i++) {
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index 2ec7d8676c..41617e968a 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -73,6 +73,12 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) {
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
+ if (result.summary().failed_requests_per_second() > 0) {
+ gpr_log(GPR_INFO, "failed requests/second: %.1f",
+ result.summary().failed_requests_per_second());
+ gpr_log(GPR_INFO, "successful requests/second: %.1f",
+ result.summary().successful_requests_per_second());
+ }
}
void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 082b4bc72f..bc4c896d83 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -38,6 +38,7 @@
#include <thread>
#include <grpc++/generic/async_generic_service.h>
+#include <grpc++/resource_quota.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
@@ -57,7 +58,7 @@ namespace testing {
template <class RequestType, class ResponseType, class ServiceType,
class ServerContextType>
-class AsyncQpsServerTest : public Server {
+class AsyncQpsServerTest GRPC_FINAL : public grpc::testing::Server {
public:
AsyncQpsServerTest(
const ServerConfig &config,
@@ -95,6 +96,11 @@ class AsyncQpsServerTest : public Server {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
+ if (config.resource_quota_size() > 0) {
+ builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest")
+ .Resize(config.resource_quota_size()));
+ }
+
server_ = builder.BuildAndStart();
using namespace std::placeholders;
@@ -131,9 +137,7 @@ class AsyncQpsServerTest : public Server {
std::lock_guard<std::mutex> lock((*ss)->mutex);
(*ss)->shutdown = true;
}
- // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
- auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
- server_->Shutdown(deadline);
+ std::thread shutdown_thread(&AsyncQpsServerTest::ShutdownThreadFunc, this);
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
(*cq)->Shutdown();
}
@@ -146,9 +150,16 @@ class AsyncQpsServerTest : public Server {
while ((*cq)->Next(&got_tag, &ok))
;
}
+ shutdown_thread.join();
}
private:
+ void ShutdownThreadFunc() {
+ // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
+ auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
+ server_->Shutdown(deadline);
+ }
+
void ThreadFunc(int thread_idx) {
// Wait until work is available or we are shutting down
bool ok;
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index c774985bfa..07f48e2644 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -31,8 +31,7 @@
*
*/
-#include <thread>
-
+#include <grpc++/resource_quota.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
@@ -93,6 +92,11 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
Server::CreateServerCredentials(config));
gpr_free(server_address);
+ if (config.resource_quota_size() > 0) {
+ builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest")
+ .Resize(config.resource_quota_size()));
+ }
+
builder.RegisterService(&service_);
impl_ = builder.BuildAndStart();