diff options
author | David Garcia Quintas <dgq@google.com> | 2016-06-16 18:43:19 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-06-16 18:43:19 -0700 |
commit | 8375c0daeabcb1aa0b47da1d82fa71fd6eafcb33 (patch) | |
tree | 3f8a688302621aa527a253675a488cbf8335659e /test/cpp | |
parent | 909dd6ec8bf41d16073d98b891c8178d5cbd0936 (diff) | |
parent | 69170755b8f16d7f2540e0bc060f67940e4b4158 (diff) |
Merge branch 'master' of github.com:grpc/grpc into simplify_compression_interop
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/interop/interop_server.cc | 18 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 35 |
2 files changed, 43 insertions, 10 deletions
diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index 8c5c0e24e1..f0a182f230 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -214,7 +214,15 @@ class TestServiceImpl : public TestService::Service { wopts.set_no_compression(); } // else, compression is already enabled via the context. } - write_success = writer->Write(response, wopts); + int time_us; + if ((time_us = request->response_parameters(i).interval_us()) > 0) { + // Sleep before response if needed + gpr_timespec sleep_time = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(time_us, GPR_TIMESPAN)); + gpr_sleep_until(sleep_time); + } + write_success = writer->Write(response); } if (write_success) { return Status::OK; @@ -255,6 +263,14 @@ class TestServiceImpl : public TestService::Service { response.mutable_payload()->set_type(request.payload().type()); response.mutable_payload()->set_body( grpc::string(request.response_parameters(0).size(), '\0')); + int time_us; + if ((time_us = request.response_parameters(0).interval_us()) > 0) { + // Sleep before response if needed + gpr_timespec sleep_time = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(time_us, GPR_TIMESPAN)); + gpr_sleep_until(sleep_time); + } write_success = stream->Write(response); } } diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2a89eb8018..047bd16408 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -125,13 +125,15 @@ class Client { if (reset) { Histogram* to_merge = new Histogram[threads_.size()]; for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Swap(&to_merge[i]); - latencies.Merge(to_merge[i]); + threads_[i]->BeginSwap(&to_merge[i]); } - delete[] to_merge; - std::unique_ptr<UsageTimer> timer(new UsageTimer); timer_.swap(timer); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->EndSwap(); + latencies.Merge(to_merge[i]); + } + delete[] to_merge; timer_result = timer->Mark(); } else { // merge snapshots of each thread histogram @@ -213,6 +215,7 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), + new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -225,9 +228,16 @@ class Client { impl_.join(); } - void Swap(Histogram* n) { + void BeginSwap(Histogram* n) { std::lock_guard<std::mutex> g(mu_); - n->Swap(&histogram_); + new_stats_ = n; + } + + void EndSwap() { + std::unique_lock<std::mutex> g(mu_); + while (new_stats_ != nullptr) { + cv_.wait(g); + }; } void MergeStatsInto(Histogram* hist) { @@ -241,11 +251,10 @@ class Client { void ThreadFunc() { for (;;) { - // lock since the thread should only be doing one thing at a time - std::lock_guard<std::mutex> g(mu_); // run the loop body const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // see if we're done + // lock, see if we're done + std::lock_guard<std::mutex> g(mu_); if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; @@ -253,11 +262,19 @@ class Client { if (done_) { return; } + // check if we're resetting stats, swap out the histogram if so + if (new_stats_) { + new_stats_->Swap(&histogram_); + new_stats_ = nullptr; + cv_.notify_one(); + } } } std::mutex mu_; + std::condition_variable cv_; bool done_; + Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; |