diff options
author | 2017-12-04 12:14:18 -0800 | |
---|---|---|
committer | 2017-12-04 12:14:18 -0800 | |
commit | 9a434371e2f582675880239c1d0d5d85081761d8 (patch) | |
tree | 935e23c511274413add36f5c175a6c2bee8afe1d /test/cpp/qps | |
parent | 4ca35636fe3c5d1e936d4cc03d18efb4be2824b8 (diff) | |
parent | c3b1e55a3c6af7c39ed1a6d7dea3463ba6194449 (diff) |
Merge branch 'master' into rq-fix
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/BUILD | 2 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 212 | ||||
-rw-r--r-- | test/cpp/qps/histogram.h | 36 | ||||
-rw-r--r-- | test/cpp/qps/qps_interarrival_test.cc | 10 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 2 |
5 files changed, 153 insertions, 109 deletions
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD index 0d91d52f22..f1abb19e64 100644 --- a/test/cpp/qps/BUILD +++ b/test/cpp/qps/BUILD @@ -106,7 +106,7 @@ grpc_cc_library( "histogram.h", "stats.h", ], - deps = ["//:gpr"], + deps = ["//test/core/util:grpc_test_util"], ) grpc_cc_test( diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 9f20b148eb..82a3f0042d 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -60,21 +60,20 @@ class SynchronousClient SetupLoadTest(config, num_threads_); } - virtual ~SynchronousClient(){}; + virtual ~SynchronousClient() {} - virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool InitThreadFuncImpl(size_t thread_idx) = 0; virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; void ThreadFunc(size_t thread_idx, Thread* t) override { - InitThreadFuncImpl(thread_idx); + if (!InitThreadFuncImpl(thread_idx)) { + return; + } for (;;) { // run the loop body HistogramEntry entry; const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); t->UpdateHistogram(&entry); - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } if (!thread_still_ok || ThreadCompleted()) { return; } @@ -109,9 +108,6 @@ class SynchronousClient size_t num_threads_; std::vector<SimpleResponse> responses_; - - private: - void DestroyMultithreading() override final { EndThreads(); } }; class SynchronousUnaryClient final : public SynchronousClient { @@ -122,7 +118,7 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFuncImpl(size_t thread_idx) override {} + bool InitThreadFuncImpl(size_t thread_idx) override { return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { @@ -140,6 +136,9 @@ class SynchronousUnaryClient final : public SynchronousClient { entry->set_status(s.error_code()); return true; } + + private: + void DestroyMultithreading() override final { EndThreads(); } }; template <class StreamType> @@ -149,31 +148,30 @@ class SynchronousStreamingClient : public SynchronousClient { : SynchronousClient(config), context_(num_threads_), stream_(num_threads_), + stream_mu_(num_threads_), + shutdown_(num_threads_), messages_per_stream_(config.messages_per_stream()), messages_issued_(num_threads_) { StartThreads(num_threads_); } virtual ~SynchronousStreamingClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - // forcibly cancel the streams, then finish - context_[i].TryCancel(); - (*stream)->Finish().IgnoreError(); - // don't log any error message on !ok since this was canceled - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams([this](size_t thread_idx) { + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + }); } protected: std::vector<grpc::ClientContext> context_; std::vector<std::unique_ptr<StreamType>> stream_; + // stream_mu_ is only needed when changing an element of stream_ or context_ + std::vector<std::mutex> stream_mu_; + // use struct Bool rather than bool because vector<bool> is not concurrent + struct Bool { + bool val; + Bool() : val(false) {} + }; + std::vector<Bool> shutdown_; const int messages_per_stream_; std::vector<int> messages_issued_; @@ -182,27 +180,26 @@ class SynchronousStreamingClient : public SynchronousClient { // don't set the value since the stream is failed and shouldn't be timed entry->set_status(s.error_code()); if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, - s.error_message().c_str()); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", + thread_idx, s.error_message().c_str()); + } } + // Lock the stream_mu_ now because the client context could change + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); context_[thread_idx].~ClientContext(); new (&context_[thread_idx]) ClientContext(); } -}; -class SynchronousStreamingPingPongClient final - : public SynchronousStreamingClient< - grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { - public: - SynchronousStreamingPingPongClient(const ClientConfig& config) - : SynchronousStreamingClient(config) {} - ~SynchronousStreamingPingPongClient() { + void CleanupAllStreams(std::function<void(size_t)> cleaner) { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); + cleanup_threads.emplace_back([this, i, cleaner] { + std::lock_guard<std::mutex> l(stream_mu_[i]); + shutdown_[i].val = true; + if (stream_[i]) { + cleaner(i); } }); } @@ -211,10 +208,36 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + void DestroyMultithreading() override final { + CleanupAllStreams( + [this](size_t thread_idx) { context_[thread_idx].TryCancel(); }); + EndThreads(); + } +}; + +class SynchronousStreamingPingPongClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingPingPongClient(const ClientConfig& config) + : SynchronousStreamingClient(config) {} + ~SynchronousStreamingPingPongClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } + + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + return false; + } messages_issued_[thread_idx] = 0; + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -239,7 +262,13 @@ class SynchronousStreamingPingPongClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } messages_issued_[thread_idx] = 0; return true; } @@ -251,25 +280,24 @@ class SynchronousStreamingFromClientClient final SynchronousStreamingFromClientClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + std::vector<double> last_issue_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + return false; + } last_issue_[thread_idx] = UsageTimer::Now(); + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -287,13 +315,16 @@ class SynchronousStreamingFromClientClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_issue_; }; class SynchronousStreamingFromServerClient final @@ -301,12 +332,24 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFuncImpl(size_t thread_idx) override { + ~SynchronousStreamingFromServerClient() {} + + private: + std::vector<double> last_recv_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + return false; + } last_recv_[thread_idx] = UsageTimer::Now(); + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -317,13 +360,16 @@ class SynchronousStreamingFromServerClient final } FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_recv_; }; class SynchronousStreamingBothWaysClient final @@ -333,24 +379,22 @@ class SynchronousStreamingBothWaysClient final SynchronousStreamingBothWaysClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } else { + return false; + } + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h index e31d5d78a8..ba72b5b332 100644 --- a/test/cpp/qps/histogram.h +++ b/test/cpp/qps/histogram.h @@ -19,8 +19,8 @@ #ifndef TEST_QPS_HISTOGRAM_H #define TEST_QPS_HISTOGRAM_H -#include <grpc/support/histogram.h> #include "src/proto/grpc/testing/stats.pb.h" +#include "test/core/util/histogram.h" namespace grpc { namespace testing { @@ -29,36 +29,36 @@ class Histogram { public: // TODO: look into making histogram params not hardcoded for C++ Histogram() - : impl_(gpr_histogram_create(default_resolution(), - default_max_possible())) {} + : impl_(grpc_histogram_create(default_resolution(), + default_max_possible())) {} ~Histogram() { - if (impl_) gpr_histogram_destroy(impl_); + if (impl_) grpc_histogram_destroy(impl_); } Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; } - void Merge(const Histogram& h) { gpr_histogram_merge(impl_, h.impl_); } - void Add(double value) { gpr_histogram_add(impl_, value); } + void Merge(const Histogram& h) { grpc_histogram_merge(impl_, h.impl_); } + void Add(double value) { grpc_histogram_add(impl_, value); } double Percentile(double pctile) const { - return gpr_histogram_percentile(impl_, pctile); + return grpc_histogram_percentile(impl_, pctile); } - double Count() const { return gpr_histogram_count(impl_); } + double Count() const { return grpc_histogram_count(impl_); } void Swap(Histogram* other) { std::swap(impl_, other->impl_); } void FillProto(HistogramData* p) { size_t n; - const auto* data = gpr_histogram_get_contents(impl_, &n); + const auto* data = grpc_histogram_get_contents(impl_, &n); for (size_t i = 0; i < n; i++) { p->add_bucket(data[i]); } - p->set_min_seen(gpr_histogram_minimum(impl_)); - p->set_max_seen(gpr_histogram_maximum(impl_)); - p->set_sum(gpr_histogram_sum(impl_)); - p->set_sum_of_squares(gpr_histogram_sum_of_squares(impl_)); - p->set_count(gpr_histogram_count(impl_)); + p->set_min_seen(grpc_histogram_minimum(impl_)); + p->set_max_seen(grpc_histogram_maximum(impl_)); + p->set_sum(grpc_histogram_sum(impl_)); + p->set_sum_of_squares(grpc_histogram_sum_of_squares(impl_)); + p->set_count(grpc_histogram_count(impl_)); } void MergeProto(const HistogramData& p) { - gpr_histogram_merge_contents(impl_, &*p.bucket().begin(), p.bucket_size(), - p.min_seen(), p.max_seen(), p.sum(), - p.sum_of_squares(), p.count()); + grpc_histogram_merge_contents(impl_, &*p.bucket().begin(), p.bucket_size(), + p.min_seen(), p.max_seen(), p.sum(), + p.sum_of_squares(), p.count()); } static double default_resolution() { return 0.01; } @@ -68,7 +68,7 @@ class Histogram { Histogram(const Histogram&); Histogram& operator=(const Histogram&); - gpr_histogram* impl_; + grpc_histogram* impl_; }; } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc index 461bf624ce..625b7db426 100644 --- a/test/cpp/qps/qps_interarrival_test.cc +++ b/test/cpp/qps/qps_interarrival_test.cc @@ -20,7 +20,7 @@ #include <iostream> // Use the C histogram rather than C++ to avoid depending on proto -#include <grpc/support/histogram.h> +#include "test/core/util/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/util/test_config.h" @@ -31,21 +31,21 @@ using grpc::testing::RandomDistInterface; static void RunTest(RandomDistInterface&& r, int threads, std::string title) { InterarrivalTimer timer; timer.init(r, threads); - gpr_histogram* h(gpr_histogram_create(0.01, 60e9)); + grpc_histogram* h(grpc_histogram_create(0.01, 60e9)); for (int i = 0; i < 10000000; i++) { for (int j = 0; j < threads; j++) { - gpr_histogram_add(h, timer.next(j)); + grpc_histogram_add(h, timer.next(j)); } } std::cout << title << " Distribution" << std::endl; std::cout << "Value, Percentile" << std::endl; for (double pct = 0.0; pct < 100.0; pct += 1.0) { - std::cout << gpr_histogram_percentile(h, pct) << "," << pct << std::endl; + std::cout << grpc_histogram_percentile(h, pct) << "," << pct << std::endl; } - gpr_histogram_destroy(h); + grpc_histogram_destroy(h); } using grpc::testing::ExpDist; diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index c288b03ec5..4c9ab0ea43 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -32,12 +32,12 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/cpu.h> -#include <grpc/support/histogram.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include "src/proto/grpc/testing/services.pb.h" #include "test/core/util/grpc_profiler.h" +#include "test/core/util/histogram.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/server.h" #include "test/cpp/util/create_test_channel.h" |