diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/qps/client.h | 62 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 12 | ||||
-rw-r--r-- | test/cpp/qps/interarrival.h | 149 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 41 | ||||
-rw-r--r-- | test/cpp/qps/qps_interarrival_test.cc | 77 | ||||
-rw-r--r-- | test/cpp/qps/qpstest.proto | 37 |
6 files changed, 377 insertions, 1 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2dc5b3860f..45481a3918 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -35,6 +35,7 @@ #define TEST_QPS_CLIENT_H #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.grpc.pb.h" @@ -46,7 +47,8 @@ namespace testing { class Client { public: - explicit Client(const ClientConfig& config) : timer_(new Timer) { + explicit Client(const ClientConfig& config) : timer_(new Timer), + interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { channels_.push_back(ClientChannelInfo( config.server_targets(i % config.server_targets_size()), config)); @@ -105,7 +107,60 @@ class Client { void EndThreads() { threads_.clear(); } virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + + void SetupLoadTest(const ClientConfig& config, size_t num_threads) { + // Set up the load distribution based on the number of threads + if (config.load_type() == CLOSED_LOOP) { + closed_loop_ = true; + } + else { + closed_loop_ = false; + + std::unique_ptr<RandomDist> random_dist; + auto& load = config.load_params(); + switch (config.load_type()) { + case POISSON: + random_dist.reset + (new ExpDist(load.poisson().offered_load()/num_threads)); + break; + case UNIFORM: + random_dist.reset + (new UniformDist(load.uniform().interarrival_lo()*num_threads, + load.uniform().interarrival_hi()*num_threads)); + break; + case DETERMINISTIC: + random_dist.reset + (new DetDist(num_threads/load.determ().offered_load())); + break; + case PARETO: + random_dist.reset + (new ParetoDist(load.pareto().interarrival_base()*num_threads, + load.pareto().alpha())); + break; + default: + GPR_ASSERT(false); + break; + } + interarrival_timer_.init(*random_dist, num_threads); + for (size_t i = 0; i<num_threads; i++) { + next_time_.push_back(std::chrono::high_resolution_clock::now() + + interarrival_timer_(i)); + } + } + } + template<class Timepoint> + bool NextIssueTime(int thread_idx, Timepoint *time_delay) { + if (closed_loop_) { + return false; + } + else { + *time_delay = next_time_[thread_idx]; + next_time_[thread_idx] += interarrival_timer_(thread_idx); + return true; + } + } + private: class Thread { public: @@ -166,6 +221,11 @@ class Client { std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<Timer> timer_; + + bool closed_loop_; + InterarrivalTimer interarrival_timer_; + std::vector<std::chrono::time_point + <std::chrono::high_resolution_clock>> next_time_; }; std::unique_ptr<Client> diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 264293561d..6a89c5acc2 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -32,6 +32,7 @@ */ #include <cassert> +#include <chrono> #include <memory> #include <mutex> #include <string> @@ -57,6 +58,7 @@ #include "test/cpp/qps/client.h" #include "test/cpp/qps/qpstest.grpc.pb.h" #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" namespace grpc { @@ -68,11 +70,19 @@ class SynchronousClient : public Client { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); + SetupLoadTest(config, num_threads_); } virtual ~SynchronousClient(){}; protected: + void WaitToIssue(int thread_idx) { + std::chrono::time_point<std::chrono::high_resolution_clock> next_time; + if (NextIssueTime(thread_idx, &next_time)) { + std::this_thread::sleep_until(next_time); + } + } + size_t num_threads_; std::vector<SimpleResponse> responses_; }; @@ -86,6 +96,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ~SynchronousUnaryClient() { EndThreads(); } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); grpc::ClientContext context; @@ -117,6 +128,7 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + WaitToIssue(thread_idx); double start = Timer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h new file mode 100644 index 0000000000..98f4def1f2 --- /dev/null +++ b/test/cpp/qps/interarrival.h @@ -0,0 +1,149 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_INTERARRIVAL_H +#define TEST_QPS_INTERARRIVAL_H + +#include <chrono> +#include <cmath> +#include <random> + +#include <grpc++/config.h> + +namespace grpc { +namespace testing { + +// First create classes that define a random distribution +// Note that this code does not include C++-specific random distribution +// features supported in std::random. Although this would make this code easier, +// this code is required to serve as the template code for other language +// stacks. Thus, this code only uses a uniform distribution of doubles [0,1) +// and then provides the distribution functions itself. + +class RandomDist { + public: + RandomDist() {} + virtual ~RandomDist() = 0; + // Argument to operator() is a uniform double in the range [0,1) + virtual double operator() (double uni) const = 0; +}; + +inline RandomDist::~RandomDist() {} + +class UniformDist GRPC_FINAL: public RandomDist { +public: + UniformDist(double lo, double hi): lo_(lo), range_(hi-lo) {} + ~UniformDist() GRPC_OVERRIDE {} + double operator() (double uni) const GRPC_OVERRIDE {return uni*range_+lo_;} +private: + double lo_; + double range_; +}; + +class ExpDist GRPC_FINAL : public RandomDist { +public: + explicit ExpDist(double lambda): lambda_recip_(1.0/lambda) {} + ~ExpDist() GRPC_OVERRIDE {} + double operator() (double uni) const GRPC_OVERRIDE { + // Note: Use 1.0-uni above to avoid NaN if uni is 0 + return lambda_recip_ * (-log(1.0-uni)); + } +private: + double lambda_recip_; +}; + +class DetDist GRPC_FINAL : public RandomDist { +public: + explicit DetDist(double val): val_(val) {} + ~DetDist() GRPC_OVERRIDE {} + double operator() (double uni) const GRPC_OVERRIDE {return val_;} +private: + double val_; +}; + +class ParetoDist GRPC_FINAL : public RandomDist { +public: + ParetoDist(double base, double alpha): base_(base), alpha_recip_(1.0/alpha) {} + ~ParetoDist() GRPC_OVERRIDE {} + double operator() (double uni) const GRPC_OVERRIDE { + // Note: Use 1.0-uni above to avoid div by zero if uni is 0 + return base_ / pow(1.0-uni, alpha_recip_); + } +private: + double base_; + double alpha_recip_; +}; + +// A class library for generating pseudo-random interarrival times +// in an efficient re-entrant way. The random table is built at construction +// time, and each call must include the thread id of the invoker + +using qps_random_engine = std::default_random_engine; + +class InterarrivalTimer { +public: + InterarrivalTimer() {} + InterarrivalTimer(const RandomDist& r, int threads, int entries=1000000) { + init(r, threads, entries); + } + void init(const RandomDist& r, int threads, int entries=1000000) { + qps_random_engine gen; + std::uniform_real_distribution<double> uniform(0.0,1.0); + for (int i=0; i<entries; i++) { + random_table_.push_back( + std::chrono::nanoseconds( + static_cast<int64_t>(1e9*r(uniform(gen))))); + } + // Now set up the thread positions + for (int i=0; i<threads; i++) { + thread_posns_.push_back(random_table_.begin() + (entries * i)/threads); + } + } + virtual ~InterarrivalTimer() {}; + + std::chrono::nanoseconds operator() (int thread_num) { + auto ret = *(thread_posns_[thread_num]++); + if (thread_posns_[thread_num] == random_table_.end()) + thread_posns_[thread_num] = random_table_.begin(); + return ret; + } + private: + typedef std::vector<std::chrono::nanoseconds> time_table; + std::vector<time_table::const_iterator> thread_posns_; + time_table random_table_; +}; + +} +} + +#endif diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index fc8e04201c..dfa3f06753 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -60,11 +60,15 @@ DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(payload_size, 1, "Payload size"); DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); +DEFINE_string(load_type, "CLOSED_LOOP", "Load type"); +DEFINE_double(load_param_1, 0.0, "Load parameter 1"); +DEFINE_double(load_param_2, 0.0, "Load parameter 2"); using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; using grpc::testing::ClientType; using grpc::testing::ServerType; +using grpc::testing::LoadType; using grpc::testing::RpcType; using grpc::testing::ResourceUsage; @@ -76,11 +80,14 @@ int main(int argc, char** argv) { ClientType client_type; ServerType server_type; + LoadType load_type; GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type)); GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type)); + GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type)); ClientConfig client_config; client_config.set_client_type(client_type); + client_config.set_load_type(load_type); client_config.set_enable_ssl(FLAGS_enable_ssl); client_config.set_outstanding_rpcs_per_channel( FLAGS_outstanding_rpcs_per_channel); @@ -89,6 +96,40 @@ int main(int argc, char** argv) { client_config.set_async_client_threads(FLAGS_async_client_threads); client_config.set_rpc_type(rpc_type); + // set up the load parameters + switch (load_type) { + case grpc::testing::CLOSED_LOOP: + break; + case grpc::testing::POISSON: { + auto poisson = client_config.mutable_load_params()->mutable_poisson(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + poisson->set_offered_load(FLAGS_load_param_1); + break; + } + case grpc::testing::UNIFORM: { + auto uniform = client_config.mutable_load_params()->mutable_uniform(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + GPR_ASSERT(FLAGS_load_param_2 != 0.0); + uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6); + uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6); + break; + } + case grpc::testing::DETERMINISTIC: { + auto determ = client_config.mutable_load_params()->mutable_determ(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + determ->set_offered_load(FLAGS_load_param_1); + break; + } + case grpc::testing::PARETO: { + auto pareto = client_config.mutable_load_params()->mutable_pareto(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + GPR_ASSERT(FLAGS_load_param_2 != 0.0); + pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6); + pareto->set_alpha(FLAGS_load_param_2); + break; + } + } + ServerConfig server_config; server_config.set_server_type(server_type); server_config.set_threads(FLAGS_server_threads); diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc new file mode 100644 index 0000000000..14af4c6506 --- /dev/null +++ b/test/cpp/qps/qps_interarrival_test.cc @@ -0,0 +1,77 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/interarrival.h" +#include <chrono> +#include <iostream> + +// Use the C histogram rather than C++ to avoid depending on proto +#include <grpc/support/histogram.h> +#include <grpc++/config.h> + +using grpc::testing::ExpDist; +using grpc::testing::InterarrivalTimer; + +void RunTest(InterarrivalTimer&& timer, std::string title) { + gpr_histogram *h(gpr_histogram_create(0.01,60e9)); + + for (int i=0; i<10000000; i++) { + for (int j=0; j<5; j++) { + gpr_histogram_add(h, timer(j).count()); + } + } + + std::cout << title << " Distribution" << std::endl; + std::cout << "Value, Percentile" << std::endl; + for (double pct = 0.0; pct < 100.0; pct += 1.0) { + std::cout << gpr_histogram_percentile(h, pct) << "," << pct << std::endl; + } + + gpr_histogram_destroy(h); +} + +using grpc::testing::ExpDist; +using grpc::testing::DetDist; +using grpc::testing::UniformDist; +using grpc::testing::ParetoDist; + +int main(int argc, char **argv) { + RunTest(InterarrivalTimer(ExpDist(10.0), 5), std::string("Exponential(10)")); + RunTest(InterarrivalTimer(DetDist(5.0), 5), std::string("Det(5)")); + RunTest(InterarrivalTimer(UniformDist(0.0,10.0), 5), + std::string("Uniform(1,10)")); + RunTest(InterarrivalTimer(ParetoDist(1.0,1.0), 5), + std::string("Pareto(1,1)")); + + return 0; +} diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index 1553ef5f07..6e710e7598 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -92,6 +92,41 @@ enum RpcType { STREAMING = 2; } +enum LoadType { + CLOSED_LOOP = 1; + POISSON = 2; + UNIFORM = 3; + DETERMINISTIC = 4; + PARETO = 5; +} + +message PoissonParams { + optional double offered_load = 1; +} + +message UniformParams { + optional double interarrival_lo = 1; + optional double interarrival_hi = 2; +} + +message DeterministicParams { + optional double offered_load = 1; +} + +message ParetoParams { + optional double interarrival_base = 1; + optional double alpha = 2; +} + +message LoadParams { + oneof load { + PoissonParams poisson = 1; + UniformParams uniform = 2; + DeterministicParams determ = 3; + ParetoParams pareto = 4; + }; +} + message ClientConfig { repeated string server_targets = 1; required ClientType client_type = 2; @@ -102,6 +137,8 @@ message ClientConfig { // only for async client: optional int32 async_client_threads = 7; optional RpcType rpc_type = 8 [default=UNARY]; + optional LoadType load_type = 9 [default=CLOSED_LOOP]; + optional LoadParams load_params = 10; } // Request current stats |