diff options
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client.h | 57 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 11 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 7 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 31 | ||||
-rw-r--r-- | test/cpp/qps/report.cc | 6 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 6 | ||||
-rw-r--r-- | test/cpp/qps/server_sync.cc | 6 |
7 files changed, 102 insertions, 22 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 5fb87b2782..9983c8a7b0 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -36,6 +36,7 @@ #include <condition_variable> #include <mutex> +#include <unordered_map> #include <vector> #include <grpc++/channel.h> @@ -114,19 +115,37 @@ class ClientRequestCreator<ByteBuffer> { class HistogramEntry GRPC_FINAL { public: - HistogramEntry() : used_(false) {} - bool used() const { return used_; } + HistogramEntry() : value_used_(false), status_used_(false) {} + bool value_used() const { return value_used_; } double value() const { return value_; } void set_value(double v) { - used_ = true; + value_used_ = true; value_ = v; } + bool status_used() const { return status_used_; } + int status() const { return status_; } + void set_status(int status) { + status_used_ = true; + status_ = status; + } private: - bool used_; + bool value_used_; double value_; + bool status_used_; + int status_; }; +typedef std::unordered_map<int, int64_t> StatusHistogram; + +inline void MergeStatusHistogram(const StatusHistogram& from, + StatusHistogram* to) { + for (StatusHistogram::const_iterator it = from.begin(); it != from.end(); + ++it) { + (*to)[it->first] += it->second; + } +} + class Client { public: Client() @@ -139,6 +158,7 @@ class Client { ClientStats Mark(bool reset) { Histogram latencies; + StatusHistogram statuses; UsageTimer::Result timer_result; MaybeStartRequests(); @@ -146,27 +166,36 @@ class Client { // avoid std::vector for old compilers that expect a copy constructor if (reset) { Histogram* to_merge = new Histogram[threads_.size()]; + StatusHistogram* to_merge_status = new StatusHistogram[threads_.size()]; + for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->BeginSwap(&to_merge[i]); + threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]); } std::unique_ptr<UsageTimer> timer(new UsageTimer); timer_.swap(timer); for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->EndSwap(); latencies.Merge(to_merge[i]); + MergeStatusHistogram(to_merge_status[i], &statuses); } delete[] to_merge; + delete[] to_merge_status; timer_result = timer->Mark(); } else { // merge snapshots of each thread histogram for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->MergeStatsInto(&latencies); + threads_[i]->MergeStatsInto(&latencies, &statuses); } timer_result = timer_->Mark(); } ClientStats stats; latencies.FillProto(stats.mutable_latencies()); + for (StatusHistogram::const_iterator it = statuses.begin(); + it != statuses.end(); ++it) { + RequestResultCount* rrc = stats.add_request_results(); + rrc->set_status_code(it->first); + rrc->set_count(it->second); + } stats.set_time_elapsed(timer_result.wall); stats.set_time_system(timer_result.system); stats.set_time_user(timer_result.user); @@ -258,16 +287,16 @@ class Client { ~Thread() { impl_.join(); } - void BeginSwap(Histogram* n) { + void BeginSwap(Histogram* n, StatusHistogram* s) { std::lock_guard<std::mutex> g(mu_); n->Swap(&histogram_); + s->swap(statuses_); } - void EndSwap() {} - - void MergeStatsInto(Histogram* hist) { + void MergeStatsInto(Histogram* hist, StatusHistogram* s) { std::unique_lock<std::mutex> g(mu_); hist->Merge(histogram_); + MergeStatusHistogram(statuses_, s); } private: @@ -288,9 +317,12 @@ class Client { const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); // lock, update histogram if needed and see if we're done std::lock_guard<std::mutex> g(mu_); - if (entry.used()) { + if (entry.value_used()) { histogram_.Add(entry.value()); } + if (entry.status_used()) { + statuses_[entry.status()]++; + } if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); } @@ -304,6 +336,7 @@ class Client { std::mutex mu_; Histogram histogram_; + StatusHistogram statuses_; Client* client_; const size_t idx_; std::thread impl_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 081114859c..4d36a6ba42 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -83,7 +83,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> start_req, - std::function<void(grpc::Status, ResponseType*)> on_done) + std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done) : context_(), stub_(stub), cq_(nullptr), @@ -113,7 +113,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return true; case State::RESP_DONE: entry->set_value((UsageTimer::Now() - start_) * 1e9); - callback_(status_, &response_); + callback_(status_, &response_, entry); next_state_ = State::INVALID; return false; default: @@ -135,7 +135,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ResponseType response_; enum State { INVALID, READY, RESP_DONE }; State next_state_; - std::function<void(grpc::Status, ResponseType*)> callback_; + std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_; std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, @@ -290,7 +290,10 @@ class AsyncUnaryClient GRPC_FINAL ~AsyncUnaryClient() GRPC_OVERRIDE {} private: - static void CheckDone(grpc::Status s, SimpleResponse* response) {} + static void CheckDone(grpc::Status s, SimpleResponse* response, + HistogramEntry* entry) { + entry->set_status(s.error_code()); + } static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>> StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, const SimpleRequest& request, CompletionQueue* cq) { diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 0ccf4e270b..f61e80d76b 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -130,11 +130,8 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); entry->set_value((UsageTimer::Now() - start) * 1e9); - if (!s.ok()) { - gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), - s.error_message().c_str()); - } - return s.ok(); + entry->set_status(s.error_code()); + return true; } }; diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 6c675e3483..a440341ccf 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -132,7 +132,8 @@ static void postprocess_scenario_result(ScenarioResult* result) { Histogram histogram; histogram.MergeProto(result->latencies()); - auto qps = histogram.Count() / average(result->client_stats(), WallTime); + auto time_estimate = average(result->client_stats(), WallTime); + auto qps = histogram.Count() / time_estimate; auto qps_per_server_core = qps / sum(result->server_cores(), Cores); result->mutable_summary()->set_qps(qps); @@ -157,6 +158,23 @@ static void postprocess_scenario_result(ScenarioResult* result) { result->mutable_summary()->set_server_user_time(server_user_time); result->mutable_summary()->set_client_system_time(client_system_time); result->mutable_summary()->set_client_user_time(client_user_time); + + if (result->request_results_size() > 0) { + int64_t successes = 0; + int64_t failures = 0; + for (int i = 0; i < result->request_results_size(); i++) { + RequestResultCount rrc = result->request_results(i); + if (rrc.status_code() == 0) { + successes += rrc.count(); + } else { + failures += rrc.count(); + } + } + result->mutable_summary()->set_successful_requests_per_second( + successes / time_estimate); + result->mutable_summary()->set_failed_requests_per_second(failures / + time_estimate); + } } // Namespace for classes and functions used only in RunScenario @@ -444,6 +462,7 @@ std::unique_ptr<ScenarioResult> RunScenario( // Finish a run std::unique_ptr<ScenarioResult> result(new ScenarioResult); Histogram merged_latencies; + std::unordered_map<int, int64_t> merged_statuses; gpr_log(GPR_INFO, "Finishing clients"); for (size_t i = 0; i < num_clients; i++) { @@ -462,6 +481,10 @@ std::unique_ptr<ScenarioResult> RunScenario( gpr_log(GPR_INFO, "Received final status from client %zu", i); const auto& stats = client_status.stats(); merged_latencies.MergeProto(stats.latencies()); + for (int i = 0; i < stats.request_results_size(); i++) { + merged_statuses[stats.request_results(i).status_code()] += + stats.request_results(i).count(); + } result->add_client_stats()->CopyFrom(stats); // That final status should be the last message on the client stream GPR_ASSERT(!client->stream->Read(&client_status)); @@ -481,6 +504,12 @@ std::unique_ptr<ScenarioResult> RunScenario( delete[] clients; merged_latencies.FillProto(result->mutable_latencies()); + for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin(); + it != merged_statuses.end(); ++it) { + RequestResultCount* rrc = result->add_request_results(); + rrc->set_status_code(it->first); + rrc->set_count(it->second); + } gpr_log(GPR_INFO, "Finishing servers"); for (size_t i = 0; i < num_servers; i++) { diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index 2ec7d8676c..41617e968a 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -73,6 +73,12 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) { void GprLogReporter::ReportQPS(const ScenarioResult& result) { gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps()); + if (result.summary().failed_requests_per_second() > 0) { + gpr_log(GPR_INFO, "failed requests/second: %.1f", + result.summary().failed_requests_per_second()); + gpr_log(GPR_INFO, "successful requests/second: %.1f", + result.summary().successful_requests_per_second()); + } } void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 2fcc64819b..bc4c896d83 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -38,6 +38,7 @@ #include <thread> #include <grpc++/generic/async_generic_service.h> +#include <grpc++/resource_quota.h> #include <grpc++/security/server_credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> @@ -95,6 +96,11 @@ class AsyncQpsServerTest GRPC_FINAL : public grpc::testing::Server { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } + if (config.resource_quota_size() > 0) { + builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest") + .Resize(config.resource_quota_size())); + } + server_ = builder.BuildAndStart(); using namespace std::placeholders; diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 0caed0ab49..07f48e2644 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -31,6 +31,7 @@ * */ +#include <grpc++/resource_quota.h> #include <grpc++/security/server_credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> @@ -91,6 +92,11 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server { Server::CreateServerCredentials(config)); gpr_free(server_address); + if (config.resource_quota_size() > 0) { + builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest") + .Resize(config.resource_quota_size())); + } + builder.RegisterService(&service_); impl_ = builder.BuildAndStart(); |