aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Moiz Haidry <mhaidry@google.com>2018-12-21 22:40:38 -0800
committerGravatar Moiz Haidry <mhaidry@google.com>2018-12-21 22:40:38 -0800
commit7bb853ebdd0b6e057de447147ad60ebf42e0903d (patch)
treefeb0a10ca58703f827a8265238ad937c73ae06d9 /test
parent5ec78a286d7be61aec929b133c031a7a1af262df (diff)
Addressed PR comments. Made Client::Thread public and removed use of void ptr to refer it. Avoided overloading of NextIssue TIme by renaming it NextRPCIssueTime
Diffstat (limited to 'test')
-rw-r--r--test/cpp/qps/client.h116
-rw-r--r--test/cpp/qps/client_callback.cc18
2 files changed, 67 insertions, 67 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 0b9837660b..4b8ac9bd94 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -251,64 +251,6 @@ class Client {
return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
}
- protected:
- bool closed_loop_;
- gpr_atm thread_pool_done_;
- double median_latency_collection_interval_seconds_; // In seconds
-
- void StartThreads(size_t num_threads) {
- gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
- threads_remaining_ = num_threads;
- for (size_t i = 0; i < num_threads; i++) {
- threads_.emplace_back(new Thread(this, i));
- }
- }
-
- void EndThreads() {
- MaybeStartRequests();
- threads_.clear();
- }
-
- virtual void DestroyMultithreading() = 0;
-
- void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
- // Set up the load distribution based on the number of threads
- const auto& load = config.load_params();
-
- std::unique_ptr<RandomDistInterface> random_dist;
- switch (load.load_case()) {
- case LoadParams::kClosedLoop:
- // Closed-loop doesn't use random dist at all
- break;
- case LoadParams::kPoisson:
- random_dist.reset(
- new ExpDist(load.poisson().offered_load() / num_threads));
- break;
- default:
- GPR_ASSERT(false);
- }
-
- // Set closed_loop_ based on whether or not random_dist is set
- if (!random_dist) {
- closed_loop_ = true;
- } else {
- closed_loop_ = false;
- // set up interarrival timer according to random dist
- interarrival_timer_.init(*random_dist, num_threads);
- const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
- for (size_t i = 0; i < num_threads; i++) {
- next_time_.push_back(gpr_time_add(
- now,
- gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
- }
- }
- }
-
- std::function<gpr_timespec()> NextIssuer(int thread_idx) {
- return closed_loop_ ? std::function<gpr_timespec()>()
- : std::bind(&Client::NextIssueTime, this, thread_idx);
- }
-
class Thread {
public:
Thread(Client* client, size_t idx)
@@ -387,6 +329,64 @@ class Client {
double interval_start_time_;
};
+ protected:
+ bool closed_loop_;
+ gpr_atm thread_pool_done_;
+ double median_latency_collection_interval_seconds_; // In seconds
+
+ void StartThreads(size_t num_threads) {
+ gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
+ threads_remaining_ = num_threads;
+ for (size_t i = 0; i < num_threads; i++) {
+ threads_.emplace_back(new Thread(this, i));
+ }
+ }
+
+ void EndThreads() {
+ MaybeStartRequests();
+ threads_.clear();
+ }
+
+ virtual void DestroyMultithreading() = 0;
+
+ void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
+ // Set up the load distribution based on the number of threads
+ const auto& load = config.load_params();
+
+ std::unique_ptr<RandomDistInterface> random_dist;
+ switch (load.load_case()) {
+ case LoadParams::kClosedLoop:
+ // Closed-loop doesn't use random dist at all
+ break;
+ case LoadParams::kPoisson:
+ random_dist.reset(
+ new ExpDist(load.poisson().offered_load() / num_threads));
+ break;
+ default:
+ GPR_ASSERT(false);
+ }
+
+ // Set closed_loop_ based on whether or not random_dist is set
+ if (!random_dist) {
+ closed_loop_ = true;
+ } else {
+ closed_loop_ = false;
+ // set up interarrival timer according to random dist
+ interarrival_timer_.init(*random_dist, num_threads);
+ const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (size_t i = 0; i < num_threads; i++) {
+ next_time_.push_back(gpr_time_add(
+ now,
+ gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
+ }
+ }
+ }
+
+ std::function<gpr_timespec()> NextIssuer(int thread_idx) {
+ return closed_loop_ ? std::function<gpr_timespec()>()
+ : std::bind(&Client::NextIssueTime, this, thread_idx);
+ }
+
virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
std::vector<std::unique_ptr<Thread>> threads_;
diff --git a/test/cpp/qps/client_callback.cc b/test/cpp/qps/client_callback.cc
index 1880f46d43..4a06325f2b 100644
--- a/test/cpp/qps/client_callback.cc
+++ b/test/cpp/qps/client_callback.cc
@@ -90,7 +90,7 @@ class CallbackClient
}
}
- gpr_timespec NextIssueTime() {
+ gpr_timespec NextRPCIssueTime() {
std::lock_guard<std::mutex> l(next_issue_time_mu_);
return Client::NextIssueTime(0);
}
@@ -166,7 +166,7 @@ class CallbackUnaryClient final : public CallbackClient {
private:
void ScheduleRpc(Thread* t, size_t vector_idx) {
if (!closed_loop_) {
- gpr_timespec next_issue_time = NextIssueTime();
+ gpr_timespec next_issue_time = NextRPCIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_[vector_idx]->alarm_.experimental().Set(
@@ -221,13 +221,13 @@ class CallbackStreamingClient : public CallbackClient {
}
~CallbackStreamingClient() {}
- void AddHistogramEntry(double start_, bool ok, void* thread_ptr) {
+ void AddHistogramEntry(double start_, bool ok, Thread* thread_ptr) {
// Update Histogram with data from the callback run
HistogramEntry entry;
if (ok) {
entry.set_value((UsageTimer::Now() - start_) * 1e9);
}
- ((Client::Thread*)thread_ptr)->UpdateHistogram(&entry);
+ thread_ptr->UpdateHistogram(&entry);
}
int messages_per_stream() { return messages_per_stream_; }
@@ -297,7 +297,7 @@ class CallbackStreamingPingPongReactor final
if (client_->ThreadCompleted()) return;
if (!client_->IsClosedLoop()) {
- gpr_timespec next_issue_time = client_->NextIssueTime();
+ gpr_timespec next_issue_time = client_->NextRPCIssueTime();
// Start an alarm callback to run the internal callback after
// next_issue_time
ctx_->alarm_.experimental().Set(next_issue_time,
@@ -307,13 +307,13 @@ class CallbackStreamingPingPongReactor final
}
}
- void set_thread_ptr(void* ptr) { thread_ptr_ = ptr; }
+ void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
CallbackStreamingPingPongClient* client_;
std::unique_ptr<CallbackClientRpcContext> ctx_;
- void* thread_ptr_; // Needed to update histogram entries
- double start_; // Track message start time
- int messages_issued_; // Messages issued by this stream
+ Client::Thread* thread_ptr_; // Needed to update histogram entries
+ double start_; // Track message start time
+ int messages_issued_; // Messages issued by this stream
};
class CallbackStreamingPingPongClientImpl final