aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_sync.cc
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2017-12-04 09:56:28 -0800
committerGravatar Vijay Pai <vpai@google.com>2017-12-04 09:56:28 -0800
commit083b9be1a214f2ff67581f186684313c8d6b75fe (patch)
treea23f2d7afb385802788f70d2fb24708c623a05db /test/cpp/qps/client_sync.cc
parentc8dd4c513908568c68c2b30d49d0303bb764bf17 (diff)
Make all-streams op about cleanup only and replace a lambda with a virtual
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r--test/cpp/qps/client_sync.cc89
1 files changed, 50 insertions, 39 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index c61e621dc4..20aa5f5435 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -60,7 +60,7 @@ class SynchronousClient
SetupLoadTest(config, num_threads_);
}
- virtual ~SynchronousClient(){};
+ virtual ~SynchronousClient() {}
virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
@@ -154,13 +154,7 @@ class SynchronousStreamingClient : public SynchronousClient {
messages_issued_(num_threads_) {
StartThreads(num_threads_);
}
- virtual ~SynchronousStreamingClient() {
- OnAllStreams([](ClientContext* ctx, StreamType* s) -> bool {
- // don't log any kind of error since we might have canceled it
- s->Finish().IgnoreError();
- return true;
- });
- }
+ virtual ~SynchronousStreamingClient() {}
protected:
std::vector<grpc::ClientContext> context_;
@@ -192,13 +186,19 @@ class SynchronousStreamingClient : public SynchronousClient {
context_[thread_idx].~ClientContext();
new (&context_[thread_idx]) ClientContext();
}
- void OnAllStreams(std::function<bool(ClientContext*, StreamType*)> cleaner) {
+
+ virtual void CleanStream(size_t thread_idx) {
+ context_[thread_idx].TryCancel();
+ }
+
+ void CleanupAllStreams() {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i, cleaner]() {
+ cleanup_threads.emplace_back([this, i] {
std::lock_guard<std::mutex> l(stream_mu_[i]);
+ shutdown_[i].val = true;
if (stream_[i]) {
- shutdown_[i].val = cleaner(&context_[i], stream_[i].get());
+ CleanStream(i);
}
});
}
@@ -206,13 +206,9 @@ class SynchronousStreamingClient : public SynchronousClient {
th.join();
}
}
-
private:
void DestroyMultithreading() override final {
- OnAllStreams([](ClientContext* ctx, StreamType* s) -> bool {
- ctx->TryCancel();
- return true;
- });
+ CleanupAllStreams();
EndThreads();
}
};
@@ -224,14 +220,9 @@ class SynchronousStreamingPingPongClient final
SynchronousStreamingPingPongClient(const ClientConfig& config)
: SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient() {
- OnAllStreams(
- [](ClientContext* ctx,
- grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>* s) -> bool {
- s->WritesDone();
- return true;
- });
+ CleanupAllStreams();
}
-
+ private:
bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
@@ -276,6 +267,12 @@ class SynchronousStreamingPingPongClient final
messages_issued_[thread_idx] = 0;
return true;
}
+
+ void CleanStream(size_t thread_idx) override {
+ stream_[thread_idx]->WritesDone();
+ // Don't log any kind of error since we may have canceled this
+ stream_[thread_idx]->Finish().IgnoreError();
+ }
};
class SynchronousStreamingFromClientClient final
@@ -284,13 +281,12 @@ class SynchronousStreamingFromClientClient final
SynchronousStreamingFromClientClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient() {
- OnAllStreams(
- [](ClientContext* ctx, grpc::ClientWriter<SimpleRequest>* s) -> bool {
- s->WritesDone();
- return true;
- });
+ CleanupAllStreams();
}
+ private:
+ std::vector<double> last_issue_;
+
bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
@@ -330,8 +326,11 @@ class SynchronousStreamingFromClientClient final
return true;
}
- private:
- std::vector<double> last_issue_;
+ void CleanStream(size_t thread_idx) override {
+ stream_[thread_idx]->WritesDone();
+ // Don't log any kind of error since we may have canceled this
+ stream_[thread_idx]->Finish().IgnoreError();
+ }
};
class SynchronousStreamingFromServerClient final
@@ -339,6 +338,13 @@ class SynchronousStreamingFromServerClient final
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {}
+ ~SynchronousStreamingFromServerClient() {
+ CleanupAllStreams();
+ }
+
+ private:
+ std::vector<double> last_recv_;
+
bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
@@ -351,6 +357,7 @@ class SynchronousStreamingFromServerClient final
last_recv_[thread_idx] = UsageTimer::Now();
return true;
}
+
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
@@ -372,8 +379,10 @@ class SynchronousStreamingFromServerClient final
return true;
}
- private:
- std::vector<double> last_recv_;
+ void CleanStream(size_t thread_idx) override {
+ // Don't log any kind of error since we may have canceled this
+ stream_[thread_idx]->Finish().IgnoreError();
+ }
};
class SynchronousStreamingBothWaysClient final
@@ -383,14 +392,9 @@ class SynchronousStreamingBothWaysClient final
SynchronousStreamingBothWaysClient(const ClientConfig& config)
: SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient() {
- OnAllStreams(
- [](ClientContext* ctx,
- grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>* s) -> bool {
- s->WritesDone();
- return true;
- });
+ CleanupAllStreams();
}
-
+ private:
bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
@@ -401,10 +405,17 @@ class SynchronousStreamingBothWaysClient final
}
return true;
}
+
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;
}
+
+ void CleanStream(size_t thread_idx) override {
+ stream_[thread_idx]->WritesDone();
+ // Don't log any kind of error since we may have canceled this
+ stream_[thread_idx]->Finish().IgnoreError();
+ }
};
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {