aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-12-04 12:14:18 -0800
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-12-04 12:14:18 -0800
commit9a434371e2f582675880239c1d0d5d85081761d8 (patch)
tree935e23c511274413add36f5c175a6c2bee8afe1d /test/cpp/qps
parent4ca35636fe3c5d1e936d4cc03d18efb4be2824b8 (diff)
parentc3b1e55a3c6af7c39ed1a6d7dea3463ba6194449 (diff)
Merge branch 'master' into rq-fix
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/BUILD2
-rw-r--r--test/cpp/qps/client_sync.cc212
-rw-r--r--test/cpp/qps/histogram.h36
-rw-r--r--test/cpp/qps/qps_interarrival_test.cc10
-rw-r--r--test/cpp/qps/qps_worker.cc2
5 files changed, 153 insertions, 109 deletions
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD
index 0d91d52f22..f1abb19e64 100644
--- a/test/cpp/qps/BUILD
+++ b/test/cpp/qps/BUILD
@@ -106,7 +106,7 @@ grpc_cc_library(
"histogram.h",
"stats.h",
],
- deps = ["//:gpr"],
+ deps = ["//test/core/util:grpc_test_util"],
)
grpc_cc_test(
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 9f20b148eb..82a3f0042d 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -60,21 +60,20 @@ class SynchronousClient
SetupLoadTest(config, num_threads_);
}
- virtual ~SynchronousClient(){};
+ virtual ~SynchronousClient() {}
- virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
+ virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
void ThreadFunc(size_t thread_idx, Thread* t) override {
- InitThreadFuncImpl(thread_idx);
+ if (!InitThreadFuncImpl(thread_idx)) {
+ return;
+ }
for (;;) {
// run the loop body
HistogramEntry entry;
const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
t->UpdateHistogram(&entry);
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- }
if (!thread_still_ok || ThreadCompleted()) {
return;
}
@@ -109,9 +108,6 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
-
- private:
- void DestroyMultithreading() override final { EndThreads(); }
};
class SynchronousUnaryClient final : public SynchronousClient {
@@ -122,7 +118,7 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
~SynchronousUnaryClient() {}
- void InitThreadFuncImpl(size_t thread_idx) override {}
+ bool InitThreadFuncImpl(size_t thread_idx) override { return true; }
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
@@ -140,6 +136,9 @@ class SynchronousUnaryClient final : public SynchronousClient {
entry->set_status(s.error_code());
return true;
}
+
+ private:
+ void DestroyMultithreading() override final { EndThreads(); }
};
template <class StreamType>
@@ -149,31 +148,30 @@ class SynchronousStreamingClient : public SynchronousClient {
: SynchronousClient(config),
context_(num_threads_),
stream_(num_threads_),
+ stream_mu_(num_threads_),
+ shutdown_(num_threads_),
messages_per_stream_(config.messages_per_stream()),
messages_issued_(num_threads_) {
StartThreads(num_threads_);
}
virtual ~SynchronousStreamingClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- // forcibly cancel the streams, then finish
- context_[i].TryCancel();
- (*stream)->Finish().IgnoreError();
- // don't log any error message on !ok since this was canceled
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams([this](size_t thread_idx) {
+ // Don't log any kind of error since we may have canceled this
+ stream_[thread_idx]->Finish().IgnoreError();
+ });
}
protected:
std::vector<grpc::ClientContext> context_;
std::vector<std::unique_ptr<StreamType>> stream_;
+ // stream_mu_ is only needed when changing an element of stream_ or context_
+ std::vector<std::mutex> stream_mu_;
+ // use struct Bool rather than bool because vector<bool> is not concurrent
+ struct Bool {
+ bool val;
+ Bool() : val(false) {}
+ };
+ std::vector<Bool> shutdown_;
const int messages_per_stream_;
std::vector<int> messages_issued_;
@@ -182,27 +180,26 @@ class SynchronousStreamingClient : public SynchronousClient {
// don't set the value since the stream is failed and shouldn't be timed
entry->set_status(s.error_code());
if (!s.ok()) {
- gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx,
- s.error_message().c_str());
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
+ thread_idx, s.error_message().c_str());
+ }
}
+ // Lock the stream_mu_ now because the client context could change
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
context_[thread_idx].~ClientContext();
new (&context_[thread_idx]) ClientContext();
}
-};
-class SynchronousStreamingPingPongClient final
- : public SynchronousStreamingClient<
- grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
- public:
- SynchronousStreamingPingPongClient(const ClientConfig& config)
- : SynchronousStreamingClient(config) {}
- ~SynchronousStreamingPingPongClient() {
+ void CleanupAllStreams(std::function<void(size_t)> cleaner) {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
+ cleanup_threads.emplace_back([this, i, cleaner] {
+ std::lock_guard<std::mutex> l(stream_mu_[i]);
+ shutdown_[i].val = true;
+ if (stream_[i]) {
+ cleaner(i);
}
});
}
@@ -211,10 +208,36 @@ class SynchronousStreamingPingPongClient final
}
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ void DestroyMultithreading() override final {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
+ EndThreads();
+ }
+};
+
+class SynchronousStreamingPingPongClient final
+ : public SynchronousStreamingClient<
+ grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
+ public:
+ SynchronousStreamingPingPongClient(const ClientConfig& config)
+ : SynchronousStreamingClient(config) {}
+ ~SynchronousStreamingPingPongClient() {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
+ }
+
+ private:
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ } else {
+ return false;
+ }
messages_issued_[thread_idx] = 0;
+ return true;
}
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
@@ -239,7 +262,13 @@ class SynchronousStreamingPingPongClient final
stream_[thread_idx]->WritesDone();
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
messages_issued_[thread_idx] = 0;
return true;
}
@@ -251,25 +280,24 @@ class SynchronousStreamingFromClientClient final
SynchronousStreamingFromClientClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ std::vector<double> last_issue_;
+
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
- &responses_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ } else {
+ return false;
+ }
last_issue_[thread_idx] = UsageTimer::Now();
+ return true;
}
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
@@ -287,13 +315,16 @@ class SynchronousStreamingFromClientClient final
stream_[thread_idx]->WritesDone();
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
- &responses_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
return true;
}
-
- private:
- std::vector<double> last_issue_;
};
class SynchronousStreamingFromServerClient final
@@ -301,12 +332,24 @@ class SynchronousStreamingFromServerClient final
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ ~SynchronousStreamingFromServerClient() {}
+
+ private:
+ std::vector<double> last_recv_;
+
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] =
- stub->StreamingFromServer(&context_[thread_idx], request_);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ } else {
+ return false;
+ }
last_recv_[thread_idx] = UsageTimer::Now();
+ return true;
}
+
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
@@ -317,13 +360,16 @@ class SynchronousStreamingFromServerClient final
}
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] =
- stub->StreamingFromServer(&context_[thread_idx], request_);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
return true;
}
-
- private:
- std::vector<double> last_recv_;
};
class SynchronousStreamingBothWaysClient final
@@ -333,24 +379,22 @@ class SynchronousStreamingBothWaysClient final
SynchronousStreamingBothWaysClient(const ClientConfig& config)
: SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ } else {
+ return false;
+ }
+ return true;
}
+
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;
diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h
index e31d5d78a8..ba72b5b332 100644
--- a/test/cpp/qps/histogram.h
+++ b/test/cpp/qps/histogram.h
@@ -19,8 +19,8 @@
#ifndef TEST_QPS_HISTOGRAM_H
#define TEST_QPS_HISTOGRAM_H
-#include <grpc/support/histogram.h>
#include "src/proto/grpc/testing/stats.pb.h"
+#include "test/core/util/histogram.h"
namespace grpc {
namespace testing {
@@ -29,36 +29,36 @@ class Histogram {
public:
// TODO: look into making histogram params not hardcoded for C++
Histogram()
- : impl_(gpr_histogram_create(default_resolution(),
- default_max_possible())) {}
+ : impl_(grpc_histogram_create(default_resolution(),
+ default_max_possible())) {}
~Histogram() {
- if (impl_) gpr_histogram_destroy(impl_);
+ if (impl_) grpc_histogram_destroy(impl_);
}
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
- void Merge(const Histogram& h) { gpr_histogram_merge(impl_, h.impl_); }
- void Add(double value) { gpr_histogram_add(impl_, value); }
+ void Merge(const Histogram& h) { grpc_histogram_merge(impl_, h.impl_); }
+ void Add(double value) { grpc_histogram_add(impl_, value); }
double Percentile(double pctile) const {
- return gpr_histogram_percentile(impl_, pctile);
+ return grpc_histogram_percentile(impl_, pctile);
}
- double Count() const { return gpr_histogram_count(impl_); }
+ double Count() const { return grpc_histogram_count(impl_); }
void Swap(Histogram* other) { std::swap(impl_, other->impl_); }
void FillProto(HistogramData* p) {
size_t n;
- const auto* data = gpr_histogram_get_contents(impl_, &n);
+ const auto* data = grpc_histogram_get_contents(impl_, &n);
for (size_t i = 0; i < n; i++) {
p->add_bucket(data[i]);
}
- p->set_min_seen(gpr_histogram_minimum(impl_));
- p->set_max_seen(gpr_histogram_maximum(impl_));
- p->set_sum(gpr_histogram_sum(impl_));
- p->set_sum_of_squares(gpr_histogram_sum_of_squares(impl_));
- p->set_count(gpr_histogram_count(impl_));
+ p->set_min_seen(grpc_histogram_minimum(impl_));
+ p->set_max_seen(grpc_histogram_maximum(impl_));
+ p->set_sum(grpc_histogram_sum(impl_));
+ p->set_sum_of_squares(grpc_histogram_sum_of_squares(impl_));
+ p->set_count(grpc_histogram_count(impl_));
}
void MergeProto(const HistogramData& p) {
- gpr_histogram_merge_contents(impl_, &*p.bucket().begin(), p.bucket_size(),
- p.min_seen(), p.max_seen(), p.sum(),
- p.sum_of_squares(), p.count());
+ grpc_histogram_merge_contents(impl_, &*p.bucket().begin(), p.bucket_size(),
+ p.min_seen(), p.max_seen(), p.sum(),
+ p.sum_of_squares(), p.count());
}
static double default_resolution() { return 0.01; }
@@ -68,7 +68,7 @@ class Histogram {
Histogram(const Histogram&);
Histogram& operator=(const Histogram&);
- gpr_histogram* impl_;
+ grpc_histogram* impl_;
};
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc
index 461bf624ce..625b7db426 100644
--- a/test/cpp/qps/qps_interarrival_test.cc
+++ b/test/cpp/qps/qps_interarrival_test.cc
@@ -20,7 +20,7 @@
#include <iostream>
// Use the C histogram rather than C++ to avoid depending on proto
-#include <grpc/support/histogram.h>
+#include "test/core/util/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/util/test_config.h"
@@ -31,21 +31,21 @@ using grpc::testing::RandomDistInterface;
static void RunTest(RandomDistInterface&& r, int threads, std::string title) {
InterarrivalTimer timer;
timer.init(r, threads);
- gpr_histogram* h(gpr_histogram_create(0.01, 60e9));
+ grpc_histogram* h(grpc_histogram_create(0.01, 60e9));
for (int i = 0; i < 10000000; i++) {
for (int j = 0; j < threads; j++) {
- gpr_histogram_add(h, timer.next(j));
+ grpc_histogram_add(h, timer.next(j));
}
}
std::cout << title << " Distribution" << std::endl;
std::cout << "Value, Percentile" << std::endl;
for (double pct = 0.0; pct < 100.0; pct += 1.0) {
- std::cout << gpr_histogram_percentile(h, pct) << "," << pct << std::endl;
+ std::cout << grpc_histogram_percentile(h, pct) << "," << pct << std::endl;
}
- gpr_histogram_destroy(h);
+ grpc_histogram_destroy(h);
}
using grpc::testing::ExpDist;
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index c288b03ec5..4c9ab0ea43 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -32,12 +32,12 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
-#include <grpc/support/histogram.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include "src/proto/grpc/testing/services.pb.h"
#include "test/core/util/grpc_profiler.h"
+#include "test/core/util/histogram.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/util/create_test_channel.h"