aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_sync.cc
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-10-09 13:44:10 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-10-09 13:44:10 -0700
commitc18ad11837f44b3eb2de788306a142b454873d09 (patch)
treed7f78d342897b7666b87fbde5a8c091bffe38c00 /test/cpp/qps/client_sync.cc
parentf6cd77c48d6d3a9579e8d27b8c140d6d674060f3 (diff)
Wait until all clients connected before starting streams
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r--test/cpp/qps/client_sync.cc56
1 files changed, 28 insertions, 28 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 5d212f1acc..94554a46b2 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -103,6 +103,8 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
~SynchronousUnaryClient() {}
+ void InitThreadFunc(size_t thread_idx) override {}
+
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
@@ -174,13 +176,7 @@ class SynchronousStreamingPingPongClient final
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
public:
SynchronousStreamingPingPongClient(const ClientConfig& config)
- : SynchronousStreamingClient(config) {
- for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
- auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
- messages_issued_[thread_idx] = 0;
- }
- }
+ : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient() {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
@@ -196,6 +192,12 @@ class SynchronousStreamingPingPongClient final
}
}
+ void InitThreadFunc(size_t thread_idx) override {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ messages_issued_[thread_idx] = 0;
+ }
+
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
@@ -228,14 +230,7 @@ class SynchronousStreamingFromClientClient final
: public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
public:
SynchronousStreamingFromClientClient(const ClientConfig& config)
- : SynchronousStreamingClient(config), last_issue_(num_threads_) {
- for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
- auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
- &responses_[thread_idx]);
- last_issue_[thread_idx] = UsageTimer::Now();
- }
- }
+ : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient() {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
@@ -251,6 +246,13 @@ class SynchronousStreamingFromClientClient final
}
}
+ void InitThreadFunc(size_t thread_idx) override {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ last_issue_[thread_idx] = UsageTimer::Now();
+ }
+
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
// Figure out how to make histogram sensible if this is rate-paced
if (!WaitToIssue(thread_idx)) {
@@ -279,13 +281,12 @@ class SynchronousStreamingFromServerClient final
: public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
- : SynchronousStreamingClient(config), last_recv_(num_threads_) {
- for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
- auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] =
- stub->StreamingFromServer(&context_[thread_idx], request_);
- last_recv_[thread_idx] = UsageTimer::Now();
- }
+ : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
+ void InitThreadFunc(size_t thread_idx) override {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ last_recv_[thread_idx] = UsageTimer::Now();
}
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
@@ -311,12 +312,7 @@ class SynchronousStreamingBothWaysClient final
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
public:
SynchronousStreamingBothWaysClient(const ClientConfig& config)
- : SynchronousStreamingClient(config) {
- for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
- auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
- }
- }
+ : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient() {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
@@ -332,6 +328,10 @@ class SynchronousStreamingBothWaysClient final
}
}
+ void InitThreadFunc(size_t thread_idx) override {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ }
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;