aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client.h
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r--test/cpp/qps/client.h82
1 files changed, 81 insertions, 1 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index dc3a9f2ac5..28cd32a197 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -35,6 +35,7 @@
#define TEST_QPS_CLIENT_H
#include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/qpstest.grpc.pb.h"
@@ -42,11 +43,31 @@
#include <mutex>
namespace grpc {
+
+#if defined(__APPLE__)
+// Specialize Timepoint for high res clock as we need that
+template <>
+class TimePoint<std::chrono::high_resolution_clock::time_point> {
+ public:
+ TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
+ TimepointHR2Timespec(time, &time_);
+ }
+ gpr_timespec raw_time() const { return time_; }
+
+ private:
+ gpr_timespec time_;
+};
+#endif
+
namespace testing {
+typedef std::chrono::high_resolution_clock grpc_time_source;
+typedef std::chrono::time_point<grpc_time_source> grpc_time;
+
class Client {
public:
- explicit Client(const ClientConfig& config) : timer_(new Timer) {
+ explicit Client(const ClientConfig& config)
+ : timer_(new Timer), interarrival_timer_() {
for (int i = 0; i < config.client_channels(); i++) {
channels_.push_back(ClientChannelInfo(
config.server_targets(i % config.server_targets_size()), config));
@@ -81,6 +102,7 @@ class Client {
protected:
SimpleRequest request_;
+ bool closed_loop_;
class ClientChannelInfo {
public:
@@ -106,6 +128,61 @@ class Client {
virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+ void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
+ // Set up the load distribution based on the number of threads
+ if (config.load_type() == CLOSED_LOOP) {
+ closed_loop_ = true;
+ } else {
+ closed_loop_ = false;
+
+ std::unique_ptr<RandomDist> random_dist;
+ const auto& load = config.load_params();
+ switch (config.load_type()) {
+ case POISSON:
+ random_dist.reset(
+ new ExpDist(load.poisson().offered_load() / num_threads));
+ break;
+ case UNIFORM:
+ random_dist.reset(
+ new UniformDist(load.uniform().interarrival_lo() * num_threads,
+ load.uniform().interarrival_hi() * num_threads));
+ break;
+ case DETERMINISTIC:
+ random_dist.reset(
+ new DetDist(num_threads / load.determ().offered_load()));
+ break;
+ case PARETO:
+ random_dist.reset(
+ new ParetoDist(load.pareto().interarrival_base() * num_threads,
+ load.pareto().alpha()));
+ break;
+ default:
+ GPR_ASSERT(false);
+ break;
+ }
+
+ interarrival_timer_.init(*random_dist, num_threads);
+ for (size_t i = 0; i < num_threads; i++) {
+ next_time_.push_back(
+ grpc_time_source::now() +
+ std::chrono::duration_cast<grpc_time_source::duration>(
+ interarrival_timer_(i)));
+ }
+ }
+ }
+
+ bool NextIssueTime(int thread_idx, grpc_time* time_delay) {
+ if (closed_loop_) {
+ return false;
+ } else {
+ *time_delay = next_time_[thread_idx];
+ next_time_[thread_idx] +=
+ std::chrono::duration_cast<grpc_time_source::duration>(
+ interarrival_timer_(thread_idx));
+ return true;
+ }
+ }
+
private:
class Thread {
public:
@@ -168,6 +245,9 @@ class Client {
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_;
+
+ InterarrivalTimer interarrival_timer_;
+ std::vector<grpc_time> next_time_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);