diff options
Diffstat (limited to 'test/cpp/qps/client.h')
-rw-r--r-- | test/cpp/qps/client.h | 82 |
1 files changed, 81 insertions, 1 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index dc3a9f2ac5..28cd32a197 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -35,6 +35,7 @@ #define TEST_QPS_CLIENT_H #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.grpc.pb.h" @@ -42,11 +43,31 @@ #include <mutex> namespace grpc { + +#if defined(__APPLE__) +// Specialize Timepoint for high res clock as we need that +template <> +class TimePoint<std::chrono::high_resolution_clock::time_point> { + public: + TimePoint(const std::chrono::high_resolution_clock::time_point& time) { + TimepointHR2Timespec(time, &time_); + } + gpr_timespec raw_time() const { return time_; } + + private: + gpr_timespec time_; +}; +#endif + namespace testing { +typedef std::chrono::high_resolution_clock grpc_time_source; +typedef std::chrono::time_point<grpc_time_source> grpc_time; + class Client { public: - explicit Client(const ClientConfig& config) : timer_(new Timer) { + explicit Client(const ClientConfig& config) + : timer_(new Timer), interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { channels_.push_back(ClientChannelInfo( config.server_targets(i % config.server_targets_size()), config)); @@ -81,6 +102,7 @@ class Client { protected: SimpleRequest request_; + bool closed_loop_; class ClientChannelInfo { public: @@ -106,6 +128,61 @@ class Client { virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + void SetupLoadTest(const ClientConfig& config, size_t num_threads) { + // Set up the load distribution based on the number of threads + if (config.load_type() == CLOSED_LOOP) { + closed_loop_ = true; + } else { + closed_loop_ = false; + + std::unique_ptr<RandomDist> random_dist; + const auto& load = config.load_params(); + switch (config.load_type()) { + case POISSON: + random_dist.reset( + new ExpDist(load.poisson().offered_load() / num_threads)); + break; + case UNIFORM: + random_dist.reset( + new UniformDist(load.uniform().interarrival_lo() * num_threads, + load.uniform().interarrival_hi() * num_threads)); + break; + case DETERMINISTIC: + random_dist.reset( + new DetDist(num_threads / load.determ().offered_load())); + break; + case PARETO: + random_dist.reset( + new ParetoDist(load.pareto().interarrival_base() * num_threads, + load.pareto().alpha())); + break; + default: + GPR_ASSERT(false); + break; + } + + interarrival_timer_.init(*random_dist, num_threads); + for (size_t i = 0; i < num_threads; i++) { + next_time_.push_back( + grpc_time_source::now() + + std::chrono::duration_cast<grpc_time_source::duration>( + interarrival_timer_(i))); + } + } + } + + bool NextIssueTime(int thread_idx, grpc_time* time_delay) { + if (closed_loop_) { + return false; + } else { + *time_delay = next_time_[thread_idx]; + next_time_[thread_idx] += + std::chrono::duration_cast<grpc_time_source::duration>( + interarrival_timer_(thread_idx)); + return true; + } + } + private: class Thread { public: @@ -168,6 +245,9 @@ class Client { std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<Timer> timer_; + + InterarrivalTimer interarrival_timer_; + std::vector<grpc_time> next_time_; }; std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); |