aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_sync.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client_sync.cc')
-rw-r--r--test/cpp/qps/client_sync.cc42
1 files changed, 31 insertions, 11 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index a944c45496..f8ce2cccbe 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -142,24 +142,33 @@ class SynchronousStreamingClient final : public SynchronousClient {
SynchronousStreamingClient(const ClientConfig& config)
: SynchronousClient(config),
context_(num_threads_),
- stream_(num_threads_) {
+ stream_(num_threads_),
+ messages_per_stream_(config.messages_per_stream()),
+ messages_issued_(num_threads_) {
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ messages_issued_[thread_idx] = 0;
}
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
+ std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
- Status s = (*stream)->Finish();
- if (!s.ok()) {
- gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i,
- s.error_message().c_str());
+ cleanup_threads.emplace_back([this, i]() {
+ auto stream = &stream_[i];
+ if (*stream) {
+ (*stream)->WritesDone();
+ Status s = (*stream)->Finish();
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i,
+ s.error_message().c_str());
+ }
}
- }
+ });
+ }
+ for (size_t i = 0; i < num_threads_; i++) {
+ cleanup_threads[i].join();
}
}
@@ -173,11 +182,19 @@ class SynchronousStreamingClient final : public SynchronousClient {
stream_[thread_idx]->Read(&responses_[thread_idx])) {
entry->set_value((UsageTimer::Now() - start) * 1e9);
// don't set the status since there isn't one yet
- return true;
+ if ((messages_per_stream_ != 0) &&
+ (++messages_issued_[thread_idx] < messages_per_stream_)) {
+ return true;
+ } else if (messages_per_stream_ == 0) {
+ return true;
+ } else {
+ // Fall through to the below resetting code after finish
+ }
}
stream_[thread_idx]->WritesDone();
Status s = stream_[thread_idx]->Finish();
- // don't set the value since the stream is failed and shouldn't be timed
+ // don't set the value since this is either a failure (shouldn't be timed)
+ // or a stream-end (already has been timed)
entry->set_status(s.error_code());
if (!s.ok()) {
gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx,
@@ -187,6 +204,7 @@ class SynchronousStreamingClient final : public SynchronousClient {
context_[thread_idx].~ClientContext();
new (&context_[thread_idx]) ClientContext();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ messages_issued_[thread_idx] = 0;
return true;
}
@@ -197,6 +215,8 @@ class SynchronousStreamingClient final : public SynchronousClient {
std::vector<
std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>>
stream_;
+ const int messages_per_stream_;
+ std::vector<int> messages_issued_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(