aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client.h
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r--test/cpp/qps/client.h68
1 files changed, 49 insertions, 19 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 047bd16408..4045e13460 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -112,6 +112,21 @@ class ClientRequestCreator<ByteBuffer> {
}
};
+class HistogramEntry GRPC_FINAL {
+ public:
+ HistogramEntry() : used_(false) {}
+ bool used() const { return used_; }
+ double value() const { return value_; }
+ void set_value(double v) {
+ used_ = true;
+ value_ = v;
+ }
+
+ private:
+ bool used_;
+ double value_;
+};
+
class Client {
public:
Client() : timer_(new UsageTimer), interarrival_timer_() {}
@@ -151,10 +166,21 @@ class Client {
return stats;
}
+ // Must call AwaitThreadsCompletion before destructor to avoid a race
+ // between destructor and invocation of virtual ThreadFunc
+ void AwaitThreadsCompletion() {
+ DestroyMultithreading();
+ std::unique_lock<std::mutex> g(thread_completion_mu_);
+ while (threads_remaining_ != 0) {
+ threads_complete_.wait(g);
+ }
+ }
+
protected:
bool closed_loop_;
void StartThreads(size_t num_threads) {
+ threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
@@ -162,7 +188,8 @@ class Client {
void EndThreads() { threads_.clear(); }
- virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+ virtual void DestroyMultithreading() = 0;
+ virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@@ -215,7 +242,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
- new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@@ -230,15 +256,10 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
- new_stats_ = n;
+ n->Swap(&histogram_);
}
- void EndSwap() {
- std::unique_lock<std::mutex> g(mu_);
- while (new_stats_ != nullptr) {
- cv_.wait(g);
- };
- }
+ void EndSwap() {}
void MergeStatsInto(Histogram* hist) {
std::unique_lock<std::mutex> g(mu_);
@@ -252,29 +273,26 @@ class Client {
void ThreadFunc() {
for (;;) {
// run the loop body
- const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
- // lock, see if we're done
+ HistogramEntry entry;
+ const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
+ // lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
+ if (entry.used()) {
+ histogram_.Add(entry.value());
+ }
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
+ client_->CompleteThread();
return;
}
- // check if we're resetting stats, swap out the histogram if so
- if (new_stats_) {
- new_stats_->Swap(&histogram_);
- new_stats_ = nullptr;
- cv_.notify_one();
- }
}
}
std::mutex mu_;
- std::condition_variable cv_;
bool done_;
- Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
@@ -286,6 +304,18 @@ class Client {
InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_;
+
+ std::mutex thread_completion_mu_;
+ size_t threads_remaining_;
+ std::condition_variable threads_complete_;
+
+ void CompleteThread() {
+ std::lock_guard<std::mutex> g(thread_completion_mu_);
+ threads_remaining_--;
+ if (threads_remaining_ == 0) {
+ threads_complete_.notify_all();
+ }
+ }
};
template <class StubType, class RequestType>