diff options
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r-- | test/cpp/qps/client.h | 40 |
1 files changed, 31 insertions, 9 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 175529f01b..047bd16408 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -38,6 +38,7 @@ #include <mutex> #include <vector> +#include <grpc++/channel.h> #include <grpc++/support/byte_buffer.h> #include <grpc++/support/channel_arguments.h> #include <grpc++/support/slice.h> @@ -124,13 +125,15 @@ class Client { if (reset) { Histogram* to_merge = new Histogram[threads_.size()]; for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Swap(&to_merge[i]); - latencies.Merge(to_merge[i]); + threads_[i]->BeginSwap(&to_merge[i]); } - delete[] to_merge; - std::unique_ptr<UsageTimer> timer(new UsageTimer); timer_.swap(timer); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->EndSwap(); + latencies.Merge(to_merge[i]); + } + delete[] to_merge; timer_result = timer->Mark(); } else { // merge snapshots of each thread histogram @@ -212,6 +215,7 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), + new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -224,9 +228,16 @@ class Client { impl_.join(); } - void Swap(Histogram* n) { + void BeginSwap(Histogram* n) { std::lock_guard<std::mutex> g(mu_); - n->Swap(&histogram_); + new_stats_ = n; + } + + void EndSwap() { + std::unique_lock<std::mutex> g(mu_); + while (new_stats_ != nullptr) { + cv_.wait(g); + }; } void MergeStatsInto(Histogram* hist) { @@ -240,11 +251,10 @@ class Client { void ThreadFunc() { for (;;) { - // lock since the thread should only be doing one thing at a time - std::lock_guard<std::mutex> g(mu_); // run the loop body const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // see if we're done + // lock, see if we're done + std::lock_guard<std::mutex> g(mu_); if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; @@ -252,11 +262,19 @@ class Client { if (done_) { return; } + // check if we're resetting stats, swap out the histogram if so + if (new_stats_) { + new_stats_->Swap(&histogram_); + new_stats_ = nullptr; + cv_.notify_one(); + } } } std::mutex mu_; + std::condition_variable cv_; bool done_; + Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; @@ -315,6 +333,10 @@ class ClientImpl : public Client { target, config.security_params().server_host_override(), config.has_security_params(), !config.security_params().use_test_ca(), std::shared_ptr<CallCredentials>(), args); + gpr_log(GPR_INFO, "Connecting to %s", target.c_str()); + GPR_ASSERT(channel_->WaitForConnected( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(30, GPR_TIMESPAN)))); stub_ = create_stub(channel_); } Channel* get_channel() { return channel_.get(); } |