aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2017-12-04 10:15:19 -0800
committerGravatar Vijay Pai <vpai@google.com>2017-12-04 10:15:19 -0800
commitc6587ca11b3b769957f670ab08bbbc1036116523 (patch)
tree05bbb1de0c5e1915b00af63f019e0afb497e3b21
parentf230ffd476fb095f0143a1b673320d93dfab8f6a (diff)
Reintroduce lambdas
-rw-r--r--test/cpp/qps/client_sync.cc60
1 files changed, 24 insertions, 36 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 69ba866233..82a3f0042d 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -154,7 +154,12 @@ class SynchronousStreamingClient : public SynchronousClient {
messages_issued_(num_threads_) {
StartThreads(num_threads_);
}
- virtual ~SynchronousStreamingClient() {}
+ virtual ~SynchronousStreamingClient() {
+ CleanupAllStreams([this](size_t thread_idx) {
+ // Don't log any kind of error since we may have canceled this
+ stream_[thread_idx]->Finish().IgnoreError();
+ });
+ }
protected:
std::vector<grpc::ClientContext> context_;
@@ -187,18 +192,14 @@ class SynchronousStreamingClient : public SynchronousClient {
new (&context_[thread_idx]) ClientContext();
}
- virtual void CleanStream(size_t thread_idx) {
- context_[thread_idx].TryCancel();
- }
-
- void CleanupAllStreams() {
+ void CleanupAllStreams(std::function<void(size_t)> cleaner) {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i] {
+ cleanup_threads.emplace_back([this, i, cleaner] {
std::lock_guard<std::mutex> l(stream_mu_[i]);
shutdown_[i].val = true;
if (stream_[i]) {
- CleanStream(i);
+ cleaner(i);
}
});
}
@@ -209,7 +210,8 @@ class SynchronousStreamingClient : public SynchronousClient {
private:
void DestroyMultithreading() override final {
- CleanupAllStreams();
+ CleanupAllStreams(
+ [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
EndThreads();
}
};
@@ -220,7 +222,10 @@ class SynchronousStreamingPingPongClient final
public:
SynchronousStreamingPingPongClient(const ClientConfig& config)
: SynchronousStreamingClient(config) {}
- ~SynchronousStreamingPingPongClient() { CleanupAllStreams(); }
+ ~SynchronousStreamingPingPongClient() {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
+ }
private:
bool InitThreadFuncImpl(size_t thread_idx) override {
@@ -267,12 +272,6 @@ class SynchronousStreamingPingPongClient final
messages_issued_[thread_idx] = 0;
return true;
}
-
- void CleanStream(size_t thread_idx) override {
- stream_[thread_idx]->WritesDone();
- // Don't log any kind of error since we may have canceled this
- stream_[thread_idx]->Finish().IgnoreError();
- }
};
class SynchronousStreamingFromClientClient final
@@ -280,7 +279,10 @@ class SynchronousStreamingFromClientClient final
public:
SynchronousStreamingFromClientClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_issue_(num_threads_) {}
- ~SynchronousStreamingFromClientClient() { CleanupAllStreams(); }
+ ~SynchronousStreamingFromClientClient() {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
+ }
private:
std::vector<double> last_issue_;
@@ -323,12 +325,6 @@ class SynchronousStreamingFromClientClient final
}
return true;
}
-
- void CleanStream(size_t thread_idx) override {
- stream_[thread_idx]->WritesDone();
- // Don't log any kind of error since we may have canceled this
- stream_[thread_idx]->Finish().IgnoreError();
- }
};
class SynchronousStreamingFromServerClient final
@@ -336,7 +332,7 @@ class SynchronousStreamingFromServerClient final
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {}
- ~SynchronousStreamingFromServerClient() { CleanupAllStreams(); }
+ ~SynchronousStreamingFromServerClient() {}
private:
std::vector<double> last_recv_;
@@ -374,11 +370,6 @@ class SynchronousStreamingFromServerClient final
}
return true;
}
-
- void CleanStream(size_t thread_idx) override {
- // Don't log any kind of error since we may have canceled this
- stream_[thread_idx]->Finish().IgnoreError();
- }
};
class SynchronousStreamingBothWaysClient final
@@ -387,7 +378,10 @@ class SynchronousStreamingBothWaysClient final
public:
SynchronousStreamingBothWaysClient(const ClientConfig& config)
: SynchronousStreamingClient(config) {}
- ~SynchronousStreamingBothWaysClient() { CleanupAllStreams(); }
+ ~SynchronousStreamingBothWaysClient() {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
+ }
private:
bool InitThreadFuncImpl(size_t thread_idx) override {
@@ -405,12 +399,6 @@ class SynchronousStreamingBothWaysClient final
// TODO (vjpai): Do this
return true;
}
-
- void CleanStream(size_t thread_idx) override {
- stream_[thread_idx]->WritesDone();
- // Don't log any kind of error since we may have canceled this
- stream_[thread_idx]->Finish().IgnoreError();
- }
};
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {