diff options
author | Vijay Pai <vpai@google.com> | 2017-12-04 09:56:28 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2017-12-04 09:56:28 -0800 |
commit | 083b9be1a214f2ff67581f186684313c8d6b75fe (patch) | |
tree | a23f2d7afb385802788f70d2fb24708c623a05db /test/cpp/qps | |
parent | c8dd4c513908568c68c2b30d49d0303bb764bf17 (diff) |
Make all-streams op about cleanup only and replace a lambda with a virtual
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/client_sync.cc | 89 |
1 files changed, 50 insertions, 39 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c61e621dc4..20aa5f5435 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -60,7 +60,7 @@ class SynchronousClient SetupLoadTest(config, num_threads_); } - virtual ~SynchronousClient(){}; + virtual ~SynchronousClient() {} virtual bool InitThreadFuncImpl(size_t thread_idx) = 0; virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; @@ -154,13 +154,7 @@ class SynchronousStreamingClient : public SynchronousClient { messages_issued_(num_threads_) { StartThreads(num_threads_); } - virtual ~SynchronousStreamingClient() { - OnAllStreams([](ClientContext* ctx, StreamType* s) -> bool { - // don't log any kind of error since we might have canceled it - s->Finish().IgnoreError(); - return true; - }); - } + virtual ~SynchronousStreamingClient() {} protected: std::vector<grpc::ClientContext> context_; @@ -192,13 +186,19 @@ class SynchronousStreamingClient : public SynchronousClient { context_[thread_idx].~ClientContext(); new (&context_[thread_idx]) ClientContext(); } - void OnAllStreams(std::function<bool(ClientContext*, StreamType*)> cleaner) { + + virtual void CleanStream(size_t thread_idx) { + context_[thread_idx].TryCancel(); + } + + void CleanupAllStreams() { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i, cleaner]() { + cleanup_threads.emplace_back([this, i] { std::lock_guard<std::mutex> l(stream_mu_[i]); + shutdown_[i].val = true; if (stream_[i]) { - shutdown_[i].val = cleaner(&context_[i], stream_[i].get()); + CleanStream(i); } }); } @@ -206,13 +206,9 @@ class SynchronousStreamingClient : public SynchronousClient { th.join(); } } - private: void DestroyMultithreading() override final { - OnAllStreams([](ClientContext* ctx, StreamType* s) -> bool { - ctx->TryCancel(); - return true; - }); + CleanupAllStreams(); EndThreads(); } }; @@ -224,14 +220,9 @@ class SynchronousStreamingPingPongClient final SynchronousStreamingPingPongClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} ~SynchronousStreamingPingPongClient() { - OnAllStreams( - [](ClientContext* ctx, - grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>* s) -> bool { - s->WritesDone(); - return true; - }); + CleanupAllStreams(); } - + private: bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); @@ -276,6 +267,12 @@ class SynchronousStreamingPingPongClient final messages_issued_[thread_idx] = 0; return true; } + + void CleanStream(size_t thread_idx) override { + stream_[thread_idx]->WritesDone(); + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + } }; class SynchronousStreamingFromClientClient final @@ -284,13 +281,12 @@ class SynchronousStreamingFromClientClient final SynchronousStreamingFromClientClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { - OnAllStreams( - [](ClientContext* ctx, grpc::ClientWriter<SimpleRequest>* s) -> bool { - s->WritesDone(); - return true; - }); + CleanupAllStreams(); } + private: + std::vector<double> last_issue_; + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); @@ -330,8 +326,11 @@ class SynchronousStreamingFromClientClient final return true; } - private: - std::vector<double> last_issue_; + void CleanStream(size_t thread_idx) override { + stream_[thread_idx]->WritesDone(); + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + } }; class SynchronousStreamingFromServerClient final @@ -339,6 +338,13 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} + ~SynchronousStreamingFromServerClient() { + CleanupAllStreams(); + } + + private: + std::vector<double> last_recv_; + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); @@ -351,6 +357,7 @@ class SynchronousStreamingFromServerClient final last_recv_[thread_idx] = UsageTimer::Now(); return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -372,8 +379,10 @@ class SynchronousStreamingFromServerClient final return true; } - private: - std::vector<double> last_recv_; + void CleanStream(size_t thread_idx) override { + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + } }; class SynchronousStreamingBothWaysClient final @@ -383,14 +392,9 @@ class SynchronousStreamingBothWaysClient final SynchronousStreamingBothWaysClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { - OnAllStreams( - [](ClientContext* ctx, - grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>* s) -> bool { - s->WritesDone(); - return true; - }); + CleanupAllStreams(); } - + private: bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); @@ -401,10 +405,17 @@ class SynchronousStreamingBothWaysClient final } return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; } + + void CleanStream(size_t thread_idx) override { + stream_[thread_idx]->WritesDone(); + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + } }; std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) { |