aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_sync.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r--test/cpp/qps/client_sync.cc18
1 files changed, 15 insertions, 3 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index a944c45496..a020adde51 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -142,10 +142,13 @@ class SynchronousStreamingClient final : public SynchronousClient {
SynchronousStreamingClient(const ClientConfig& config)
: SynchronousClient(config),
context_(num_threads_),
- stream_(num_threads_) {
+ stream_(num_threads_),
+ messages_per_stream_(config.messages_per_stream()),
+ messages_issued_(num_threads_) {
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ messages_issued_[thread_idx] = 0;
}
StartThreads(num_threads_);
}
@@ -173,11 +176,17 @@ class SynchronousStreamingClient final : public SynchronousClient {
stream_[thread_idx]->Read(&responses_[thread_idx])) {
entry->set_value((UsageTimer::Now() - start) * 1e9);
// don't set the status since there isn't one yet
- return true;
+ if ((messages_per_stream_ != 0) &&
+ (++messages_issued_[thread_idx] < messages_per_stream_)) {
+ return true;
+ } else {
+ // Fall through to the below resetting code after finish
+ }
}
stream_[thread_idx]->WritesDone();
Status s = stream_[thread_idx]->Finish();
- // don't set the value since the stream is failed and shouldn't be timed
+ // don't set the value since this is either a failure (shouldn't be timed)
+ // or a stream-end (already has been timed)
entry->set_status(s.error_code());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx,
@@ -187,6 +196,7 @@ class SynchronousStreamingClient final : public SynchronousClient {
context_[thread_idx].~ClientContext();
new (&context_[thread_idx]) ClientContext();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ messages_issued_[thread_idx] = 0;
return true;
}
@@ -197,6 +207,8 @@ class SynchronousStreamingClient final : public SynchronousClient {
std::vector<
std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>>
stream_;
+ const int messages_per_stream_;
+ std::vector<int> messages_issued_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(