diff options
author | Vijay Pai <vpai@google.com> | 2017-12-04 10:15:19 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2017-12-04 10:15:19 -0800 |
commit | c6587ca11b3b769957f670ab08bbbc1036116523 (patch) | |
tree | 05bbb1de0c5e1915b00af63f019e0afb497e3b21 | |
parent | f230ffd476fb095f0143a1b673320d93dfab8f6a (diff) |
Reintroduce lambdas
-rw-r--r-- | test/cpp/qps/client_sync.cc | 60 |
1 files changed, 24 insertions, 36 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 69ba866233..82a3f0042d 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -154,7 +154,12 @@ class SynchronousStreamingClient : public SynchronousClient { messages_issued_(num_threads_) { StartThreads(num_threads_); } - virtual ~SynchronousStreamingClient() {} + virtual ~SynchronousStreamingClient() { + CleanupAllStreams([this](size_t thread_idx) { + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + }); + } protected: std::vector<grpc::ClientContext> context_; @@ -187,18 +192,14 @@ class SynchronousStreamingClient : public SynchronousClient { new (&context_[thread_idx]) ClientContext(); } - virtual void CleanStream(size_t thread_idx) { - context_[thread_idx].TryCancel(); - } - - void CleanupAllStreams() { + void CleanupAllStreams(std::function<void(size_t)> cleaner) { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i] { + cleanup_threads.emplace_back([this, i, cleaner] { std::lock_guard<std::mutex> l(stream_mu_[i]); shutdown_[i].val = true; if (stream_[i]) { - CleanStream(i); + cleaner(i); } }); } @@ -209,7 +210,8 @@ class SynchronousStreamingClient : public SynchronousClient { private: void DestroyMultithreading() override final { - CleanupAllStreams(); + CleanupAllStreams( + [this](size_t thread_idx) { context_[thread_idx].TryCancel(); }); EndThreads(); } }; @@ -220,7 +222,10 @@ class SynchronousStreamingPingPongClient final public: SynchronousStreamingPingPongClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} - ~SynchronousStreamingPingPongClient() { CleanupAllStreams(); } + ~SynchronousStreamingPingPongClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } private: bool InitThreadFuncImpl(size_t thread_idx) override { @@ -267,12 +272,6 @@ class SynchronousStreamingPingPongClient final messages_issued_[thread_idx] = 0; return true; } - - void CleanStream(size_t thread_idx) override { - stream_[thread_idx]->WritesDone(); - // Don't log any kind of error since we may have canceled this - stream_[thread_idx]->Finish().IgnoreError(); - } }; class SynchronousStreamingFromClientClient final @@ -280,7 +279,10 @@ class SynchronousStreamingFromClientClient final public: SynchronousStreamingFromClientClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_issue_(num_threads_) {} - ~SynchronousStreamingFromClientClient() { CleanupAllStreams(); } + ~SynchronousStreamingFromClientClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } private: std::vector<double> last_issue_; @@ -323,12 +325,6 @@ class SynchronousStreamingFromClientClient final } return true; } - - void CleanStream(size_t thread_idx) override { - stream_[thread_idx]->WritesDone(); - // Don't log any kind of error since we may have canceled this - stream_[thread_idx]->Finish().IgnoreError(); - } }; class SynchronousStreamingFromServerClient final @@ -336,7 +332,7 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - ~SynchronousStreamingFromServerClient() { CleanupAllStreams(); } + ~SynchronousStreamingFromServerClient() {} private: std::vector<double> last_recv_; @@ -374,11 +370,6 @@ class SynchronousStreamingFromServerClient final } return true; } - - void CleanStream(size_t thread_idx) override { - // Don't log any kind of error since we may have canceled this - stream_[thread_idx]->Finish().IgnoreError(); - } }; class SynchronousStreamingBothWaysClient final @@ -387,7 +378,10 @@ class SynchronousStreamingBothWaysClient final public: SynchronousStreamingBothWaysClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} - ~SynchronousStreamingBothWaysClient() { CleanupAllStreams(); } + ~SynchronousStreamingBothWaysClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } private: bool InitThreadFuncImpl(size_t thread_idx) override { @@ -405,12 +399,6 @@ class SynchronousStreamingBothWaysClient final // TODO (vjpai): Do this return true; } - - void CleanStream(size_t thread_idx) override { - stream_[thread_idx]->WritesDone(); - // Don't log any kind of error since we may have canceled this - stream_[thread_idx]->Finish().IgnoreError(); - } }; std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) { |